Pinot Input Format

Pinot Input format is a set of plugins with the goal of reading data from files during data ingestion. It can be split into two additional types: record encoders (for batch jobs) and decoders (for ingestion).

Currently supported Pinot Input Formats:

  • Batch

    • Avro

    • CSV

    • JSON

    • ORC

    • PARQUET

    • THRIFT

  • Streaming

    • Avro

    • Avro Confluent

      • To use the avro confluent stream decoder, the realtime table configuration should point to the streamConfigs section of tableIndexConfig should point to the avro confluent stream decoder. Here is an example configuration:

"streamConfigs": {
  "streamType": "kafka",
  "stream.kafka.consumer.type": "LowLevel",
  "stream.kafka.topic.name": "kafka-topic",
  "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
  "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
  "stream.kafka.decoder.prop.schema.registry.rest.url": "http://<schema registry host>:8081",
  "stream.kafka.zk.broker.url": "<zk broker url>/",
  "stream.kafka.broker.list": "<kafka broker url>",
  "realtime.segment.flush.threshold.time": "24h",
  "realtime.segment.flush.threshold.size": "0",
  "realtime.segment.flush.desired.size": "150M",
  "stream.kafka.consumer.prop.auto.isolation.level": "read_committed",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
  "stream.kafka.consumer.prop.group.id": "<group id>",
  "stream.kafka.consumer.prop.client.id": "<client id>"
}

  • Protocol Buffers To ingest data in protocol buffers format, the following config needs to be added in the ingestion spec

    executionFrameworkSpec:
      name: 'standalone'
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'file:///path/to/input'
    includeFileNamePattern: 'glob:**/*.parquet'
    excludeFileNamePattern: 'glob:**/*.avro'
    outputDirURI: 'file:///path/to/output'
    overwriteOutput: true
    pinotFSSpecs:
      - scheme: file
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    recordReaderSpec:
      dataFormat: 'proto'
      className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig'
      configs:
        descriptorFile: 'file:///path/to/sample.desc
    tableSpec:
      tableName: 'myTable'
      schemaURI: 'http://localhost:9000/tables/myTable/schema'
      tableConfigURI: 'http://localhost:9000/tables/myTable'
    pinotClusterSpecs:
      - controllerURI: 'localhost:9000'
    pushJobSpec:
      pushAttempts: 2

The descriptorFile contains all of the descriptors of a .proto file. It should be an URI pointing to the location of the .desc file for a corresponding .proto file. You can generate the descriptor file from a .proto file using the command

protoc -I=/directory/containing/proto/files--include_imports -- descriptor_set_out=/path/to/sample.desc /path/to/sample.proto

Last updated