This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
In this guide, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.
Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic
Start Kafka
Create a Kafka Topic
Start Kafka
Start Kafka cluster on port 9092
using the same Zookeeper from the quick-start examples.
Create a Kafka topic
Download the latest Kafka. Create a topic.
We will publish the data in the same format as mentioned in the Stream ingestion docs. So you can use the same schema mentioned under Create Schema Configuration.
The real-time table configuration for the transcript
table described in the schema from the previous step.
For Kafka, we use streamType as kafka
. Currently only JSON format is supported but you can easily write your own decoder by extending the StreamMessageDecoder
interface. You can then access your decoder class by putting the jar file in plugins
directory
The lowLevel
consumer reads data per partition whereas the highLevel
consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.
For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
For Kafka version 2.X and above, use
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
You can set the offset to -
smallest
to start consumer from the earliest offset
largest
to start consumer from the latest offset
timestamp in format yyyy-MM-dd'T'HH:mm:ss.SSSZ
to start the consumer from the offset after the timestamp.
datetime duration or period
to start the consumer from the offset after the period eg., '2d'.
The resulting configuration should look as follows -
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.
We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json
.
Push sample JSON into the transcript-topic
Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json
file.
Checkin Kafka docker container
Publish messages to the target topic
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to checkout the real-time data.
Pinot supports 2 major generations of Kafka library - kafka-0.9 and kafka-2.x for both high and low level consumers.
Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest
tagged docker images or master
build, you should replace org.apache.kafka
with shaded.org.apache.kafka
in your table config.
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In Kafka 2.0 connector pom.xml, change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl.
are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level
in Kafka stream config, which can be read_committed
or read_uncommitted
(default). Setting it to read_committed
will ingest transactionally committed messages in Kafka stream only.
For example,
Note that the default value of this config read_uncommitted
to read all messages. Also, this config supports low-level consumer only.
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
Pinot's Kafka connector now supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate
to true
.
In addition to this, if you want to actually use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
Don't forget to follow the schema evolution guidelines when updating schema of an existing table!
Kafka Record | Pinot Table Column | Description |
---|---|---|
Record key: any type <K>
__key
: String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName
: String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset
: String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp
: String
To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config
where the Kinesis specific properties are:
Kinesis supports authentication using the DefaultCredentialsProviderChain. The credential provider looks for the credentials in the following order -
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups. You can also specify other aws fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX_VALUE, we will overflow
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
Pinot supports consuming data from Apache Pulsar via pinot-pulsar
plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.
You can enable pulsar plugin with the following config at the time of Pinot setup
-Dplugins.include=pinot-pulsar
pinot-pulsar
plugin is not part of official 0.10.0 binary. You can download the plugin from our external repository and add it to libs
or plugins
directory in pinot.
A sample Pulsar stream config to ingest data should look as follows. You can use the streamConfigs
section from this sample and make changes for your corresponding table.
You can change the following Pulsar specifc configurations for your tables
Pinot-Pulsar connector supports authentication using the security tokens. You can generate the token by following the official Pulsar documentaton. Once generated, you can add the following property to streamConfigs
to add auth token for each request
Pinot-pulsar connecor also supports TLS for encrypted connections. You can follow the official pulsar documentation to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.
Also, make sure to change the brokers url from pulsar://localhost:6650
to pulsar+ssl://localhost:6650
so that secure connections are used.
For other table and stream configurations, you can headover to Table configuration Reference
PInot currently relies on Pulsar client version 2.7.2. Users should make sure the Pulsar broker is compatible with the this client version.
Apache Pinot lets users consume data from streams and push it directly into the database, in a process known as stream ingestion. Stream Ingestion makes it possible to query data within seconds of publication.
Stream Ingestion provides support for checkpoints for preventing data loss.
Setting up Stream ingestion involves the following steps:
Create schema configuration
Create table configuration
Upload table and schema spec
Let's take a look at each of the steps in more detail.
Let us assume the data to be ingested is in the following format:
Schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions
, metrics
or timestamp
. For more details on schema configuration, see creating a schema.
For our sample data, the schema configuration looks like this:
The next step is to create a table where all the ingested data will flow and can be queried. Unlike batch ingestion, table configuration for real-time ingestion also triggers the data ingestion job. For a more detailed overview of tables, see the table reference.
The real-time table configuration consists of the following fields:
tableName - The name of the table where the data should flow
tableType - The internal type for the table. Should always be set to REALTIME
for realtime ingestion
segmentsConfig -
tableIndexConfig - defines which column to use for indexing along with the type of index. For full configuration, see [Indexing Configs]. It has the following required fields -
loadMode - specifies how the segments should be loaded. Should beheap
or mmap
. Here's the difference between both the configs
mmap: Segments are loaded onto memory-mapped files. This is the default mode.
heap: Segments are loaded into direct memory. Note, 'heap' here is a legacy misnomer, and it does not imply JVM heap. This mode should only be used when we want faster performance than memory-mapped files, and are also sure that we will never run into OOM.
streamConfig - specifies the data source along with the necessary configs to start consuming the real-time data. The streamConfig can be thought of as the equivalent to the job spec for batch ingestion. The following options are supported:
The following flush threshold settings are also supported:
You can also specify additional configs for the consumer directly into the streamConfigs.
For our sample data and schema, the table config will look like this:
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, pinot will start ingesting available records from the topic.
There are some scenarios where the message rate in the input stream has a bursty nature which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other realtime tables on the same server. In such scenarios, you should throttle the consumption rate during stream ingestion.
Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit
which indicates the upper bound on the message rate for the entire topic.
Here is the sample configuration on how to configure the consumption throttling:
Some things to keep in mind while tuning this config are:
Since this config applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.
In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.
Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:
In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION
.
Note that any configuration change for topic.consumption.rate.limit
in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Please refer to Pause Stream Ingestion for more details.
We are working on support for other ingestion platforms, but you can also write your own ingestion plugin if it is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.
There are some scenarios in which you may want to pause the realtime ingestion while your table is available for queries. For example if there is a problem with the stream ingestion, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.
When a Pause request is issued, Controller instructs the realtime servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Please note that Pause and Resume requests are async. OK response means that instructions for pausing or resuming has been successfully sent to the realtime server. If you want to know if the consumptions actually stopped or resumed, you can issue a pause status request.
It's worth noting that consuming segments on realtime servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may therefore take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages, etc.), it will result in the table getting into an error state.
Pause and resume feature comes to the rescue here. When a Pause request is issued by the operator, consuming segments are committed without starting new mutables ones. Instead, new mutable segments are started only when the Resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.
There is another feature called "Force Commit" which utilizes the primitives of pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.
For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a Pause request. Second, change the consumption parameters. Finally, issue the Resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.
If a Pinot table is configured to consume using a Low Level (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.
Pinot runs a periodic task called RealtimeSegmentValidationManager
that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a periodic task that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.
If it is desired to recognize the new partitions sooner, then you can manually trigger the periodic task so as to recognize such data immediately.
Often, it is important to understand the rate of ingestion of data into your realtime table. This is commonly done by looking at the consumption "lag" of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).
The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo
API or the table's /debug
API, as shown below:
A sample response from a Kafka based realtime table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.
Property | Description |
---|---|
Property | Description |
---|---|
Config key | Description | Supported values |
---|---|---|
Config key | Description | Supported values |
---|---|---|
Term | Description |
---|---|
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number, AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.
streamType
This should be set to "pulsar"
stream.pulsar.topic.name
Your pulsar topic name
stream.pulsar.bootstrap.servers
Comma-seperated broker list for Apache Pulsar
streamType
The streaming platform from which to consume the data
kafka
stream.[streamType].consumer.type
Whether to use per partition low-level consumer or high-level stream consumer
lowLevel
- Consume data from each partition with offset management
highLevel
- Consume data without control over the partitions
stream.[streamType].topic.name
The datasource (e.g. topic, data stream) from which to consume the data
String
stream.[streamType].decoder.class.name
Name of the class to be used for parsing the data. The class should implement org.apache.pinot.spi.stream.StreamMessageDecoder
interface
String. Available options:
org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder
org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
stream.[streamType].consumer.factory.class.name
Name of the factory class to be used to provide the appropriate implementation of low level and high level consumer as well as the metadata
String. Available options:
org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory
org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory
stream.[streamType].consumer.prop.auto.offset.reset
Determines the offset from which to start the ingestion
smallest
largest
or
timestamp in milliseconds
topic.consumption.rate.limit
Determines the upper bound for consumption rate for the whole topic. Having a consumption rate limiter is beneficial in case the stream message rate has a bursty pattern which leads to long GC pauses on the Pinot servers. The rate limiter can also be considered as a safeguard against excessive ingestion of realtime tables.
Double. The values should be greater than zero.
realtime.segment.flush.threshold.time
Time threshold that will keep the realtime segment open for before we complete the segment. Noted that this time should be smaller than the Kafka retention period configured for the corresponding topic.
realtime.segment.flush.threshold.rows
Row count flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,
since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,
then a high level consumer would have a buffer size of two million.
If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless
threshold.time is reached first)
realtime.segment.flush.threshold.segment.size
The desired size of a completed realtime segment. This config is used only if realtime.segment.flush.threshold.rows
is set to 0.
currentOffsetsMap
Current consuming offset position per partition
latestUpstreamOffsetMap
(Wherever applicable) Latest offset found in the upstream topic partition
recordsLagMap
(Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset
and currentOffset
for the partition when the lag computation request is made.
recordsAvailabilityLagMap
(Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.