Batch ingestion of data into Apache Pinot using dimension tables.
Dimension tables are a special kind of offline tables from which data can be looked up via the lookup UDF, providing join-like functionality.
Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups. When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.
A dimension table cannot be part of a hybrid table.
Configure dimension tables using following properties in the table configuration:
isDimTable
: Set to true.
segmentsConfig.segmentPushType
: Set to REFRESH
.
dimensionTableConfig.disablePreload
: By default, dimension tables are preloaded to allow for fast lookups. Set to true
to trade off speed for memory by storing only the segment reference and docID. Otherwise, the whole row is stored in the Dimension table hash map.
controller.dimTable.maxSize
: Determines the maximum size quota for a dimension table in a cluster. Table creation will fail if the storage quota exceeds this maximum size.
dimensionFieldSpecs
: To look up dimension values, dimension tables need a primary key. For details, see dimensionFieldSpecs
.
Batch ingestion of data into Apache Pinot using Apache Hadoop.
Pinot supports Apache Hadoop as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
You can follow the wiki to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
Next, you need to change the execution config in the job spec to the following -
You can check out the sample job spec here.
Finally execute the hadoop job using the command -
Ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.
The MapReduce job called SegmentPreprocessingJob
would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.
Check the below example to see how to use SegmentPreprocessingJob
.
In Hadoop properties, set the following to enable this job:
In table config, specify the operations in preprocessing.operations
that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:
Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.
Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.
For more details on this MR job, refer to this document.
Batch ingestion of data into Apache Pinot using Apache Flink.
Pinot supports Apache Flink as a processing framework to push segment files to the database.
Pinot distribution contains an Apache Flink that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.
Here is an example code snippet to show how to utilize the in a Flink streaming application:
PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.
Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.
Here is an example table config
the only required configurations are:
"outputDirURI"
: where PinotSinkFunction should write the constructed segment file to
"push.controllerUri"
: which Pinot cluster (controller) URL PinotSinkFunction should communicate with.
The rest of the configurations are standard for any Pinot table.
Batch ingestion of data into Apache Pinot.
With batch ingestion you create a table using data already present in a file system such as S3. This is particularly useful when you want to use Pinot to query across large data with minimal latency or to test out new features using a simple data file.
To ingest data from a filesystem, perform the following steps, which are described in more detail in this page:
Create schema configuration
Create table configuration
Upload schema and table configs
Upload data
Batch ingestion currently supports the following mechanisms to upload the data:
Standalone
Here's an example using standalone local processing.
First, create a table using the following CSV data.
In our data, the only column on which aggregations can be performed is score
. Secondly, timestampInEpoch
is the only timestamp column. So, on our schema, we keep score
as metric and timestampInEpoch
as timestamp column.
Here, we have also defined two extra fields: format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it's in milliseconds, so we've specified 1:MILLISECONDS:EPOCH
.
We define a table transcript
and map the schema created in the previous step to the table. For batch data, we keep the tableType
as OFFLINE
.
Now that we have both the configs, upload them and create a table by running the following command:
Check out the table config and schema in the \[Rest API]
to make sure it was successfully uploaded.
We now have an empty table in Pinot. Next, upload the CSV file to this empty table.
A table is composed of multiple segments. The segments can be created in the following three ways:
Minion based ingestion\
Upload API\
Ingestion jobs
There are 2 controller APIs that can be used for a quick ingestion test using a small file.
When these APIs are invoked, the controller has to download the file and build the segment locally.
Hence, these APIs are NOT meant for production environments and for large input files.
This API creates a segment using the given file and pushes it to Pinot. All steps happen on the controller.
Example usage:
To upload a JSON file data.json
to a table called foo_OFFLINE
, use below command
Note that query params need to be URLEncoded. For example, {"inputFormat":"json"} in the command below needs to be converted to %7B%22inputFormat%22%3A%22json%22%7D.
The batchConfigMapStr
can be used to pass in additional properties needed for decoding the file. For example, in case of csv, you may need to provide the delimiter
This API creates a segment using file at the given URI and pushes it to Pinot. Properties to access the FS need to be provided in the batchConfigMap. All steps happen on the controller. Example usage:
Segments can be created and uploaded using tasks known as DataIngestionJobs
. A job also needs a config of its own. We call this config the JobSpec
.
For our CSV file and table, the JobSpec
should look like this:
Now that we have the job spec for our table transcript
, we can trigger the job using the following command:
Once the job successfully finishes, head over to the \[query console]
and start playing with the data.
There are 3 ways to upload a Pinot segment:
Segment tar push
Segment URI push
Segment metadata push
This is the original and default push mechanism.
Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.
The push job will:
Upload the entire segment tar file to the Pinot controller.
Pinot controller will:
Save the segment into the controller segment directory(Local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism requires the segment tar file stored on a deep store with a globally accessible segment tar URI.
URI push is light-weight on the client-side, and the controller side requires equivalent work as the tar push.
The push job will:
POST this segment tar URI to the Pinot controller.
Pinot controller will:
Download segment from the URI and save it to controller segment directory (local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism also requires the segment tar file stored on a deep store with a globally accessible segment tar URI.
Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.
The push job will:
Download the segment based on URI.
Extract metadata.
Upload metadata to the Pinot Controller.
Pinot Controller will:
Add the segment to the table based on the metadata.
4. Segment Metadata Push with copyToDeepStore
This extends the original Segment Metadata Push for cases, where the segments are pushed to a location not used as deep store. The ingestion job can still do metadata push but ask Pinot Controller to copy the segments into deep store. Those use cases usually happen when the ingestion jobs don't have direct access to deep store but still want to use metadata push for its efficiency, thus using a staging location to keep the segments temporarily.
NOTE: the staging location and deep store have to use same storage scheme, like both on s3. This is because the copy is done via PinotFS.copyDir interface that assumes so; but also because this does copy at storage system side, so segments don't need to go through Pinot Controller at all.
To make this work, grant Pinot controllers access to the staging location. For example on AWS, this may require adding an access policy like this example for the controller EC2 instances:
Then use metadata push to add one extra config like this one:
Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent results due to interleaving of old and new data.
When Pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot controller and server:
Push segment to other systems and implement your own segment fetcher to pull data from those systems.
Since pinot is written in Java, you can set the following basic Java configurations to tune the segment runner job -
Log4j2 file location with -Dlog4j2.configurationFile
Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
JVM props, like -Xmx8g -Xms4G
If you are using the docker, you can set the following under JAVA_OPTS
variable.
You can set -D mapreduce.map.memory.mb=8192
to set the mapper memory size when submitting the Hadoop job.
You can add config spark.executor.memory
to tune the memory usage for segment creation when submitting the Spark job.