Ingest streaming data from Apache Kafka
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
Learn how to ingest data from Kafka, a stream processing platform. You should have a local cluster up and running, following the instructions in Set up a cluster.
Install and Launch Kafka
Let's start by downloading Kafka to our local machine.
To pull down the latest Docker image, run the following command:
docker pull wurstmeister/kafka:latestDownload Kafka from kafka.apache.org/quickstart#quickstart_download and then extract it:
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0Next we'll spin up a Kafka broker:
docker run --network pinot-demo --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka -e KAFKA_BROKER_ID=0 -e KAFKA_ADVERTISED_HOST_NAME=kafka wurstmeister/kafka:latestNote: The --network pinot-demo flag is optional and assumes that you have a Docker network named pinot-demo that you want to connect the Kafka container to.
On one terminal window run this command:
Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.propertiesAnd on another window, run this command:
Start Kafka Broker
bin/kafka-server-start.sh config/server.propertiesData Source
We're going to generate some JSON messages from the terminal using the following script:
import datetime
import uuid
import random
import json
while True:
ts = int(datetime.datetime.now().timestamp()* 1000)
id = str(uuid.uuid4())
count = random.randint(0, 1000)
print(
json.dumps({"ts": ts, "uuid": id, "count": count})
)
datagen.py
If you run this script (python datagen.py), you'll see the following output:
Ingesting Data into Kafka
Let's now pipe that stream of messages into Kafka, by running the following command:
We can check how many messages have been ingested by running the following command:
Output
And we can print out the messages themselves by running the following command
Output
Schema
A schema defines what fields are present in the table along with their data types in JSON format.
Create a file called /tmp/pinot/schema-stream.json and add the following content to it.
Table Config
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.
Create a file called /tmp/pinot/table-config-stream.json and add the following content to it.
Create schema and table
Create the table and schema by running the appropriate command below:
Querying
Navigate to localhost:9000/#/query and click on the events table to run a query that shows the first 10 rows in this table.
Querying the events table
Kafka ingestion guidelines
Kafka versions in Pinot
Pinot supports two versions of the Kafka library: kafka-0.9 and kafka-2.x for low level consumers.
Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest tagged docker images or master build, you should replace org.apache.kafka with shaded.org.apache.kafka in your table config.
Upgrade from Kafka 0.9 connector to Kafka 2.x connector
Update table config for low level consumer:
stream.kafka.consumer.factory.class.namefromorg.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactorytoorg.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.
Pinot does not support using high-level Kafka consumers (HLC). Pinot uses low-level consumers to ensure accurate results, supports operational complexity and scalability, and minimizes storage overhead.
How to consume from a Kafka version > 2.0.0
This connector is also suitable for Kafka lib version higher than 2.0.0. In Kafka 2.0 connector pom.xml, change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.
Kafka configurations in Pinot
Use Kafka partition (low) level consumer with SSL
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl. are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.
Use Confluent Schema Registry with JSON encoded messages
If your Kafka messages are JSON-encoded and registered with Confluent Schema Registry, use the KafkaConfluentSchemaRegistryJsonMessageDecoder. This decoder uses the Confluent KafkaJsonSchemaDeserializer to decode messages whose JSON schemas are managed by the registry.
When to use this decoder
Your Kafka producer serializes messages using the Confluent JSON Schema serializer.
Your JSON schemas are registered in Confluent Schema Registry.
You want schema validation and evolution support for JSON messages.
If your messages are Avro-encoded and registered with Schema Registry, use KafkaConfluentSchemaRegistryAvroMessageDecoder instead (shown in the SSL example above). If your messages are plain JSON without a schema registry, use JSONMessageDecoder.
Example table config
The key configuration properties for this decoder are:
stream.kafka.decoder.class.name-- Set toorg.apache.pinot.plugin.inputformat.json.confluent.KafkaConfluentSchemaRegistryJsonMessageDecoder.stream.kafka.decoder.prop.schema.registry.rest.url-- The URL of the Confluent Schema Registry.
Authentication
This decoder supports the same authentication options as the Avro schema registry decoder. You can configure SSL or SASL_SSL authentication for both the Kafka consumer and the Schema Registry client using the stream.kafka.decoder.prop.schema.registry.* properties. See the SSL example and SASL_SSL example above for details.
For Schema Registry basic authentication, add the following properties:
This decoder was added in Pinot 1.4. Make sure your Pinot deployment is running version 1.4 or later.
Consume transactionally-committed messages
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level in Kafka stream config, which can be read_committed or read_uncommitted (default). Setting it to read_committed will ingest transactionally committed messages in Kafka stream only.
For example,
Note that the default value of this config read_uncommitted to read all messages. Also, this config supports low-level consumer only.
Use Kafka partition (low) level consumer with SASL_SSL
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.
Extract record headers as Pinot table columns
Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
Record key: any type <K>
__key : String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName : String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset : String
Record metadata - partition : int
__metadata$partition : String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp : String
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate to true.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
Remember to follow the schema evolution guidelines when updating schema of an existing table!
Tell Pinot where to find an Avro schema
There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data](https://docs.pinot.apache.org/basics/data-import/complex-type#infer-the-pinot-schema-from-the-avro-schema-and-json-data) for details.
To avoid errors like The Avro schema must be provided, designate the location of the schema in your streamConfigs section. For example, if your current section contains the following:
Then add this key: "stream.kafka.decoder.prop.schema"followed by a value that denotes the location of your schema.
Subset partition ingestion
By default, a Pinot REALTIME table consumes from all partitions of the configured Kafka topic. In some scenarios you may want a table to consume only a subset of the topic's partitions. The stream.kafka.partition.ids setting lets you specify exactly which Kafka partitions a table should consume.
When to use subset partition ingestion
Split-topic ingestion -- Multiple Pinot tables share the same Kafka topic, and each table is responsible for a different set of partitions. This is useful when the same topic contains logically distinct data partitioned by key, and you want separate tables (or indexes) for each partition group.
Multi-table partition assignment -- You want to distribute the partitions of a high-throughput topic across several Pinot tables for workload isolation, independent scaling, or different retention policies.
Selective consumption -- You only need data from specific partitions of a topic (for example, partitions that correspond to a particular region or tenant).
Configuration
Add stream.kafka.partition.ids to the streamConfigMaps entry in your table config. The value is a comma-separated list of Kafka partition IDs (zero-based integers):
When this setting is present, Pinot will consume only from the listed partitions. When it is absent or blank, Pinot consumes from all partitions of the topic (the default behavior).
Example: splitting a topic across two tables
Suppose you have a Kafka topic called events with two partitions (0 and 1). You can create two Pinot tables, each consuming from one partition:
Table events_part_0:
Table events_part_1:
Validation rules and limitations
Partition IDs must be non-negative integers. Negative values will cause a validation error.
Non-integer values (e.g.
"abc") will cause a validation error.Duplicate IDs are silently deduplicated. For example,
"0,2,0,5"is treated as"0,2,5".The partition IDs are sorted internally for stable ordering, regardless of the order specified in the config.
The configured partition IDs are validated against the actual Kafka topic metadata at table creation time. If a specified partition ID does not exist in the topic, an error is raised.
When using subset partition ingestion with multiple tables consuming from the same topic, ensure that the partition assignments do not overlap if you want each record to be consumed by exactly one table. Pinot does not enforce non-overlapping partition assignments across tables.
Whitespace around partition IDs and commas is trimmed (e.g.,
" 0 , 2 , 5 "is valid).
Last updated
Was this helpful?

