This guide shows you how to ingest a stream of records into a Pinot table.
Apache Pinot lets users consume data from streams and push it directly into the database. This process is called stream ingestion. Stream ingestion makes it possible to query data within seconds of publication.
Stream ingestion provides support for checkpoints for preventing data loss.
To set up Stream ingestion, perform the following steps, which are described in more detail in this page:
Create schema configuration
Create table configuration
Create ingestion configuration
Upload table and schema spec
Here's an example where we assume the data to be ingested is in the following format:
The schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions
, metrics
, or timestamp
. For more details on schema configuration, see creating a schema.
For our sample data, the schema configuration looks like this:
The next step is to create a table where all the ingested data will flow and can be queried. For details about each table component, see the table reference.
The table configuration contains an ingestion configuration (ingestionConfig
), which specifies how to ingest streaming data into Pinot. For details, see the ingestion configuration reference.
ingestionConfig
For our sample data and schema, the table config will look like this:
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, Pinot will start ingesting available records from the topic.
There are some scenarios where the message rate in the input stream can come in bursts which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other real-time tables on the same server. If this happens to you, throttle the consumption rate during stream ingestion to better manage overall performance.
Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit
which indicates the upper bound on the message rate for the entire topic.
Here is the sample configuration on how to configure the consumption throttling:
Some things to keep in mind while tuning this config are:
Since this configuration applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.
In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.
Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:
In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION
.
Note that any configuration change for topic.consumption.rate.limit
in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Refer to Pause Stream Ingestion for more details.
You can also write an ingestion plugin if the platform you are using is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.
There are some scenarios in which you may want to pause the real-time ingestion while your table is available for queries. For example, if there is a problem with the stream ingestion and, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.
When a Pause
request is issued, the controller instructs the real-time servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Note that Pause
and Resume
requests are async. An OK
response means that instructions for pausing or resuming has been successfully sent to the real-time server. If you want to know if the consumption has actually stopped or resumed, issue a pause status request.
It's worth noting that consuming segments on real-time servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages), it will result in the table getting into an error state.
The pause and resume feature is helpful in these instances. When a pause request is issued by the operator, consuming segments are committed without starting new mutable segments. Instead, new mutable segments are started only when the resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.
There is another feature called Force Commit
which utilizes the primitives of the pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.
(v 0.12.0+) Once submitted, the forceCommit API returns a jobId that can be used to get the current progress of the forceCommit operation. A sample response and status API call:
The forceCommit request just triggers a regular commit before the consuming segments reaching the end criteria, so it follows the same mechanism as regular commit. It is one-time shot request, and not retried automatically upon failure. But it is idempotent so one may keep issuing it till success if needed.
This API is async, as it doesn't wait for the segment commit to complete. But a status entry is put in ZK to track when the request is issued and the consuming segments included. The consuming segments tracked in the status entry are compared with the latest IdealState to indicate the progress of forceCommit. However, this status is not updated or deleted upon commit success or failure, so that it could become stale. Currently, the most recent 100 status entries are kept in ZK, and the oldest ones only get deleted when the total number is about to exceed 100.
For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a pause request. Second, change the consumption parameters. Finally, issue the resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.
If a Pinot table is configured to consume using a Low Level (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.
Pinot runs a periodic task called RealtimeSegmentValidationManager
that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a periodic task that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.
If you want to recognize the new partitions sooner, then manually trigger the periodic task so as to recognize such data immediately.
Often, it is important to understand the rate of ingestion of data into your real-time table. This is commonly done by looking at the consumption lag of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).
The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo
API or the table's /debug
API, as shown below:
A sample response from a Kafka-based real-time table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.
currentOffsetsMap
Current consuming offset position per partition
latestUpstreamOffsetMap
(Wherever applicable) Latest offset found in the upstream topic partition
recordsLagMap
(Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset
and currentOffset
for the partition when the lag computation request is made.
recordsAvailabilityLagMap
(Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.
Real-time ingestion includes 3 stages of message processing: Decode, Transform, and Index.
In each of these stages, a failure can happen which may or may not result in an ingestion failure. The following metrics are available to investigate ingestion issues:
Decode stage -> an error here is recorded as INVALID_REALTIME_ROWS_DROPPED
Transform stage -> possible errors here are:
When a message gets dropped due to the FILTER transform, it is recorded as REALTIME_ROWS_FILTERED
When the transform pipeline sets the $INCOMPLETE_RECORD_KEY$
key in the message, it is recorded as INCOMPLETE_REALTIME_ROWS_CONSUMED
, only when continueOnError
configuration is enabled. If the continueOnError
is not enabled, the ingestion fails.
Index stage -> When there is failure at this stage, the ingestion typically stops and marks the partition as ERROR.
There is yet another metric called ROWS_WITH_ERROR
which is the sum of all error counts in the 3 stages above.
Furthermore, the metric REALTIME_CONSUMPTION_EXCEPTIONS
gets incremented whenever there is a transient/permanent stream exception seen during consumption.
These metrics can be used to understand why ingestion failed for a particular table partition before diving into the server logs.
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
Learn how to ingest data from Kafka, a stream processing platform. You should have a local cluster up and running, following the instructions in .
Let's start by downloading Kafka to our local machine.
To pull down the latest Docker image, run the following command:
Download Kafka from and then extract it:
Next we'll spin up a Kafka broker:
Note: The --network pinot-demo flag is optional and assumes that you have a Docker network named pinot-demo that you want to connect the Kafka container to.
On one terminal window run this command:
Start Zookeeper
And on another window, run this command:
Start Kafka Broker
We're going to generate some JSON messages from the terminal using the following script:
datagen.py
If you run this script (python datagen.py
), you'll see the following output:
Let's now pipe that stream of messages into Kafka, by running the following command:
We can check how many messages have been ingested by running the following command:
Output
And we can print out the messages themselves by running the following command
Output
A schema defines what fields are present in the table along with their data types in JSON format.
Create a file called /tmp/pinot/schema-stream.json
and add the following content to it.
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.
Create a file called /tmp/pinot/table-config-stream.json
and add the following content to it.
Create the table and schema by running the appropriate command below:
Pinot supports two versions of the Kafka library: kafka-0.9
and kafka-2.x
for low level consumers.
Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest
tagged docker images or master
build, you should replace org.apache.kafka
with shaded.org.apache.kafka
in your table config.
Update table config for low level consumer: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
Pinot does not support using high-level Kafka consumers (HLC). Pinot uses low-level consumers to ensure accurate results, supports operational complexity and scalability, and minimizes storage overhead.
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl.
are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level
in Kafka stream config, which can be read_committed
or read_uncommitted
(default). Setting it to read_committed
will ingest transactionally committed messages in Kafka stream only.
For example,
Note that the default value of this config read_uncommitted
to read all messages. Also, this config supports low-level consumer only.
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate
to true
.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
To avoid errors like The Avro schema must be provided
, designate the location of the schema in your streamConfigs
section. For example, if your current section contains the following:
Then add this key: "stream.kafka.decoder.prop.schema"
followed by a value that denotes the location of your schema.
This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.
Pinot supports consuming data from via the pinot-pulsar
plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.
Enable the Pulsar plugin with the following config at the time of Pinot setup: -Dplugins.include=pinot-pulsar
The pinot-pulsar
plugin is not part of official 0.10.0 binary. You can download the plugin from and add it to the libs
or plugins
directory in pinot.
Here is a sample Pulsar stream config. You can use the streamConfigs
section from this sample and make changes for your corresponding table.
You can change the following Pulsar specifc configurations for your tables
Also, make sure to change the brokers url from pulsar://localhost:6650
to pulsar+ssl://localhost:6650
so that secure connections are used.
Pinot currently relies on Pulsar client version 2.7.2. Make sure the Pulsar broker is compatible with the this client version.
The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Pulsar table, set the stream config metadata.populate
to true
. The fields eventTime
, publishTime
, brokerPublishTime
, and key
are populated by default. If you would like to extract additional fields from the Pulsar Message, populate the metadataFields
config with a comma separated list of fields to populate. The fields are referenced by the field name in the Pulsar Message. For example, setting:
Will make the __metadata$messageId
, __metadata$messageBytes
, __metadata$eventTime
, and __metadata$topicName
, fields available for mapping to columns in the Pinot schema.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
This guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.
To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into your table config:
where the Kinesis specific properties are:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
Although you can also specify the accessKey
and secretKey
in the properties above, we don't recommend this insecure method. We recommend using it only for non-production proof-of-concept (POC) setups. You can also specify other AWS fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
ShardID
is of the format "shardId-000000000001". We use the numeric part as partitionId
. Our partitionId
variable is integer. If shardIds grow beyond Integer.MAX\_VALUE
, we will overflow into the partitionId space.
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
Learn how to apply indexes to a Pinot table. This guide assumes that you have followed the guide.
Pinot supports a series of different indexes that can be used to optimize query performance. In this guide, we'll learn how to add indexes to the events
table that we set up in the guide.
If no indexes are applied to the columns in a Pinot segment, the query engine needs to scan through every document, checking whether that document meets the filter criteria provided in a query. This can be a slow process if there are a lot of documents to scan.
When indexes are applied, the query engine can more quickly work out which documents satisfy the filter criteria, reducing the time it takes to execute the query.
By default, Pinot creates a forward index for every column. The forward index generally stores documents in insertion order.
However, before flushing the segment, Pinot does a single pass over every column to see whether the data is sorted. If data is sorted, Pinot creates a sorted (forward) index for that column instead of the forward index.
For real-time tables you can also explicitly tell Pinot that one of the columns should be sorted. For more details, see the [Sorted Index Documentation](https://docs.pinot.apache.org/basics/indexing/forward-index#real-time-tables).
For filtering documents within a segment, Pinot supports the following indexing techniques:
Inverted index: Used for exact lookups.
Range index - Used for range queries.
Text index - Used for phrase, term, boolean, prefix, or regex queries.
Geospatial index - Based on H3, a hexagon-based hierarchical gridding. Used for finding points that exist within a certain distance from another point.
JSON index - Used for querying columns in JSON documents.
Star-Tree index - Pre-aggregates results across multiple columns.
Let's see how we can apply these indexing techniques to our data. To recap, the events
table has the following fields:
We might want to write queries that filter on the ts
and uuid
columns, so these are the columns on which we would want to configure indexes.
Since the data we're ingesting into the Kafka topic is all implicitly ordered by timestamp, this means that the ts
column already has a sorted index. This means that any queries that filter on this column are already optimised.
So that leaves us with the uuid
column.
We're going to add an inverted index to the uuid
column so that queries that filter on that column will return quicker. We need to add the following line:
To the tableIndexConfig
section.
Copy the following to the clipboard:
/tmp/pinot/table-config-stream.json
Once you've done that, you'll need to click Reload All Segments and then Yes to apply the indexing change to all segments.
The following query will return the indexes defined on the uuid
column:
Output
We can see from looking at the inverted-index
property that the index has been applied.
You can now run some queries that filter on the uuid
column, as shown below:
You'll need to change the actual uuid
value to a value that exists in your database, because the UUIDs are generated randomly by our script.
Navigate to and click on the events
table to run a query that shows the first 10 rows in this table.
Querying the events table
This connector is also suitable for Kafka lib version higher than 2.0.0
. In , change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
Remember to follow the when updating schema of an existing table!
There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data]() for details.
The Pinot-Pulsar connector supports authentication using security tokens. To generate a token, follow the instructions in . Once generated, add the following property to streamConfigs
to add an authentication token for each request:
The Pinot-Pulsar connector supports authentication using OAuth2, for example, if connecting to a StreamNative Pulsar cluster. For more information, see how to . Once configured, you can add the following properties to streamConfigs
:
The Pinot-pulsar connector also supports TLS for encrypted connections. You can follow to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.
For other table and stream configurations, you can headover to
Pinot's Pulsar connector supports automatically extracting record headers and metadata into the Pinot table columns. Pulsar supports a large amount of per-record metadata. Reference the for the meaning of the metadata fields.
Remember to follow the when updating schema of an existing table!
Kinesis supports authentication using the . The credential provider looks for the credentials in the following order:
You must provide all read
access level
permissions for Pinot to work with an AWS Kinesis data stream. See the for details.
Navigate to , click on Edit Table, paste the next table config, and then click Save.
We can check that the index has been applied to all our segments by querying Pinot's REST API. You can find Swagger documentation at .
We're using the to extract the fields that we're interested in.
Record key: any type <K>
__key
: String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName
: String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset
: String
Record metadata - partition : int
__metadata$partition
: String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp
: String
streamType
This should be set to "pulsar"
stream.pulsar.topic.name
Your pulsar topic name
stream.pulsar.bootstrap.servers
Comma-separated broker list for Apache Pulsar
stream.pulsar.metadata.populate
set to true
to populate metadata
stream.pulsar.metadata.fields
set to comma separated list of metadata fields
key : String
__key
: String
Yes
properties : Map<String, String>
Each header key is listed as a separate column: __header$HeaderKeyName
: String
Yes
publishTime : Long
__metadata$publishTime
: String
publish time as determined by the producer
Yes
brokerPublishTime: Optional
__metadata$brokerPublishTime
: String
publish time as determined by the broker
Yes
eventTime : Long
__metadata$eventTime
: String
Yes
messageId : MessageId -> String
__metadata$messageId
: String
String representation of the MessagId field. The format is ledgerId:entryId:partitionIndex
messageId : MessageId -> bytes
__metadata$messageBytes
: String
Base64 encoded version of the bytes returned from calling MessageId.toByteArray()
producerName : String
__metadata$producerName
: String
schemaVersion : byte[]
__metadata$schemaVersion
: String
Base64 encoded value
sequenceId : Long
__metadata$sequenceId
: String
orderingKey : byte[]
__metadata$orderingKey
: String
Base64 encoded value
size : Integer
__metadata$size
: String
topicName : String
__metadata$topicName
: String
index : String
__metadata$index
: String
redeliveryCount : Integer
__metadata$redeliveryCount
: String
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number_,_ AT___SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.
ts
uuid
count