arrow-left

All pages
gitbookPowered by GitBook
1 of 6

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Batch Ingestion

Batch ingestion of data into Apache Pinot.

With batch ingestion you create a table using data already present in a file system such as S3. This is particularly useful when you want to use Pinot to query across large data with minimal latency or to test out new features using a simple data file.

To ingest data from a filesystem, perform the following steps, which are described in more detail in this page:

  1. Create schema configuration

  2. Create table configuration

  3. Upload schema and table configs

  4. Upload data

Batch ingestion currently supports the following mechanisms to upload the data:

  • Standalone

Here's an example using standalone local processing.

First, create a table using the following CSV data.

hashtag
Create schema configuration

In our data, the only column on which aggregations can be performed is score. Secondly, timestampInEpoch is the only timestamp column. So, on our schema, we keep score as metric and timestampInEpoch as timestamp column.

Here, we have also defined two extra fields: format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it's in milliseconds, so we've specified 1:MILLISECONDS:EPOCH.

hashtag
Create table configuration

We define a table transcript and map the schema created in the previous step to the table. For batch data, we keep the tableType as OFFLINE.

hashtag
Upload schema and table configs

Now that we have both the configs, upload them and create a table by running the following command:

Check out the table config and schema in the \[Rest API] to make sure it was successfully uploaded.

hashtag
Upload data

We now have an empty table in Pinot. Next, upload the CSV file to this empty table.

A table is composed of multiple segments. The segments can be created in the following three ways:

  • Minion based ingestion\

  • Upload API\

  • Ingestion jobs

hashtag
Minion-based ingestion

Refer to

hashtag
Upload API

There are 2 controller APIs that can be used for a quick ingestion test using a small file.

triangle-exclamation

When these APIs are invoked, the controller has to download the file and build the segment locally.

Hence, these APIs are NOT meant for production environments and for large input files.

hashtag
/ingestFromFile

This API creates a segment using the given file and pushes it to Pinot. All steps happen on the controller.

Example usage:

To upload a JSON file data.json to a table called foo_OFFLINE, use below command

Note that query params need to be URLEncoded. For example, {"inputFormat":"json"} in the command below needs to be converted to %7B%22inputFormat%22%3A%22json%22%7D.

The batchConfigMapStr can be used to pass in additional properties needed for decoding the file. For example, in case of csv, you may need to provide the delimiter

hashtag
/ingestFromURI

This API creates a segment using file at the given URI and pushes it to Pinot. Properties to access the FS need to be provided in the batchConfigMap. All steps happen on the controller. Example usage:

hashtag
Ingestion jobs

Segments can be created and uploaded using tasks known as DataIngestionJobs. A job also needs a config of its own. We call this config the JobSpec.

For our CSV file and table, the JobSpec should look like this:

For more detail, refer to .

Now that we have the job spec for our table transcript, we can trigger the job using the following command:

Once the job successfully finishes, head over to the \[query console] and start playing with the data.

hashtag
Segment push job type

There are 3 ways to upload a Pinot segment:

  • Segment tar push

  • Segment URI push

  • Segment metadata push

hashtag
Segment tar push

This is the original and default push mechanism.

Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.

The push job will:

  1. Upload the entire segment tar file to the Pinot controller.

Pinot controller will:

  1. Save the segment into the controller segment directory(Local or any PinotFS).

  2. Extract segment metadata.

  3. Add the segment to the table.

hashtag
Segment URI push

This push mechanism requires the segment tar file stored on a deep store with a globally accessible segment tar URI.

URI push is light-weight on the client-side, and the controller side requires equivalent work as the tar push.

The push job will:

  1. POST this segment tar URI to the Pinot controller.

Pinot controller will:

  1. Download segment from the URI and save it to controller segment directory (local or any PinotFS).

  2. Extract segment metadata.

  3. Add the segment to the table.

hashtag
Segment metadata push

This push mechanism also requires the segment tar file stored on a deep store with a globally accessible segment tar URI.

Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.

The push job will:

  1. Download the segment based on URI.

  2. Extract metadata.

  3. Upload metadata to the Pinot Controller.

Pinot Controller will:

  1. Add the segment to the table based on the metadata.

4. Segment Metadata Push with copyToDeepStore

This extends the original Segment Metadata Push for cases, where the segments are pushed to a location not used as deep store. The ingestion job can still do metadata push but ask Pinot Controller to copy the segments into deep store. Those use cases usually happen when the ingestion jobs don't have direct access to deep store but still want to use metadata push for its efficiency, thus using a staging location to keep the segments temporarily.

NOTE: the staging location and deep store have to use same storage scheme, like both on s3. This is because the copy is done via PinotFS.copyDir interface that assumes so; but also because this does copy at storage system side, so segments don't need to go through Pinot Controller at all.

To make this work, grant Pinot controllers access to the staging location. For example on AWS, this may require adding an access policy like this example for the controller EC2 instances:

Then use metadata push to add one extra config like this one:

hashtag
Consistent data push and rollback

Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent results due to interleaving of old and new data.

See for how to enable this feature.

hashtag
Segment fetchers

When Pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot controller and server:

  1. Push segment to shared NFS and let pinot pull segment files from the location of that NFS. See .

  2. Push segment to a Web server and let pinot pull segment files from the Web server with HTTP/HTTPS link. See .

  3. Push segment to PinotFS(HDFS/S3/GCS/ADLS) and let pinot pull segment files from PinotFS URI. See and .

The first three options are supported out of the box within the Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files, it will pick up the file and allocate it to proper Pinot servers and brokers. To enable Pinot support for PinotFS, you'll need to provide configuration and proper Hadoop dependencies.

hashtag
Persistence

By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of a system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add deep storage. Checkout for all the info and related configs.

hashtag
Tuning

hashtag
Standalone

Since pinot is written in Java, you can set the following basic Java configurations to tune the segment runner job -

  • Log4j2 file location with -Dlog4j2.configurationFile

  • Plugin directory location with -Dplugins.dir=/opt/pinot/plugins

  • JVM props, like -Xmx8g -Xms4G

If you are using the docker, you can set the following under JAVA_OPTS variable.

hashtag
Hadoop

You can set -D mapreduce.map.memory.mb=8192 to set the mapper memory size when submitting the Hadoop job.

hashtag
Spark

You can add config spark.executor.memory to tune the memory usage for segment creation when submitting the Spark job.

Spark

Batch ingestion of data into Apache Pinot using Apache Spark.

Pinot supports Apache Spark (2.x and 3.x) as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

To set up Spark, do one of the following:

  • Use the Spark-Pinot Connector. For more information, see the .

Push segment to other systems and implement your own segment fetcher to pull data from those systems.

Hadoop
Spark
SegmentGenerationAndPushTask
Ingestion job spec
Consistent Push and Rollback
Segment URI Pusharrow-up-right
Segment URI Pusharrow-up-right
Segment URI Pusharrow-up-right
Segment Metadata Pusharrow-up-right
PinotFS
File systems
Follow the instructions below.

You can follow the wiki to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

If you do build Pinot from Source, you should consider opting into using the build-shaded-jar jar profile with -Pbuild-shaded-jar. While Pinot does not bundle spark into its jar, it does bundle certain hadoop libraries.

Next, you need to change the execution config in the job spec to the following:

To run Spark ingestion, you need the following jars in your classpath

  • pinot-batch-ingestion-spark plugin jar - available in plugins-external directory in the package

  • pinot-all jar - available in lib directory in the package

These jars can be specified using spark.driver.extraClassPath or any other option.

For loading any other plugins that you want to use, use:

The complete spark-submit command should look like this:

Ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

Note: You should change the master to yarn and deploy-mode to cluster for production environments.

circle-info

We have stopped including spark-core dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark in case of any runtime issues. You can either build from source or download latest master build jars.

hashtag
Running in Cluster Mode on YARN

If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -

  • Build Pinot from source with option -DuseProvidedHadoop

  • Copy Pinot binaries to S3, HDFS or any other distributed storage that is accessible from all nodes.

  • Copy Ingestion spec YAML file to S3, HDFS or any other distributed storage. Mention this path as part of --files argument in the command

  • Add --jars options that contain the s3/hdfs paths to all the required plugin and pinot-all jar

  • Point classPath to spark working directory. Generally, just specifying the jar names without any paths works. Same should be done for main jar as well as the spec YAML file

Example

circle-check

For Spark 3.x, replace pinot-batch-ingestion-spark-2.4 with pinot-batch-ingestion-spark-3.2 in all places in the commands. Also, ensure the classpath in ingestion spec is changed from org.apache.pinot.plugin.ingestion.batch.spark. to org.apache.pinot.plugin.ingestion.batch.spark3.

hashtag
FAQ

Q - I am getting the following exception - Class has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

Since 0.8.0 release, Pinot binaries are compiled with JDK 11. If you are using Spark along with Hadoop 2.7+, you need to use the Java8 version of Pinot. Currently, you need to build jdk 8 version from source.

Q - I am not able to find pinot-batch-ingestion-spark jar.

For Pinot version prior to 0.10.0, the spark plugin is located in plugin dir of binary distribution. For 0.10.0 and later, it is located in pinot-external dir.

Q - Spark is not able to find the jars leading to java.nio.file.NoSuchFileException

This means the classpath for spark job has not been configured properly. If you are running spark in a distributed environment such as Yarn or k8s, make sure both spark.driver.classpath and spark.executor.classpath are set. Also, the jars in driver.classpath should be added to --jars argument in spark-submit so that spark can distribute those jars to all the nodes in your cluster. You also need to take provide appropriate scheme with the file path when running the jar. In this doc, we have used local:\\ but it can be different depending on your cluster setup.

Q - Spark job failing while pushing the segments.

It can be because of misconfigured controllerURI in job spec yaml file. If the controllerURI is correct, make sure it is accessible from all the nodes of your YARN or k8s cluster.

Q - My data gets overwritten during ingestion.

Set segmentPushType to APPEND in the tableConfig.

If already set to APPEND, this is likely due to a missing timeColumnName in your table config. If you can't provide a time column, use our segment name generation configs in ingestion spec. Generally using inputFile segment name generator should fix your issue.

Q - I am getting java.lang.RuntimeException: java.io.IOException: Failed to create directory: pinot-plugins-dir-0/plugins/*

Removing -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins from spark.driver.extraJavaOptions should fix this. As long as plugins are mentioned in classpath and jars argument it should not be an issue.

Q - Getting Class not found: exception

Check if extraClassPath arguments contain all the plugin jars for both driver and executors. Also, all the plugin jars are mentioned in the --jars argument. If both of these are correct, check if the extraClassPath contains local filesystem classpaths and not s3 or hdfs or any other distributed file system classpaths.

ReadMearrow-up-right
studentID,firstName,lastName,gender,subject,score,timestampInEpoch
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000
{
  "schemaName": "transcript",
  "dimensionFieldSpecs": [
    {
      "name": "studentID",
      "dataType": "INT"
    },
    {
      "name": "firstName",
      "dataType": "STRING"
    },
    {
      "name": "lastName",
      "dataType": "STRING"
    },
    {
      "name": "gender",
      "dataType": "STRING"
    },
    {
      "name": "subject",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "score",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "timestampInEpoch",
    "dataType": "LONG",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}
{
  "tableName": "transcript",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "replication": 1,
    "timeColumnName": "timestampInEpoch",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": 365
  },
  "tenants": {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "continueOnError": true,
    "rowTimeValueCheck": true,
    "segmentTimeValueCheck": false

  },
  "metadata": {}
}
bin/pinot-admin.sh AddTable \\
  -tableConfigFile /path/to/table-config.json \\
  -schemaFile /path/to/table-schema.json -exec
curl -X POST -F [email protected] \
  -H "Content-Type: multipart/form-data" \
  "http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
  batchConfigMapStr={"inputFormat":"json"}"
curl -X POST -F [email protected] \
  -H "Content-Type: multipart/form-data" \
  "http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
batchConfigMapStr={
  "inputFormat":"csv",
  "recordReader.prop.delimiter":"|"
}"
curl -X POST "http://localhost:9000/ingestFromURI?tableNameWithType=foo_OFFLINE
&batchConfigMapStr={
  "inputFormat":"json",
  "input.fs.className":"org.apache.pinot.plugin.filesystem.S3PinotFS",
  "input.fs.prop.region":"us-central",
  "input.fs.prop.accessKey":"foo",
  "input.fs.prop.secretKey":"bar"
}
&sourceURIStr=s3://test.bucket/path/to/json/data/data.json"
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'

# Recommended to set jobType to SegmentCreationAndMetadataPush for production environment where Pinot Deep Store is configured  
jobType: SegmentCreationAndTarPush

inputDirURI: '/tmp/pinot-quick-start/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'transcript'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
bin/pinot-admin.sh LaunchDataIngestionJob \\
    -jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yaml
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3:ListAllMyBuckets",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::metadata-push-staging",
                "arn:aws:s3:::metadata-push-staging/*"
            ]
        }
    ]
}
...
jobType: SegmentCreationAndMetadataPush
...
outputDirURI: 's3://metadata-push-staging/stagingDir/'
...
pushJobSpec:
  copyToDeepStoreForMetadataPush: true
...
# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:

  # name: execution framework name
  name: 'spark'

  # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'

  # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'

  # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'

  #segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'

  # extraConfigs: extra configs for execution framework.
  extraConfigs:

    # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
    stagingDir: your/local/dir/staging
spark.driver.extraClassPath =>
pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
spark.driver.extraJavaOptions =>
-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
export PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin

spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master local --deploy-mode client //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
-conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yaml
spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master yarn --deploy-mode cluster //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--conf "spark.executor.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--jars "${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar,${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar"
--files s3://path/to/spark_job_spec.yaml
local://pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile spark_job_spec.yaml

Flink

Batch ingestion of data into Apache Pinot using Apache Flink.

Pinot supports Apache Flink as a processing framework to push segment files to the database.

Pinot distribution contains an Apache Flink SinkFunctionarrow-up-right that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.

hashtag
Example

hashtag
Flink application

Here is an example code snippet to show how to utilize the in a Flink streaming application:

As in the example shown above, the only required information from the Pinot side is the table and the table .

For a more detailed executable, refer to the .

hashtag
Table Config

PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.

Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.

Here is an example table config

the only required configurations are:

  • "outputDirURI": where PinotSinkFunction should write the constructed segment file to

  • "push.controllerUri": which Pinot cluster (controller) URL PinotSinkFunction should communicate with.

The rest of the configurations are standard for any Pinot table.

Backfill Data

Batch ingestion of backfill data into Apache Pinot.

hashtag
Introduction

Pinot batch ingestion involves two parts: routine ingestion job(hourly/daily) and backfill. Here are some examples to show how routine batch ingestion works in Pinot offline table:

  • Batch Ingestion Overviewarrow-up-right

High-level description

  1. Organize raw data into buckets (eg: /var/pinot/airlineStats/rawdata/2014/01/01). Each bucket typically contains several files (eg: /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01_0.avro)

  2. Run a Pinot batch ingestion job, which points to a specific date folder like ‘/var/pinot/airlineStats/rawdata/2014/01/01’. The segment generation job will convert each such avro file into a Pinot segment for that day and give it a unique name.

  3. Run Pinot segment push job to upload those segments with those uniques names via a Controller API

circle-info

IMPORTANT: The segment name is the unique identifier used to uniquely identify that segment in Pinot. If the controller gets an upload request for a segment with the same name - it will attempt to replace it with the new one.

This newly uploaded data can now be queried in Pinot. However, sometimes users will make changes to the raw data which need to be reflected in Pinot. This process is known as 'Backfill'.

hashtag
How to backfill data in Pinot

Pinot supports data modification only at the segment level, which means you must update entire segments for doing backfills. The high level idea is to repeat steps 2 (segment generation) and 3 (segment upload) mentioned above:

  • Backfill jobs must run at the same granularity as the daily job. E.g., if you need to backfill data for 2014/01/01, specify that input folder for your backfill job (e.g.: ‘/var/pinot/airlineStats/rawdata/2014/01/01’)

  • The backfill job will then generate segments with the same name as the original job (with the new data).

  • When uploading those segments to Pinot, the controller will replace the old segments with the new ones (segment names act like primary keys within Pinot) one by one.

hashtag
Edge case example

Backfill jobs expect the same number of (or more) data files on the backfill date. So the segment generation job will create the same number of (or more) segments than the original run.

For example, assuming table airlineStats has 2 segments(airlineStats_2014-01-01_2014-01-01_0, airlineStats_2014-01-01_2014-01-01_1) on date 2014/01/01 and the backfill input directory contains only 1 input file. Then the segment generation job will create just one segment: airlineStats_2014-01-01_2014-01-01_0. After the segment push job, only segment airlineStats_2014-01-01_2014-01-01_0 got replaced and stale data in segment airlineStats_2014-01-01_2014-01-01_1 are still there.

If the raw data is modified in such a way that the original time bucket has fewer input files than the first ingestion run, backfill will fail.

Dimension table

Batch ingestion of data into Apache Pinot using dimension tables.

Dimension tables are a special kind of offline tables from which data can be looked up via the lookup UDF, providing join-like functionality.

Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups.

To mark an offline table as a dimension table, isDimTable should be set to true and segmentsConfig.segementPushType should be set to REFRESH in the table config, like this:

{
  "OFFLINE": {
    "tableName": "dimBaseballTeams_OFFLINE",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "schemaName": "dimBaseballTeams",
      "segmentPushType": "REFRESH"
    },
    "metadata": {},
    "quota": {
      "storage": "200M"
    },
    "isDimTable": true
  }
}

As dimension tables are used to perform lookups of dimension values, they are required to have a primary key (can be a composite key).

{
  "dimensionFieldSpecs": [
    {
      "dataType": "STRING",
      "name": "teamID"
    },
    {
      "dataType": "STRING",
      "name": "teamName"
    }
  ],
  "schemaName": "dimBaseballTeams",
  "primaryKeyColumns": ["teamID"]
}

When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.

The maximum size quota for a dimension table in a cluster is controlled by the controller.dimTable.maxSize controller property. Table creation will fail if the storage quota exceeds this maximum size.

A dimension table cannot be part of a .

Batch Ingestion in Practicearrow-up-right
hybrid table
PinotSinkFunctionarrow-up-right
schema
config
quick start examplearrow-up-right
// some environmental setup
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> srcRows = execEnv.addSource(new FlinkKafkaConsumer<Row>(...));
RowTypeInfo typeInfo = new RowTypeInfo(
  new TypeInformation[]{Types.FLOAT, Types.FLOAT, Types.STRING, Types.STRING},
  new String[]{"lon", "lat", "address", "name"});


// add processing logic for the data stream for example:
DataStream<Row> processedRows = srcRow.keyBy(r -> r.getField(0));
...

// configurations for PinotSinkFunction
Schema pinotSchema = ...
TableConfig pinotTableConfig = ...
processedRows.addSink(new PinotSinkFunction<>(
  new FlinkRowGenericRowConverter(typeInfo), 
  pinotTableConfig,
  pinotSchema);

// execute the program
execEnv.execute();
{
  "tableName" : "tbl_OFFLINE",
  "tableType" : "OFFLINE",
  "segmentsConfig" : {
    // ...
  },
  "tenants" : {
    // ...
  },
  "tableIndexConfig" : {
    // ....
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "HOURLY", 
      "batchConfigMaps": [
        {
          "outputDirURI": "file://path/to/flink/segmentwriter/output/dir",
          "overwriteOutput": "false",
          "push.controllerUri": "https://target.pinot.cluster.controller.url"
        }
      ]
    }
  }
}

Hadoop

Batch ingestion of data into Apache Pinot using Apache Hadoop.

hashtag
Segment Creation and Push

Pinot supports Apache Hadooparrow-up-right as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

You can follow the wiki to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

Next, you need to change the execution config in the job spec to the following -

You can check out the sample job spec here.

Finally execute the hadoop job using the command -

Ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

hashtag
Data Preprocessing before Segment Creation

We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.

The MapReduce job called SegmentPreprocessingJob would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.

Check the below example to see how to use SegmentPreprocessingJob.

In Hadoop properties, set the following to enable this job:

In table config, specify the operations in preprocessing.operations that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:

hashtag
preprocessing.num.reducers

Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.

hashtag
preprocessing.max.num.records.per.file

Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.

For more details on this MR job, refer to this .

documentarrow-up-right
# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:

    # name: execution framework name
  name: 'hadoop'

  # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'

  # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'

  # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'

  # segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentMetadataPushJobRunner'

    # extraConfigs: extra configs for execution framework.
  extraConfigs:

    # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
    stagingDir: your/local/dir/staging
export PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"

hadoop jar  \\
        ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\
        org.apache.pinot.tools.admin.PinotAdministrator \\
        LaunchDataIngestionJob \\
        -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
enable.preprocessing = true
preprocess.path.to.output = <output_path>
{
    "OFFLINE": {
        "metadata": {
            "customConfigs": {
                “preprocessing.operations”: “resize, partition, sort”, // To enable the following preprocessing operations
                "preprocessing.max.num.records.per.file": "100",       // To enable resizing
                "preprocessing.num.reducers": "3"                      // To enable resizing
            }
        },
        ...
        "tableIndexConfig": {
            "aggregateMetrics": false,
            "autoGeneratedInvertedIndex": false,
            "bloomFilterColumns": [],
            "createInvertedIndexDuringSegmentGeneration": false,
            "invertedIndexColumns": [],
            "loadMode": "MMAP",
            "nullHandlingEnabled": false,
            "segmentPartitionConfig": {       // To enable partitioning
                "columnPartitionMap": {
                    "item": {
                        "functionName": "murmur",
                        "numPartitions": 4
                    }
                }
            },
            "sortedColumn": [                // To enable sorting
                "actorId"
            ],
            "streamConfigs": {}
        },
        "tableName": "tableName_OFFLINE",
        "tableType": "OFFLINE",
        "tenants": {
            ...
        }
    }
}