Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This feature is supported after the 0.11.0 release. Reference PR: https://github.com/apache/pinot/pull/8557
Ensure you have available Pinot Minion instances deployed within the cluster.
Pinot version is 0.11.0 or above
Parse the query with the table name and directory URI along with a list of options for the ingestion job.
Call controller minion task execution API endpoint to schedule the task on minion
Response has the schema of table name and task job id.
INSERT INTO [database.]table FROM FILE dataDirURI OPTION ( k=v ) [, OPTION (k=v)]*
Screenshot
We are actively developing this feature...
The details will be revealed soon.
Pinot supports Apache Flink as a processing framework to push segment files to the database.
Pinot distribution contains an Apache Flink that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.
Here is an example code snippet to show how to utilize the in a Flink streaming application:
PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.
Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.
Here is an example table config
the only required configurations are:
"outputDirURI"
: where PinotSinkFunction should write the constructed segment file to
"push.controllerUri"
: which Pinot cluster (controller) URL PinotSinkFunction should communicate with.
The rest of the configurations are standard for any Pinot table.
As the example shown above, the only required information from the Pinot side is the table and the table .
For a more detail executable please refer to the .
Dimension tables in Apache Pinot.
Dimension tables are a special kind of offline tables from which data can be looked up via the lookup UDF, providing join like functionality.
Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups.
To mark an offline table as a dim table, isDimTable
should be set to true and segmentsConfig.segementPushType
should be set to REFRESH in the table config as shown below:
As dimension tables are used to perform lookups of dimension values, they are required to have a primary key (can be a composite key).
When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.
The maximum size quota for a dimension table in a cluster is controlled by the controller.dimTable.maxSize
controller property. Table creation will fail if the storage quota exceeds this maximum size.
A dimension table cannot be part of a hybrid table.
Pinot supports Apache Hadoop as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
You can follow the [wiki] to build pinot distribution from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
Next, you need to change the execution config in the job spec to the following -
You can check out the sample job spec here.
Finally execute the hadoop job using the command -
Please ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.
The MapReduce job called SegmentPreprocessingJob
would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.
Check the below example to see how to use SegmentPreprocessingJob
.
In Hadoop properties, set the following to enable this job:
In table config, specify the operations in preprocessing.operations
that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:
Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.
Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.
For more details on this MR job, please refer to this document.
Pinot batch ingestion involves two parts: routing ingestion job(hourly/daily) and backfill. Here are some tutorials on how routine batch ingestion works in Pinot Offline Table:
High Level Idea
Organize raw data into buckets (eg: /var/pinot/airlineStats/rawdata/2014/01/01). Each bucket typically contains several files (eg: /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01_0.avro)
Run a Pinot batch ingestion job, which points to a specific date folder like ‘/var/pinot/airlineStats/rawdata/2014/01/01’. The segment generation job will convert each such avro file into a Pinot segment for that day and give it a unique name.
Run Pinot segment push job to upload those segments with those uniques names via a Controller API
IMPORTANT: The segment name is the unique identifier used to uniquely identify that segment in Pinot. If the controller gets an upload request for a segment with the same name - it will attempt to replace it with the new one.
This newly uploaded data can now be queried in Pinot. However, sometimes users will make changes to the raw data which need to be reflected in Pinot. This process is known as 'Backfill'.
Pinot supports data modification only at the segment level, which means we should update entire segments for doing backfills. The high level idea is to repeat steps 2 (segment generation) and 3 (segment upload) mentioned above:
Backfill jobs must run at the same granularity as the daily job. E.g., if you need to backfill data for 2014/01/01, specify that input folder for your backfill job (e.g.: ‘/var/pinot/airlineStats/rawdata/2014/01/01’)
The backfill job will then generate segments with the same name as the original job (with the new data).
When uploading those segments to Pinot, the controller will replace the old segments with the new ones (segment names act like primary keys within Pinot) one by one.
Backfill jobs expect the same number of (or more) data files on the backfill date. So the segment generation job will create the same number of (or more) segments than the original run.
E.g. assuming table airlineStats has 2 segments(airlineStats_2014-01-01_2014-01-01_0, airlineStats_2014-01-01_2014-01-01_1) on date 2014/01/01 and the backfill input directory contains only 1 input file. Then the segment generation job will create just one segment: airlineStats_2014-01-01_2014-01-01_0. After the segment push job, only segment airlineStats_2014-01-01_2014-01-01_0 got replaced and stale data in segment airlineStats_2014-01-01_2014-01-01_1 are still there.
In case the raw data is modified in such a way that the original time bucket has fewer input files than the first ingestion run, backfill will fail.
Deduplication support in Apache Pinot.
Pinot provides native support of Deduplication during the real-time ingestion (v0.11.0+).
To enable dedup on a Pinot table, there are a couple of table configuration and schema changes needed.
There are certain mandatory configurations needed in order to be able to enable dedup.
To be able to dedup records, a primary key is needed to uniquely identify a given record. To define a primary key, add the field primaryKeyColumns
to the schema definition.
Note this field expects a list of columns, as the primary key can be composite.
While ingesting a record, if its primary key is found to be already present, the record will be dropped.
An important requirement for the Pinot dedup table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the send
API. If the original stream is not partitioned, then a streaming processing job (e.g. Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.
The dedup Pinot table can use only the low-level consumer for the input streams. As a result, it uses the partitioned replica-group assignment for the segments. Moreover, dedup poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires strictReplicaGroup
as the routing strategy. To use that, configure instanceSelectorType
in Routing
as the following:
The high-level consumer is not allowed for the input stream ingestion, which means stream.kafka.consumer.type
must be lowLevel
.
The incoming stream must be partitioned by the primary key such that, all records with a given primaryKey must be consumed by the same Pinot server instance.
To enable dedup for a REALTIME table, add the following to the table config.
Supported values for hashFunction
are NONE
, MD5
and MURMUR3
, with the default being NONE
.
Unlike other real-time tables, Dedup table takes up more memory resources as it needs to bookkeep the primary key and its corresponding segment reference, in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Dedup table.
Create the Kafka topic with more partitions. The number of Kafka partitions determines the partition numbers of the Pinot table. The more partitions you have in the Kafka topic, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally.
Dedup table maintains an in-memory map from the primary key to the segment reference. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction
config in the Dedup config, which can be MD5
or MURMUR3
, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.
Monitoring: Set up a dashboard over the metric pinot.server.dedupPrimaryKeysCount.tableName
to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth.
Capacity planning: It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the amount of the primary keys in the Kafka throughput per partition and time the primary key space cost to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an dedup table instance.
Pinot supports Apache spark as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
We support both Spark 2.X and 3.X
You can follow the wiki to build pinot distribution from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
Next, you need to change the execution config in the job spec to the following -
You can check out the sample job spec here.
To run Spark ingestion, you need the following jars in your classpath
pinot-batch-ingestion-spark
plugin jar - available in plugins-external
directory in the package
pinot-all
jar - available in lib
directory in the package
These jars can be specified using spark.driver.extraClassPath
or any other option.
For loading any other plugins that you want to use, you can use -
The complete spark-submit command should look as follows
Please ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
Note: You should change the master
to yarn
and deploy-mode
to cluster
for production environments.
We have stopped including spark-core
dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark
in case of any runtime issues. You can either build from source or download latest master build jars.
If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -
Build Pinot from source with option -DuseProvidedHadoop
Copy Pinot binaries to S3, HDFS or any other distributed storage that is accessible from all nodes.
Copy Ingestion spec YAML file to S3, HDFS or any other distributed storage. Mention this path as part of --files
argument in the command
Add --jars
options that contain the s3/hdfs paths to all the required plugin and pinot-all jar
Point classPath
to spark working directory. Generally, just specifying the jar names without any paths works. Same should be done for main jar as well as the spec YAML file
Example
For Spark 3.x, replace pinot-batch-ingestion-spark-2.4
with pinot-batch-ingestion-spark-3.2
in all places in the commands.
Also, ensure the classpath in ingestion spec is changed from org.apache.pinot.plugin.ingestion.batch.spark.
to
org.apache.pinot.plugin.ingestion.batch.spark3.
Q - I am getting the following exception - Class has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
Since 0.8.0 release, Pinot binaries are compiled with JDK 11. If you are using Spark along with Hadoop 2.7+, you need to use the java8 version of pinot. Currently, you need to build jdk 8 version from source.
Q - I am not able to find pinot-batch-ingestion-spark
jar.
For Pinot version prior to 0.10.0, the spark plugin is located in plugin
dir of binary distribution. For 0.10.0 and later, it is located in pinot-external
dir.
Q - Spark is not able to find the jars leading to java.nio.file.NoSuchFileException
This means the classpath for spark job has not been configured properly. If you are running spark in a distributed environment such as Yarn or k8s, make sure both spark.driver.classpath
and spark.executor.classpath
are set. Also, the jars in driver.classpath
should be added to --jars
argument in spark-submit
so that spark can distribute those jars to all the nodes in your cluster. You also need to take provide appropriate scheme with the file path when running the jar. In this doc, we have used local:\\
but it can be different dependening on your cluster setup.
Q - Spark job failing while pushing the segments.
It can be because of misconfigured controllerURI
in job spec yaml file. If the controllerURI is correct, make sure it is accessible from all the nodes of your YARN or k8s cluster.
Q - My data gets overwritten during ingestion.
Set segmentPushType to APPEND
in the tableConfig.
If already set to APPEND
, this is likely due to a missing timeColumnName
in your table config. If you can't provide a time column, please use our segment name generation configs in ingestion spec. Generally using inputFile
segment name generator should fix your issue.
Q - I am getting java.lang.RuntimeException: java.io.IOException: Failed to create directory: pinot-plugins-dir-0/plugins/*
Removing -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
from spark.driver.extraJavaOptions
should fix this. As long as plugins are mentioned in classpath and jars
argument it should not be an issue.
Q - Getting Class not found:
exception
Please check if extraClassPath
arguments contain all the plugin jars for both driver and executors. Also, all the plugin jars are mentioned in the --jars
argument. If both of these are correct, please check if the extraClassPath
contains local filesystem classpaths and not s3 or hdfs or any other distributed file system classpaths.
This section is an overview of the various options for importing data into Pinot.
There are multiple options for importing data into Pinot. These guides are ready-made examples that show you step-by-step instructions for importing records into Pinot, supported by our .
These guides are meant to get you up and running with imported data as quick as possible. Pinot supports multiple file input formats without needing to change anything other than the file name. Each example imports a ready-made dataset so you can see how things work without needing to bring your own dataset.
This guide will show you how to import data using stream ingestion from Apache Kafka topics.
This guide will show you how to import data using stream ingestion with upsert.
This guide will show you how to import data using stream ingestion with deduplication.
By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add a deep storage. Checkout for all the info and related configs.
These guides will show you how to import data as well as persist it in the file systems.
These guides will show you how to import data from a Pinot supported input format.
This guide will show you how to handle the complex type in the ingested data, such as map and array.
Batch ingestion allows users to create a table using data already present in a file system such as S3. This is particularly useful for the cases where the user wants to utilize Pinot's ability to query large data with minimal latency or test out new features using a simple data file.
Ingesting data from a filesystem involves the following steps -
Define Schema
Define Table Config
Upload Schema and Table configs
Upload data
Batch Ingestion currently supports the following mechanisms to upload the data -
Standalone
Here we'll take a look at the standalone local processing to get you started.
Let's create a table for the following CSV data source.
In our data, the only column on which aggregations can be performed is score. Secondly, timestampInEpoch is the only timestamp column. So, on our schema, we keep score as metric and timestampInEpoch as timestamp column.
Here, we have also defined two extra fields - format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it is in milliseconds hence we have specified 1:MILLISECONDS:EPOCH
We define a tabletranscript
and map the schema created in the previous step to the table. For batch data, we keep the tableType
as OFFLINE
Now that we have both the configs, we can simply upload them and create a table. To achieve that, just run the command -
Check out the table config and schema in the [Rest API] to make sure it was successfully uploaded.
We now have an empty table in pinot. So as the next step we will upload our CSV file to this table.
A table is composed of multiple segments. The segments can be created using three ways
1) Minion based ingestion 2) Upload API 3) Ingestion jobs
There are 2 Controller APIs that can be used for a quick ingestion test using a small file.
When these APIs are invoked, the controller has to download the file and build the segment locally.
Hence, these APIs are NOT meant for production environments and for large input files.
This API creates a segment using the given file and pushes it to Pinot. All steps happen on the controller. Example usage:
To upload a JSON file data.json
to a table called foo_OFFLINE
, use below command
Note that query params need to be URLEncoded. For example, {"inputFormat":"json"} in the command below needs to be converted to %7B%22inputFormat%22%3A%22json%22%7D.
The batchConfigMapStr
can be used to pass in additional properties needed for decoding the file. For example, in case of csv, you may need to provide the delimiter
This API creates a segment using file at the given URI and pushes it to Pinot. Properties to access the FS need to be provided in the batchConfigMap. All steps happen on the controller. Example usage:
Segments can be created and uploaded using tasks known as DataIngestionJobs. A job also needs a config of its own. We call this config the JobSpec.
For our CSV file and table, the job spec should look like below.
Now that we have the job spec for our table transcript
, we can trigger the job using the following command
Once the job has successfully finished, you can head over to the [query console] and start playing with the data.
There are 3 ways to upload a Pinot segment:
This is the original and default push mechanism.
Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.
The push job will:
Upload the entire segment tar file to the Pinot controller.
Pinot controller will:
Save the segment into the controller segment directory(Local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism requires the segment Tar file stored on a deep store with a globally accessible segment tar URI.
URI push is light-weight on the client-side, and the controller side requires equivalent work as the Tar push.
The push job will:
POST this segment Tar URI to the Pinot controller.
Pinot controller will:
Download segment from the URI and save it to controller segment directory(Local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism also requires the segment Tar file stored on a deep store with a globally accessible segment tar URI.
Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.
The push job will:
Download the segment based on URI.
Extract metadata.
Upload metadata to the Pinot Controller.
Pinot Controller will:
Add the segment to the table based on the metadata.
4. Segment Metadata Push with copyToDeepStore
This extends the original Segment Metadata Push for cases, where the segments are pushed to a location not used as deep store. The ingestion job can still do metadata push but ask Pinot Controller to copy the segments into deep store. Those use cases usually happen when the ingestion jobs don't have direct access to deep store but still want to use metadata push for its efficiency, thus using a staging location to keep the segments temporarily.
NOTE: the staging location and deep store have to use same storage scheme, like both on s3. This is because the copy is done via PinotFS.copyDir interface that assumes so; but also because this does copy at storage system side, so segments don't need to go through Pinot Controller at all.
To make this work, firstly, grant Pinot controllers access to the staging location. e.g. on AWS, this may be to add access policy like below for the controller EC2 instances
Then use metadata push but add one extra config like below:
Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent result due to interleaving of old and new data.
When pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot Controller and Server:
Push segment to other systems and implement your own segment fetcher to pull data from those systems.
Since pinot is written in Java, you can set the following basic java configurations to tune the segment runner job -
Log4j2 file location with -Dlog4j2.configurationFile
Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
JVM props, like -Xmx8g -Xms4G
If you are using the docker, you can set the following under JAVA_OPTS
variable.
You can set -D mapreduce.map.memory.mb=8192
to set the mapper memory size when submitting the Hadoop job.
You can add config spark.executor.memory
to tune the memory usage for segment creation when submitting the Spark job.
Refer to
You can refer to for more details.
See for how to enable this feature.
Push segment to shared NFS and let pinot pull segment files from the location of that NFS. See .
Push segment to a Web server and let pinot pull segment files from the Web server with HTTP/HTTPS link. See .
Push segment to PinotFS(HDFS/S3/GCS/ADLS) and let pinot pull segment files from PinotFS URI. See and .
The first three options are supported out of the box within the Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for PinotFS, you will need to provide configuration and proper Hadoop dependencies.
By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of a system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add deep storage. Checkout for all the info and related configs.
This section contains a collection of short guides to show you how to import from a Pinot supported file system.
FileSystem is an abstraction provided by Pinot to access data in distributed file systems (DFS).
Pinot uses distributed file systems for the following purposes:
Batch Ingestion Job - To read the input data (CSV, Avro, Thrift, etc.) and to write generated segments to DFS
Controller - When a segment is uploaded to the controller, the controller saves it in the DFS configured.
Server - When a server(s) is notified of a new segment, the server copies the segment from remote DFS to their local node using the DFS abstraction.
Pinot lets you choose a distributed file system provider. The following file systems are supported by Pinot:
To use a distributed file system, you need to enable plugins. To do that, specify the plugin directory and include the required plugins -
Now, You can proceed to change the filesystem in the controller
and server
config as shown below:
scheme
refers to the prefix used in the URI of the filesystem. e.g. for the URI s3://bucket/path/to/file
, the scheme is s3
You can also change the filesystem during ingestion. In the ingestion job spec, specify the filesystem with the following config:
This guide shows you how to import data from HDFS.
You can enable the Hadoop DFS using the plugin pinot-hdfs
. In the controller or server, add the config:
By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include
, you need to put all the plugins you want to use, e.g. pinot-json
, pinot-avro
, pinot-kafka-2.0...
HDFS implementation provides the following options -
hadoop.conf.path
: Absolute path of the directory containing hadoop XML configuration files such as hdfs-site.xml, core-site.xml .
hadoop.write.checksum
: create checksum while pushing an object. Default is false
hadoop.kerberos.principle
hadoop.kerberos.keytab
Each of these properties should be prefixed by pinot.[node].storage.factory.class.hdfs.
where node
is either controller
or server
depending on the config
The kerberos
configs should be used only if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.
You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.
To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.
For example, the following curl requests to Controller will notify it to download segment files to the proper table:
Standalone Job:
Hadoop Job:
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
In this guide, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.
Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic
Start Kafka
Create a Kafka Topic
Start Kafka
Start Kafka cluster on port 9092
using the same Zookeeper from the quick-start examples.
Create a Kafka topic
Download the latest Kafka. Create a topic.
We will publish the data in the same format as mentioned in the Stream ingestion docs. So you can use the same schema mentioned under Create Schema Configuration.
The real-time table configuration for the transcript
table described in the schema from the previous step.
For Kafka, we use streamType as kafka
. Currently only JSON format is supported but you can easily write your own decoder by extending the StreamMessageDecoder
interface. You can then access your decoder class by putting the jar file in plugins
directory
The lowLevel
consumer reads data per partition whereas the highLevel
consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.
For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
For Kafka version 2.X and above, use
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
You can set the offset to -
smallest
to start consumer from the earliest offset
largest
to start consumer from the latest offset
timestamp in format yyyy-MM-dd'T'HH:mm:ss.SSSZ
to start the consumer from the offset after the timestamp.
datetime duration or period
to start the consumer from the offset after the period eg., '2d'.
The resulting configuration should look as follows -
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.
We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json
.
Push sample JSON into the transcript-topic
Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json
file.
Checkin Kafka docker container
Publish messages to the target topic
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to checkout the real-time data.
Pinot supports 2 major generations of Kafka library - kafka-0.9 and kafka-2.x for both high and low level consumers.
Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest
tagged docker images or master
build, you should replace org.apache.kafka
with shaded.org.apache.kafka
in your table config.
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In Kafka 2.0 connector pom.xml, change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl.
are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level
in Kafka stream config, which can be read_committed
or read_uncommitted
(default). Setting it to read_committed
will ingest transactionally committed messages in Kafka stream only.
For example,
Note that the default value of this config read_uncommitted
to read all messages. Also, this config supports low-level consumer only.
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
Pinot's Kafka connector now supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
Record key: any type <K>
__key
: String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName
: String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset
: String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp
: String
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate
to true
.
In addition to this, if you want to actually use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
Don't forget to follow the schema evolution guidelines when updating schema of an existing table!
Apache Pinot lets users consume data from streams and push it directly into the database, in a process known as stream ingestion. Stream Ingestion makes it possible to query data within seconds of publication.
Stream Ingestion provides support for checkpoints for preventing data loss.
Setting up Stream ingestion involves the following steps:
Create schema configuration
Create table configuration
Upload table and schema spec
Let's take a look at each of the steps in more detail.
Let us assume the data to be ingested is in the following format:
Schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions
, metrics
or timestamp
. For more details on schema configuration, see creating a schema.
For our sample data, the schema configuration looks like this:
The next step is to create a table where all the ingested data will flow and can be queried. Unlike batch ingestion, table configuration for real-time ingestion also triggers the data ingestion job. For a more detailed overview of tables, see the table reference.
The real-time table configuration consists of the following fields:
tableName - The name of the table where the data should flow
tableType - The internal type for the table. Should always be set to REALTIME
for realtime ingestion
segmentsConfig -
tableIndexConfig - defines which column to use for indexing along with the type of index. For full configuration, see [Indexing Configs]. It has the following required fields -
loadMode - specifies how the segments should be loaded. Should beheap
or mmap
. Here's the difference between both the configs
mmap: Segments are loaded onto memory-mapped files. This is the default mode.
heap: Segments are loaded into direct memory. Note, 'heap' here is a legacy misnomer, and it does not imply JVM heap. This mode should only be used when we want faster performance than memory-mapped files, and are also sure that we will never run into OOM.
streamConfig - specifies the data source along with the necessary configs to start consuming the real-time data. The streamConfig can be thought of as the equivalent to the job spec for batch ingestion. The following options are supported:
streamType
The streaming platform from which to consume the data
kafka
stream.[streamType].consumer.type
Whether to use per partition low-level consumer or high-level stream consumer
lowLevel
- Consume data from each partition with offset management
highLevel
- Consume data without control over the partitions
stream.[streamType].topic.name
The datasource (e.g. topic, data stream) from which to consume the data
String
stream.[streamType].decoder.class.name
Name of the class to be used for parsing the data. The class should implement org.apache.pinot.spi.stream.StreamMessageDecoder
interface
String. Available options:
org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder
org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
stream.[streamType].consumer.factory.class.name
Name of the factory class to be used to provide the appropriate implementation of low level and high level consumer as well as the metadata
String. Available options:
org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory
org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory
stream.[streamType].consumer.prop.auto.offset.reset
Determines the offset from which to start the ingestion
smallest
largest
or
timestamp in milliseconds
topic.consumption.rate.limit
Determines the upper bound for consumption rate for the whole topic. Having a consumption rate limiter is beneficial in case the stream message rate has a bursty pattern which leads to long GC pauses on the Pinot servers. The rate limiter can also be considered as a safeguard against excessive ingestion of realtime tables.
Double. The values should be greater than zero.
The following flush threshold settings are also supported:
realtime.segment.flush.threshold.time
Time threshold that will keep the realtime segment open for before we complete the segment. Noted that this time should be smaller than the Kafka retention period configured for the corresponding topic.
realtime.segment.flush.threshold.rows
Row count flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,
since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,
then a high level consumer would have a buffer size of two million.
If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless
threshold.time is reached first)
realtime.segment.flush.threshold.segment.size
The desired size of a completed realtime segment. This config is used only if realtime.segment.flush.threshold.rows
is set to 0.
You can also specify additional configs for the consumer directly into the streamConfigs.
For our sample data and schema, the table config will look like this:
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, pinot will start ingesting available records from the topic.
There are some scenarios where the message rate in the input stream has a bursty nature which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other realtime tables on the same server. In such scenarios, you should throttle the consumption rate during stream ingestion.
Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit
which indicates the upper bound on the message rate for the entire topic.
Here is the sample configuration on how to configure the consumption throttling:
Some things to keep in mind while tuning this config are:
Since this config applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.
In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.
Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:
In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION
.
Note that any configuration change for topic.consumption.rate.limit
in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Please refer to Pause Stream Ingestion for more details.
We are working on support for other ingestion platforms, but you can also write your own ingestion plugin if it is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.
There are some scenarios in which you may want to pause the realtime ingestion while your table is available for queries. For example if there is a problem with the stream ingestion, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.
When a Pause request is issued, Controller instructs the realtime servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Please note that Pause and Resume requests are async. OK response means that instructions for pausing or resuming has been successfully sent to the realtime server. If you want to know if the consumptions actually stopped or resumed, you can issue a pause status request.
It's worth noting that consuming segments on realtime servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may therefore take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages, etc.), it will result in the table getting into an error state.
Pause and resume feature comes to the rescue here. When a Pause request is issued by the operator, consuming segments are committed without starting new mutables ones. Instead, new mutable segments are started only when the Resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.
There is another feature called "Force Commit" which utilizes the primitives of pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.
For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a Pause request. Second, change the consumption parameters. Finally, issue the Resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.
If a Pinot table is configured to consume using a Low Level (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.
Pinot runs a periodic task called RealtimeSegmentValidationManager
that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a periodic task that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.
If it is desired to recognize the new partitions sooner, then you can manually trigger the periodic task so as to recognize such data immediately.
Often, it is important to understand the rate of ingestion of data into your realtime table. This is commonly done by looking at the consumption "lag" of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).
The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo
API or the table's /debug
API, as shown below:
A sample response from a Kafka based realtime table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.
currentOffsetsMap
Current consuming offset position per partition
latestUpstreamOffsetMap
(Wherever applicable) Latest offset found in the upstream topic partition
recordsLagMap
(Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset
and currentOffset
for the partition when the lag computation request is made.
recordsAvailabilityLagMap
(Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.
You can enable Amazon S3 Filesystem backend by including the plugin pinot-s3
.
By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include
, you need to put all the plugins you want to use, e.g. pinot-json
, pinot-avro
, pinot-kafka-2.0...
You can also configure the S3 filesystem using the following options:
Configuration
Description
region
The AWS Data center region in which the bucket is located
accessKey
(Optional) AWS access key required for authentication. This should only be used for testing purposes as we don't store these keys in secret.
secretKey
(Optional) AWS secret key required for authentication. This should only be used for testing purposes as we don't store these keys in secret.
endpoint
(Optional) Override endpoint for s3 client.
disableAcl
If this is set tofalse
, bucket owner is granted full access to the objects created by pinot. Default value is true
.
serverSideEncryption
(Optional) The server-side encryption algorithm used when storing this object in Amazon S3 (Now supports aws:kms
), set to null to disable SSE.
ssekmsKeyId
(Optional, but required when serverSideEncryption=aws:kms
) Specifies the AWS KMS key ID to use for object encryption. All GET and PUT requests for an object protected by AWS KMS will fail if not made via SSL or using SigV4.
ssekmsEncryptionContext
(Optional) Specifies the AWS KMS Encryption Context to use for object encryption. The value of this header is a base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs.
Each of these properties should be prefixed by pinot.[node].storage.factory.s3.
where node
is either controller
or server
depending on the config
e.g.
S3 Filesystem supports authentication using the DefaultCredentialsProviderChain. The credential provider looks for the credentials in the following order -
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups.
To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config
where the Kinesis specific properties are:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups. You can also specify other aws fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX_VALUE, we will overflow
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
Complex-type handling in Apache Pinot.
It's common for ingested data to have a complex structure. For example, Avro schemas have records and arrays and JSON supports objects and arrays.
Apache Pinot's data model supports primitive data types (including int, long, float, double, BigDecimal string, bytes), as well as limited multi-value types such as an array of primitive types (multi-valued BigDecimal type is not supported). Such simple data types allow Pinot to build fast indexing structures for good query performance, but it requires some handling of the complex structures.
Support for BIG_DECIMAL
type is added after release 0.10.0
.
There are in general two options for such handling:
Convert the complex-type data into JSON string and then build a JSON index
Use the inbuilt complex-type handling rules in the ingestion config.
On this page, we'll show how to handle this complex-type structure with these two approaches, to process the example data in the following figure, which is a field group
from the Meetup events Quickstart example.
This object has two child fields and the child group
is a nested array with elements of object type.
Apache Pinot provides a powerful JSON index to accelerate the value lookup and filtering for the column. To convert an object group
with complex type to JSON, you can add the following config to table config.
The config transformConfigs
transforms the object group
to a JSON string group_json
, which then creates the JSON indexing with config jsonIndexColumns
. To read the full spec, see json_meetupRsvp_realtime_table_config.json.
Also, note that group
is a reserved keyword in SQL and therefore needs to be quoted in transformFunction
.
The columnName
can't use the same name as any of the fields in the source JSON data e.g. if our source data contains the field group
and we want to transform the data in that field before persisting it, the destination column name would need to be something different, like group_json
.
Additionally, you need to overwrite the maxLength
of the field group_json
on the schema, because by default, a string column has a limited length. For example,
For the full spec, see json_meetupRsvp_schema.json.
With this, you can start to query the nested fields under group
. For the details about the supported JSON function, see guide).
Though JSON indexing is a handy way to process the complex types, there are some limitations:
It’s not performant to group by or order by a JSON field, because JSON_EXTRACT_SCALAR
is needed to extract the values in the GROUP BY and ORDER BY clauses, which invokes the function evaluation.
For cases that you want to use Pinot's multi-column functions such as DISTINCTCOUNTMV
Alternatively, from Pinot 0.8, you can use the complex-type handling in ingestion configurations to flatten and unnest the complex structure and convert them into primitive types. Then you can reduce the complex-type data into a flattened Pinot table, and query it via SQL. With the inbuilt processing rules, you do not need to write ETL jobs in another compute framework such as Flink or Spark.
To process this complex type, you can add the configuration complexTypeConfig
to the ingestionConfig
. For example:
With the complexTypeConfig
, all the map objects will be flattened to direct fields automatically. And with unnestFields
, a record with the nested collection will unnest into multiple records. For instance, the example at the beginning will transform into two rows with this configuration example.
Note that
The nested field group_id
under group
is flattened to group.group_id
. The default value of the delimiter is .
You can choose another delimiter by specifying the configuration delimiter
under complexTypeConfig
. This flattening rule also applies to maps in the collections to be unnested.
The nested array group_topics
under group
is unnested into the top-level, and converts the output to a collection of two rows. Note the handling of the nested field within group_topics
, and the eventual top-level field of group.group_topics.urlkey
. All the collections to unnest shall be included in the configuration fieldsToUnnest
.
Collections not specified in fieldsToUnnest
will be serialized into JSON string, except for the array of primitive values, which will be ingested as a multi-value column by default. The behavior is defined by the collectionNotUnnestedToJson
config, which takes the following values:
NON_PRIMITIVE
- Converts the array to a multi-value column. (default)
ALL
- Converts the array of primitive values to JSON string.
NONE
- Does not do any conversion.
You can find the full spec of the table config here and the table schema here.
You can then query the table with primitive values using the following SQL query:
.
is a reserved character in SQL, so you need to quote the flattened columns in the query.
When there are complex structures, it can be challenging and tedious to figure out the Pinot schema manually. To help with schema inference, Pinot provides utility tools to take the Avro schema or JSON data as input and output the inferred Pinot schema.
To infer the Pinot schema from Avro schema, you can use the command like the following:
Note you can input configurations like fieldsToUnnest
similar to the ones in complexTypeConfig
. And this will simulate the complex-type handling rules on the Avro schema and output the Pinot schema in the file specified in outputDir
.
Similarly, you can use the command like the following to infer the Pinot schema from a file of JSON objects.
You can check out an example of this run in this PR.
Pinot supports consuming data from via pinot-pulsar
plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.
You can enable pulsar plugin with the following config at the time of Pinot setup
-Dplugins.include=pinot-pulsar
pinot-pulsar
plugin is not part of official 0.10.0 binary. You can download the plugin from and add it to libs
or plugins
directory in pinot.
A sample Pulsar stream config to ingest data should look as follows. You can use the streamConfigs
section from this sample and make changes for your corresponding table.
You can change the following Pulsar specifc configurations for your tables
Also, make sure to change the brokers url from pulsar://localhost:6650
to pulsar+ssl://localhost:6650
so that secure connections are used.
PInot currently relies on Pulsar client version 2.7.2. Users should make sure the Pulsar broker is compatible with the this client version.
Upsert support in Apache Pinot.
Pinot provides native support of upsert during the real-time ingestion (v0.6.0+). There are scenarios that the records need modifications, such as correcting a ride fare and updating a delivery status.
With the foundation of full upsert support in Pinot, another category of use cases on partial upsert are enabled (v0.8.0+). Partial upsert is convenient to users so that they only need to specify the columns whose value changes, and ignore the others.
To enable upsert on a Pinot table, there are a couple of configurations to make on the table configurations as well as on the input stream.
To update a record, a primary key is needed to uniquely identify the record. To define a primary key, add the field primaryKeyColumns
to the schema definition. For example, the schema definition of UpsertMeetupRSVP
in the quick start example has this definition.
Note this field expects a list of columns, as the primary key can be composite.
When two records of the same primary key are ingested, the record with the greater comparison value (timeColumn by default) is used. When records with the same primary key and event time, then the order is not determined. In most cases, the later ingested record will be used, but may not be so in the cases when the table has a column to sort by.
Partition the input stream by the primary key
An important requirement for the Pinot upsert table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the API. If the original stream is not partitioned, then a streaming processing job (e.g. Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.
There are a few configurations needed in the table configurations to enable upsert.
Full upsert
The upsert mode defaults to NONE
for realtime tables. To enable the full upsert, set the mode
to FULL
for the full update. FULL upsert means that a new record will replace the older record completely if they have same primary key. Example config:
Partial upserts
Partial upsert support is also added in release-0.8.0
. With this feature, users can choose to update only specific columns and ignore the rest.
To enable the partial upsert, set the mode
to PARTIAL
and specify partialUpsertStrategies
for partial upsert columns. Since release-0.10.0
, OVERWRITE
is used as the default strategy for columns without a specified strategy. defaultPartialUpsertStrategy
is also introduced to change the default strategy for all columns. For example:
Pinot supports the following partial upsert strategies:
With partial upsert, if the value is null
in either the existing record or the new coming record, Pinot will ignore the upsert strategy and the null
value:
(null
, newValue) -> newValue
(oldValue, null
) -> oldValue
(null
, null
) -> null
By default, Pinot uses the value in the time column (timeColumn
in tableConfig) to determine the latest record. That means, for two records with the same primary key, the record with the larger value of the time column is picked as the latest update. However, there are cases when users need to use another column to determine the order. In such case, you can use option comparisonColumn
to override the column used for comparison. For example,
For partial upsert table, the out-of-order events won't be consumed and indexed. For example, for two records with the same primary key, if the record with the smaller value of the comparison column came later than the other record, it will be skipped.
Upsert snapshot support is also added in release-0.12.0
. To enable the snapshot, set the enableSnapshot
to true
. For example:
Upsert maintains metadata in memory containing which docIds are valid in a particular segment (ValidDocIndexes). This metadata gets lost during server restarts and needs to be recreated again. ValidDocIndexes can not be recovered easily after out-of-TTL primary keys get removed. Enabling snapshots addresses this problem by adding functions to store and recover validDocIds snapshot for Immutable Segments We recommend that you enable this feature so as to speed up server boot times during restarts.
The lifecycle for validDocIds snapshots are shows as follows,
If snapshot is enabled, load validDocIds from snapshot during add segments.
If snapshot is not enabled, delete validDocIds snapshots during add segments if exists.
If snapshot is enabled, persist validDocIds snapshot for immutable segments when removing segment.
There are some limitations for the upsert Pinot tables.
The high-level consumer is not allowed for the input stream ingestion, which means stream.[consumerName].consumer.type
must always be lowLevel
.
The star-tree index cannot be used for indexing, as the star-tree index performs pre-aggregation during the ingestion.
Unlike append-only tables, out-of-order events (with comparison value in incoming record less than the latest available value) won't be consumed and indexed by Pinot partial upsert table, these late events will be skipped.
Unlike other real-time tables, Upsert table takes up more memory resources as it needs to bookkeep the record locations in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Upsert table.
The number of partitions in input streams determines the partition numbers of the Pinot table. The more partitions you have in input topic/stream, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally. Do note that you can't increase the partitions in future for upsert enabled tables so you need to start with good enough partitions (atleast 2-3X the number of pinot servers)
Upsert table maintains an in-memory map from the primary key to the record location. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction
config in the Upsert config, which can be MD5
or MURMUR3
, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.
Set up a dashboard over the metric pinot.server.upsertPrimaryKeysCount.tableName
to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth. The total memory usage by upsert is roughly (primaryKeysCount * (sizeOfKeyInBytes + 24))
It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the rate of the primary keys in the input stream per partition and extrapolate the data to a specific time period (based on table retention) to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an upsert table instance.
Putting these together, you can find the table configurations of the quick start example as the following:
Pinot server maintains a primary key to record location map across all the segments served in an upsert-enabled table. As a result, when updating the config for an existing upsert table (e.g. change the columns in the primary key, change the comparison column), servers need to be restarted in order to apply the changes and rebuild the map.
To illustrate how the full upsert works, the Pinot binary comes with a quick start example. Use the following command to creates a realtime upsert table meetupRSVP
.
You can also run partial upsert demo with the following command
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to checkout the realtime data.
For partial upsert you can see only the value from configured column changed based on specified partial upsert strategy.
An example for partial upsert is shown below, each of the event_id kept being unique during ingestion, meanwhile the value of rsvp_count incremented.
To see the difference from the non-upsert table, you can use a query option skipUpsert
to skip the upsert effect in the query result.
Can I change primary key columns in existing upsert table?
Yes, you can add or delete columns to primary keys as long as input stream is partitioned on one of the primary key columns. However, you need to restart all Pinot servers so that it can rebuild the primary key to record location map with the new columns.
This guide shows you how to import data from GCP (Google Cloud Platform).
You can enable the using the plugin pinot-gcs
. In the controller or server, add the config -
By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include
, you need to put all the plugins you want to use, e.g. pinot-json
, pinot-avro
, pinot-kafka-2.0...
GCP filesystems provides the following options -
projectId
- The name of the Google Cloud Platform project under which you have created your storage bucket.
gcpKey
- Location of the json file containing GCP keys. You can refer to download the keys.
Each of these properties should be prefixed by pinot.[node].storage.factory.class.gs.
where node
is either controller
or server
depending on the config
e.g.
Kinesis supports authentication using the . The credential provider looks for the credentials in the following order -
Pinot-Pulsar connector supports authentication using the security tokens. You can generate the token by following the . Once generated, you can add the following property to streamConfigs
to add auth token for each request
Pinot-pulsar connecor also supports TLS for encrypted connections. You can follow to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.
For other table and stream configurations, you can headover to
The upsert Pinot table can use only the low-level consumer for the input streams. As a result, it uses the for the segments. Moreover, upsert poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires to use strictReplicaGroup
as the routing strategy. To use that, configure instanceSelectorType
in Routing
as the following:
streamType
This should be set to "pulsar"
stream.pulsar.topic.name
Your pulsar topic name
stream.pulsar.bootstrap.servers
Comma-seperated broker list for Apache Pulsar
OVERWRITE
Overwrite the column of the last record
INCREMENT
Add the new value to the existing values
APPEND
Add the new item to the Pinot unordered set
UNION
Add the new item to the Pinot unordered set if not exists
IGNORE
Ignore the new value, keep the existing value (v0.10.0+)
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number, AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.
This section contains a collection of guides that will show you how to import data from a Pinot supported input format.
Pinot offers support for various popular input formats during ingestion. By changing the input format, you can reduce the time spent doing serialization-deserialization and speed up the ingestion.
The input format can be changed using the recordReaderSpec
config in the ingestion job spec.
The config consists of the following keys:
dataFormat
- Name of the data format to consume.
className
- name of the class that implements the RecordReader
interface. This class is used for parsing the data.
configClassName
- name of the class that implements the RecordReaderConfig
interface. This class is used the parse the values mentioned in configs
configs
- Key value pair for format specific configs. This field can be left out.
Pinot supports the multiple input formats out of the box. You just need to specify the corresponding readers and the associated custom configs to switch between the formats.
CSV Record Reader supports the following configs -
fileFormat
- can be one of default, rfc4180, excel, tdf, mysql
header
- header of the file. The columnNames should be seperated by the delimiter mentioned in the config
delimiter
- The character seperating the columns
multiValueDelimiter
- The character seperating multiple values in a single column. This can be used to split a column into a list.
Supported from 0.11 release -
skipHeader
- skip header record in the file. Boolean
ignoreEmptyLines
- ignore empty lines instead of consuming them and filling with default values. Boolean
ignoreSurroundingSpaces
- ignore spaces around column names and values. Boolean
quoteCharacter
- single character that is being used for quotes in CSV files
recordSeparator
- character used to seperate records in the input file. Default is \n
or \r\n
depending on the platform.
nullStringValue
- string value the represents null in CSV files. Default is empty string.
Your CSV file may have raw text fields that cannot be reliably delimited using any character. In this case, explicitly set the multiValueDelimeter field to empty in the ingestion config.
multiValueDelimiter: ''
The Avro record reader converts the data in file to a GenericRecord
. A java class or .avro
file is not required. By default the avro record reader only supports primitive types. You can set enableLogicalTypes
to true
to enable support for rest of the avro data types.
We use the following conversion table to translate between avro and pinot data types. The conversions are done using the offical avro methods present in org.apache.avro.Conversions
INT
INT
LONG
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
STRING
STRING
ENUM
STRING
BYTES
BYTES
FIXED
BYTES
MAP
JSON
ARRAY
JSON
RECORD
JSON
UNION
JSON
DECIMAL
BYTES
UUID
STRING
DATE
STRING
yyyy-MM-dd
format
TIME_MILLIS
STRING
HH:mm:ss.SSS
format
TIME_MICROS
STRING
HH:mm:ss.SSSSSS
format
TIMESTAMP_MILLIS
TIMESTAMP
TIMESTAMP_MICROS
TIMESTAMP
Thrift requires the generated class using .thrift
file to parse the data. The .class file should be available in the Pinot's classpath. You can put the files in the lib/
folder of pinot distribution directory.
Since 0.11.0 release, The Parquet record reader determines whether to use ParquetAvroRecordReader
or ParquetNativeRecordReader
to read records. The reader looks for parquet.avro.schema
or avro.schema
key in the parquet file footer and if present uses the Avro reader.
Users can however change the record reader manually in case of a misconfiguration.
For the support of DECIMAL and other parquet native data types, always use ParquetNativeRecordReader
INT96
LONG
ParquetINT96
type converts nanoseconds
to Pinot INT64
type of milliseconds
INT64
LONG
INT32
INT
FLOAT
FLOAT
DOUBLE
DOUBLE
BINARY
BYTES
FIXED-LEN-BYTE-ARRAY
BYTES
DECIMAL
DOUBLE
ENUM
STRING
UTF8
STRING
REPEATED
MULTIVALUE/MAP (represented as MV
if parquet original type is LIST, then it is converted to MULTIVALUE column otherwise a MAP column.
For ParquetAvroRecordReader
, you can refer to the Avro section above for the type conversions.
ORC record reader supports the following data types -
ORC Data Type
Java Data Type
BOOLEAN
String
SHORT
Integer
INT
Integer
LONG
Integer
FLOAT
Float
DOUBLE
Double
STRING
String
VARCHAR
String
CHAR
String
LIST
Object[]
MAP
Map<Object, Object>
DATE
Long
TIMESTAMP
Long
BINARY
byte[]
BYTE
Integer
In LIST and MAP types, the object should only belong to one of the data types supported by Pinot.
The reader requires a descriptor file to deserialize the data present in the files. You can generate the descriptor file (.desc
) from the .proto
file using the command -
This guide shows you how to import data from files stored in Azure Data Lake Storage Gen2 (ADLS Gen2)
You can enable the Azure Data Lake Storage using the plugin pinot-adls
. In the controller or server, add the config -
By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include
, you need to put all the plugins you want to use, e.g. pinot-json
, pinot-avro
, pinot-kafka-2.0...
Azure Blob Storage provides the following options -
accountName
: Name of the azure account under which the storage is created
accessKey
: access key required for the authentication
fileSystemName
- name of the filesystem to use i.e. container name (container name is similar to bucket name in S3)
enableChecksum
- enable MD5 checksum for verification. Default is false
.
Each of these properties should be prefixed by pinot.[node].storage.factory.class.adl2.
where node
is either controller
or server
depending on the config
e.g.