arrow-left

All pages
gitbookPowered by GitBook
1 of 1

Loading...

Pinot Input Format

Pinot Input format is a set of plugins with the goal of reading data from files during data ingestion. It can be split into two additional types: record encoders (for batch jobs) and decoders (for ingestion).

Currently supported Pinot Input Formats:

  • Batch

    • Avro

    • CSV

    • JSON

    • ORC

    • PARQUET

    • THRIFT

  • Streaming

    • Avro

    • Avro Confluent

  • Protocol Buffers To ingest data in protocol buffers format, the following config needs to be added in the ingestion spec

The descriptorFile contains all of the descriptors of a .proto file. It should be an URI pointing to the location of the .desc file for a corresponding .proto file. You can generate the descriptor file from a .proto file using the command

protoc -I=/directory/containing/proto/files--include_imports -- descriptor_set_out=/path/to/sample.desc /path/to/sample.proto

To use the avro confluent stream decoder, the realtime table configuration should point to the streamConfigs section of tableIndexConfig should point to the avro confluent stream decoder. Here is an example configuration:

"streamConfigs": {
  "streamType": "kafka",
  "stream.kafka.consumer.type": "LowLevel",
  "stream.kafka.topic.name": "kafka-topic",
  "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
  "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
  "stream.kafka.decoder.prop.schema.registry.rest.url": "http://<schema registry host>:8081",
  "stream.kafka.zk.broker.url": "<zk broker url>/",
  "stream.kafka.broker.list": "<kafka broker url>",
  "realtime.segment.flush.threshold.time": "24h",
  "realtime.segment.flush.threshold.size": "0",
  "realtime.segment.flush.desired.size": "150M",
  "stream.kafka.consumer.prop.auto.isolation.level": "read_committed",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
  "stream.kafka.consumer.prop.group.id": "<group id>",
  "stream.kafka.consumer.prop.client.id": "<client id>"
}
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: 'file:///path/to/input'
includeFileNamePattern: 'glob:**/*.parquet'
excludeFileNamePattern: 'glob:**/*.avro'
outputDirURI: 'file:///path/to/output'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'proto'
  className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig'
  configs:
    descriptorFile: 'file:///path/to/sample.desc
tableSpec:
  tableName: 'myTable'
  schemaURI: 'http://localhost:9000/tables/myTable/schema'
  tableConfigURI: 'http://localhost:9000/tables/myTable'
pinotClusterSpecs:
  - controllerURI: 'localhost:9000'
pushJobSpec:
  pushAttempts: 2