To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config
where the Kinesis specific properties are:
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number, AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.
Kinesis supports authentication using the DefaultCredentialsProviderChain. The credential provider looks for the credentials in the following order -
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups. You can also specify other aws fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX_VALUE, we will overflow
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
Apache Pinot lets users consume data from streams and push it directly into the database, in a process known as stream ingestion. Stream Ingestion makes it possible to query data within seconds of publication.
Stream Ingestion provides support for checkpoints for preventing data loss.
Setting up Stream ingestion involves the following steps:
Create schema configuration
Create table configuration
Upload table and schema spec
Let's take a look at each of the steps in more detail.
Let us assume the data to be ingested is in the following format:
Schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions
, metrics
or timestamp
. For more details on schema configuration, see creating a schema.
For our sample data, the schema configuration looks like this:
The next step is to create a table where all the ingested data will flow and can be queried. Unlike batch ingestion, table configuration for real-time ingestion also triggers the data ingestion job. For a more detailed overview of tables, see the table reference.
The real-time table configuration consists of the following fields:
tableName - The name of the table where the data should flow
tableType - The internal type for the table. Should always be set to REALTIME
for realtime ingestion
segmentsConfig -
tableIndexConfig - defines which column to use for indexing along with the type of index. For full configuration, see [Indexing Configs]. It has the following required fields -
loadMode - specifies how the segments should be loaded. Should beheap
or mmap
. Here's the difference between both the configs
mmap: Segments are loaded onto memory-mapped files. This is the default mode.
heap: Segments are loaded into direct memory. Note, 'heap' here is a legacy misnomer, and it does not imply JVM heap. This mode should only be used when we want faster performance than memory-mapped files, and are also sure that we will never run into OOM.
streamConfig - specifies the data source along with the necessary configs to start consuming the real-time data. The streamConfig can be thought of as the equivalent to the job spec for batch ingestion. The following options are supported:
streamType
The streaming platform from which to consume the data
kafka
stream.[streamType].consumer.type
Whether to use per partition low-level consumer or high-level stream consumer
lowLevel
- Consume data from each partition with offset management
highLevel
- Consume data without control over the partitions
stream.[streamType].topic.name
The datasource (e.g. topic, data stream) from which to consume the data
String
stream.[streamType].decoder.class.name
Name of the class to be used for parsing the data. The class should implement org.apache.pinot.spi.stream.StreamMessageDecoder
interface
String. Available options:
org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder
org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
stream.[streamType].consumer.factory.class.name
Name of the factory class to be used to provide the appropriate implementation of low level and high level consumer as well as the metadata
String. Available options:
org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory
org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory
stream.[streamType].consumer.prop.auto.offset.reset
Determines the offset from which to start the ingestion
smallest
largest
or
timestamp in milliseconds
topic.consumption.rate.limit
Determines the upper bound for consumption rate for the whole topic. Having a consumption rate limiter is beneficial in case the stream message rate has a bursty pattern which leads to long GC pauses on the Pinot servers. The rate limiter can also be considered as a safeguard against excessive ingestion of realtime tables.
Double. The values should be greater than zero.
The following flush threshold settings are also supported:
realtime.segment.flush.threshold.time
Time threshold that will keep the realtime segment open for before we complete the segment. Noted that this time should be smaller than the Kafka retention period configured for the corresponding topic.
realtime.segment.flush.threshold.rows
Row count flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,
since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,
then a high level consumer would have a buffer size of two million.
If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless
threshold.time is reached first)
realtime.segment.flush.threshold.segment.size
The desired size of a completed realtime segment. This config is used only if realtime.segment.flush.threshold.rows
is set to 0.
You can also specify additional configs for the consumer by prefixing the key with stream.[streamType] where streamType
is the name of the streaming platform. e.g. kafka
For our sample data and schema, the table config will look like this:
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, pinot will start ingesting available records from the topic.
We are working on support for other ingestion platforms, but you can also write your own ingestion plugin if it is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
In this guide, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.
Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic
Start Kafka
Create a Kafka Topic
Start Kafka
Start Kafka cluster on port 9876
using the same Zookeeper from the quick-start examples.
Create a Kafka topic
Download the latest Kafka. Create a topic.
We will publish the data in the same format as mentioned in the Stream ingestion docs. So you can use the same schema mentioned under Create Schema Configuration.
The real-time table configuration for the transcript
table described in the schema from the previous step.
For Kafka, we use streamType as kafka
. Currently only JSON format is supported but you can easily write your own decoder by extending the StreamMessageDecoder
interface. You can then access your decoder class by putting the jar file in plugins
directory
The lowLevel
consumer reads data per partition whereas the highLevel
consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.
For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
For Kafka version 2.X and above, use
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
You can set the offset to -
smallest
to start consumer from the earliest offset
largest
to start consumer from the latest offset
timestamp in milliseconds
to start the consumer from the offset after the timestamp.
The resulting configuration should look as follows -
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In Kafka 2.0 connector pom.xml, change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
The connector with Kafka lib 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level
in Kafka stream config, which can be read_committed
or read_uncommitted
(default). Setting it to read_committed
will ingest transactionally committed messages in Kafka stream only.
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.
We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json
.
Push sample JSON into the transcript-topic
Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json
file.
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to checkout the real-time data.
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl.
are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
With Kafka consumer 2.0, you can ingest transactionally committed messages only by configuring kafka.isolation.level
to read_committed
. For example,
Note that the default value of this config read_uncommitted
to read all messages. Also, this config supports low-level consumer only.
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
Pinot supports consuming data from Apache Pulsar via pinot-pulsar
plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.
You can enable pulsar plugin with the following config at the time of Pinot setup
-Dplugins.include=pinot-pulsar
pinot-pulsar
plugin is not part of official 0.10.0 binary. You can download the plugin from our external repository and add it to libs
or plugins
directory in pinot.
A sample Pulsar stream config to ingest data should look as follows. You can use the streamConfigs
section from this sample and make changes for your corresponding table.
You can change the following Pulsar specifc configurations for your tables
streamType
This should be set to "pulsar"
stream.pulsar.topic.name
Your pulsar topic name
stream.pulsar.bootstrap.servers
Comma-seperated broker list for Apache Pulsar
Pinot-Pulsar connector supports authentication using the security tokens. You can generate the token by following the official Pulsar documentaton. Once generated, you can add the following property to streamConfigs
to add auth token for each request
Pinot-pulsar connecor also supports TLS for encrypted connections. You can follow the official pulsar documentation to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.
Also, make sure to change the brokers url from pulsar://localhost:6650
to pulsar+ssl://localhost:6650
so that secure connections are used.
For other table and stream configurations, you can headover to Table configuration Reference
PInot currently relies on Pulsar client version 2.7.2. Users should make sure the Pulsar broker is compatible with the this client version.