This guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.
To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config:
where the Kinesis specific properties are:
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number_,_ AT___SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.
Kinesis supports authentication using the DefaultCredentialsProviderChain. The credential provider looks for the credentials in the following order -
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
Although you can also specify the accessKey
and secretKey
in the properties above, we don't recommend this unsecure method. We recommend using it only for non-production proof-of-concept (POC) setups. You can also specify other AWS fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
ShardID
is of the format "shardId-000000000001". We use the numeric part as partitionId
. Our partitionId
variable is integer. If shardIds grow beyond Integer.MAX\_VALUE
, we will overflow into the partitionId space.
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
This guide shows you how to ingest a stream of records into a Pinot table.
Apache Pinot lets users consume data from streams and push it directly into the database. This process is called stream ingestion. Stream ingestion makes it possible to query data within seconds of publication.
Stream ingestion provides support for checkpoints for preventing data loss.
To set up Stream ingestion, perform the following steps, which are described in more detail in this page:
Create schema configuration
Create table configuration
Create ingestion configuration
Upload table and schema spec
Here's an example where we assume the data to be ingested is in the following format:
The schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions
, metrics
, or timestamp
. For more details on schema configuration, see creating a schema.
For our sample data, the schema configuration looks like this:
The next step is to create a table where all the ingested data will flow and can be queried. For details about each table component, see the table reference.
The ingestion configuration (ingestionConfig
) specifies how to ingest streaming data into Pinot. First, include a subsection for streamConfigMaps
. Next, decide whether to skip table errors with _continueOnError
and whether to validate time values with rowTimeValueCheck
and _segmentTimeValueCheck
. See details about these ingestionConfig
configuration options the streamConfigMaps and Additional ingestion configs tables below:
streamConfigMaps
Config key
Description
Supported values
streamType
The streaming platform to ingest data from
kafka
stream.[streamType].consumer.type
Whether to use per partition low-level consumer or high-level stream consumer
- lowLevel
: Consume data from each partition with offset management. - highLevel
: Consume data without control over the partitions.
stream.[streamType].topic.name
Topic or data source to ingest data from
String
stream.[streamType].broker.list
List of brokers
stream.[streamType].decoder.class.name
Name of class to parse the data. The class should implement the org.apache.pinot.spi.stream.StreamMessageDecoder
interface.
String. Available options: - org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
- org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder
- org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder
- org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
- org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder
- org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder
stream.[streamType].consumer.factory.class.name
Name of factory class to provide the appropriate implementation of low-level and high-level consumer, as well as the metadata
String. Available options: - org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
- org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
- org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory
- org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory
stream.[streamType].consumer.prop.auto.offset.reset
Determines the offset from which to start the ingestion
- smallest
- largest
- timestamp in milliseconds
stream.[streamType].decoder.prop.format
Specifies the data format to ingest via a stream. The value of this property should match the format of the data in the stream.
- JSON
realtime.segment.flush.threshold.time
Maximum elapsed time after which a consuming segment persist. Note that this time should be smaller than the Kafka retention period configured for the corresponding topic.
String, such 1d
or 4h30m
. Default is 6h
(six hours).
realtime.segment.flush.threshold.rows
The maximum number of rows to consume before persisting the consuming segment. If this value is set to 0, the configuration looks to realtime.segment.flush.threshold.segment.size
below.
Default is 5,000,000
realtime.segment.flush.threshold.segment.size
Desired size of the completed segments. This value is used when realtime.segment.flush.threshold.rows
is set to 0.
_continueOnError
Set to true
to skip any row indexing error and move on to the next row. Otherwise, an error evaluating a transform or filter function may block ingestion (real-time or offline), and result in data loss or corruption. Consider your use case to determine if it's preferable to set this option to false
, and fail the ingestion if an error occurs to maintain data integrity.
rowTimeValueCheck
Set to true
to validate the time column values ingested during segment upload. Validates each row of data in a segment matches the specified time format, and falls within a valid time range (1971-2071). If the value doesn't meet both criteria, Pinot replaces the value with null. This option ensures that the time values are strictly increasing and that there are no duplicates or gaps in the data.
_segmentTimeValueCheck
Set to true
to validate the time range of the segment falls between 1971 and 2071. This option ensures data segments stored in the system are correct and consistent
ingestionConfig
For our sample data and schema, the table config will look like this:
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, Pinot will start ingesting available records from the topic.
There are some scenarios where the message rate in the input stream can come in bursts which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other real-time tables on the same server. If this happens to you, throttle the consumption rate during stream ingestion to better manage overall performance.
Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit
which indicates the upper bound on the message rate for the entire topic.
Here is the sample configuration on how to configure the consumption throttling:
Some things to keep in mind while tuning this config are:
Since this configuration applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.
In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.
Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:
In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION
.
Note that any configuration change for topic.consumption.rate.limit
in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Refer to Pause Stream Ingestion for more details.
You can also write an ingestion plugin if the platform you are using is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.
There are some scenarios in which you may want to pause the real-time ingestion while your table is available for queries. For example, if there is a problem with the stream ingestion and, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.
When a Pause
request is issued, the controller instructs the real-time servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Note that Pause
and Resume
requests are async. An OK
response means that instructions for pausing or resuming has been successfully sent to the real-time server. If you want to know if the consumption has actually stopped or resumed, issue a pause status request.
It's worth noting that consuming segments on real-time servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages), it will result in the table getting into an error state.
The pause and resume feature is helpful in these instances. When a pause request is issued by the operator, consuming segments are committed without starting new mutable segments. Instead, new mutable segments are started only when the resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.
There is another feature called Force Commit
which utilizes the primitives of the pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.
(v 0.12.0+) Once submitted, the forceCommit API returns a jobId that can be used to get the current progress of the forceCommit operation. A sample response and status API call:
The forceCommit request just triggers a regular commit before the consuming segments reaching the end criteria, so it follows the same mechanism as regular commit. It is one-time shot request, and not retried automatically upon failure. But it is idempotent so one may keep issuing it till success if needed.
This API is async, as it doesn't wait for the segment commit to complete. But a status entry is put in ZK to track when the request is issued and the consuming segments included. The consuming segments tracked in the status entry are compared with the latest IdealState to indicate the progress of forceCommit. However, this status is not updated or deleted upon commit success or failure, so that it could become stale. Currently, the most recent 100 status entries are kept in ZK, and the oldest ones only get deleted when the total number is about to exceed 100.
For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a pause request. Second, change the consumption parameters. Finally, issue the resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.
If a Pinot table is configured to consume using a Low Level (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.
Pinot runs a periodic task called RealtimeSegmentValidationManager
that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a periodic task that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.
If you want to recognize the new partitions sooner, then manually trigger the periodic task so as to recognize such data immediately.
Often, it is important to understand the rate of ingestion of data into your real-time table. This is commonly done by looking at the consumption lag of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).
The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo
API or the table's /debug
API, as shown below:
A sample response from a Kafka-based real-time table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.
currentOffsetsMap
Current consuming offset position per partition
latestUpstreamOffsetMap
(Wherever applicable) Latest offset found in the upstream topic partition
recordsLagMap
(Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset
and currentOffset
for the partition when the lag computation request is made.
recordsAvailabilityLagMap
(Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.
Real-time ingestion includes 3 stages of message processing: Decode, Transform, and Index.
In each of these stages, a failure can happen which may or may not result in an ingestion failure. The following metrics are available to investigate ingestion issues:
Decode stage -> an error here is recorded as INVALID_REALTIME_ROWS_DROPPED
Transform stage -> possible errors here are:
When a message gets dropped due to the FILTER transform, it is recorded as REALTIME_ROWS_FILTERED
When the transform pipeline sets the $INCOMPLETE_RECORD_KEY$
key in the message, it is recorded as INCOMPLETE_REALTIME_ROWS_CONSUMED
, only when continueOnError
configuration is enabled. If the continueOnError
is not enabled, the ingestion fails.
Index stage -> When there is failure at this stage, the ingestion typically stops and marks the partition as ERROR.
There is yet another metric called ROWS_WITH_ERROR
which is the sum of all error counts in the 3 stages above.
Furthermore, the metric REALTIME_CONSUMPTION_EXCEPTIONS
gets incremented whenever there is a transient/permanent stream exception seen during consumption.
These metrics can be used to understand why ingestion failed for a particular table partition before diving into the server logs.
This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.
Pinot supports consuming data from via the pinot-pulsar
plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.
Enable the Pulsar plugin with the following config at the time of Pinot setup: -Dplugins.include=pinot-pulsar
The pinot-pulsar
plugin is not part of official 0.10.0 binary. You can download the plugin from and add it to the libs
or plugins
directory in pinot.
Here is a sample Pulsar stream config. You can use the streamConfigs
section from this sample and make changes for your corresponding table.
You can change the following Pulsar specifc configurations for your tables
Also, make sure to change the brokers url from pulsar://localhost:6650
to pulsar+ssl://localhost:6650
so that secure connections are used.
Pinot currently relies on Pulsar client version 2.7.2. Make sure the Pulsar broker is compatible with the this client version.
The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Pulsar table, set the stream config metadata.populate
to true
. The fields eventTime
, publishTime
, brokerPublishTime
, and key
are populated by default. If you would like to extract additional fields from the Pulsar Message, populate the metadataFields
config with a comma separated list of fields to populate. The fields are referenced by the field name in the Pulsar Message. For example, setting:
Will make the __metadata$messageId
, __metadata$messageBytes
, __metadata$eventTime
, and __metadata$topicName
, fields available for mapping to columns in the Pinot schema.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
In this page, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.
Let's set up a demo Kafka cluster locally, and create a sample topic transcript-topic
Start Kafka
Create a Kafka topic
Start Kafka
Start Kafka cluster on port 9092
using the same Zookeeper from the .
Create a Kafka topic
Download the latest . Create a topic.
We will publish the data in the same format as mentioned in the docs. So you can use the same schema mentioned under .
The real-time table configuration for the transcript
table described in the schema from the previous step.
The lowLevel
consumer reads data per partition whereas the highLevel
consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.
For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
For Kafka version 2.X and above, use
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
You can set the offset to -
smallest
to start consumer from the earliest offset
largest
to start consumer from the latest offset
timestamp in format yyyy-MM-dd'T'HH:mm:ss.SSSZ
to start the consumer from the offset after the timestamp.
datetime duration or period
to start the consumer from the offset after the period eg., '2d'.
The resulting configuration should look as follows -
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.
We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json
.
Push sample JSON into the transcript-topic
Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json
file.
Checkin Kafka docker container
Publish messages to the target topic
Pinot supports 2 major generations of Kafka library - kafka-0.9 and kafka-2.x for both high and low level consumers.
Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest
tagged docker images or master
build, you should replace org.apache.kafka
with shaded.org.apache.kafka
in your table config.
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer, also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl.
are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level
in Kafka stream config, which can be read_committed
or read_uncommitted
(default). Setting it to read_committed
will ingest transactionally committed messages in Kafka stream only.
For example,
Note that the default value of this config read_uncommitted
to read all messages. Also, this config supports low-level consumer only.
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate
to true
.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
To avoid errors like The Avro schema must be provided
, designate the location of the schema in your streamConfigs
section. For example, if your current section contains the following:
Then add this key: "stream.kafka.decoder.prop.schema"
followed by a value that denotes the location of your schema.
String, such as 150M
or 1.1G
., etc. Default is 200M
(200 megabytes). You can also specify additional configurations for the consumer directly into streamConfigMaps
. For example, for Kafka streams, add any of the configs described in to pass them directly to the Kafka consumer.
The Pinot-Pulsar connector supports authentication using the security tokens. You can generate the token by following the . Once generated, you can add the following property to streamConfigs
to add auth token for each request
The Pinot-pulsar connector also supports TLS for encrypted connections. You can follow to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.
For other table and stream configurations, you can headover to
Pinot's Pulsar connector supports automatically extracting record headers and metadata into the Pinot table columns. Pulsar supports a large amount of per-record metadata. Please reference the for the meaning of the metadata fields.
Remember to follow the when updating schema of an existing table!
For Kafka, we use streamType as kafka
. See for available decoder class options. You can also write your own decoder by extending the StreamMessageDecoder
interface and putting the jar file in plugins
directory.
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the to checkout the real-time data.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In , change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
Remember to follow the when updating schema of an existing table!
There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data]() for details.
streamType
This should be set to "pulsar"
stream.pulsar.topic.name
Your pulsar topic name
stream.pulsar.bootstrap.servers
Comma-separated broker list for Apache Pulsar
stream.pulsar.metadata.populate
set to true
to populate metadata
stream.pulsar.metadata.fields
set to comma separated list of metadata fields
key : String
__key
: String
Yes
properties : Map<String, String>
Each header key is listed as a separate column: __header$HeaderKeyName
: String
Yes
publishTime : Long
__metadata$publishTime
: String
publish time as determined by the producer
Yes
brokerPublishTime: Optional
__metadata$brokerPublishTime
: String
publish time as determined by the broker
Yes
eventTime : Long
__metadata$eventTime
: String
Yes
messageId : MessageId -> String
__metadata$messageId
: String
String representation of the MessagId field. The format is ledgerId:entryId:partitionIndex
messageId : MessageId -> bytes
__metadata$messageBytes
: String
Base64 encoded version of the bytes returned from calling MessageId.toByteArray()
producerName : String
__metadata$producerName
: String
schemaVersion : byte[]
__metadata$schemaVersion
: String
Base64 encoded value
sequenceId : Long
__metadata$sequenceId
: String
orderingKey : byte[]
__metadata$orderingKey
: String
Base64 encoded value
size : Integer
__metadata$size
: String
topicName : String
__metadata$topicName
: String
index : String
__metadata$index
: String
redeliveryCount : Integer
__metadata$redeliveryCount
: String
Record key: any type <K>
__key
: String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName
: String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset
: String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp
: String