Note: The --network pinot-demo flag is optional and assumes that you have a Docker network named pinot-demo that you want to connect the Kafka container to.
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.
Create a file called /tmp/pinot/table-config-stream.json and add the following content to it.
Pinot supports two versions of the Kafka library: kafka-0.9 and kafka-2.x for low level consumers.
Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest tagged docker images or master build, you should replace org.apache.kafka with shaded.org.apache.kafka in your table config.
Upgrade from Kafka 0.9 connector to Kafka 2.x connector
Update table config for low level consumer: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.
Pinot does not support using high-level Kafka consumers (HLC). Pinot uses low-level consumers to ensure accurate results, supports operational complexity and scalability, and minimizes storage overhead.
How to consume from a Kafka version > 2.0.0
Kafka configurations in Pinot
Use Kafka partition (low) level consumer with SSL
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl. are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level in Kafka stream config, which can be read_committed or read_uncommitted (default). Setting it to read_committed will ingest transactionally committed messages in Kafka stream only.
Note that the default value of this config read_uncommitted to read all messages. Also, this config supports low-level consumer only.
Use Kafka partition (low) level consumer with SASL_SSL
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.
Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
Kafka Record
Pinot Table Column
Description
Record key: any type <K>
__key : String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName : String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset : String
Record metadata - partition : int
__metadata$partition : String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp : String
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate to true.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
Tell Pinot where to find an Avro schema
To avoid errors like The Avro schema must be provided, designate the location of the schema in your streamConfigs section. For example, if your current section contains the following:
Then add this key: "stream.kafka.decoder.prop.schema"followed by a value that denotes the location of your schema.
Navigate to and click on the events table to run a query that shows the first 10 rows in this table.
Querying the events table
This connector is also suitable for Kafka lib version higher than 2.0.0. In , change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.
Remember to follow the when updating schema of an existing table!
There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data]() for details.