Batch ingestion of data into Apache Pinot using Apache Flink.
Pinot supports Apache Flink as a processing framework to push segment files to the database.
Pinot distribution contains an Apache Flink SinkFunction that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.
Here is an example code snippet to show how to utilize the PinotSinkFunction in a Flink streaming application:
As in the example shown above, the only required information from the Pinot side is the table schema and the table config.
For a more detailed executable, refer to the quick start example.
PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.
Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.
Here is an example table config
the only required configurations are:
"outputDirURI"
: where PinotSinkFunction should write the constructed segment file to
"push.controllerUri"
: which Pinot cluster (controller) URL PinotSinkFunction should communicate with.
The rest of the configurations are standard for any Pinot table.
Batch ingestion of backfill data into Apache Pinot.
Pinot batch ingestion involves two parts: routine ingestion job(hourly/daily) and backfill. Here are some examples to show how routine batch ingestion works in Pinot offline table:
High-level description
Organize raw data into buckets (eg: /var/pinot/airlineStats/rawdata/2014/01/01). Each bucket typically contains several files (eg: /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01_0.avro)
Run a Pinot batch ingestion job, which points to a specific date folder like ‘/var/pinot/airlineStats/rawdata/2014/01/01’. The segment generation job will convert each such avro file into a Pinot segment for that day and give it a unique name.
Run Pinot segment push job to upload those segments with those uniques names via a Controller API
IMPORTANT: The segment name is the unique identifier used to uniquely identify that segment in Pinot. If the controller gets an upload request for a segment with the same name - it will attempt to replace it with the new one.
This newly uploaded data can now be queried in Pinot. However, sometimes users will make changes to the raw data which need to be reflected in Pinot. This process is known as 'Backfill'.
Pinot supports data modification only at the segment level, which means you must update entire segments for doing backfills. The high level idea is to repeat steps 2 (segment generation) and 3 (segment upload) mentioned above:
Backfill jobs must run at the same granularity as the daily job. E.g., if you need to backfill data for 2014/01/01, specify that input folder for your backfill job (e.g.: ‘/var/pinot/airlineStats/rawdata/2014/01/01’)
The backfill job will then generate segments with the same name as the original job (with the new data).
When uploading those segments to Pinot, the controller will replace the old segments with the new ones (segment names act like primary keys within Pinot) one by one.
Backfill jobs expect the same number of (or more) data files on the backfill date. So the segment generation job will create the same number of (or more) segments than the original run.
For example, assuming table airlineStats has 2 segments(airlineStats_2014-01-01_2014-01-01_0, airlineStats_2014-01-01_2014-01-01_1) on date 2014/01/01 and the backfill input directory contains only 1 input file. Then the segment generation job will create just one segment: airlineStats_2014-01-01_2014-01-01_0. After the segment push job, only segment airlineStats_2014-01-01_2014-01-01_0 got replaced and stale data in segment airlineStats_2014-01-01_2014-01-01_1 are still there.
If the raw data is modified in such a way that the original time bucket has fewer input files than the first ingestion run, backfill will fail.
Batch ingestion of data into Apache Pinot using Apache Hadoop.
Pinot supports as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
You can follow the to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
Next, you need to change the execution config in the job spec to the following -
You can check out the sample job spec here.
Finally execute the hadoop job using the command -
Ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.
The MapReduce job called SegmentPreprocessingJob
would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.
Check the below example to see how to use SegmentPreprocessingJob
.
In Hadoop properties, set the following to enable this job:
In table config, specify the operations in preprocessing.operations
that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:
Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.
Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.
For more details on this MR job, refer to this .
Batch ingestion of data into Apache Pinot using dimension tables.
Dimension tables are a special kind of offline tables from which data can be looked up via the lookup UDF, providing join-like functionality.
Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups. When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.
A dimension table cannot be part of a hybrid table.
Configure dimension tables using following properties in the table configuration:
isDimTable
: Set to true.
segmentsConfig.segmentPushType
: Set to REFRESH
.
dimensionTableConfig.disablePreload
: By default, dimension tables are preloaded to allow for fast lookups. Set to true
to trade off speed for memory by storing only the segment reference and docID. Otherwise, the whole row is stored in the Dimension table hash map.
controller.dimTable.maxSize
: Determines the maximum size quota for a dimension table in a cluster. Table creation will fail if the storage quota exceeds this maximum size.
dimensionFieldSpecs
: To look up dimension values, dimension tables need a primary key. For details, see dimensionFieldSpecs
.
Batch ingestion of data into Apache Pinot using Apache Spark.
Pinot supports Apache Spark (2.x and 3.x) as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
To set up Spark, do one of the following:
Use the Spark-Pinot Connector. For more information, see the .
Follow the instructions below.
You can follow the to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
If you do build Pinot from Source, you should consider opting into using the build-shaded-jar
jar profile with -Pbuild-shaded-jar
. While Pinot does not bundle spark into its jar, it does bundle certain hadoop libraries.
Next, you need to change the execution config in the to the following:
To run Spark ingestion, you need the following jars in your classpath
pinot-batch-ingestion-spark
plugin jar - available in plugins-external
directory in the package
pinot-all
jar - available in lib
directory in the package
These jars can be specified using spark.driver.extraClassPath
or any other option.
For loading any other plugins that you want to use, use:
The complete spark-submit command should look like this:
Ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
Note: You should change the master
to yarn
and deploy-mode
to cluster
for production environments.
If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -