arrow-left

All pages
gitbookPowered by GitBook
1 of 8

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Tutorials

Use OSS as Deep Storage for Pinot

Configure AliCloud Object Storage Service (OSS) as Pinot deep storage

OSS can be used as HDFS deep storage for Apache Pinot without implement OSS file system plugin. You should follow the steps below; 1. Configure hdfs-site.xml and core-site.xml files. After that, put these configurations under any desired path, then set the value of pinot.<node>.storage.factory.oss.hadoop.conf config on the controller/server configs to this path.

For hdfs-site.xml; you do not have to give any configuration;

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
</configuration>

For core-site.xml; you have to give OSS access/secret and bucket configurations like below;

2. In order to access OSS, find your HDFS jars related to OSS and put them under the PINOT_DIR/lib. You can use jars below but be careful about versions to avoid conflict.

  • smartdata-aliyun-oss

  • smartdata-hadoop-common

  • guava

3. Set OSS deep storage configs on controller.conf and server.conf;

Controller config

Server config

Example Job Spec

Using the same HDFS deep storage configs and jars, you can read data from OSS, then create segments and push them to OSS again. An example standalone batch ingestion job can be like below;

<?xml version="1.0"?>
<configuration>
    <property>
	      <name>fs.defaultFS</name>
	      <value>oss://your-bucket-name/</value>
	  </property>
    <property>
        <name>fs.oss.accessKeyId</name>
        <value>your-access-key-id</value>
    </property>
    <property>
        <name>fs.oss.accessKeySecret</name>
        <value>your-access-key-secret</value>
    </property>
    <property>
        <name>fs.oss.impl</name>
        <value>com.aliyun.emr.fs.oss.OssFileSystem</value>
    </property>
    <property>
        <name>fs.oss.endpoint</name>
        <value>your-oss-endpoint</value>
    </property>
</configuration>
controller.data.dir=oss://your-bucket-name/path/to/segments
controller.local.temp.dir=/path/to/local/temp/directory
controller.enable.split.commit=true
pinot.controller.storage.factory.class.oss=org.apache.pinot.plugin.filesystem.HadoopPinotFS
pinot.controller.storage.factory.oss.hadoop.conf.path=path/to/conf/directory/
pinot.controller.segment.fetcher.protocols=file,http,oss
pinot.controller.segment.fetcher.oss.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
pinot.server.instance.enable.split.commit=true
pinot.server.storage.factory.class.oss=org.apache.pinot.plugin.filesystem.HadoopPinotFS
pinot.server.storage.factory.oss.hadoop.conf.path=path/to/conf/directory/
pinot.server.segment.fetcher.protocols=file,http,oss
pinot.server.segment.fetcher.oss.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
jobType: SegmentCreationAndMetadataPush
inputDirURI: 'oss://your-bucket-name/input'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: 'oss://your-bucket-name/output'
overwriteOutput: true
pinotFSSpecs:
  - scheme: oss
    className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
    configs:
      hadoop.conf.path: '/path/to/hadoop/conf'
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'transcript'
pinotClusterSpecs:
  - controllerURI: '<http://localhost:9000>'

Ingest Parquet Files from S3 Using Spark

One of the primary advantage of using Pinot is its pluggable architecture. The plugins make it easy to add support for any third-party system which can be an execution framework, a filesystem or input format.

In this tutorial, we will use three such plugins to easily ingest data and push it to our pinot cluster. The plugins we will be using are -

  • pinot-batch-ingestion-spark

  • pinot-s3

  • pinot-parquet

You can check out , and for all the available plugins.

hashtag
Setup

We are using the following tools and frameworks for this tutorial -

  • 2.4.0 (Although any spark 2.X/3.X should work)

  • 1.8.2

circle-info

Check out the for latest configuration and FAQs

hashtag
Input Data

We need to get input data to ingest first. For our demo, we'll just create some small parquet files and upload them to our S3 bucket. The easiest way is to create CSV files and then convert them to parquet. CSV makes it human-readable and thus easier to modify input in case of some failure in our demo. We will call this file students.csv

Now, we'll create parquet files from the above CSV file using Spark. Since this is a small program, we will be using Spark shell instead of writing a full fledged Spark code.

The .parquet files can now be found in /path/to/batch_input directory. You can now upload this directory to S3 either using their UI or running the command

hashtag
Create Schema and Table

We need to create a table to query the data that will be ingested. All tables in pinot are associated with a schema. You can check out and for more details on creating configurations.

For our demo, we will have the following schema and table configs

We can now upload these configurations to pinot and create an empty table. We will be using pinot-admin.sh CLI for these purpose.

You can check out for all the available commands.

Our table will now be available in the

hashtag
Ingest Data

Now that our data is available in S3 as well as we have the Tables in Pinot, we can start the process of ingesting the data. Data ingestion in Pinot involves the following steps -

  • Read data and generate compressed segment files from input

  • Upload the compressed segment files to output location

  • Push the location of the segment files to the controller

Once the location is available to the controller, it can notify the servers to download the segment files and populate the tables.

The above steps can be performed using any distributed executor of your choice such as Hadoop, Spark, Flink etc. For this demo we will be using Apache Spark to execute the steps.

Pinot provides runners for Spark out of the box. So as a user, you don't need to write a single line of code. You can write runners for any other executor using our provided interfaces.

Firstly, we will create a job spec configuration file for our data ingestion process.

In the job spec, we have kept execution framework as spark and configured the appropriate runners for each of our steps. We also need a temporary stagingDir for our spark job. This directory is cleaned up after our job has executed.

We also provide the S3 Filesystem and Parquet reader implementation in the config to use. You can refer for complete list of configuration.

We can now run our spark job to execute all the steps and populate data in pinot.

circle-check

You can go through theof our Spark ingestion guide in case you face any errors.

Voila! Now our data is successfully ingested. Let's try to query it from Pinot's broker

If everything went right, you should receive the following output

Schema Evolution

So far, you've seen how to for a Pinot table. In this tutorial, we'll see how to evolve the schema (e.g. add a new column to the schema). This guide assumes you have a Pinot cluster up and running (eg: as mentioned in ). We will also assume there's an existing table baseballStats created as part of the .

circle-info

Pinot only allows adding new columns to the schema. In order to drop a column, change the column name or data type, a new table has to be created.

Apache Pinot 0.10.0arrow-up-right

Batch Ingestion
File systems
Input formats
Apache Sparkarrow-up-right
Apache Parquetarrow-up-right
Amazon S3arrow-up-right
Spark Ingestion Plugin page
Table configuration
Schema configuration
Command-Line Interface (CLI)
Pinot data explorer
Ingestion Job Spec
FAQ section
hashtag
Get the existing schema

Let's begin by first fetching the existing schema. We can do this using the controller API:

hashtag
Add a new column

Let's add a new column at the end of the schema, something like this (by editing baseballStats.schema

In this example, we're adding a new column called yearsOfExperience with a default value of 1.

hashtag
Update the schema

You can now update the schema using the following command

Please note: this will not be reflected immediately. You can use the following command to reload the table segments for this column to show up. This can be done as follows:

This will trigger a reload operation on each of the servers hosting the table's segemnts. The API response has a reloadJobId which can be used to monitor the status of the reload operation using the segment reload status API.

After the reload, now you can query the new column as shown below:

circle-info

Real-Time Pinot table: In case of real-time tables, make sure the "pinot.server.instance.reload.consumingSegment" config is set to true inside Server configarrow-up-right. Without this, the current consuming segment(s) will not reflect the default null value for newly added columns.

Note that the real values for the newly added columns won't be reflected within the current consuming segment(s). The next consuming segment(s) will start consuming the real values.

hashtag
Derived Column

New columns can be added with ingestion transforms. If all the source columns for the new column exist in the schema, the transformed values will be generated for the new column instead of filling default values. Note that derived column as well as corresponding data type needs to be first defined in the schema before making changes in table config for ingestion transform.

hashtag
Backfilling the Data

As you can observe, the current query returns the defaultNullValue for the newly added column. In order to populate this column with real values, you will need to re-run the batch ingestion job for the past dates.

circle-info

Real-Time Pinot table: Backfilling data does not work for real-time tables. If you only have a real-time table, you can convert it to a hybrid table, by adding an offline counterpart that uses the same schema. Then you can backfill the offline table and fill in values for the newly added column. More on hybrid tables herearrow-up-right.

create a new schemaarrow-up-right
https://docs.pinot.apache.org/basics/getting-started/running-pinot-locallyarrow-up-right
batch quick startarrow-up-right
timestampInEpoch,id,name,age,score
1597044264380,1,david,15,98
1597044264381,2,henry,16,97
1597044264382,3,katie,14,99
1597044264383,4,catelyn,15,96
1597044264384,5,emma,13,93
1597044264390,6,john,15,100
1597044264396,7,isabella,13,89
1597044264399,8,linda,17,91
1597044264502,9,mark,16,67
1597044264670,10,tom,14,78
scala> val df = spark.read.format("csv").option("header", true).load("path/to/students.csv")
scala> df.write.option("compression","none").mode("overwrite").parquet("/path/to/batch_input/")
aws s3 cp /path/to/batch_input s3://my-bucket/batch-input/ --recursive
student_schema.json
{
    "schemaName": "students",
    "dimensionFieldSpecs": [
        {
            "name": "id",
            "dataType": "INT"
        },
        {
            "name": "name",
            "dataType": "STRING"
        },
        {
            "name": "age",
            "dataType": "INT"
        }
    ],
    "metricFieldSpecs": [
        {
            "name": "score",
            "dataType": "INT"
        }
    ],
    "dateTimeFieldSpecs": [
        {
            "name": "timestampInEpoch",
            "dataType": "LONG",
            "format": "1:MILLISECONDS:EPOCH",
            "granularity": "1:MILLISECONDS"
        }
    ]
}
student_table.json
{
    "tableName": "students",
    "segmentsConfig": {
        "timeColumnName": "timestampInEpoch",
        "timeType": "MILLISECONDS",
        "replication": "1",
        "schemaName": "students"
    },
    "tableIndexConfig": {
        "invertedIndexColumns": [],
        "loadMode": "MMAP"
    },
    "tenants": {
        "broker": "DefaultTenant",
        "server": "DefaultTenant"
    },
    "tableType": "OFFLINE",
    "metadata": {}
}
pinot-admin.sh AddTable -tableConfigFile /path/to/student_table.json -schemaFile /path/to/student_schema.json -controllerHost localhost -controllerPort 9000 -exec
spark_job_spec.yaml
executionFrameworkSpec:
  name: 'spark'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
  extraConfigs:
      stagingDir: s3://my-bucket/spark/staging/
# jobType: Pinot ingestion job type.
# Supported job types are:
#   'SegmentCreation'
#   'SegmentTarPush'
#   'SegmentUriPush'
#   'SegmentCreationAndTarPush'
#   'SegmentCreationAndUriPush'
#   'SegmentCreationAndMetadataPush'
jobType: SegmentCreationAndMetadataPush
inputDirURI: 's3://my-bucket/path/to/batch-input/'
outputDirURI: 's3:///my-bucket/path/to/batch-output/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: s3
    className: org.apache.pinot.plugin.filesystem.S3PinotFS
    configs:    
      region: 'us-west-2'
recordReaderSpec:
  dataFormat: 'parquet'
  className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
tableSpec:
  tableName: 'students'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'
pushJobSpec:
  pushParallelism: 2
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
export PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin

spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master local --deploy-mode client //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dplugins.include=pinot-s3,pinot-parquet -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" //
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-s3/pinot-s3-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-parquet/pinot-parquet-${PINOT_VERSION}-shaded.jar" //
--conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-s3/pinot-s3-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-parquet/pinot-parquet-${PINOT_VERSION}-shaded.jar" //
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yaml
bin/pinot-admin.sh PostQuery -brokerHost localhost -brokerPort 8000 -queryType sql -query "SELECT * FROM students LIMIT 10"
{
  "resultTable": {
    "dataSchema": {
      "columnNames": [
        "age",
        "id",
        "name",
        "score",
        "timestampInEpoch"
      ],
      "columnDataTypes": [
        "INT",
        "INT",
        "STRING",
        "INT",
        "LONG"
      ]
    },
    "rows": [
      [
        15,
        1,
        "david",
        98,
        1597044264380
      ],
      [
        16,
        2,
        "henry",
        97,
        1597044264381
      ],
      [
        14,
        3,
        "katie",
        99,
        1597044264382
      ],
      [
        15,
        4,
        "catelyn",
        96,
        1597044264383
      ],
      [
        13,
        5,
        "emma",
        93,
        1597044264384
      ],
      [
        15,
        6,
        "john",
        100,
        1597044264390
      ],
      [
        13,
        7,
        "isabella",
        89,
        1597044264396
      ],
      [
        17,
        8,
        "linda",
        91,
        1597044264399
      ],
      [
        16,
        9,
        "mark",
        67,
        1597044264502
      ],
      [
        14,
        10,
        "tom",
        78,
        1597044264670
      ]
    ]
  },
  "exceptions": [],
  "numServersQueried": 1,
  "numServersResponded": 1,
  "numSegmentsQueried": 1,
  "numSegmentsProcessed": 1,
  "numSegmentsMatched": 1,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 10,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 50,
  "numGroupsLimitReached": false,
  "totalDocs": 10,
  "timeUsedMs": 11,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0
}
bin/pinot-admin.sh AddSchema -schemaFile baseballStats.schema -exec
$ curl -F [email protected] localhost:9000/schemas
$ curl localhost:9000/schemas/baseballStats > baseballStats.schema
{
  "schemaName" : "baseballStats",
  "dimensionFieldSpecs" : [ {
  
    ...
    
    }, {
    "name" : "myNewColumn",
    "dataType" : "INT",
    "defaultNullValue": 1
  } ]
}
$ curl -X POST localhost:9000/segments/baseballStats/reload

{"baseballStats_OFFLINE":{"reloadJobId":"98ad3705-58f3-47d0-a02d-d66dc66a9567","reloadJobMetaZKStorageStatus":"SUCCESS","numMessagesSent":"3"}}
$ curl -X GET localhost:9000/segments/segmentReloadStatus/98ad3705-58f3-47d0-a02d-d66dc66a9567

{
  "estimatedTimeRemainingInMinutes": 0,
  "timeElapsedInMinutes": 0.17655,
  "totalServersQueried": 3,
  "successCount": 12,
  "totalSegmentCount": 12,
  "totalServerCallsFailed": 0,
  "metadata": {
    "jobId": "98ad3705-58f3-47d0-a02d-d66dc66a9567",
    "messageCount": "3",
    "submissionTimeMs": "1661753088066",
    "jobType": "RELOAD_ALL_SEGMENTS",
    "tableName": "baseballStats_OFFLINE"
  }
}
$ bin/pinot-admin.sh PostQuery \
  -queryType sql \
  -brokerPort 8000 \
  -query "select playerID, yearsOfExperience from baseballStats limit 10" 2>/dev/null
Executing command: PostQuery -brokerHost 192.168.86.234 -brokerPort 8000 -queryType sql -query select playerID, yearsOfExperience from baseballStats limit 10
Result: {"resultTable":{"dataSchema":{"columnNames":["playerID","yearsOfExperience"],"columnDataTypes":["STRING","INT"]},"rows":[["aardsda01",1],["aardsda01",1],["aardsda01",1],["aardsda01",1],["aardsda01",1],["aardsda01",1],["aardsda01",1],["aaronha01",1],["aaronha01",1],["aaronha01",1]]},"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numSegmentsQueried":1,"numSegmentsProcessed":1,"numSegmentsMatched":1,"numConsumingSegmentsQueried":0,"numDocsScanned":10,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":20,"numGroupsLimitReached":false,"totalDocs":97889,"timeUsedMs":3,"segmentStatistics":[],"traceInfo":{},"minConsumingFreshnessTimeMs":0}

Creating Pinot Segments

hashtag
Creating Pinot segments

Pinot segments can be created offline on Hadoop, or via command line from data files. Controller REST endpoint can then be used to add the segment to the table to which the segment belongs. Pinot segments can also be created by ingesting data from realtime resources (such as Kafka).

hashtag
Creating segments using hadoop

Offline Pinot workflow

To create Pinot segments on Hadoop, a workflow can be created to complete the following steps:

  1. Pre-aggregate, clean up and prepare the data, writing it as Avro format files in a single HDFS directory

  2. Create segments

  3. Upload segments to the Pinot cluster

Step one can be done using your favorite tool (such as Pig, Hive or Spark), Pinot provides two MapReduce jobs to do step two and three.

hashtag
Configuring the job

Create a job properties configuration file, such as one below:

hashtag
Executing the job

The Pinot Hadoop module contains a job that you can incorporate into your workflow to generate Pinot segments.

You can then use the SegmentTarPush job to push segments via the controller REST API.

hashtag
Creating Pinot segments outside of Hadoop

Here is how you can create Pinot segments from standard formats like CSV/JSON/AVRO.

  1. Follow the steps described in the section on to build pinot. Locate pinot-admin.sh in pinot-tools/target/pinot-tools=pkg/bin/pinot-admin.sh.

  2. Create a top level directory containing all the CSV/JSON/AVRO files that need to be converted into segments.

Run the pinot-admin command to generate the segments. The command can be invoked as follows. Options within “[ ]” are optional. For -format, the default value is AVRO.

To configure various parameters for CSV a config file in JSON format can be provided. This file is optional, as are each of its parameters. When not provided, default values used for these parameters are described below:

  1. fileFormat: Specify one of the following. Default is EXCEL.

    1. EXCEL

    2. MYSQL

Below is a sample config file.

Sample Schema:

hashtag
Pushing offline segments to Pinot

You can use curl to push a segment to pinot:

Alternatively you can use the pinot-admin.sh utility to upload one or more segments:

The command uploads all the segments found in segmentDirectoryPath. The segments could be either tar-compressed (in which case it is a file under segmentDirectoryPath) or uncompressed (in which case it is a directory under segmentDirectoryPath).

The file name extensions are expected to be the same as the format name (i.e .csv, .json or .avro), and are case insensitive. Note that the converter expects the .csv extension even if the data is delimited using tabs or spaces instead.
  • Prepare a schema file describing the schema of the input data. The schema needs to be in JSON format. See example later in this section.

  • Specifically for CSV format, an optional csv config file can be provided (also in JSON format). This is used to configure parameters like the delimiter/header for the CSV file etc. A detailed description of this follows below.

  • RFC4180
  • TDF

  • header: If the input CSV file does not contain a header, it can be specified using this field. Note, if this is specified, then the input file is expected to not contain the header row, or else it will result in parse error. The columns in the header must be delimited by the same delimiter character as the rest of the CSV file.

  • delimiter: Use this to specify a delimiter character. The default value is “,”.

  • multiValueDelimiter: Use this to specify a delimiter character for each value in multi-valued columns. The default value is “;”.

  • Compiling the code
    # === Index segment creation job config ===
    
    # path.to.input: Input directory containing Avro files
    path.to.input=/user/pinot/input/data
    
    # path.to.output: Output directory containing Pinot segments
    path.to.output=/user/pinot/output
    
    # path.to.schema: Schema file for the table, stored locally
    path.to.schema=flights-schema.json
    
    # segment.table.name: Name of the table for which to generate segments
    segment.table.name=flights
    
    # === Segment tar push job config ===
    
    # push.to.hosts: Comma separated list of controllers host names to which to push
    push.to.hosts=controller_host_0,controller_host_1
    
    # push.to.port: The port on which the controller runs
    push.to.port=8888
    mvn clean install -DskipTests -Pbuild-shaded-jar
    hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentCreation job.properties
    hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentTarPush job.properties
    bin/pinot-admin.sh CreateSegment -dataDir <input_data_dir> [-format [CSV/JSON/AVRO]] [-readerConfigFile <csv_config_file>] [-generatorConfigFile <generator_config_file>] -segmentName <segment_name> -schemaFile <input_schema_file> -tableName <table_name> -outDir <output_data_dir> [-overwrite]
    {
      "fileFormat": "EXCEL",
      "header": "col1,col2,col3,col4",
      "delimiter": "\t",
      "multiValueDelimiter": ","
    }
    {
      "schemaName": "flights",
      "dimensionFieldSpecs": [
        {
          "name": "flightNumber",
          "dataType": "LONG"
        },
        {
          "name": "tags",
          "dataType": "STRING",
          "singleValueField": false
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "price",
          "dataType": "DOUBLE"
        }
      ],
      "timeFieldSpec": {
        "incomingGranularitySpec": {
          "name": "daysSinceEpoch",
          "dataType": "INT",
          "timeType": "DAYS"
        }
      }
    }
    curl -X POST -F segment=@<segment-tar-file-path> http://controllerHost:controllerPort/segments
    pinot-tools/target/pinot-tools-pkg/bin//pinot-admin.sh UploadSegment -controllerHost <hostname> -controllerPort <port> -segmentDir <segmentDirectoryPath>

    Use S3 and Pinot in Docker

    hashtag
    Setup Pinot Cluster

    In order to setup Pinot in Docker to use S3 as deep store, we need to put extra configs for Controller and Server.

    hashtag
    Create a docker network

    hashtag
    Start Zookeeper

    hashtag
    Prepare Pinot configuration files

    Below sections will prepare 3 config files under /tmp/pinot-s3-docker to mount to the container.

    hashtag
    Start Controller

    Below is a sample controller.conf file.

    circle-info

    Please config:

    controller.data.dirto your s3 bucket. All the uploaded segments will be stored there.

    circle-info

    And add s3 as a pinot storage with configs:

    Regarding AWS Credential, we also follow the convention of .

    You can specify AccessKey and Secret using:

    circle-info

    Add s3 to pinot.controller.segment.fetcher.protocols

    and set pinot.controller.segment.fetcher.s3.class toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher

    Then start pinot controller with:

    hashtag
    Start Broker

    Broker is a simple one you can just start it with default:

    hashtag
    Start Server

    Below is a sample server.conf file

    circle-info

    Similar to controller config, please also set s3 configs in pinot server.

    Then start pinot server with:

    hashtag
    Setup Table

    In this demo, we just use airlineStats table as an example which is already packaged inside the docker image.

    You can also mount your table conf and schema files to the container and run it.

    hashtag
    Set up Ingestion Jobs

    hashtag
    Standalone Job

    Below is a sample standalone ingestion job spec with certain notable changes:

    • jobType is SegmentCreationAndMetadataPush (this job will bypass controller download segment )

    • inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/

    • outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments

    Sample ingestionJobSpec.yaml

    Launch the data ingestion job:

    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)
  • Java System Properties - aws.accessKeyId and aws.secretKey

  • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

  • Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey and pinot.controller.storage.factory.s3.secretKey in the config file. (Not recommended)

  • Add a new PinotFs under pinotFSSpecs

    DefaultAWSCredentialsProviderChainarrow-up-right
    - scheme: s3
      className: org.apache.pinot.plugin.filesystem.S3PinotFS
      configs:
        region: 'us-west-2'
    docker network create -d bridge pinot-demo
    docker run \
        --name zookeeper \
        --restart always \
        --network=pinot-demo \
        -d zookeeper:3.5.6
    /tmp/pinot-s3-docker/
                         controller.conf
                         server.conf
                         ingestionJobSpec.yaml
    pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.controller.storage.factory.s3.region=us-west-2
    pinot.role=controller
    pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.controller.storage.factory.s3.region=us-west-2
    controller.data.dir=s3://<my-bucket>/pinot-data/pinot-s3-example-docker/controller-data/
    controller.local.temp.dir=/tmp/pinot-tmp-data/
    controller.helix.cluster.name=pinot-s3-example-docker
    controller.zk.str=zookeeper:2181
    controller.port=9000
    controller.enable.split.commit=true
    pinot.controller.segment.fetcher.protocols=file,http,s3
    pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    docker run --rm -ti \
        --name pinot-controller \
        --network=pinot-demo \
        -p 9000:9000 \
        --env AWS_ACCESS_KEY_ID=<aws-access-key-id> \
        --env AWS_SECRET_ACCESS_KEY=<aws-secret-access-key> \
        --mount type=bind,source=/tmp/pinot-s3-docker,target=/tmp \
        apachepinot/pinot:0.6.0-SNAPSHOT-ca8545b29-20201105-jdk11 StartController \
        -configFileName /tmp/controller.conf
    docker run --rm -ti \
        --name pinot-broker \
        --network=pinot-demo \
        --env AWS_ACCESS_KEY_ID=<aws-access-key-id> \
        --env AWS_SECRET_ACCESS_KEY=<aws-secret-access-key> \
        apachepinot/pinot:0.6.0-SNAPSHOT-ca8545b29-20201105-jdk11 StartBroker \
        -zkAddress zookeeper:2181 -clusterName pinot-s3-example-docker
    pinot.server.netty.port=8098
    pinot.server.adminapi.port=8097
    pinot.server.instance.dataDir=/tmp/pinot-tmp/server/index
    pinot.server.instance.segmentTarDir=/tmp/pinot-tmp/server/segmentTars
    
    
    pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.server.storage.factory.s3.region=us-west-2
    pinot.server.segment.fetcher.protocols=file,http,s3
    pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    docker run --rm -ti \
        --name pinot-server \
        --network=pinot-demo \
        --env AWS_ACCESS_KEY_ID=<aws-access-key-id> \
        --env AWS_SECRET_ACCESS_KEY=<aws-secret-access-key> \
        --mount type=bind,source=/tmp/pinot-s3-docker,target=/tmp \
        apachepinot/pinot:0.6.0-SNAPSHOT-ca8545b29-20201105-jdk11 StartServer \
        -zkAddress zookeeper:2181 -clusterName pinot-s3-example-docker \
        -configFileName /tmp/server.conf
    docker run --rm -ti \
        --name pinot-ingestion-job \
        --network=pinot-demo \
        apachepinot/pinot:0.6.0-SNAPSHOT-ca8545b29-20201105-jdk11 AddTable \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -exec
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'standalone'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    
      # segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.IngestionJobRunner interface.
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
    
    # jobType: Pinot ingestion job type.
    # Supported job types are:
    #   'SegmentCreation'
    #   'SegmentTarPush'
    #   'SegmentUriPush'
    #   'SegmentCreationAndTarPush'
    #   'SegmentCreationAndUriPush'
    jobType: SegmentCreationAndMetadataPush
    
    # inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
    inputDirURI: 's3://<my-bucket>/pinot-data/rawdata/airlineStats/rawdata/'
    
    # includeFileNamePattern: include file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
    #   'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
    includeFileNamePattern: 'glob:**/*.avro'
    
    # excludeFileNamePattern: exclude file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
    #   'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
    # _excludeFileNamePattern: ''
    
    # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
    outputDirURI: 's3://<my-bucket>/pinot-data/pinot-s3-docker/segments/airlineStats'
    
    # segmentCreationJobParallelism: The parallelism to create egments.
    segmentCreationJobParallelism: 5
    
    # overwriteOutput: Overwrite output segments if existed.
    overwriteOutput: true
    
    # pinotFSSpecs: defines all related Pinot file systems.
    pinotFSSpecs:
    
      - # scheme: used to identify a PinotFS.
        # E.g. local, hdfs, dbfs, etc
        scheme: file
    
        # className: Class name used to create the PinotFS instance.
        # E.g.
        #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
        #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
        #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    
    
      - scheme: s3
        className: org.apache.pinot.plugin.filesystem.S3PinotFS
        configs:
          region: 'us-west-2'
    
    # recordReaderSpec: defines all record reader
    recordReaderSpec:
    
      # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
      dataFormat: 'avro'
    
      # className: Corresponding RecordReader class name.
      # E.g.
      #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
      #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
      #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
      #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
      #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
      #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
      className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    
    # tableSpec: defines table name and where to fetch corresponding table config and table schema.
    tableSpec:
    
      # tableName: Table name
      tableName: 'airlineStats'
    
      # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_schema.json
      #   http://localhost:9000/tables/myTable/schema
      schemaURI: 'http://pinot-controller:9000/tables/airlineStats/schema'
    
      # tableConfigURI: defines where to reade the table config.
      # Supports using PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_config.json
      #   http://localhost:9000/tables/myTable
      # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
      # The real table config is the object under the field 'OFFLINE'.
      tableConfigURI: 'http://pinot-controller:9000/tables/airlineStats'
    
    # pinotClusterSpecs: defines the Pinot Cluster Access Point.
    pinotClusterSpecs:
      - # controllerURI: used to fetch table/schema information and data push.
        # E.g. http://localhost:9000
        controllerURI: 'http://pinot-controller:9000'
    
    # pushJobSpec: defines segment push job related configuration.
    pushJobSpec:
    
      # pushAttempts: number of attempts for push job, default is 1, which means no retry.
      pushAttempts: 2
    
      # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
      pushRetryIntervalMillis: 1000
    
    docker run --rm -ti \
        --name pinot-ingestion-job \
        --network=pinot-demo \
        --env AWS_ACCESS_KEY_ID=<aws-access-key-id> \
        --env AWS_SECRET_ACCESS_KEY=<aws-secret-access-key> \
        --mount type=bind,source=/tmp/pinot-s3-docker,target=/tmp \
        apachepinot/pinot:0.6.0-SNAPSHOT-ca8545b29-20201105-jdk11 LaunchDataIngestionJob \
        -jobSpecFile /tmp/ingestionJobSpec.yaml
    pinot.controller.storage.factory.s3.accessKey=****************LFVX
    pinot.controller.storage.factory.s3.secretKey=****************gfhz

    Use S3 as Deep Storage for Pinot

    circle-info

    Below commands are based on pinot distribution binary.

    hashtag
    Setup Pinot Cluster

    In order to setup Pinot to use S3 as deep store, we need to put extra configs for Controller and Server.

    hashtag
    Start Controller

    Below is a sample controller.conf file.

    circle-info

    Please config:

    controller.data.dirto your s3 bucket. All the uploaded segments will be stored there.

    circle-info

    And add s3 as a pinot storage with configs:

    Regarding AWS Credential, we also follow the convention of .

    You can specify AccessKey and Secret using:

    circle-info

    Add s3 to pinot.controller.segment.fetcher.protocols

    and set pinot.controller.segment.fetcher.s3.class toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher

    If you to grant full control to bucket owner, then add this to the config:

    Then start pinot controller with:

    hashtag
    Start Broker

    Broker is a simple one you can just start it with default:

    hashtag
    Start Server

    Below is a sample server.conf file

    circle-info

    Similar to controller config, please also set s3 configs in pinot server.

    If you to grant full control to bucket owner, then add this to the config:

    Then start pinot controller with:

    hashtag
    Setup Table

    In this demo, we just use airlineStats table as an example.

    Create table with below command:

    hashtag
    Set up Ingestion Jobs

    hashtag
    Standalone Job

    Below is a sample standalone ingestion job spec with certain notable changes:

    • jobType is SegmentCreationAndUriPush

    • inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/

    • outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments

    Sample ingestionJobSpec.yaml

    Below is a sample job output:

    hashtag
    Spark Job

    hashtag
    Setup Spark Cluster (Skip if you already have one)

    Please follow this to setup a local spark cluster.

    hashtag
    Submit Spark Job

    Below is a sample Spark Ingestion job

    Submit spark job with the ingestion job:

    hashtag
    Sample Results/Snapshots

    Below is the sample snapshot of s3 location for controller:

    Below is a sample download URI in PropertyStore, we expect the segment download uri is started with s3://

    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)
  • Java System Properties - aws.accessKeyId and aws.secretKey

  • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

  • Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey and pinot.controller.storage.factory.s3.secretKey in the config file. (Not recommended)

  • Add a new PinotFs under pinotFSSpecs

  • For library version < 0.6.0, please set segmentUriPrefix to [scheme]://[bucket.name], e.g. s3://my.bucket , from version 0.6.0, you can put empty string or just ignore segmentUriPrefix.

  • DefaultAWSCredentialsProviderChainarrow-up-right
    pagearrow-up-right
    Sample S3 Controller Storage
    Sample segment download URI in PropertyStore
    - scheme: s3
      className: org.apache.pinot.plugin.filesystem.S3PinotFS
      configs:
        region: 'us-west-2'
    pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.controller.storage.factory.s3.region=us-west-2
    controller.data.dir=s3://my.bucket/pinot-data/pinot-s3-example/controller-data
    controller.local.temp.dir=/tmp/pinot-tmp-data/
    controller.zk.str=localhost:2181
    controller.host=127.0.0.1
    controller.port=9000
    controller.helix.cluster.name=pinot-s3-example
    pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.controller.storage.factory.s3.region=us-west-2
    
    pinot.controller.segment.fetcher.protocols=file,http,s3
    pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.controller.storage.factory.s3.disableAcl=false
    bin/pinot-admin.sh StartController -configFileName conf/controller.conf
    bin/pinot-admin.sh StartBroker -zkAddress localhost:2181 -clusterName pinot-s3-example
    pinot.server.netty.port=8098
    pinot.server.adminapi.port=8097
    pinot.server.instance.dataDir=/tmp/pinot-tmp/server/index
    pinot.server.instance.segmentTarDir=/tmp/pinot-tmp/server/segmentTars
    
    
    pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.server.storage.factory.s3.region=us-west-2
    pinot.server.segment.fetcher.protocols=file,http,s3
    pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.controller.storage.factory.s3.disableAcl=false
    bin/pinot-admin.sh StartServer -configFileName conf/server.conf -zkAddress localhost:2181 -clusterName pinot-s3-example
    bin/pinot-admin.sh AddTable  -schemaFile examples/batch/airlineStats/airlineStats_schema.json -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -exec
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'standalone'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    
    # jobType: Pinot ingestion job type.
    # Supported job types are:
    #   'SegmentCreation'
    #   'SegmentTarPush'
    #   'SegmentUriPush'
    #   'SegmentCreationAndTarPush'
    #   'SegmentCreationAndUriPush'
    #jobType: SegmentCreationAndUriPush
    jobType: SegmentCreationAndUriPush
    # inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
    inputDirURI: 's3://my.bucket/batch/airlineStats/rawdata/'
    
    # includeFileNamePattern: include file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
    #   'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
    includeFileNamePattern: 'glob:**/*.avro'
    
    # excludeFileNamePattern: exclude file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
    #   'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
    # _excludeFileNamePattern: ''
    
    # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
    outputDirURI: 's3://my.bucket/examples/output/airlineStats/segments'
    
    # overwriteOutput: Overwrite output segments if existed.
    overwriteOutput: true
    
    # pinotFSSpecs: defines all related Pinot file systems.
    pinotFSSpecs:
    
      - # scheme: used to identify a PinotFS.
        # E.g. local, hdfs, dbfs, etc
        scheme: file
    
        # className: Class name used to create the PinotFS instance.
        # E.g.
        #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
        #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
        #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    
    
      - scheme: s3
        className: org.apache.pinot.plugin.filesystem.S3PinotFS
        configs:
          region: 'us-west-2'
    
    # recordReaderSpec: defines all record reader
    recordReaderSpec:
    
      # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
      dataFormat: 'avro'
    
      # className: Corresponding RecordReader class name.
      # E.g.
      #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
      #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
      #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
      #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
      #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
      #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
      className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    
    # tableSpec: defines table name and where to fetch corresponding table config and table schema.
    tableSpec:
    
      # tableName: Table name
      tableName: 'airlineStats'
    
      # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_schema.json
      #   http://localhost:9000/tables/myTable/schema
      schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
    
      # tableConfigURI: defines where to reade the table config.
      # Supports using PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_config.json
      #   http://localhost:9000/tables/myTable
      # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
      # The real table config is the object under the field 'OFFLINE'.
      tableConfigURI: 'http://localhost:9000/tables/airlineStats'
    
    # pinotClusterSpecs: defines the Pinot Cluster Access Point.
    pinotClusterSpecs:
      - # controllerURI: used to fetch table/schema information and data push.
        # E.g. http://localhost:9000
        controllerURI: 'http://localhost:9000'
    
    # pushJobSpec: defines segment push job related configuration.
    pushJobSpec:
    
      # pushAttempts: number of attempts for push job, default is 1, which means no retry.
      pushAttempts: 2
    
      # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
      pushRetryIntervalMillis: 1000
    
      # For Pinot version < 0.6.0, use [scheme]://[bucket.name] as prefix.
      # E.g. s3://my.bucket
      segmentUriPrefix: 's3://my.bucket'
      segmentUriSuffix: ''
    bin/pinot-admin.sh LaunchDataIngestionJob -jobSpecFile  ~/temp/pinot/pinot-s3-test/ingestionJobSpec.yaml
    2020/08/18 16:11:03.521 INFO [IngestionJobLauncher] [main] SegmentGenerationJobSpec:
    !!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
    excludeFileNamePattern: null
    executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
      segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
      segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
    includeFileNamePattern: glob:**/*.avro
    inputDirURI: s3://my.bucket/batch/airlineStats/rawdata/
    jobType: SegmentUriPush
    outputDirURI: s3://my.bucket/examples/output/airlineStats/segments
    overwriteOutput: true
    pinotClusterSpecs:
    - {controllerURI: 'http://localhost:9000'}
    pinotFSSpecs:
    - {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
    - className: org.apache.pinot.plugin.filesystem.S3PinotFS
      configs: {region: us-west-2}
      scheme: s3
    pushJobSpec: {pushAttempts: 2, pushParallelism: 1, pushRetryIntervalMillis: 1000,
      segmentUriPrefix: '', segmentUriSuffix: ''}
    recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
      configClassName: null, configs: null, dataFormat: avro}
    segmentNameGeneratorSpec: null
    tableSpec: {schemaURI: 'http://localhost:9000/tables/airlineStats/schema', tableConfigURI: 'http://localhost:9000/tables/airlineStats',
      tableName: airlineStats}
    
    2020/08/18 16:11:03.531 INFO [IngestionJobLauncher] [main] Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner
    2020/08/18 16:11:03.654 INFO [PinotFSFactory] [main] Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
    2020/08/18 16:11:03.656 INFO [PinotFSFactory] [main] Initializing PinotFS for scheme s3, classname org.apache.pinot.plugin.filesystem.S3PinotFS
    2020/08/18 16:11:05.520 INFO [SegmentPushUtils] [main] Start sending table airlineStats segment URIs: [s3://my.bucket/examples/output/airlineStats/segments/2014/01/01/airlineStats_OFFLINE_16071_16071_0.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/02/airlineStats_OFFLINE_16072_16072_1.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/03/airlineStats_OFFLINE_16073_16073_2.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/04/airlineStats_OFFLINE_16074_16074_3.tar.gz, s3://my.bucket/examples/output/airlineStats/segments/2014/01/05/airlineStats_OFFLINE_16075_16075_4.tar.gz] to locations: [org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec@4e07b95f]
    2020/08/18 16:11:05.521 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/01/airlineStats_OFFLINE_16071_16071_0.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:09.356 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:09.358 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/01/airlineStats_OFFLINE_16071_16071_0.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16071_16071_0 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:09.359 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/02/airlineStats_OFFLINE_16072_16072_1.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:09.824 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:09.825 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/02/airlineStats_OFFLINE_16072_16072_1.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16072_16072_1 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:09.825 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/03/airlineStats_OFFLINE_16073_16073_2.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:10.500 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:10.501 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/03/airlineStats_OFFLINE_16073_16073_2.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16073_16073_2 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:10.501 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/04/airlineStats_OFFLINE_16074_16074_3.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:10.967 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:10.968 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/04/airlineStats_OFFLINE_16074_16074_3.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16074_16074_3 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:10.969 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/05/airlineStats_OFFLINE_16075_16075_4.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:11.420 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:11.420 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/05/airlineStats_OFFLINE_16075_16075_4.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16075_16075_4 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:11.421 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/06/airlineStats_OFFLINE_16076_16076_5.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:11.872 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:11.873 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/06/airlineStats_OFFLINE_16076_16076_5.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16076_16076_5 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:11.877 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/07/airlineStats_OFFLINE_16077_16077_6.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:12.293 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:12.294 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/07/airlineStats_OFFLINE_16077_16077_6.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16077_16077_6 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:12.295 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/08/airlineStats_OFFLINE_16078_16078_7.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:12.672 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:12.673 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/08/airlineStats_OFFLINE_16078_16078_7.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16078_16078_7 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:12.674 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/09/airlineStats_OFFLINE_16079_16079_8.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:13.048 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:13.050 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/09/airlineStats_OFFLINE_16079_16079_8.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16079_16079_8 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:13.051 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/10/airlineStats_OFFLINE_16080_16080_9.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:13.483 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:13.485 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/10/airlineStats_OFFLINE_16080_16080_9.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16080_16080_9 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:13.486 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/11/airlineStats_OFFLINE_16081_16081_10.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:14.080 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:14.081 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/11/airlineStats_OFFLINE_16081_16081_10.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16081_16081_10 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:14.082 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/12/airlineStats_OFFLINE_16082_16082_11.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:14.477 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:14.477 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/12/airlineStats_OFFLINE_16082_16082_11.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16082_16082_11 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:14.478 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/13/airlineStats_OFFLINE_16083_16083_12.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:14.865 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:14.866 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/13/airlineStats_OFFLINE_16083_16083_12.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16083_16083_12 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:14.867 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/14/airlineStats_OFFLINE_16084_16084_13.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:15.257 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:15.258 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/14/airlineStats_OFFLINE_16084_16084_13.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16084_16084_13 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:15.258 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/15/airlineStats_OFFLINE_16085_16085_14.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:15.917 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:15.919 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/15/airlineStats_OFFLINE_16085_16085_14.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16085_16085_14 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:15.919 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/16/airlineStats_OFFLINE_16086_16086_15.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:16.719 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:16.720 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/16/airlineStats_OFFLINE_16086_16086_15.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16086_16086_15 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:16.720 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/17/airlineStats_OFFLINE_16087_16087_16.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:17.346 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:17.347 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/17/airlineStats_OFFLINE_16087_16087_16.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16087_16087_16 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:17.347 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/18/airlineStats_OFFLINE_16088_16088_17.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:17.815 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:17.816 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/18/airlineStats_OFFLINE_16088_16088_17.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16088_16088_17 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:17.816 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/19/airlineStats_OFFLINE_16089_16089_18.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:18.389 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:18.389 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/19/airlineStats_OFFLINE_16089_16089_18.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16089_16089_18 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:18.390 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/20/airlineStats_OFFLINE_16090_16090_19.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:18.978 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:18.978 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/20/airlineStats_OFFLINE_16090_16090_19.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16090_16090_19 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:18.979 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/21/airlineStats_OFFLINE_16091_16091_20.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:19.586 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:19.587 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/21/airlineStats_OFFLINE_16091_16091_20.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16091_16091_20 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:19.589 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/22/airlineStats_OFFLINE_16092_16092_21.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:20.087 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:20.087 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/22/airlineStats_OFFLINE_16092_16092_21.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16092_16092_21 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:20.088 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/23/airlineStats_OFFLINE_16093_16093_22.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:20.550 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:20.551 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/23/airlineStats_OFFLINE_16093_16093_22.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16093_16093_22 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:20.552 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/24/airlineStats_OFFLINE_16094_16094_23.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:20.978 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:20.979 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/24/airlineStats_OFFLINE_16094_16094_23.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16094_16094_23 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:20.979 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/25/airlineStats_OFFLINE_16095_16095_24.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:21.626 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:21.628 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/25/airlineStats_OFFLINE_16095_16095_24.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16095_16095_24 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:21.628 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/26/airlineStats_OFFLINE_16096_16096_25.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:22.121 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:22.122 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/26/airlineStats_OFFLINE_16096_16096_25.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16096_16096_25 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:22.123 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/27/airlineStats_OFFLINE_16097_16097_26.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:22.679 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:22.679 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/27/airlineStats_OFFLINE_16097_16097_26.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16097_16097_26 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:22.680 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/28/airlineStats_OFFLINE_16098_16098_27.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:23.373 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:23.374 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/28/airlineStats_OFFLINE_16098_16098_27.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16098_16098_27 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:23.375 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/29/airlineStats_OFFLINE_16099_16099_28.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:23.787 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:23.788 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/29/airlineStats_OFFLINE_16099_16099_28.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16099_16099_28 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:23.788 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/30/airlineStats_OFFLINE_16100_16100_29.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:24.298 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:24.299 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/30/airlineStats_OFFLINE_16100_16100_29.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16100_16100_29 of table: airlineStats_OFFLINE"}
    2020/08/18 16:11:24.299 INFO [SegmentPushUtils] [main] Sending table airlineStats segment URI: s3://my.bucket/examples/output/airlineStats/segments/2014/01/31/airlineStats_OFFLINE_16101_16101_30.tar.gz to location: http://localhost:9000 for
    2020/08/18 16:11:24.987 INFO [FileUploadDownloadClient] [main] Sending request: http://localhost:9000/v2/segments to controller: localhost, version: Unknown
    2020/08/18 16:11:24.987 INFO [SegmentPushUtils] [main] Response for pushing table airlineStats segment uri s3://my.bucket/examples/output/airlineStats/segments/2014/01/31/airlineStats_OFFLINE_16101_16101_30.tar.gz to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16101_16101_30 of table: airlineStats_OFFLINE"}
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'spark'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
    
    # jobType: Pinot ingestion job type.
    # Supported job types are:
    #   'SegmentCreation'
    #   'SegmentTarPush'
    #   'SegmentUriPush'
    #   'SegmentCreationAndTarPush'
    #   'SegmentCreationAndUriPush'
    jobType: SegmentCreationAndUriPush
    
    # inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
    inputDirURI: 's3://my.bucket/batch/airlineStats/rawdata/'
    
    # includeFileNamePattern: include file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
    #   'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
    includeFileNamePattern: 'glob:**/*.avro'
    
    # excludeFileNamePattern: exclude file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
    #   'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
    # _excludeFileNamePattern: ''
    
    # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
    outputDirURI: 's3://my.bucket/examples/output/airlineStats/segments'
    
    # overwriteOutput: Overwrite output segments if existed.
    overwriteOutput: true
    
    # pinotFSSpecs: defines all related Pinot file systems.
    pinotFSSpecs:
    
      - # scheme: used to identify a PinotFS.
        # E.g. local, hdfs, dbfs, etc
        scheme: file
    
        # className: Class name used to create the PinotFS instance.
        # E.g.
        #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
        #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
        #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
        className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
        configs:
          'hadoop.conf.path': ''
    
      - scheme: s3
        className: org.apache.pinot.plugin.filesystem.S3PinotFS
        configs:
          region: 'us-west-2'
    
    # recordReaderSpec: defines all record reader
    recordReaderSpec:
    
      # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
      dataFormat: 'avro'
    
      # className: Corresponding RecordReader class name.
      # E.g.
      #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
      #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
      #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
      #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
      #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
      #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
      className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    
    # tableSpec: defines table name and where to fetch corresponding table config and table schema.
    tableSpec:
    
      # tableName: Table name
      tableName: 'airlineStats'
    
      # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_schema.json
      #   http://localhost:9000/tables/myTable/schema
      schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
    
      # tableConfigURI: defines where to reade the table config.
      # Supports using PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_config.json
      #   http://localhost:9000/tables/myTable
      # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
      # The real table config is the object under the field 'OFFLINE'.
      tableConfigURI: 'http://localhost:9000/tables/airlineStats'
    
    # segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
    segmentNameGeneratorSpec:
    
      # type: Current supported type is 'simple' and 'normalizedDate'.
      type: normalizedDate
    
      # configs: Configs to init SegmentNameGenerator.
      configs:
        segment.name.prefix: 'airlineStats_batch'
        exclude.sequence.id: true
    
    # pinotClusterSpecs: defines the Pinot Cluster Access Point.
    pinotClusterSpecs:
      - # controllerURI: used to fetch table/schema information and data push.
        # E.g. http://localhost:9000
        controllerURI: 'http://localhost:9000'
    
    # pushJobSpec: defines segment push job related configuration.
    pushJobSpec:
    
      # pushParallelism: push job parallelism, default is 1.
      pushParallelism: 2
    
      # pushAttempts: number of attempts for push job, default is 1, which means no retry.
      pushAttempts: 2
    
      # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
      pushRetryIntervalMillis: 1000
    ${SPARK_HOME}/bin/spark-submit \
      --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
      --master "local[2]" \
      --deploy-mode client \
      --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
      --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
      local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
      -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/sparkIngestionJobSpec.yaml
    pinot.controller.storage.factory.s3.accessKey=****************LFVX
    pinot.controller.storage.factory.s3.secretKey=****************gfhz

    Batch Data Ingestion In Practice

    In practice, we need to run Pinot data ingestion as a pipeline or a scheduled job.

    Assuming pinot-distribution is already built, inside examples directory, you could find several sample table layouts.

    hashtag
    Table Layout

    Usually each table deserves its own directory, like airlineStats.

    Inside the table directory, rawdata is created to put all the input data.

    Typically, for data events with timestamp, we partition those data and store them into a daily folder. E.g. a typically layout would follow this pattern: rawdata/%yyyy%/%mm%/%dd%/[daily_input_files].

    hashtag
    Configuring batch ingestion job

    Create a batch ingestion job spec file to describe how to ingest the data.

    Below is an example (also located at examples/batch/airlineStats/ingestionJobSpec.yaml)

    hashtag
    Executing the job

    Below command will create example table into Pinot cluster.

    Below command will kick off the ingestion job to generate Pinot segments and push them into the cluster.

    After job finished, segments are stored in examples/batch/airlineStats/segments following same layout of input directory layout.

    hashtag
    Executing the job using Spark

    Below example is running in a spark local mode. You can download spark distribution and start it by running:

    Build latest Pinot Distribution following this .

    Below command shows how to use spark-submit command to submit a spark job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar.

    Sample Spark ingestion job spec yaml, (also located at examples/batch/airlineStats/sparkIngestionJobSpec.yaml):

    Please ensure parameter PINOT_ROOT_DIR and PINOT_VERSION are set properly.

    circle-info

    Please ensure you set

    • spark.driver.extraJavaOptions =>

      -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins

    hashtag
    Executing the job using Hadoop

    Below command shows how to use Hadoop jar command to run a Hadoop job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar.

    Sample Hadoop ingestion job spec yaml(also located at examples/batch/airlineStats/hadoopIngestionJobSpec.yaml):

    Please ensure parameter PINOT_ROOT_DIR and PINOT_VERSION are set properly.

    hashtag
    Tunning

    You can set Environment Variable: JAVA_OPTS to modify:

    • Log4j2 file location with -Dlog4j2.configurationFile

    • Plugin directory location with -Dplugins.dir=/opt/pinot/plugins

    • JVM props, like -Xmx8g -Xms4G

    Please note that you need to config above three all together in JAVA_OPTS. If you only config JAVA_OPTS="-Xmx4g" then plugins.dir is empty usually will cause job failure.

    E.g.

    You can also add your customized JAVA_OPTS if necessary.

    Or put all the required plugins jars to CLASSPATH, then set -Dplugins.dir=${CLASSPATH}
    • spark.driver.extraClassPath =>

      pinot-all-${PINOT_VERSION}-jar-with-depdencies.jar

    Wikiarrow-up-right
    /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01.avro
    /var/pinot/airlineStats/rawdata/2014/01/02/airlineStats_data_2014-01-02.avro
    /var/pinot/airlineStats/rawdata/2014/01/03/airlineStats_data_2014-01-03.avro
    /var/pinot/airlineStats/rawdata/2014/01/04/airlineStats_data_2014-01-04.avro
    /var/pinot/airlineStats/rawdata/2014/01/05/airlineStats_data_2014-01-05.avro
    /var/pinot/airlineStats/rawdata/2014/01/06/airlineStats_data_2014-01-06.avro
    /var/pinot/airlineStats/rawdata/2014/01/07/airlineStats_data_2014-01-07.avro
    /var/pinot/airlineStats/rawdata/2014/01/08/airlineStats_data_2014-01-08.avro
    /var/pinot/airlineStats/rawdata/2014/01/09/airlineStats_data_2014-01-09.avro
    /var/pinot/airlineStats/rawdata/2014/01/10/airlineStats_data_2014-01-10.avro
    /var/pinot/airlineStats/rawdata/2014/01/11/airlineStats_data_2014-01-11.avro
    /var/pinot/airlineStats/rawdata/2014/01/12/airlineStats_data_2014-01-12.avro
    /var/pinot/airlineStats/rawdata/2014/01/13/airlineStats_data_2014-01-13.avro
    /var/pinot/airlineStats/rawdata/2014/01/14/airlineStats_data_2014-01-14.avro
    /var/pinot/airlineStats/rawdata/2014/01/15/airlineStats_data_2014-01-15.avro
    /var/pinot/airlineStats/rawdata/2014/01/16/airlineStats_data_2014-01-16.avro
    /var/pinot/airlineStats/rawdata/2014/01/17/airlineStats_data_2014-01-17.avro
    /var/pinot/airlineStats/rawdata/2014/01/18/airlineStats_data_2014-01-18.avro
    /var/pinot/airlineStats/rawdata/2014/01/19/airlineStats_data_2014-01-19.avro
    /var/pinot/airlineStats/rawdata/2014/01/20/airlineStats_data_2014-01-20.avro
    /var/pinot/airlineStats/rawdata/2014/01/21/airlineStats_data_2014-01-21.avro
    /var/pinot/airlineStats/rawdata/2014/01/22/airlineStats_data_2014-01-22.avro
    /var/pinot/airlineStats/rawdata/2014/01/23/airlineStats_data_2014-01-23.avro
    /var/pinot/airlineStats/rawdata/2014/01/24/airlineStats_data_2014-01-24.avro
    /var/pinot/airlineStats/rawdata/2014/01/25/airlineStats_data_2014-01-25.avro
    /var/pinot/airlineStats/rawdata/2014/01/26/airlineStats_data_2014-01-26.avro
    /var/pinot/airlineStats/rawdata/2014/01/27/airlineStats_data_2014-01-27.avro
    /var/pinot/airlineStats/rawdata/2014/01/28/airlineStats_data_2014-01-28.avro
    /var/pinot/airlineStats/rawdata/2014/01/29/airlineStats_data_2014-01-29.avro
    /var/pinot/airlineStats/rawdata/2014/01/30/airlineStats_data_2014-01-30.avro
    /var/pinot/airlineStats/rawdata/2014/01/31/airlineStats_data_2014-01-31.avro
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'standalone'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    
    # jobType: Pinot ingestion job type.
    # Supported job types are:
    #   'SegmentCreation'
    #   'SegmentTarPush'
    #   'SegmentUriPush'
    #   'SegmentCreationAndTarPush'
    #   'SegmentCreationAndUriPush'
    jobType: SegmentCreationAndTarPush
    
    # inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
    inputDirURI: 'examples/batch/airlineStats/rawdata'
    
    # includeFileNamePattern: include file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
    #   'glob:**\/*.avro' will include all the avro files under inputDirURI recursively.
    includeFileNamePattern: 'glob:**/*.avro'
    
    # excludeFileNamePattern: exclude file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
    #   'glob:**\/*.avro' will exclude all the avro files under inputDirURI recursively.
    # _excludeFileNamePattern: ''
    
    # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
    outputDirURI: 'examples/batch/airlineStats/segments'
    
    # overwriteOutput: Overwrite output segments if existed.
    overwriteOutput: true
    
    # pinotFSSpecs: defines all related Pinot file systems.
    pinotFSSpecs:
    
      - # scheme: used to identify a PinotFS.
        # E.g. local, hdfs, dbfs, etc
        scheme: file
    
        # className: Class name used to create the PinotFS instance.
        # E.g.
        #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
        #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
        #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    
    # recordReaderSpec: defines all record reader
    recordReaderSpec:
    
      # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
      dataFormat: 'avro'
    
      # className: Corresponding RecordReader class name.
      # E.g.
      #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
      #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
      #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
      #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
      #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
      #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
      className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    
    # tableSpec: defines table name and where to fetch corresponding table config and table schema.
    tableSpec:
    
      # tableName: Table name
      tableName: 'airlineStats'
    
      # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_schema.json
      #   http://localhost:9000/tables/myTable/schema
      schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
    
      # tableConfigURI: defines where to reade the table config.
      # Supports using PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_config.json
      #   http://localhost:9000/tables/myTable
      # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
      # The real table config is the object under the field 'OFFLINE'.
      tableConfigURI: 'http://localhost:9000/tables/airlineStats'
    
    # segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
    segmentNameGeneratorSpec:
    
      # type: Current supported type is 'simple' and 'normalizedDate'.
      type: normalizedDate
    
      # configs: Configs to init SegmentNameGenerator.
      configs:
        segment.name.prefix: 'airlineStats_batch'
        exclude.sequence.id: true
    
    # pinotClusterSpecs: defines the Pinot Cluster Access Point.
    pinotClusterSpecs:
      - # controllerURI: used to fetch table/schema information and data push.
        # E.g. http://localhost:9000
        controllerURI: 'http://localhost:9000'
    
    # pushJobSpec: defines segment push job related configuration.
    pushJobSpec:
    
      # pushAttempts: number of attempts for push job, default is 1, which means no retry.
      pushAttempts: 2
    
      # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
      pushRetryIntervalMillis: 1000
    bin/pinot-admin.sh AddTable  -schemaFile examples/batch/airlineStats/airlineStats_schema.json -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -exec
    bin/pinot-admin.sh LaunchDataIngestionJob -jobSpecFile examples/batch/airlineStats/ingestionJobSpec.yaml
    /var/pinot/airlineStats/segments/2014/01/01/airlineStats_batch_2014-01-01_2014-01-01.tar.gz
    /var/pinot/airlineStats/segments/2014/01/02/airlineStats_batch_2014-01-02_2014-01-02.tar.gz
    /var/pinot/airlineStats/segments/2014/01/03/airlineStats_batch_2014-01-03_2014-01-03.tar.gz
    /var/pinot/airlineStats/segments/2014/01/04/airlineStats_batch_2014-01-04_2014-01-04.tar.gz
    /var/pinot/airlineStats/segments/2014/01/05/airlineStats_batch_2014-01-05_2014-01-05.tar.gz
    /var/pinot/airlineStats/segments/2014/01/06/airlineStats_batch_2014-01-06_2014-01-06.tar.gz
    /var/pinot/airlineStats/segments/2014/01/07/airlineStats_batch_2014-01-07_2014-01-07.tar.gz
    /var/pinot/airlineStats/segments/2014/01/08/airlineStats_batch_2014-01-08_2014-01-08.tar.gz
    /var/pinot/airlineStats/segments/2014/01/09/airlineStats_batch_2014-01-09_2014-01-09.tar.gz
    /var/pinot/airlineStats/segments/2014/01/10/airlineStats_batch_2014-01-10_2014-01-10.tar.gz
    /var/pinot/airlineStats/segments/2014/01/11/airlineStats_batch_2014-01-11_2014-01-11.tar.gz
    /var/pinot/airlineStats/segments/2014/01/12/airlineStats_batch_2014-01-12_2014-01-12.tar.gz
    /var/pinot/airlineStats/segments/2014/01/13/airlineStats_batch_2014-01-13_2014-01-13.tar.gz
    /var/pinot/airlineStats/segments/2014/01/14/airlineStats_batch_2014-01-14_2014-01-14.tar.gz
    /var/pinot/airlineStats/segments/2014/01/15/airlineStats_batch_2014-01-15_2014-01-15.tar.gz
    /var/pinot/airlineStats/segments/2014/01/16/airlineStats_batch_2014-01-16_2014-01-16.tar.gz
    /var/pinot/airlineStats/segments/2014/01/17/airlineStats_batch_2014-01-17_2014-01-17.tar.gz
    /var/pinot/airlineStats/segments/2014/01/18/airlineStats_batch_2014-01-18_2014-01-18.tar.gz
    /var/pinot/airlineStats/segments/2014/01/19/airlineStats_batch_2014-01-19_2014-01-19.tar.gz
    /var/pinot/airlineStats/segments/2014/01/20/airlineStats_batch_2014-01-20_2014-01-20.tar.gz
    /var/pinot/airlineStats/segments/2014/01/21/airlineStats_batch_2014-01-21_2014-01-21.tar.gz
    /var/pinot/airlineStats/segments/2014/01/22/airlineStats_batch_2014-01-22_2014-01-22.tar.gz
    /var/pinot/airlineStats/segments/2014/01/23/airlineStats_batch_2014-01-23_2014-01-23.tar.gz
    /var/pinot/airlineStats/segments/2014/01/24/airlineStats_batch_2014-01-24_2014-01-24.tar.gz
    /var/pinot/airlineStats/segments/2014/01/25/airlineStats_batch_2014-01-25_2014-01-25.tar.gz
    /var/pinot/airlineStats/segments/2014/01/26/airlineStats_batch_2014-01-26_2014-01-26.tar.gz
    /var/pinot/airlineStats/segments/2014/01/27/airlineStats_batch_2014-01-27_2014-01-27.tar.gz
    /var/pinot/airlineStats/segments/2014/01/28/airlineStats_batch_2014-01-28_2014-01-28.tar.gz
    /var/pinot/airlineStats/segments/2014/01/29/airlineStats_batch_2014-01-29_2014-01-29.tar.gz
    /var/pinot/airlineStats/segments/2014/01/30/airlineStats_batch_2014-01-30_2014-01-30.tar.gz
    /var/pinot/airlineStats/segments/2014/01/31/airlineStats_batch_2014-01-31_2014-01-31.tar.gz
    wget https://downloads.apache.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
    tar xvf spark-2.4.6-bin-hadoop2.7.tgz
    cd spark-2.4.6-bin-hadoop2.7
    ./bin/spark-shell --master 'local[2]'
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'spark'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
    
      # extraConfigs: extra configs for execution framework.
      extraConfigs:
    
        # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
        stagingDir: examples/batch/airlineStats/staging
    
    # jobType: Pinot ingestion job type.
    # Supported job types are:
    #   'SegmentCreation'
    #   'SegmentTarPush'
    #   'SegmentUriPush'
    #   'SegmentCreationAndTarPush'
    #   'SegmentCreationAndUriPush'
    jobType: SegmentCreationAndTarPush
    
    # inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
    inputDirURI: 'examples/batch/airlineStats/rawdata'
    
    # includeFileNamePattern: include file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
    #   'glob:**\/*.avro' will include all the avro files under inputDirURI recursively.
    includeFileNamePattern: 'glob:**/*.avro'
    
    # excludeFileNamePattern: exclude file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
    #   'glob:**\/*.avro' will exclude all the avro files under inputDirURI recursively.
    # excludeFileNamePattern: ''
    
    # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
    outputDirURI: 'examples/batch/airlineStats/segments'
    
    # overwriteOutput: Overwrite output segments if existed.
    overwriteOutput: true
    
    # pinotFSSpecs: defines all related Pinot file systems.
    pinotFSSpecs:
    
      - # scheme: used to identify a PinotFS.
        # E.g. local, hdfs, dbfs, etc
        scheme: file
    
        # className: Class name used to create the PinotFS instance.
        # E.g.
        #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
        #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
        #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
        className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
    
    # recordReaderSpec: defines all record reader
    recordReaderSpec:
    
      # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
      dataFormat: 'avro'
    
      # className: Corresponding RecordReader class name.
      # E.g.
      #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
      #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
      #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
      #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
      #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
      #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
      className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    
    # tableSpec: defines table name and where to fetch corresponding table config and table schema.
    tableSpec:
    
      # tableName: Table name
      tableName: 'airlineStats'
    
      # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_schema.json
      #   http://localhost:9000/tables/myTable/schema
      schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
    
      # tableConfigURI: defines where to reade the table config.
      # Supports using PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_config.json
      #   http://localhost:9000/tables/myTable
      # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
      # The real table config is the object under the field 'OFFLINE'.
      tableConfigURI: 'http://localhost:9000/tables/airlineStats'
    
    # segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
    segmentNameGeneratorSpec:
    
      # type: Current supported type is 'simple' and 'normalizedDate'.
      type: normalizedDate
    
      # configs: Configs to init SegmentNameGenerator.
      configs:
        segment.name.prefix: 'airlineStats_batch'
        exclude.sequence.id: true
    
    # pinotClusterSpecs: defines the Pinot Cluster Access Point.
    pinotClusterSpecs:
      - # controllerURI: used to fetch table/schema information and data push.
        # E.g. http://localhost:9000
        controllerURI: 'http://localhost:9000'
    
    # pushJobSpec: defines segment push job related configuration.
    pushJobSpec:
    
      # pushParallelism: push job parallelism, default is 1.
      pushParallelism: 2
    
      # pushAttempts: number of attempts for push job, default is 1, which means no retry.
      pushAttempts: 2
    
      # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
      pushRetryIntervalMillis: 1000
    export PINOT_VERSION=0.10.0-SNAPSHOT
    export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
    cd ${PINOT_DISTRIBUTION_DIR}
    ${SPARK_HOME}/bin/spark-submit \
      --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
      --master "local[2]" \
      --deploy-mode client \
      --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
      --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
      local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
      -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/sparkIngestionJobSpec.yaml
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'hadoop'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
    
      # extraConfigs: extra configs for execution framework.
      extraConfigs:
    
        # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
        stagingDir: examples/batch/airlineStats/staging
    
    # jobType: Pinot ingestion job type.
    # Supported job types are:
    #   'SegmentCreation'
    #   'SegmentTarPush'
    #   'SegmentUriPush'
    #   'SegmentCreationAndTarPush'
    #   'SegmentCreationAndUriPush'
    jobType: SegmentCreationAndTarPush
    
    # inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
    inputDirURI: 'examples/batch/airlineStats/rawdata'
    
    # includeFileNamePattern: include file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
    #   'glob:**\/*.avro' will include all the avro files under inputDirURI recursively.
    includeFileNamePattern: 'glob:**/*.avro'
    
    # excludeFileNamePattern: exclude file name pattern, supported glob pattern.
    # Sample usage:
    #   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
    #   'glob:**\/*.avro' will exclude all the avro files under inputDirURI recursively.
    # _excludeFileNamePattern: ''
    
    # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
    outputDirURI: 'examples/batch/airlineStats/segments'
    
    # overwriteOutput: Overwrite output segments if existed.
    overwriteOutput: true
    
    # pinotFSSpecs: defines all related Pinot file systems.
    pinotFSSpecs:
    
      - # scheme: used to identify a PinotFS.
        # E.g. local, hdfs, dbfs, etc
        scheme: file
    
        # className: Class name used to create the PinotFS instance.
        # E.g.
        #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem
        #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake
        #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
        className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
    
    # recordReaderSpec: defines all record reader
    recordReaderSpec:
    
      # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
      dataFormat: 'avro'
    
      # className: Corresponding RecordReader class name.
      # E.g.
      #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
      #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
      #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
      #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
      #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
      #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
      className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    
    # tableSpec: defines table name and where to fetch corresponding table config and table schema.
    tableSpec:
    
      # tableName: Table name
      tableName: 'airlineStats'
    
      # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_schema.json
      #   http://localhost:9000/tables/myTable/schema
      schemaURI: 'http://localhost:9000/tables/airlineStats/schema'
    
      # tableConfigURI: defines where to reade the table config.
      # Supports using PinotFS or HTTP.
      # E.g.
      #   hdfs://path/to/table_config.json
      #   http://localhost:9000/tables/myTable
      # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
      # The real table config is the object under the field 'OFFLINE'.
      tableConfigURI: 'http://localhost:9000/tables/airlineStats'
    
    # segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
    segmentNameGeneratorSpec:
    
      # type: Current supported type is 'simple' and 'normalizedDate'.
      type: normalizedDate
    
      # configs: Configs to init SegmentNameGenerator.
      configs:
        segment.name.prefix: 'airlineStats_batch'
        exclude.sequence.id: true
    
    # pinotClusterSpecs: defines the Pinot Cluster Access Point.
    pinotClusterSpecs:
      - # controllerURI: used to fetch table/schema information and data push.
        # E.g. http://localhost:9000
        controllerURI: 'http://localhost:9000'
    
    # pushJobSpec: defines segment push job related configuration.
    pushJobSpec:
    
      # pushParallelism: push job parallelism, default is 1.
      pushParallelism: 2
    
      # pushAttempts: number of attempts for push job, default is 1, which means no retry.
      pushAttempts: 2
    
      # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
      pushRetryIntervalMillis: 1000
    export PINOT_VERSION=0.10.0-SNAPSHOT
    export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
    export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"
    hadoop jar  \
            ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
            org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
            -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
    docker run --rm -ti -e JAVA_OPTS="-Xms8G -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-admin-log4j2.xml  -Dplugins.dir=/opt/pinot/plugins" --name pinot-data-ingestion-job apachepinot/pinot:latest LaunchDataIngestionJob -jobSpecFile /path/to/ingestion_job_spec.yaml