Stream ingestion
Apache Pinot lets users consume data from streams and push it directly into the database, in a process known as stream ingestion. Stream Ingestion makes it possible to query data within seconds of publication.
Stream Ingestion provides support for checkpoints for preventing data loss.
Setting up Stream ingestion involves the following steps:
  1. 1.
    Create schema configuration
  2. 2.
    Create table configuration
  3. 3.
    Upload table and schema spec
Let's take a look at each of the steps in more detail.
Let us assume the data to be ingested is in the following format:
1
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
2
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
3
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
4
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
5
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
6
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
7
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
8
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
9
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
10
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
11
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
12
{"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
Copied!

Create Schema Configuration

Schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions , metrics or timestamp. For more details on schema configuration, see creating a schema.
For our sample data, the schema configuration looks like this:
/tmp/pinot-quick-start/transcript-schema.json
1
{
2
"schemaName": "transcript",
3
"dimensionFieldSpecs": [
4
{
5
"name": "studentID",
6
"dataType": "INT"
7
},
8
{
9
"name": "firstName",
10
"dataType": "STRING"
11
},
12
{
13
"name": "lastName",
14
"dataType": "STRING"
15
},
16
{
17
"name": "gender",
18
"dataType": "STRING"
19
},
20
{
21
"name": "subject",
22
"dataType": "STRING"
23
}
24
],
25
"metricFieldSpecs": [
26
{
27
"name": "score",
28
"dataType": "FLOAT"
29
}
30
],
31
"dateTimeFieldSpecs": [{
32
"name": "timestamp",
33
"dataType": "LONG",
34
"format" : "1:MILLISECONDS:EPOCH",
35
"granularity": "1:MILLISECONDS"
36
}]
37
}
Copied!

Create Table Configuration

The next step is to create a table where all the ingested data will flow and can be queried. Unlike batch ingestion, table configuration for real-time ingestion also triggers the data ingestion job. For a more detailed overview of tables, see the table reference.
The real-time table configuration consists of the following fields:
  • tableName - The name of the table where the data should flow
  • tableType - The internal type for the table. Should always be set to REALTIME for realtime ingestion
  • segmentsConfig -
  • tableIndexConfig - defines which column to use for indexing along with the type of index. For full configuration, see [Indexing Configs]. It has the following required fields -
    • loadMode - specifies how the segments should be loaded. Should beheap or mmap. Here's the difference between both the configs
      • mmap: Segments are loaded onto memory-mapped files. This is the default mode.
      • heap: Segments are loaded into direct memory. Note, 'heap' here is a legacy misnomer, and it does not imply JVM heap. This mode should only be used when we want faster performance than memory-mapped files, and are also sure that we will never run into OOM.
    • streamConfig - specifies the data source along with the necessary configs to start consuming the real-time data. The streamConfig can be thought of as the equivalent to the job spec for batch ingestion. The following options are supported:
Config key
Description
Supported values
streamType
The streaming platform from which to consume the data
kafka
stream.[streamType].consumer.type
Whether to use per partition low-level consumer or high-level stream consumer
  • lowLevel - Consume data from each partition with offset management
  • highLevel - Consume data without control over the partitions
stream.[streamType].topic.name
The datasource (e.g. topic, data stream) from which to consume the data
String
stream.[streamType].decoder.class.name
Name of the class to be used for parsing the data. The class should implement org.apache.pinot.spi.stream.StreamMessageDecoder interface
String. Available options:
  • org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
  • org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder
  • org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder
  • org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
stream.[streamType].consumer.factory.class.name
Name of the factory class to be used to provide the appropriate implementation of low level and high level consumer as well as the metadata
String. Available options:
  • org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
  • org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
  • org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory
  • org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory
stream.[streamType].consumer.prop.auto.offset.reset
Determines the offset from which to start the ingestion
  • smallest
  • largest or
  • timestamp in milliseconds
topic.consumption.rate.limit
Determines the upper bound for consumption rate for the whole topic. Having a consumption rate limiter is beneficial in case the stream message rate has a bursty pattern which leads to long GC pauses on the Pinot servers. The rate limiter can also be considered as a safeguard against excessive ingestion of realtime tables.
Double. The values should be greater than zero.
The following flush threshold settings are also supported:
Config key
Description
Supported values
realtime.segment.flush.threshold.time
Time threshold that will keep the realtime segment open for before we complete the segment. Noted that this time should be smaller than the Kafka retention period configured for the corresponding topic.
realtime.segment.flush.threshold.rows
Row count flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,
since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,
then a high level consumer would have a buffer size of two million.
If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless
threshold.time is reached first)
realtime.segment.flush.threshold.segment.size
The desired size of a completed realtime segment. This config is used only if realtime.segment.flush.threshold.rows is set to 0.
You can also specify additional configs for the consumer by prefixing the key with stream.[streamType] where streamType is the name of the streaming platform. e.g. kafka
For our sample data and schema, the table config will look like this:
1
{
2
"tableName": "transcript",
3
"tableType": "REALTIME",
4
"segmentsConfig": {
5
"timeColumnName": "timestamp",
6
"timeType": "MILLISECONDS",
7
"schemaName": "transcript",
8
"replicasPerPartition": "1"
9
},
10
"tenants": {},
11
"tableIndexConfig": {
12
"loadMode": "MMAP",
13
"streamConfigs": {
14
"streamType": "kafka",
15
"stream.kafka.consumer.type": "lowlevel",
16
"stream.kafka.topic.name": "transcript-topic",
17
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
18
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
19
"stream.kafka.broker.list": "localhost:9876",
20
"realtime.segment.flush.threshold.time": "3600000",
21
"realtime.segment.flush.threshold.rows": "50000",
22
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
23
}
24
},
25
"metadata": {
26
"customConfigs": {}
27
}
28
}
Copied!

Upload schema and table config

Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, pinot will start ingesting available records from the topic.
Docker
Launcher Script
1
docker run \
2
--network=pinot-demo \
3
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
4
--name pinot-streaming-table-creation \
5
apachepinot/pinot:latest AddTable \
6
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
7
-tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
8
-controllerHost pinot-quickstart \
9
-controllerPort 9000 \
10
-exec
Copied!
1
bin/pinot-admin.sh AddTable \
2
-schemaFile /path/to/transcript-schema.json \
3
-tableConfigFile /path/to/transcript-table-realtime.json \
4
-exec
Copied!

Custom Ingestion Support

We are working on support for other ingestion platforms, but you can also write your own ingestion plugin if it is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.