Use S3 as Deep Storage for Pinot
Below commands are based on pinot distribution binary.

Setup Pinot Cluster

In order to setup Pinot to use S3 as deep store, we need to put extra configs for Controller and Server.

Start Controller

Below is a sample controller.conf file.
Please config:
controller.data.dirto your s3 bucket. All the uploaded segments will be stored there.
And add s3 as a pinot storage with configs:
1
pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
2
pinot.controller.storage.factory.s3.region=us-west-2
Copied!
Regarding AWS Credential, we also follow the convention of DefaultAWSCredentialsProviderChain.
You can specify AccessKey and Secret using:
  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)
  • Java System Properties - aws.accessKeyId and aws.secretKey
  • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI
  • Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey and pinot.controller.storage.factory.s3.secretKey in the config file. (Not recommended)
1
pinot.controller.storage.factory.s3.accessKey=****************LFVX
2
pinot.controller.storage.factory.s3.secretKey=****************gfhz
Copied!
Add s3 to pinot.controller.segment.fetcher.protocols
and set pinot.controller.segment.fetcher.s3.class toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
1
controller.data.dir=s3://my.bucket/pinot-data/pinot-s3-example/controller-data
2
controller.local.temp.dir=/tmp/pinot-tmp-data/
3
controller.zk.str=localhost:2181
4
controller.host=127.0.0.1
5
controller.port=9000
6
controller.helix.cluster.name=pinot-s3-example
7
pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
8
pinot.controller.storage.factory.s3.region=us-west-2
9
10
pinot.controller.segment.fetcher.protocols=file,http,s3
11
pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
Copied!
If you to grant full control to bucket owner, then add this to the config:
1
pinot.controller.storage.factory.s3.disableAcl=false
Copied!
Then start pinot controller with:
1
bin/pinot-admin.sh StartController -configFileName conf/controller.conf
Copied!

Start Broker

Broker is a simple one you can just start it with default:
1
bin/pinot-admin.sh StartBroker -zkAddress localhost:2181 -clusterName pinot-s3-example
Copied!

Start Server

Below is a sample server.conf file
Similar to controller config, please also set s3 configs in pinot server.
1
pinot.server.netty.port=8098
2
pinot.server.adminapi.port=8097
3
pinot.server.instance.dataDir=/tmp/pinot-tmp/server/index
4
pinot.server.instance.segmentTarDir=/tmp/pinot-tmp/server/segmentTars
5
6
7
pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
8
pinot.server.storage.factory.s3.region=us-west-2
9
pinot.server.segment.fetcher.protocols=file,http,s3
10
pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
Copied!
If you to grant full control to bucket owner, then add this to the config:
1
pinot.controller.storage.factory.s3.disableAcl=false
Copied!
Then start pinot controller with:
1
bin/pinot-admin.sh StartServer -configFileName conf/server.conf -zkAddress localhost:2181 -clusterName pinot-s3-example
Copied!

Setup Table

In this demo, we just use airlineStats table as an example.
Create table with below command:
1
bin/pinot-admin.sh AddTable -schemaFile examples/batch/airlineStats/airlineStats_schema.json -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -exec
Copied!

Set up Ingestion Jobs

Standalone Job

Below is a sample standalone ingestion job spec with certain notable changes:
  • jobType is SegmentCreationAndUriPush
  • inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/
  • outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments
  • Add a new PinotFs under pinotFSSpecs
    1
    - scheme: s3
    2
    className: org.apache.pinot.plugin.filesystem.S3PinotFS
    3
    configs:
    4
    region: 'us-west-2'
    Copied!
  • For library version < 0.6.0, please set segmentUriPrefix to [scheme]://[bucket.name], e.g. s3://my.bucket , from version 0.6.0, you can put empty string or just ignore segmentUriPrefix.
Sample ingestionJobSpec.yaml
1
# executionFrameworkSpec: Defines ingestion jobs to be running.
2
executionFrameworkSpec:
3
4
# name: execution framework name
5
name: 'standalone'
6
7
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
8
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
9
10
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
11
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
12
13
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
14
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
15
16
# jobType: Pinot ingestion job type.
17
# Supported job types are:
18
# 'SegmentCreation'
19
# 'SegmentTarPush'
20
# 'SegmentUriPush'
21
# 'SegmentCreationAndTarPush'
22
# 'SegmentCreationAndUriPush'
23
#jobType: SegmentCreationAndUriPush
24
jobType: SegmentCreationAndUriPush
25
# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
26
inputDirURI: 's3://my.bucket/batch/airlineStats/rawdata/'
27
28
# includeFileNamePattern: include file name pattern, supported glob pattern.
29
# Sample usage:
30
# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
31
# 'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
32
includeFileNamePattern: 'glob:**/*.avro'
33
34
# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
35
# Sample usage:
36
# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
37
# 'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
38
# _excludeFileNamePattern: ''
39
40
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
41
outputDirURI: 's3://my.bucket/examples/output/airlineStats/segments'
42
43
# overwriteOutput: Overwrite output segments if existed.
44
overwriteOutput: true
45
46
# pinotFSSpecs: defines all related Pinot file systems.
47
pinotFSSpecs:
48
49
- # scheme: used to identify a PinotFS.
50
# E.g. local, hdfs, dbfs, etc
51
scheme: file
52
53
# className: Class name used to create the PinotFS instance.
54
# E.g.
55
# org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
56
# org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
57
# org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
58
className: org.apache.pinot.spi.filesystem.LocalPinotFS
59
60
61
- scheme: s3
62
className: org.apache.pinot.plugin.filesystem.S3PinotFS
63
configs:
64
region: 'us-west-2'
65
66
# recordReaderSpec: defines all record reader
67
recordReaderSpec:
68
69
# dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
70
dataFormat: 'avro'
71
72
# className: Corresponding RecordReader class name.
73
# E.g.
74
# org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
75
# org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
76
# org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
77
# org.apache.pinot.plugin.inputformat.json.JSONRecordReader
78
# org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
79
# org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
80
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
81
82
# tableSpec: defines table name and where to fetch corresponding table config and table schema.
83
tableSpec:
84
85
# tableName: Table name
86
tableName: 'airlineStats'
87
88
# schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
89
# E.g.
90
# hdfs://path/to/table_schema.json
91
# http://localhost:9000/tables/myTable/schema
92
schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
93
94
# tableConfigURI: defines where to reade the table config.
95
# Supports using PinotFS or HTTP.
96
# E.g.
97
# hdfs://path/to/table_config.json
98
# http://localhost:9000/tables/myTable
99
# Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
100
# The real table config is the object under the field 'OFFLINE'.
101
tableConfigURI: 'http://localhost:9000/tables/airlineStats'
102
103
# pinotClusterSpecs: defines the Pinot Cluster Access Point.
104
pinotClusterSpecs:
105
- # controllerURI: used to fetch table/schema information and data push.
106
# E.g. http://localhost:9000
107
controllerURI: 'http://localhost:9000'
108
109
# pushJobSpec: defines segment push job related configuration.
110
pushJobSpec:
111
112
# pushAttempts: number of attempts for push job, default is 1, which means no retry.
113
pushAttempts: 2
114
115
# pushRetryIntervalMillis: retry wait Ms, default to 1 second.
116
pushRetryIntervalMillis: 1000
117
118
# For Pinot version < 0.6.0, use [scheme]://[bucket.name] as prefix.
119
# E.g. s3://my.bucket
120
segmentUriPrefix: 's3://my.bucket'
121
segmentUriSuffix: ''
Copied!
Below is a sample job output:
1
bin/pinot-admin.sh LaunchDataIngestionJob -jobSpecFile ~/temp/pinot/pinot-s3-test/ingestionJobSpec.yaml
Copied!
1
2020/08/18 16:11:03.521 INFO [IngestionJobLauncher] [main] SegmentGenerationJobSpec:
2
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
3
excludeFileNamePattern: null
4
executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
5
segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
6
segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
7
includeFileNamePattern: glob:**/*.avro
8
inputDirURI: s3://my.bucket/batch/airlineStats/rawdata/
9
jobType: SegmentUriPush
10
outputDirURI: s3://my.bucket/examples/output/airlineStats/segments
11
overwriteOutput: true
12
pinotClusterSpecs:
13
- {controllerURI: 'http://localhost:9000'}
14
pinotFSSpecs:
15
- {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
16
- className: org.apache.pinot.plugin.filesystem.S3PinotFS
17
configs: {region: us-west-2}
18
scheme: s3
19
pushJobSpec: {pushAttempts: 2, pushParallelism: 1, pushRetryIntervalMillis: 1000,
20
segmentUriPrefix: '', segmentUriSuffix: ''}
21
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
22
configClassName: null, configs: null, dataFormat: avro}
23
segmentNameGeneratorSpec: null
24
tableSpec: {schemaURI: 'http://localhost:9000/tables/airlineStats/schema', tableConfigURI: 'http://localhost:9000/tables/airlineStats',
25
tableName: airlineStats}
26
27
2020/08/18 16:11:03.531 INFO [IngestionJobLauncher] [main] Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner
28
2020/08/18 16:11:03.654 INFO [PinotFSFactory] [main] Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
29
2020/08/18 16:11:03.656 INFO [PinotFSFactory] [main] Initializing PinotFS for scheme s3, classname org.apache.pinot.plugin.filesystem.S3PinotFS
30
2020/08/18 16:11:05.520 INFO [SegmentPushUtils] [main] Start sending table airlineStats segment URIs: [s3://my.bucket/examples/output/airlineStats/segments/2014/01/01/airlineStats_OFFLINE_16071_16071_0.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/02/airlineStats_OFFLINE_16072_16072_1.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/03/airlineStats_OFFLINE_16073_16073_2.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/04/airlineStats_OFFLINE_16074_16074_3.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/05/airlineStats_OFFLINE_16075_16075_4.tar.gz] to locations: [[email protected]95f]
31
2020/08/18 16:11:05.521 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/01/airlineStats_OFFLINE_16071_16071_0.tar.gz to location: http://localhost:9000 for
32
2020/08/18 16:11:09.356 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
33
2020/08/18 16:11:09.358 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/01/airlineStats_OFFLINE_16071_16071_0.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16071_16071_0 of table: airlineStats_OFFLINE"}
34
2020/08/18 16:11:09.359 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/02/airlineStats_OFFLINE_16072_16072_1.tar.gz to location: http://localhost:9000 for
35
2020/08/18 16:11:09.824 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
36
2020/08/18 16:11:09.825 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/02/airlineStats_OFFLINE_16072_16072_1.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16072_16072_1 of table: airlineStats_OFFLINE"}
37
2020/08/18 16:11:09.825 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/03/airlineStats_OFFLINE_16073_16073_2.tar.gz to location: http://localhost:9000 for
38
2020/08/18 16:11:10.500 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
39
2020/08/18 16:11:10.501 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/03/airlineStats_OFFLINE_16073_16073_2.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16073_16073_2 of table: airlineStats_OFFLINE"}
40
2020/08/18 16:11:10.501 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/04/airlineStats_OFFLINE_16074_16074_3.tar.gz to location: http://localhost:9000 for
41
2020/08/18 16:11:10.967 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
42
2020/08/18 16:11:10.968 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/04/airlineStats_OFFLINE_16074_16074_3.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16074_16074_3 of table: airlineStats_OFFLINE"}
43
2020/08/18 16:11:10.969 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/05/airlineStats_OFFLINE_16075_16075_4.tar.gz to location: http://localhost:9000 for
44
2020/08/18 16:11:11.420 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
45
2020/08/18 16:11:11.420 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/05/airlineStats_OFFLINE_16075_16075_4.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16075_16075_4 of table: airlineStats_OFFLINE"}
46
2020/08/18 16:11:11.421 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/06/airlineStats_OFFLINE_16076_16076_5.tar.gz to location: http://localhost:9000 for
47
2020/08/18 16:11:11.872 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
48
2020/08/18 16:11:11.873 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/06/airlineStats_OFFLINE_16076_16076_5.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16076_16076_5 of table: airlineStats_OFFLINE"}
49
2020/08/18 16:11:11.877 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/07/airlineStats_OFFLINE_16077_16077_6.tar.gz to location: http://localhost:9000 for
50
2020/08/18 16:11:12.293 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
51
2020/08/18 16:11:12.294 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/07/airlineStats_OFFLINE_16077_16077_6.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16077_16077_6 of table: airlineStats_OFFLINE"}
52
2020/08/18 16:11:12.295 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/08/airlineStats_OFFLINE_16078_16078_7.tar.gz to location: http://localhost:9000 for
53
2020/08/18 16:11:12.672 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
54
2020/08/18 16:11:12.673 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/08/airlineStats_OFFLINE_16078_16078_7.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16078_16078_7 of table: airlineStats_OFFLINE"}
55
2020/08/18 16:11:12.674 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/09/airlineStats_OFFLINE_16079_16079_8.tar.gz to location: http://localhost:9000 for
56
2020/08/18 16:11:13.048 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
57
2020/08/18 16:11:13.050 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/09/airlineStats_OFFLINE_16079_16079_8.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16079_16079_8 of table: airlineStats_OFFLINE"}
58
2020/08/18 16:11:13.051 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/10/airlineStats_OFFLINE_16080_16080_9.tar.gz to location: http://localhost:9000 for
59
2020/08/18 16:11:13.483 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
60
2020/08/18 16:11:13.485 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/10/airlineStats_OFFLINE_16080_16080_9.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16080_16080_9 of table: airlineStats_OFFLINE"}
61
2020/08/18 16:11:13.486 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/11/airlineStats_OFFLINE_16081_16081_10.tar.gz to location: http://localhost:9000 for
62
2020/08/18 16:11:14.080 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
63
2020/08/18 16:11:14.081 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/11/airlineStats_OFFLINE_16081_16081_10.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16081_16081_10 of table: airlineStats_OFFLINE"}
64
2020/08/18 16:11:14.082 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/12/airlineStats_OFFLINE_16082_16082_11.tar.gz to location: http://localhost:9000 for
65
2020/08/18 16:11:14.477 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
66
2020/08/18 16:11:14.477 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/12/airlineStats_OFFLINE_16082_16082_11.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16082_16082_11 of table: airlineStats_OFFLINE"}
67
2020/08/18 16:11:14.478 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/13/airlineStats_OFFLINE_16083_16083_12.tar.gz to location: http://localhost:9000 for
68
2020/08/18 16:11:14.865 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
69
2020/08/18 16:11:14.866 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/13/airlineStats_OFFLINE_16083_16083_12.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16083_16083_12 of table: airlineStats_OFFLINE"}
70
2020/08/18 16:11:14.867 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/14/airlineStats_OFFLINE_16084_16084_13.tar.gz to location: http://localhost:9000 for
71
2020/08/18 16:11:15.257 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
72
2020/08/18 16:11:15.258 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/14/airlineStats_OFFLINE_16084_16084_13.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16084_16084_13 of table: airlineStats_OFFLINE"}
73
2020/08/18 16:11:15.258 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/15/airlineStats_OFFLINE_16085_16085_14.tar.gz to location: http://localhost:9000 for
74
2020/08/18 16:11:15.917 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
75
2020/08/18 16:11:15.919 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/15/airlineStats_OFFLINE_16085_16085_14.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16085_16085_14 of table: airlineStats_OFFLINE"}
76
2020/08/18 16:11:15.919 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/16/airlineStats_OFFLINE_16086_16086_15.tar.gz to location: http://localhost:9000 for
77
2020/08/18 16:11:16.719 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
78
2020/08/18 16:11:16.720 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/16/airlineStats_OFFLINE_16086_16086_15.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16086_16086_15 of table: airlineStats_OFFLINE"}
79
2020/08/18 16:11:16.720 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/17/airlineStats_OFFLINE_16087_16087_16.tar.gz to location: http://localhost:9000 for
80
2020/08/18 16:11:17.346 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
81
2020/08/18 16:11:17.347 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/17/airlineStats_OFFLINE_16087_16087_16.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16087_16087_16 of table: airlineStats_OFFLINE"}
82
2020/08/18 16:11:17.347 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/18/airlineStats_OFFLINE_16088_16088_17.tar.gz to location: http://localhost:9000 for
83
2020/08/18 16:11:17.815 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
84
2020/08/18 16:11:17.816 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/18/airlineStats_OFFLINE_16088_16088_17.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16088_16088_17 of table: airlineStats_OFFLINE"}
85
2020/08/18 16:11:17.816 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/19/airlineStats_OFFLINE_16089_16089_18.tar.gz to location: http://localhost:9000 for
86
2020/08/18 16:11:18.389 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
87
2020/08/18 16:11:18.389 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/19/airlineStats_OFFLINE_16089_16089_18.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16089_16089_18 of table: airlineStats_OFFLINE"}
88
2020/08/18 16:11:18.390 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/20/airlineStats_OFFLINE_16090_16090_19.tar.gz to location: http://localhost:9000 for
89
2020/08/18 16:11:18.978 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
90
2020/08/18 16:11:18.978 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/20/airlineStats_OFFLINE_16090_16090_19.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16090_16090_19 of table: airlineStats_OFFLINE"}
91
2020/08/18 16:11:18.979 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/21/airlineStats_OFFLINE_16091_16091_20.tar.gz to location: http://localhost:9000 for
92
2020/08/18 16:11:19.586 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
93
2020/08/18 16:11:19.587 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/21/airlineStats_OFFLINE_16091_16091_20.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16091_16091_20 of table: airlineStats_OFFLINE"}
94
2020/08/18 16:11:19.589 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/22/airlineStats_OFFLINE_16092_16092_21.tar.gz to location: http://localhost:9000 for
95
2020/08/18 16:11:20.087 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
96
2020/08/18 16:11:20.087 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/22/airlineStats_OFFLINE_16092_16092_21.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16092_16092_21 of table: airlineStats_OFFLINE"}
97
2020/08/18 16:11:20.088 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/23/airlineStats_OFFLINE_16093_16093_22.tar.gz to location: http://localhost:9000 for
98
2020/08/18 16:11:20.550 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
99
2020/08/18 16:11:20.551 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/23/airlineStats_OFFLINE_16093_16093_22.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16093_16093_22 of table: airlineStats_OFFLINE"}
100
2020/08/18 16:11:20.552 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/24/airlineStats_OFFLINE_16094_16094_23.tar.gz to location: http://localhost:9000 for
101
2020/08/18 16:11:20.978 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
102
2020/08/18 16:11:20.979 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/24/airlineStats_OFFLINE_16094_16094_23.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16094_16094_23 of table: airlineStats_OFFLINE"}
103
2020/08/18 16:11:20.979 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/25/airlineStats_OFFLINE_16095_16095_24.tar.gz to location: http://localhost:9000 for
104
2020/08/18 16:11:21.626 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
105
2020/08/18 16:11:21.628 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/25/airlineStats_OFFLINE_16095_16095_24.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16095_16095_24 of table: airlineStats_OFFLINE"}
106
2020/08/18 16:11:21.628 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/26/airlineStats_OFFLINE_16096_16096_25.tar.gz to location: http://localhost:9000 for
107
2020/08/18 16:11:22.121 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
108
2020/08/18 16:11:22.122 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/26/airlineStats_OFFLINE_16096_16096_25.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16096_16096_25 of table: airlineStats_OFFLINE"}
109
2020/08/18 16:11:22.123 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/27/airlineStats_OFFLINE_16097_16097_26.tar.gz to location: http://localhost:9000 for
110
2020/08/18 16:11:22.679 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
111
2020/08/18 16:11:22.679 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/27/airlineStats_OFFLINE_16097_16097_26.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16097_16097_26 of table: airlineStats_OFFLINE"}
112
2020/08/18 16:11:22.680 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/28/airlineStats_OFFLINE_16098_16098_27.tar.gz to location: http://localhost:9000 for
113
2020/08/18 16:11:23.373 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
114
2020/08/18 16:11:23.374 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/28/airlineStats_OFFLINE_16098_16098_27.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16098_16098_27 of table: airlineStats_OFFLINE"}
115
2020/08/18 16:11:23.375 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/29/airlineStats_OFFLINE_16099_16099_28.tar.gz to location: http://localhost:9000 for
116
2020/08/18 16:11:23.787 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
117
2020/08/18 16:11:23.788 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/29/airlineStats_OFFLINE_16099_16099_28.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16099_16099_28 of table: airlineStats_OFFLINE"}
118
2020/08/18 16:11:23.788 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/30/airlineStats_OFFLINE_16100_16100_29.tar.gz to location: http://localhost:9000 for
119
2020/08/18 16:11:24.298 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
120
2020/08/18 16:11:24.299 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/30/airlineStats_OFFLINE_16100_16100_29.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16100_16100_29 of table: airlineStats_OFFLINE"}
121
2020/08/18 16:11:24.299 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/31/airlineStats_OFFLINE_16101_16101_30.tar.gz to location: http://localhost:9000 for
122
2020/08/18 16:11:24.987 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
123
2020/08/18 16:11:24.987 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/31/airlineStats_OFFLINE_16101_16101_30.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16101_16101_30 of table: airlineStats_OFFLINE"}
Copied!

Spark Job

Setup Spark Cluster (Skip if you already have one)

Please follow this page to setup a local spark cluster.

Submit Spark Job

Below is a sample Spark Ingestion job
1
# executionFrameworkSpec: Defines ingestion jobs to be running.
2
executionFrameworkSpec:
3
4
# name: execution framework name
5
name: 'spark'
6
7
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
8
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
9
10
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
11
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
12
13
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
14
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
15
16
# jobType: Pinot ingestion job type.
17
# Supported job types are:
18
# 'SegmentCreation'
19
# 'SegmentTarPush'
20
# 'SegmentUriPush'
21
# 'SegmentCreationAndTarPush'
22
# 'SegmentCreationAndUriPush'
23
jobType: SegmentCreationAndUriPush
24
25
# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
26
inputDirURI: 's3://my.bucket/batch/airlineStats/rawdata/'
27
28
# includeFileNamePattern: include file name pattern, supported glob pattern.
29
# Sample usage:
30
# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
31
# 'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
32
includeFileNamePattern: 'glob:**/*.avro'
33
34
# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
35
# Sample usage:
36
# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
37
# 'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
38
# _excludeFileNamePattern: ''
39
40
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
41
outputDirURI: 's3://my.bucket/examples/output/airlineStats/segments'
42
43
# overwriteOutput: Overwrite output segments if existed.
44
overwriteOutput: true
45
46
# pinotFSSpecs: defines all related Pinot file systems.
47
pinotFSSpecs:
48
49
- # scheme: used to identify a PinotFS.
50
# E.g. local, hdfs, dbfs, etc
51
scheme: file
52
53
# className: Class name used to create the PinotFS instance.
54
# E.g.
55
# org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
56
# org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
57
# org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
58
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
59
configs:
60
'hadoop.conf.path': ''
61
62
- scheme: s3
63
className: org.apache.pinot.plugin.filesystem.S3PinotFS
64
configs:
65
region: 'us-west-2'
66
67
# recordReaderSpec: defines all record reader
68
recordReaderSpec:
69
70
# dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
71
dataFormat: 'avro'
72
73
# className: Corresponding RecordReader class name.
74
# E.g.
75
# org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
76
# org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
77
# org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
78
# org.apache.pinot.plugin.inputformat.json.JSONRecordReader
79
# org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
80
# org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
81
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
82
83
# tableSpec: defines table name and where to fetch corresponding table config and table schema.
84
tableSpec:
85
86
# tableName: Table name
87
tableName: 'airlineStats'
88
89
# schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
90
# E.g.
91
# hdfs://path/to/table_schema.json
92
# http://localhost:9000/tables/myTable/schema
93
schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
94
95
# tableConfigURI: defines where to reade the table config.
96
# Supports using PinotFS or HTTP.
97
# E.g.
98
# hdfs://path/to/table_config.json
99
# http://localhost:9000/tables/myTable
100
# Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
101
# The real table config is the object under the field 'OFFLINE'.
102
tableConfigURI: 'http://localhost:9000/tables/airlineStats'
103
104
# segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
105
segmentNameGeneratorSpec:
106
107
# type: Current supported type is 'simple' and 'normalizedDate'.
108
type: normalizedDate
109
110
# configs: Configs to init SegmentNameGenerator.
111
configs:
112
segment.name.prefix: 'airlineStats_batch'
113
exclude.sequence.id: true
114
115
# pinotClusterSpecs: defines the Pinot Cluster Access Point.
116
pinotClusterSpecs:
117
- # controllerURI: used to fetch table/schema information and data push.
118
# E.g. http://localhost:9000
119
controllerURI: 'http://localhost:9000'
120
121
# pushJobSpec: defines segment push job related configuration.
122
pushJobSpec:
123
124
# pushParallelism: push job parallelism, default is 1.
125
pushParallelism: 2
126
127
# pushAttempts: number of attempts for push job, default is 1, which means no retry.
128
pushAttempts: 2
129
130
# pushRetryIntervalMillis: retry wait Ms, default to 1 second.
131
pushRetryIntervalMillis: 1000
Copied!
Submit spark job with the ingestion job:
1
${SPARK_HOME}/bin/spark-submit \
2
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
3
--master "local[2]" \
4
--deploy-mode client \
5
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
6
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
7
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
8
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/sparkIngestionJobSpec.yaml
Copied!

Sample Results/Snapshots

Below is the sample snapshot of s3 location for controller:
Sample S3 Controller Storage
Below is a sample download URI in PropertyStore, we expect the segment download uri is started with s3://
Sample segment download URI in PropertyStore
Last modified 2d ago