Batch Data Ingestion In Practice
In practice, we need to run Pinot data ingestion as a pipeline or a scheduled job.
Assuming pinot-distribution is already built, inside examples directory, you could find several sample table layouts.

Table Layout

Usually each table deserves its own directory, like airlineStats.
Inside the table directory, rawdata is created to put all the input data.
Typically, for data events with timestamp, we partition those data and store them into a daily folder. E.g. a typically layout would follow this pattern: rawdata/%yyyy%/%mm%/%dd%/[daily_input_files].
1
/var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01.avro
2
/var/pinot/airlineStats/rawdata/2014/01/02/airlineStats_data_2014-01-02.avro
3
/var/pinot/airlineStats/rawdata/2014/01/03/airlineStats_data_2014-01-03.avro
4
/var/pinot/airlineStats/rawdata/2014/01/04/airlineStats_data_2014-01-04.avro
5
/var/pinot/airlineStats/rawdata/2014/01/05/airlineStats_data_2014-01-05.avro
6
/var/pinot/airlineStats/rawdata/2014/01/06/airlineStats_data_2014-01-06.avro
7
/var/pinot/airlineStats/rawdata/2014/01/07/airlineStats_data_2014-01-07.avro
8
/var/pinot/airlineStats/rawdata/2014/01/08/airlineStats_data_2014-01-08.avro
9
/var/pinot/airlineStats/rawdata/2014/01/09/airlineStats_data_2014-01-09.avro
10
/var/pinot/airlineStats/rawdata/2014/01/10/airlineStats_data_2014-01-10.avro
11
/var/pinot/airlineStats/rawdata/2014/01/11/airlineStats_data_2014-01-11.avro
12
/var/pinot/airlineStats/rawdata/2014/01/12/airlineStats_data_2014-01-12.avro
13
/var/pinot/airlineStats/rawdata/2014/01/13/airlineStats_data_2014-01-13.avro
14
/var/pinot/airlineStats/rawdata/2014/01/14/airlineStats_data_2014-01-14.avro
15
/var/pinot/airlineStats/rawdata/2014/01/15/airlineStats_data_2014-01-15.avro
16
/var/pinot/airlineStats/rawdata/2014/01/16/airlineStats_data_2014-01-16.avro
17
/var/pinot/airlineStats/rawdata/2014/01/17/airlineStats_data_2014-01-17.avro
18
/var/pinot/airlineStats/rawdata/2014/01/18/airlineStats_data_2014-01-18.avro
19
/var/pinot/airlineStats/rawdata/2014/01/19/airlineStats_data_2014-01-19.avro
20
/var/pinot/airlineStats/rawdata/2014/01/20/airlineStats_data_2014-01-20.avro
21
/var/pinot/airlineStats/rawdata/2014/01/21/airlineStats_data_2014-01-21.avro
22
/var/pinot/airlineStats/rawdata/2014/01/22/airlineStats_data_2014-01-22.avro
23
/var/pinot/airlineStats/rawdata/2014/01/23/airlineStats_data_2014-01-23.avro
24
/var/pinot/airlineStats/rawdata/2014/01/24/airlineStats_data_2014-01-24.avro
25
/var/pinot/airlineStats/rawdata/2014/01/25/airlineStats_data_2014-01-25.avro
26
/var/pinot/airlineStats/rawdata/2014/01/26/airlineStats_data_2014-01-26.avro
27
/var/pinot/airlineStats/rawdata/2014/01/27/airlineStats_data_2014-01-27.avro
28
/var/pinot/airlineStats/rawdata/2014/01/28/airlineStats_data_2014-01-28.avro
29
/var/pinot/airlineStats/rawdata/2014/01/29/airlineStats_data_2014-01-29.avro
30
/var/pinot/airlineStats/rawdata/2014/01/30/airlineStats_data_2014-01-30.avro
31
/var/pinot/airlineStats/rawdata/2014/01/31/airlineStats_data_2014-01-31.avro
Copied!

Configuring batch ingestion job

Create a batch ingestion job spec file to describe how to ingest the data.
Below is an example (also located at examples/batch/airlineStats/ingestionJobSpec.yaml)
1
# executionFrameworkSpec: Defines ingestion jobs to be running.
2
executionFrameworkSpec:
3
4
# name: execution framework name
5
name: 'standalone'
6
7
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
8
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
9
10
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
11
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
12
13
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
14
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
15
16
# jobType: Pinot ingestion job type.
17
# Supported job types are:
18
# 'SegmentCreation'
19
# 'SegmentTarPush'
20
# 'SegmentUriPush'
21
# 'SegmentCreationAndTarPush'
22
# 'SegmentCreationAndUriPush'
23
jobType: SegmentCreationAndTarPush
24
25
# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
26
inputDirURI: 'examples/batch/airlineStats/rawdata'
27
28
# includeFileNamePattern: include file name pattern, supported glob pattern.
29
# Sample usage:
30
# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
31
# 'glob:**\/*.avro' will include all the avro files under inputDirURI recursively.
32
includeFileNamePattern: 'glob:**/*.avro'
33
34
# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
35
# Sample usage:
36
# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
37
# 'glob:**\/*.avro' will exclude all the avro files under inputDirURI recursively.
38
# _excludeFileNamePattern: ''
39
40
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
41
outputDirURI: 'examples/batch/airlineStats/segments'
42
43
# overwriteOutput: Overwrite output segments if existed.
44
overwriteOutput: true
45
46
# pinotFSSpecs: defines all related Pinot file systems.
47
pinotFSSpecs:
48
49
- # scheme: used to identify a PinotFS.
50
# E.g. local, hdfs, dbfs, etc
51
scheme: file
52
53
# className: Class name used to create the PinotFS instance.
54
# E.g.
55
# org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
56
# org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
57
# org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
58
className: org.apache.pinot.spi.filesystem.LocalPinotFS
59
60
# recordReaderSpec: defines all record reader
61
recordReaderSpec:
62
63
# dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
64
dataFormat: 'avro'
65
66
# className: Corresponding RecordReader class name.
67
# E.g.
68
# org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
69
# org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
70
# org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
71
# org.apache.pinot.plugin.inputformat.json.JSONRecordReader
72
# org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
73
# org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
74
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
75
76
# tableSpec: defines table name and where to fetch corresponding table config and table schema.
77
tableSpec:
78
79
# tableName: Table name
80
tableName: 'airlineStats'
81
82
# schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
83
# E.g.
84
# hdfs://path/to/table_schema.json
85
# http://localhost:9000/tables/myTable/schema
86
schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
87
88
# tableConfigURI: defines where to reade the table config.
89
# Supports using PinotFS or HTTP.
90
# E.g.
91
# hdfs://path/to/table_config.json
92
# http://localhost:9000/tables/myTable
93
# Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
94
# The real table config is the object under the field 'OFFLINE'.
95
tableConfigURI: 'http://localhost:9000/tables/airlineStats'
96
97
# segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
98
segmentNameGeneratorSpec:
99
100
# type: Current supported type is 'simple' and 'normalizedDate'.
101
type: normalizedDate
102
103
# configs: Configs to init SegmentNameGenerator.
104
configs:
105
segment.name.prefix: 'airlineStats_batch'
106
exclude.sequence.id: true
107
108
# pinotClusterSpecs: defines the Pinot Cluster Access Point.
109
pinotClusterSpecs:
110
- # controllerURI: used to fetch table/schema information and data push.
111
# E.g. http://localhost:9000
112
controllerURI: 'http://localhost:9000'
113
114
# pushJobSpec: defines segment push job related configuration.
115
pushJobSpec:
116
117
# pushAttempts: number of attempts for push job, default is 1, which means no retry.
118
pushAttempts: 2
119
120
# pushRetryIntervalMillis: retry wait Ms, default to 1 second.
121
pushRetryIntervalMillis: 1000
Copied!

Executing the job

Below command will create example table into Pinot cluster.
1
bin/pinot-admin.sh AddTable -schemaFile examples/batch/airlineStats/airlineStats_schema.json -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -exec
Copied!
Below command will kick off the ingestion job to generate Pinot segments and push them into the cluster.
1
bin/pinot-admin.sh LaunchDataIngestionJob -jobSpecFile examples/batch/airlineStats/ingestionJobSpec.yaml
Copied!
After job finished, segments are stored in examples/batch/airlineStats/segments following same layout of input directory layout.
1
/var/pinot/airlineStats/segments/2014/01/01/airlineStats_batch_2014-01-01_2014-01-01.tar.gz
2
/var/pinot/airlineStats/segments/2014/01/02/airlineStats_batch_2014-01-02_2014-01-02.tar.gz
3
/var/pinot/airlineStats/segments/2014/01/03/airlineStats_batch_2014-01-03_2014-01-03.tar.gz
4
/var/pinot/airlineStats/segments/2014/01/04/airlineStats_batch_2014-01-04_2014-01-04.tar.gz
5
/var/pinot/airlineStats/segments/2014/01/05/airlineStats_batch_2014-01-05_2014-01-05.tar.gz
6
/var/pinot/airlineStats/segments/2014/01/06/airlineStats_batch_2014-01-06_2014-01-06.tar.gz
7
/var/pinot/airlineStats/segments/2014/01/07/airlineStats_batch_2014-01-07_2014-01-07.tar.gz
8
/var/pinot/airlineStats/segments/2014/01/08/airlineStats_batch_2014-01-08_2014-01-08.tar.gz
9
/var/pinot/airlineStats/segments/2014/01/09/airlineStats_batch_2014-01-09_2014-01-09.tar.gz
10
/var/pinot/airlineStats/segments/2014/01/10/airlineStats_batch_2014-01-10_2014-01-10.tar.gz
11
/var/pinot/airlineStats/segments/2014/01/11/airlineStats_batch_2014-01-11_2014-01-11.tar.gz
12
/var/pinot/airlineStats/segments/2014/01/12/airlineStats_batch_2014-01-12_2014-01-12.tar.gz
13
/var/pinot/airlineStats/segments/2014/01/13/airlineStats_batch_2014-01-13_2014-01-13.tar.gz
14
/var/pinot/airlineStats/segments/2014/01/14/airlineStats_batch_2014-01-14_2014-01-14.tar.gz
15
/var/pinot/airlineStats/segments/2014/01/15/airlineStats_batch_2014-01-15_2014-01-15.tar.gz
16
/var/pinot/airlineStats/segments/2014/01/16/airlineStats_batch_2014-01-16_2014-01-16.tar.gz
17
/var/pinot/airlineStats/segments/2014/01/17/airlineStats_batch_2014-01-17_2014-01-17.tar.gz
18
/var/pinot/airlineStats/segments/2014/01/18/airlineStats_batch_2014-01-18_2014-01-18.tar.gz
19
/var/pinot/airlineStats/segments/2014/01/19/airlineStats_batch_2014-01-19_2014-01-19.tar.gz
20
/var/pinot/airlineStats/segments/2014/01/20/airlineStats_batch_2014-01-20_2014-01-20.tar.gz
21
/var/pinot/airlineStats/segments/2014/01/21/airlineStats_batch_2014-01-21_2014-01-21.tar.gz
22
/var/pinot/airlineStats/segments/2014/01/22/airlineStats_batch_2014-01-22_2014-01-22.tar.gz
23
/var/pinot/airlineStats/segments/2014/01/23/airlineStats_batch_2014-01-23_2014-01-23.tar.gz
24
/var/pinot/airlineStats/segments/2014/01/24/airlineStats_batch_2014-01-24_2014-01-24.tar.gz
25
/var/pinot/airlineStats/segments/2014/01/25/airlineStats_batch_2014-01-25_2014-01-25.tar.gz
26
/var/pinot/airlineStats/segments/2014/01/26/airlineStats_batch_2014-01-26_2014-01-26.tar.gz
27
/var/pinot/airlineStats/segments/2014/01/27/airlineStats_batch_2014-01-27_2014-01-27.tar.gz
28
/var/pinot/airlineStats/segments/2014/01/28/airlineStats_batch_2014-01-28_2014-01-28.tar.gz
29
/var/pinot/airlineStats/segments/2014/01/29/airlineStats_batch_2014-01-29_2014-01-29.tar.gz
30
/var/pinot/airlineStats/segments/2014/01/30/airlineStats_batch_2014-01-30_2014-01-30.tar.gz
31
/var/pinot/airlineStats/segments/2014/01/31/airlineStats_batch_2014-01-31_2014-01-31.tar.gz
Copied!

Executing the job using Spark

Below example is running in a spark local mode. You can download spark distribution and start it by running:
1
wget https://downloads.apache.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
2
tar xvf spark-2.4.6-bin-hadoop2.7.tgz
3
cd spark-2.4.6-bin-hadoop2.7
4
./bin/spark-shell --master 'local[2]'
Copied!
Build latest Pinot Distribution following this Wiki.
Below command shows how to use spark-submit command to submit a spark job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar.
Sample Spark ingestion job spec yaml, (also located at examples/batch/airlineStats/sparkIngestionJobSpec.yaml):
1
# executionFrameworkSpec: Defines ingestion jobs to be running.
2
executionFrameworkSpec:
3
4
# name: execution framework name
5
name: 'spark'
6
7
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
8
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
9
10
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
11
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
12
13
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
14
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
15
16
# extraConfigs: extra configs for execution framework.
17
extraConfigs:
18
19
# stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
20
stagingDir: examples/batch/airlineStats/staging
21
22
# jobType: Pinot ingestion job type.
23
# Supported job types are:
24
# 'SegmentCreation'
25
# 'SegmentTarPush'
26
# 'SegmentUriPush'
27
# 'SegmentCreationAndTarPush'
28
# 'SegmentCreationAndUriPush'
29
jobType: SegmentCreationAndTarPush
30
31
# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
32
inputDirURI: 'examples/batch/airlineStats/rawdata'
33
34
# includeFileNamePattern: include file name pattern, supported glob pattern.
35
# Sample usage:
36
# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
37
# 'glob:**\/*.avro' will include all the avro files under inputDirURI recursively.
38
includeFileNamePattern: 'glob:**/*.avro'
39
40
# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
41
# Sample usage:
42
# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
43
# 'glob:**\/*.avro' will exclude all the avro files under inputDirURI recursively.
44
# excludeFileNamePattern: ''
45
46
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
47
outputDirURI: 'examples/batch/airlineStats/segments'
48
49
# overwriteOutput: Overwrite output segments if existed.
50
overwriteOutput: true
51
52
# pinotFSSpecs: defines all related Pinot file systems.
53
pinotFSSpecs:
54
55
- # scheme: used to identify a PinotFS.
56
# E.g. local, hdfs, dbfs, etc
57
scheme: file
58
59
# className: Class name used to create the PinotFS instance.
60
# E.g.
61
# org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
62
# org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
63
# org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
64
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
65
66
# recordReaderSpec: defines all record reader
67
recordReaderSpec:
68
69
# dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
70
dataFormat: 'avro'
71
72
# className: Corresponding RecordReader class name.
73
# E.g.
74
# org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
75
# org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
76
# org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
77
# org.apache.pinot.plugin.inputformat.json.JSONRecordReader
78
# org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
79
# org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
80
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
81
82
# tableSpec: defines table name and where to fetch corresponding table config and table schema.
83
tableSpec:
84
85
# tableName: Table name
86
tableName: 'airlineStats'
87
88
# schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
89
# E.g.
90
# hdfs://path/to/table_schema.json
91
# http://localhost:9000/tables/myTable/schema
92
schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
93
94
# tableConfigURI: defines where to reade the table config.
95
# Supports using PinotFS or HTTP.
96
# E.g.
97
# hdfs://path/to/table_config.json
98
# http://localhost:9000/tables/myTable
99
# Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
100
# The real table config is the object under the field 'OFFLINE'.
101
tableConfigURI: 'http://localhost:9000/tables/airlineStats'
102
103
# segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
104
segmentNameGeneratorSpec:
105
106
# type: Current supported type is 'simple' and 'normalizedDate'.
107
type: normalizedDate
108
109
# configs: Configs to init SegmentNameGenerator.
110
configs:
111
segment.name.prefix: 'airlineStats_batch'
112
exclude.sequence.id: true
113
114
# pinotClusterSpecs: defines the Pinot Cluster Access Point.
115
pinotClusterSpecs:
116
- # controllerURI: used to fetch table/schema information and data push.
117
# E.g. http://localhost:9000
118
controllerURI: 'http://localhost:9000'
119
120
# pushJobSpec: defines segment push job related configuration.
121
pushJobSpec:
122
123
# pushParallelism: push job parallelism, default is 1.
124
pushParallelism: 2
125
126
# pushAttempts: number of attempts for push job, default is 1, which means no retry.
127
pushAttempts: 2
128
129
# pushRetryIntervalMillis: retry wait Ms, default to 1 second.
130
pushRetryIntervalMillis: 1000
Copied!
Please ensure parameter PINOT_ROOT_DIR and PINOT_VERSION are set properly.
Please ensure you set
  • spark.driver.extraJavaOptions =>
    -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
Or put all the required plugins jars to CLASSPATH, then set -Dplugins.dir=${CLASSPATH}
  • spark.driver.extraClassPath =>
    pinot-all-${PINOT_VERSION}-jar-with-depdencies.jar
1
export PINOT_VERSION=0.5.0-SNAPSHOT
2
export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/pinot-distribution/target/apache-pinot-${PINOT_VERSION}-bin/apache-pinot-${PINOT_VERSION}-bin
3
cd ${PINOT_DISTRIBUTION_DIR}
4
${SPARK_HOME}/bin/spark-submit \
5
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
6
--master "local[2]" \
7
--deploy-mode client \
8
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
9
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
10
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
11
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/sparkIngestionJobSpec.yaml
Copied!

Executing the job using Hadoop

Below command shows how to use Hadoop jar command to run a Hadoop job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar.
Sample Hadoop ingestion job spec yaml(also located at examples/batch/airlineStats/hadoopIngestionJobSpec.yaml):
1
# executionFrameworkSpec: Defines ingestion jobs to be running.
2
executionFrameworkSpec:
3
4
# name: execution framework name
5
name: 'hadoop'
6
7
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
8
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
9
10
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
11
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
12
13
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
14
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
15
16
# extraConfigs: extra configs for execution framework.
17
extraConfigs:
18
19
# stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
20
stagingDir: examples/batch/airlineStats/staging
21
22
# jobType: Pinot ingestion job type.
23
# Supported job types are:
24
# 'SegmentCreation'
25
# 'SegmentTarPush'
26
# 'SegmentUriPush'
27
# 'SegmentCreationAndTarPush'
28
# 'SegmentCreationAndUriPush'
29
jobType: SegmentCreationAndTarPush
30
31
# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
32
inputDirURI: 'examples/batch/airlineStats/rawdata'
33
34
# includeFileNamePattern: include file name pattern, supported glob pattern.
35
# Sample usage:
36
# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
37
# 'glob:**\/*.avro' will include all the avro files under inputDirURI recursively.
38
includeFileNamePattern: 'glob:**/*.avro'
39
40
# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
41
# Sample usage:
42
# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
43
# 'glob:**\/*.avro' will exclude all the avro files under inputDirURI recursively.
44
# _excludeFileNamePattern: ''
45
46
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
47
outputDirURI: 'examples/batch/airlineStats/segments'
48
49
# overwriteOutput: Overwrite output segments if existed.
50
overwriteOutput: true
51
52
# pinotFSSpecs: defines all related Pinot file systems.
53
pinotFSSpecs:
54
55
- # scheme: used to identify a PinotFS.
56
# E.g. local, hdfs, dbfs, etc
57
scheme: file
58
59
# className: Class name used to create the PinotFS instance.
60
# E.g.
61
# org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
62
# org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
63
# org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
64
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
65
66
# recordReaderSpec: defines all record reader
67
recordReaderSpec:
68
69
# dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
70
dataFormat: 'avro'
71
72
# className: Corresponding RecordReader class name.
73
# E.g.
74
# org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
75
# org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
76
# org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
77
# org.apache.pinot.plugin.inputformat.json.JSONRecordReader
78
# org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
79
# org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
80
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
81
82
# tableSpec: defines table name and where to fetch corresponding table config and table schema.
83
tableSpec:
84
85
# tableName: Table name
86
tableName: 'airlineStats'
87
88
# schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
89
# E.g.
90
# hdfs://path/to/table_schema.json
91
# http://localhost:9000/tables/myTable/schema
92
schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
93
94
# tableConfigURI: defines where to reade the table config.
95
# Supports using PinotFS or HTTP.
96
# E.g.
97
# hdfs://path/to/table_config.json
98
# http://localhost:9000/tables/myTable
99
# Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
100
# The real table config is the object under the field 'OFFLINE'.
101
tableConfigURI: 'http://localhost:9000/tables/airlineStats'
102
103
# segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
104
segmentNameGeneratorSpec:
105
106
# type: Current supported type is 'simple' and 'normalizedDate'.
107
type: normalizedDate
108
109
# configs: Configs to init SegmentNameGenerator.
110
configs:
111
segment.name.prefix: 'airlineStats_batch'
112
exclude.sequence.id: true
113
114
# pinotClusterSpecs: defines the Pinot Cluster Access Point.
115
pinotClusterSpecs:
116
- # controllerURI: used to fetch table/schema information and data push.
117
# E.g. http://localhost:9000
118
controllerURI: 'http://localhost:9000'
119
120
# pushJobSpec: defines segment push job related configuration.
121
pushJobSpec:
122
123
# pushParallelism: push job parallelism, default is 1.
124
pushParallelism: 2
125
126
# pushAttempts: number of attempts for push job, default is 1, which means no retry.
127
pushAttempts: 2
128
129
# pushRetryIntervalMillis: retry wait Ms, default to 1 second.
130
pushRetryIntervalMillis: 1000
Copied!
Please ensure parameter PINOT_ROOT_DIR and PINOT_VERSION are set properly.
1
export PINOT_VERSION=0.5.0-SNAPSHOT
2
export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/pinot-distribution/target/apache-pinot-${PINOT_VERSION}-bin/apache-pinot-${PINOT_VERSION}-bin
3
export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"
4
hadoop jar \
5
${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
6
org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
7
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
Copied!

Tunning

You can set Environment Variable: JAVA_OPTS to modify:
  • Log4j2 file location with -Dlog4j2.configurationFile
  • Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
  • JVM props, like -Xmx8g -Xms4G
Please note that you need to config above three all together in JAVA_OPTS. If you only config JAVA_OPTS="-Xmx4g" then plugins.dir is empty usually will cause job failure.
E.g.
1
docker run --rm -ti -e JAVA_OPTS="-Xms8G -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-admin-log4j2.xml -Dplugins.dir=/opt/pinot/plugins" --name pinot-data-ingestion-job apachepinot/pinot:latest LaunchDataIngestionJob -jobSpecFile /path/to/ingestion_job_spec.yaml
Copied!
You can also add your customized JAVA_OPTS if necessary.
Last modified 3d ago