Batch ingestion allows users to create a table using data already present in a file system such as S3. This is particularly useful for the cases where the user wants to utilize Pinot's ability to query large data with minimal latency or test out new features using a simple data file.
Ingesting data from a filesystem involves the following steps -
Define Schema
Define Table Config
Upload Schema and Table configs
Upload data
Batch Ingestion currently supports the following mechanisms to upload the data -
Here we'll take a look at the standalone local processing to get you started.
Let's create a table for the following CSV data source.
studentID,firstName,lastName,gender,subject,score,timestampInEpoch200,Lucy,Smith,Female,Maths,3.8,1570863600000200,Lucy,Smith,Female,English,3.5,1571036400000201,Bob,King,Male,Maths,3.2,1571900400000202,Nick,Young,Male,Physics,3.6,1572418800000
In our data, the only column on which aggregations can be performed is score. Secondly, timestampInEpoch is the only timestamp column. So, on our schema, we keep score as metric and timestampInEpoch as timestamp column.
{"schemaName": "transcript","dimensionFieldSpecs": [{"name": "studentID","dataType": "INT"},{"name": "firstName","dataType": "STRING"},{"name": "lastName","dataType": "STRING"},{"name": "gender","dataType": "STRING"},{"name": "subject","dataType": "STRING"}],"metricFieldSpecs": [{"name": "score","dataType": "FLOAT"}],"dateTimeFieldSpecs": [{"name": "timestampInEpoch","dataType": "LONG","format" : "1:MILLISECONDS:EPOCH","granularity": "1:MILLISECONDS"}]}
Here, we have also defined two extra fields - format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it is in milliseconds hence we have specified 1:MILLISECONDS:EPOCH
We define a tabletranscript
and map the schema created in the previous step to the table. For batch data, we keep the tableType
as OFFLINE
{"tableName": "transcript","segmentsConfig" : {"timeColumnName": "timestampInEpoch","timeType": "MILLISECONDS","replication" : "1","schemaName" : "transcript"},"tableIndexConfig" : {"invertedIndexColumns" : [],"loadMode" : "MMAP"},"tenants" : {"broker":"DefaultTenant","server":"DefaultTenant"},"tableType":"OFFLINE","metadata": {}}
Now that we have both the configs, we can simply upload them and create a table. To achieve that, just run the command -
bin/pinot-admin.sh AddTable \\-tableConfigFile /path/to/table-config.json \\-schemaFile /path/to/table-schema.json -exec
Check out the table config and schema in the [Rest API] to make sure it was successfully uploaded.
We now have an empty table in pinot. So as the next step we will upload our CSV file to this table. A table is composed of multiple segments.
The segments are created and uploaded using tasks known as DataIngestionJobs. A job also needs a config of its own. We call this config the JobSpec.
For our CSV file and table, the job spec should look like below.
executionFrameworkSpec:name: 'standalone'segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'jobType: SegmentCreationAndTarPushinputDirURI: '/tmp/pinot-quick-start/rawdata/'includeFileNamePattern: 'glob:**/*.csv'outputDirURI: '/tmp/pinot-quick-start/segments/'overwriteOutput: truepinotFSSpecs:- scheme: fileclassName: org.apache.pinot.spi.filesystem.LocalPinotFSrecordReaderSpec:dataFormat: 'csv'className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'tableSpec:tableName: 'transcript'pinotClusterSpecs:- controllerURI: '<http://localhost:9000>'
You can refer to Ingestion Job Spec for more details.
Now that we have the job spec for our table transcript
, we can trigger the job using the following command
bin/pinot-admin.sh LaunchDataIngestionJob \\-jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yml
Once the job has successfully finished, you can head over to the [query console] and start playing with the data.
There are 3 ways to upload a Pinot segment:
This is the original and default push mechanism.
Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.
The push job will:
Upload the entire segment tar file to the Pinot controller.
Pinot controller will:
Save the segment into the controller segment directory(Local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism requires the segment Tar file stored on a deep store with a globally accessible segment tar URI.
URI push is light-weight on the client-side, and the controller side requires equivalent work as the Tar push.
The push job will:
POST this segment Tar URI to the Pinot controller.
Pinot controller will:
Download segment from the URI and save it to controller segment directory(Local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism also requires the segment Tar file stored on a deep store with a globally accessible segment tar URI.
Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.
The push job will:
Download the segment based on URI.
Extract metadata.
Upload metadata to the Pinot Controller.
Pinot Controller will:
Add the segment to the table based on the metadata.
When pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot Controller and Server:
Push segment to shared NFS and let pinot pull segment files from the location of that NFS. See Segment URI Push.
Push segment to a Web server and let pinot pull segment files from the Web server with HTTP/HTTPS link. See Segment URI Push.
Push segment to PinotFS(HDFS/S3/GCS/ADLS) and let pinot pull segment files from PinotFS URI. See Segment URI Push and Segment Metadata Push.
Push segment to other systems and implement your own segment fetcher to pull data from those systems.
The first three options are supported out of the box within the Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for PinotFS, you will need to provide PinotFS configuration and proper Hadoop dependencies.
By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of a system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add deep storage. Checkout File systems for all the info and related configs.
Since pinot is written in Java, you can set the following basic java configurations to tune the segment runner job -
Log4j2 file location with -Dlog4j2.configurationFile
Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
JVM props, like -Xmx8g -Xms4G
If you are using the docker, you can set the following under JAVA_OPTS
variable.
You can set -D mapreduce.map.memory.mb=8192
to set the mapper memory size when submitting the Hadoop job.
You can add config spark.executor.memory
to tune the memory usage for segment creation when submitting the Spark job.