arrow-left

All pages
gitbookPowered by GitBook
1 of 1

Loading...

Input formats

This section contains a collection of guides that will show you how to import data from a Pinot supported input format.

Pinot offers support for various popular input formats during ingestion. By changing the input format, you can reduce the time spent doing serialization-deserialization and speed up the ingestion.

hashtag
Configuring input formats

The input format can be changed using the recordReaderSpec config in the ingestion job spec.

The config consists of the following keys:

  • dataFormat - Name of the data format to consume.

  • className - name of the class that implements the RecordReader interface. This class is used for parsing the data.

To configure input format for realtime ingestion, you can add the following to the table config json

hashtag
Supported input formats

Pinot supports the multiple input formats out of the box. You just need to specify the corresponding readers and the associated custom configs to switch between the formats.

hashtag
CSV

hashtag
Supported Configs

fileFormat - can be one of default, rfc4180, excel, tdf, mysql

header - header of the file. The columnNames should be seperated by the delimiter mentioned in the config

delimiter - The character seperating the columns

multiValueDelimiter - The character seperating multiple values in a single column. This can be used to split a column into a list.

nullValueString - use this to specify how NULL values are represented in your csv files. Default is empty string interpreted as NULL.

circle-info

Your CSV file may have raw text fields that cannot be reliably delimited using any character. In this case, explicitly set the multiValueDelimeter field to empty in the ingestion config. multiValueDelimiter: ''

hashtag
AVRO

The Avro record reader converts the data in file to a GenericRecord. A java class or .avro file is not required.

You can also specify Kafka schema registry for avro records in stream.

hashtag
JSON

hashtag
Thrift

circle-info

Thrift requires the generated class using .thrift file to parse the data. The .class file should be available in the Pinot's classpath. You can put the files in the lib/ folder of pinot distribution directory.

hashtag
Parquet

circle-exclamation

The above class doesn't read the Parquet INT96 and Decimaltype.

Please use the below class to handle INT96 and Decimaltype.

hashtag
ORC

ORC record reader supports the following data types -

circle-info

In LIST and MAP types, the object should only belong to one of the data types supported by Pinot.

hashtag
Protocol Buffers

The reader requires a descriptor file to deserialize the data present in the files. You can generate the descriptor file (.desc) from the .proto file using the command -

hashtag
Descriptor file in DFS

The descriptorFile needs to be present on all pinot server machines for ingestion to work. You can also upload the descriptor file to a DFS such as S3, GCS etc. and mention that path in the configs. Do note that you'll also need to specify for the directory in the pinot configuration or ingestion spec as well.

Both proto2 and proto3 formats are supported by the reader.

hashtag
Schema Registry

Protobuf reader also supports Confluent schema registry. Using schema registry allows you to not create and upload any descriptor file. The schema is fetched from the registry itself using the metadata present in the Kafka message. The only pre-requisite for it to work is that your messages should be serialized using io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer in producer.

recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
  configs: 
    key1 : 'value1'
    key2 : 'value2'
configClassName - name of the class that implements the RecordReaderConfig interface. This class is used the parse the values mentioned in configs
  • configs - Key value pair for format specific configs. This field can be left out.

  • Double

    STRING

    String

    VARCHAR

    String

    CHAR

    String

    LIST

    Object[]

    MAP

    Map<Object, Object>

    DATE

    Long

    TIMESTAMP

    Long

    BINARY

    byte[]

    BYTE

    Integer

    Parquet Data Type

    Java Data Type

    Comment

    INT96

    INT64

    ParquetINT96 type converts nanoseconds

    to Pinot INT64 type of milliseconds

    DECIMAL

    DOUBLE

    ORC Data Type

    Java Data Type

    BOOLEAN

    String

    SHORT

    Integer

    INT

    Integer

    LONG

    Integer

    FLOAT

    Float

    filesystem config

    DOUBLE

    "streamConfigs": {
        "streamType": "foo_bar",
        "stream.foo_bar.decoder.class.name": "org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder"
        "stream.foo_bar.decoder.prop.key1": "value1" ,
        "stream.foo_bar.decoder.prop.key2" : "value2"
    }
    dataFormat: 'csv'
    className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
    configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    configs:
    	fileFormat: 'default' #should be one of default, rfc4180, excel, tdf, mysql
    	header: 'columnName seperated by delimiter'
      delimiter: ','
      multiValueDelimiter: '-'
    "streamType": "kafka",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder"
    "stream.kafka.decoder.prop.delimiter": "," ,
    "stream.kafka.decoder.prop.multiValueDelimiter" : "-"
    dataFormat: 'avro'
    className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    "streamType": "kafka",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
    "stream.kafka.decoder.prop.schema.registry.rest.url": "http://localhost:2222/schemaRegistry",
    dataFormat: 'json'
    className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
    "streamType": "kafka",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
    dataFormat: 'thrift'
    className: 'org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader'
    configs:
        thriftClass: 'ParserClassName'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'
    dataFormat: 'orc'
    className: 'org.apache.pinot.plugin.inputformat.orc.ORCRecordReader'
    dataFormat: 'proto'
    className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
    configs:
        descriptorFile: 'file:///path/to/sample.desc'
        protoClassName: 'Metrics'
    "streamType": "kafka",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder",
    "stream.kafka.decoder.prop.descriptorFile": "file:///tmp/Workspace/protobuf/metrics.desc",
    "stream.kafka.decoder.prop.protoClassName": "Metrics"
    protoc --include_imports --descriptor_set_out=/absolute/path/to/output.desc /absolute/path/to/input.proto
    recordReaderSpec:
      dataFormat: 'proto'
      className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
      configs:
      	descriptorFile: 's3://path/to/sample.desc'
    pinotFSSpecs:
      - scheme: s3
        className: org.apache.pinot.plugin.filesystem.S3PinotFS
        configs:
          region: 'us-west-1'
    "streamType": "kafka",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder",
    "stream.kafka.decoder.prop.descriptorFile": "s3://tmp/Workspace/protobuf/metrics.desc",
    "stream.kafka.decoder.prop.protoClassName": "Metrics"
    "streamType": "kafka",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder",
    "stream.kafka.decoder.prop.schema.registry.rest.url": "http://localhost:2222/schemaRegistry",
    "stream.kafka.decoder.prop.cached.schema.map.capacity": 1000