arrow-left

All pages
gitbookPowered by GitBook
1 of 3

Loading...

Loading...

Loading...

Ingest records with dynamic schemas

Storing records with dynamic schemas in a table with a fixed schema.

Some domains (e.g., logging) generate records where each record can have a different set of keys, whereas Pinot tables have a relatively static schema. For records with varying keys, it's impractical to store each field in its own table column. However, most (if not all) fields may be important, so fields should not be dropped unnecessarily.

The SchemaConformingTransformerarrow-up-right is a RecordTransformerarrow-up-right that can transform records with dynamic schemas such that they can be ingested in a table with a static schema. The transformer primarily takes record fields that don't exist in the schema and stores them in a type of catchall field.

For example, consider this record:

Let's say the table's schema contains the following fields:

  • timestamp

  • hostname

  • level

  • message

  • tags.platform

  • tags.service

  • indexableExtras

  • unindexableExtras

Without this transformer, the HOSTNAME field and the entire tags field would be dropped when storing the record in the table. However, with this transformer, the record would be transformed into the following:

Notice that the transformer does the following:

  • Flattens nested fields which exist in the schema, like tags.platform

  • Drops some fields like HOSTNAME, where HOSTNAME must be listed as a field in the config option fieldPathsToDrop

The unindexableExtras field allows the transformer to separate fields that don't need indexing (because they are only retrieved, not searched) from those that do.

hashtag
SchemaConformingTransformer Configuration

To use the transformer, add the schemaConformingTransformerConfig option in the ingestionConfig section of your table configuration, as shown in the following example.

For example:

Available configuration options are listed in .

{
  "timestamp": 1687786535928,
  "hostname": "host1",
  "HOSTNAME": "host1",
  "level": "INFO",
  "message": "Started processing job1",
  "tags": {
    "platform": "data",
    "service": "serializer",
    "params": {
      "queueLength": 5,
      "timeout": 299,
      "userData_noIndex": {
        "nth": 99
      }
    }
  }
}
Moves fields that don't exist in the schema and have the suffix _noIndex into the unindexableExtras field
  • Moves any remaining fields that don't exist in the schema into the indexableExtras field

  • SchemaConformingTransformerConfigarrow-up-right
    {
      "timestamp": 1687786535928,
      "hostname": "host1",
      "level": "INFO",
      "message": "Started processing job1",
      "tags.platform": "data",
      "tags.service": "serializer",
      "indexableExtras": {
        "tags": {
          "params": {
            "queueLength": 5,
            "timeout": 299
          }
        }
      },
      "unindexableExtras": {
        "tags": {
          "userData_noIndex": {
            "nth": 99
          }
        }
      }
    }
    {
      "ingestionConfig": {
        "schemaConformingTransformerConfig": {
          "indexableExtrasField": "extras",
          "unindexableExtrasField": "extrasNoIndex",
          "unindexableFieldSuffix": "_no_index",
          "fieldPathsToDrop": [
            "HOSTNAME"
          ]
        }
      }
    }

    Input formats

    This section contains a collection of guides that will show you how to import data from a Pinot-supported input format.

    Pinot offers support for various popular input formats during ingestion. By changing the input format, you can reduce the time spent doing serialization-deserialization and speed up the ingestion.

    hashtag
    Configuring input formats

    To change the input format, adjust the recordReaderSpec config in the ingestion job specification.

    The configuration consists of the following keys:

    • dataFormat: Name of the data format to consume.

    • className: Name of the class that implements the RecordReader interface. This class is used for parsing the data.

    hashtag
    Supported input formats

    Pinot supports multiple input formats out of the box. Specify the corresponding readers and the associated custom configurations to switch between formats.

    hashtag
    CSV

    CSV Record Reader supports the following configs:

    • fileFormat: default, rfc4180, excel, tdf, mysql

    circle-info

    Your CSV file may have raw text fields that cannot be reliably delimited using any character. In this case, explicitly set the multiValueDelimeter field to empty in the ingestion config. multiValueDelimiter: ''

    hashtag
    Avro

    The Avro record reader converts the data in file to a GenericRecord. A Java class or .avro file is not required. By default, the Avro record reader only supports primitive types. To enable support for rest of the Avro data types, set enableLogicalTypes to true .

    We use the following conversion table to translate between Avro and Pinot data types. The conversions are done using the offical Avro methods present in org.apache.avro.Conversions.

    Avro Data Type
    Pinot Data Type
    Comment

    hashtag
    JSON

    hashtag
    Thrift

    circle-info

    Thrift requires the generated class using .thrift file to parse the data. The .class file should be available in the Pinot's classpath. You can put the files in the lib/ folder of Pinot distribution directory.

    hashtag
    Parquet

    Since 0.11.0 release, the Parquet record reader determines whether to use ParquetAvroRecordReader or ParquetNativeRecordReader to read records. The reader looks for the parquet.avro.schema or avro.schema key in the parquet file footer, and if present, uses the Avro reader.

    You can change the record reader manually in case of a misconfiguration.

    circle-exclamation

    For the support of DECIMAL and other parquet native data types, always use ParquetNativeRecordReader.

    For ParquetAvroRecordReader , you can refer to the for the type conversions.

    hashtag
    ORC

    ORC record reader supports the following data types -

    ORC Data Type
    Java Data Type
    circle-info

    In LIST and MAP types, the object should only belong to one of the data types supported by Pinot.

    hashtag
    Protocol Buffers

    The reader requires a descriptor file to deserialize the data present in the files. You can generate the descriptor file (.desc) from the .proto file using the command -

    recordReaderSpec:
      dataFormat: 'csv'
      className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
      configs: 
    			key1 : 'value1'
    			key2 : 'value2'
    configClassName: Name of the class that implements the RecordReaderConfig interface. This class is used the parse the values mentioned in configs
  • configs: Key-value pair for format-specific configurations. This field is optional.

  • header
    : Header of the file. The
    columnNames
    should be separated by the delimiter mentioned in the configuration.
  • delimiter: The character seperating the columns.

  • multiValueDelimiter: The character separating multiple values in a single column. This can be used to split a column into a list.

  • skipHeader: Skip header record in the file. Boolean.

  • ignoreEmptyLines: Ignore empty lines (instead of filling them with default values). Boolean.

  • ignoreSurroundingSpaces: ignore spaces around column names and values. Boolean

  • quoteCharacter: Single character used for quotes in CSV files.

  • recordSeparator: Character used to separate records in the input file. Default is or \r depending on the platform.

  • nullStringValue: String value that represents null in CSV files. Default is empty string.

  • skipUnParseableLines : Skip lines that cannot be parsed. Note that this would result in data loss. Boolean.

  • DOUBLE

    BOOLEAN

    BOOLEAN

    STRING

    STRING

    ENUM

    STRING

    BYTES

    BYTES

    FIXED

    BYTES

    MAP

    JSON

    ARRAY

    JSON

    RECORD

    JSON

    UNION

    JSON

    DECIMAL

    BYTES

    UUID

    STRING

    DATE

    STRING

    yyyy-MM-dd format

    TIME_MILLIS

    STRING

    HH:mm:ss.SSS format

    TIME_MICROS

    STRING

    HH:mm:ss.SSSSSS format

    TIMESTAMP_MILLIS

    TIMESTAMP

    TIMESTAMP_MICROS

    TIMESTAMP

    BINARY

    BYTES

    FIXED-LEN-BYTE-ARRAY

    BYTES

    DECIMAL

    DOUBLE

    ENUM

    STRING

    UTF8

    STRING

    REPEATED

    MULTIVALUE/MAP (represented as MV

    if parquet original type is LIST, then it is converted to MULTIVALUE column otherwise a MAP column.

    DOUBLE

    Double

    STRING

    String

    VARCHAR

    String

    CHAR

    String

    LIST

    Object[]

    MAP

    Map<Object, Object>

    DATE

    Long

    TIMESTAMP

    Long

    BINARY

    byte[]

    BYTE

    Integer

    INT

    INT

    LONG

    LONG

    FLOAT

    FLOAT

    INT96

    LONG

    ParquetINT96 type converts nanoseconds

    to Pinot INT64 type of milliseconds

    INT64

    LONG

    INT32

    INT

    FLOAT

    FLOAT

    DOUBLE

    BOOLEAN

    String

    SHORT

    Integer

    INT

    Integer

    LONG

    Integer

    FLOAT

    Float

    Avro section above

    DOUBLE

    DOUBLE

    dataFormat: 'csv'
    className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
    configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    configs:
    	fileFormat: 'default' #should be one of default, rfc4180, excel, tdf, mysql
    	header: 'columnName separated by delimiter'
      delimiter: ','
      multiValueDelimiter: '-'
    dataFormat: 'avro'
    className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    configs:
        enableLogicalTypes: true
    dataFormat: 'json'
    className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
    dataFormat: 'thrift'
    className: 'org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader'
    configs:
    	thriftClass: 'ParserClassName'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'
    dataFormat: 'orc'
    className: 'org.apache.pinot.plugin.inputformat.orc.ORCRecordReader'
    dataFormat: 'proto'
    className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
    configs:
    	descriptorFile: 'file:///path/to/sample.desc'
    protoc --include_imports --descriptor_set_out=/absolute/path/to/output.desc /absolute/path/to/input.proto

    Complex Type (Array, Map) Handling

    Complex type handling in Apache Pinot.

    Commonly, ingested data has a complex structure. For example, Avro schemas have recordsarrow-up-right and arraysarrow-up-right while JSON supports objectsarrow-up-right and arraysarrow-up-right.

    Apache Pinot's data model supports primitive data types (including int, long, float, double, BigDecimal, string, bytes), and limited multi-value types, such as an array of primitive types. Simple data types allow Pinot to build fast indexing structures for good query performance, but does require some handling of the complex structures.

    There are two options for complex type handling:

    • Convert the complex-type data into a JSON string and then build a JSON index.

    • Use the built-in complex-type handling rules in the ingestion configuration.

    On this page, we'll show how to handle these complex-type structures with each of these two approaches. We will process some example data, consisting of the field group from the .

    This object has two child fields and the child group is a nested array with elements of object type.

    hashtag
    JSON indexing

    Apache Pinot provides a powerful to accelerate the value lookup and filtering for the column. To convert an object group with complex type to JSON, add the following to your table configuration.

    The config transformConfigs transforms the object group to a JSON string group_json, which then creates the JSON indexing with configuration jsonIndexColumns. To read the full spec, see .

    Also, note that group is a reserved keyword in SQL and therefore needs to be quoted in transformFunction.

    circle-info

    The columnName can't use the same name as any of the fields in the source JSON data, for example, if our source data contains the field group and we want to transform the data in that field before persisting it, the destination column name would need to be something different, like group_json.

    circle-info

    Note that you do not need to worry about the maxLength of the field group_json on the schema, because "JSON" data type does not have a maxLength and will not be truncated. This is true even though "JSON" is stored as a string internally.

    The schema will look like this:

    For the full specification, see .

    With this, you can start to query the nested fields under group. For more details about the supported JSON function, see ).

    hashtag
    Ingestion configurations

    Though JSON indexing is a handy way to process the complex types, there are some limitations:

    • It’s not performant to group by or order by a JSON field, because JSON_EXTRACT_SCALAR is needed to extract the values in the GROUP BY and ORDER BY clauses, which invokes the function evaluation.

    • It does not work with Pinot's such as DISTINCTCOUNTMV.

    Alternatively, from Pinot 0.8, you can use the complex-type handling in ingestion configurations to flatten and unnest the complex structure and convert them into primitive types. Then you can reduce the complex-type data into a flattened Pinot table, and query it via SQL. With the built-in processing rules, you do not need to write ETL jobs in another compute framework such as Flink or Spark.

    To process this complex type, you can add the configuration complexTypeConfig to the ingestionConfig. For example:

    With the complexTypeConfig , all the map objects will be flattened to direct fields automatically. And with unnestFields , a record with the nested collection will unnest into multiple records. For instance, the example at the beginning will transform into two rows with this configuration example.

    Note that:

    • The nested field group_id under group is flattened to group.group_id. The default value of the delimiter is . You can choose another delimiter by specifying the configuration delimiter under complexTypeConfig. This flattening rule also applies to maps in the collections to be unnested.

    You can find the full specifications of the table config and the table schema .

    You can then query the table with primitive values using the following SQL query:

    circle-info

    . is a reserved character in SQL, so you need to quote the flattened columns in the query.

    hashtag
    Infer the Pinot schema from the Avro schema and JSON data

    When there are complex structures, it can be challenging and tedious to figure out the Pinot schema manually. To help with schema inference, Pinot provides utility tools to take the Avro schema or JSON data as input and output the inferred Pinot schema.

    To infer the Pinot schema from Avro schema, you can use a command like this:

    Note you can input configurations like fieldsToUnnest similar to the ones in complexTypeConfig. And this will simulate the complex-type handling rules on the Avro schema and output the Pinot schema in the file specified in outputDir.

    Similarly, you can use the command like the following to infer the Pinot schema from a file of JSON objects.

    You can check out an example of this run in this .

    The nested array group_topics under group is unnested into the top-level, and converts the output to a collection of two rows. Note the handling of the nested field within group_topics, and the eventual top-level field of group.group_topics.urlkey. All the collections to unnest shall be included in the configuration fieldsToUnnest.
  • Collections not specified in fieldsToUnnestwill be serialized into JSON string, except for the array of primitive values, which will be ingested as a multi-value column by default. The behavior is defined by the collectionNotUnnestedToJson config, which takes the following values:

    • NON_PRIMITIVE- Converts the array to a multi-value column. (default)

    • ALL- Converts the array of primitive values to JSON string.

    • NONE- Does not do any conversion.

  • Meetup events Quickstart examplearrow-up-right
    JSON index
    meetupRsvpJson_realtime_table_config.jsonarrow-up-right
    json_meetupRsvp_schema.jsonarrow-up-right
    guide
    multi-column functionsarrow-up-right
    herearrow-up-right
    herearrow-up-right
    PRarrow-up-right
    Example JSON data
    Flattened/unnested data
    json_meetupRsvp_realtime_table_config.json
    {
        "ingestionConfig":{
          "transformConfigs": [
            {
              "columnName": "group_json",
              "transformFunction": "jsonFormat(\"group\")"
            }
          ],
        },
        ...
        "tableIndexConfig": {
        "loadMode": "MMAP",
        "noDictionaryColumns": [
          "group_json"
        ],
        "jsonIndexColumns": [
          "group_json"
        ]
      },
    
    }
    json_meetupRsvp_realtime_table_schema.json
    {
      {
          "name": "group_json",
          "dataType": "JSON",
        }
        ...
    }
    complexTypeHandling_meetupRsvp_realtime_table_config.json
    {
      "ingestionConfig": {    
        "complexTypeConfig": {
          "delimiter": ".",
          "fieldsToUnnest": ["group.group_topics"],
          "collectionNotUnnestedToJson": "NON_PRIMITIVE"
        }
      }
    }
    SELECT "group.group_topics.urlkey", 
           "group.group_topics.topic_name", 
           "group.group_id" 
    FROM meetupRsvp
    LIMIT 10
    bin/pinot-admin.sh AvroSchemaToPinotSchema \
      -timeColumnName fields.hoursSinceEpoch \
      -avroSchemaFile /tmp/test.avsc \
      -pinotSchemaName myTable \
      -outputDir /tmp/test \
      -fieldsToUnnest entries
    bin/pinot-admin.sh JsonToPinotSchema \
      -timeColumnName hoursSinceEpoch \
      -jsonFile /tmp/test.json \
      -pinotSchemaName myTable \
      -outputDir /tmp/test \
      -fieldsToUnnest payload.commits