Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Pinot segments can be created offline on Hadoop, or via command line from data files. Controller REST endpoint can then be used to add the segment to the table to which the segment belongs. Pinot segments can also be created by ingesting data from real-time resources (such as Kafka).
Offline Pinot workflow
To create Pinot segments on Hadoop, a workflow can be created to complete the following steps:
Pre-aggregate, clean up and prepare the data, writing it as Avro format files in a single HDFS directory
Create segments
Upload segments to the Pinot cluster
Step one can be done using your favorite tool (such as Pig, Hive or Spark), Pinot provides two MapReduce jobs to do step two and three.
Create a job properties configuration file, such as one below:
The Pinot Hadoop module contains a job that you can incorporate into your workflow to generate Pinot segments.
You can then use the SegmentTarPush job to push segments via the controller REST API.
Here is how you can create Pinot segments from standard formats like CSV/JSON/AVRO.
Follow the steps described in the section on Compiling the code to build pinot. Locate pinot-admin.sh
in pinot-tools/target/pinot-tools=pkg/bin/pinot-admin.sh
.
Create a top level directory containing all the CSV/JSON/AVRO files that need to be converted into segments.
The file name extensions are expected to be the same as the format name (i.e .csv
, .json
or .avro
), and are case insensitive. Note that the converter expects the .csv
extension even if the data is delimited using tabs or spaces instead.
Prepare a schema file describing the schema of the input data. The schema needs to be in JSON format. See example later in this section.
Specifically for CSV format, an optional csv config file can be provided (also in JSON format). This is used to configure parameters like the delimiter/header for the CSV file etc. A detailed description of this follows below.
Run the pinot-admin command to generate the segments. The command can be invoked as follows. Options within “[ ]” are optional. For -format, the default value is AVRO.
To configure various parameters for CSV a config file in JSON format can be provided. This file is optional, as are each of its parameters. When not provided, default values used for these parameters are described below:
fileFormat: Specify one of the following. Default is EXCEL.
EXCEL
MYSQL
RFC4180
TDF
header: If the input CSV file does not contain a header, it can be specified using this field. Note, if this is specified, then the input file is expected to not contain the header row, or else it will result in parse error. The columns in the header must be delimited by the same delimiter character as the rest of the CSV file.
delimiter: Use this to specify a delimiter character. The default value is “,”.
multiValueDelimiter: Use this to specify a delimiter character for each value in multi-valued columns. The default value is “;”.
Below is a sample config file.
Sample Schema:
You can use curl to push a segment to pinot:
Alternatively you can use the pinot-admin.sh utility to upload one or more segments:
The command uploads all the segments found in segmentDirectoryPath
. The segments could be either tar-compressed (in which case it is a file under segmentDirectoryPath
) or uncompressed (in which case it is a directory under segmentDirectoryPath
).
Schema evolution occurs over time. As business requirements evolve, and data formats or structures need to change, use Pinot to keep your schemas up-to-date. If you're just starting out with schemas in Pinot, see how to create a new schema for a Pinot table.
In this tutorial, you'll learn how to add a new column to your schema, load data to the updated schema, run a query to test the updated schema, and backfill data.
Pinot only supports adding new columns to a schema. To drop a column or change the column name or data type, you must create a new table.
Before you get started, you must have a Pinot cluster up and running, and a baseballStats
table (created when you set up a Pinot cluster using the Quickstart option). For more information, see how to start running Pinot and set up a cluster using the Quickstart option.
Fetch the existing schema using the controller API:
Edit the baseballStats.schema
file to include a new column at the end of the schema. For example, here we're adding a new column called yearsOfExperience
with a dataType
of INT
and defaultNullValue
of 1
.
Update the schema using the following command:
After you add the new column to your schema, reload the consuming segments.
(Real-time tables only): Open Server config, and set pinot.server.instance.reload.consumingSegment
to true
.
To ensure the baseballStats
column shows up, run the following command to reload the table segments--be sure to replace the accurate reloadJobId
for your schema:
Command
Response
This triggers a reload operation on each of the servers hosting the table's segments. The API response has a reloadJobId
that you can use to monitor the status of the reload operation using the segment reload status API.
Reloading a segment shouldn't impact in-flight queries. New segments are reloaded to replace existing segments only after an existing segment isn't serving any in-flight queries.
Command
Response
For real-time consuming segments, the reload is performed as force commit, which commits the current consuming segment and loads it as an immutable segment. A new consuming segment is created after the current one is committed, and picks up the changes in the table config and schema.
Upsert and dedup config change cannot be applied via reload because they will change the table level (cross segments) metadata management. To apply these changes, server needs to be restarted.
In some cases, for example, if the transform function evaluation fails or references a column that isn't part of the segment being reloaded, the reload operation may not succesfully apply the transform. In these cases, the reload status API will still report sucess, but querying the new columns may not work. Review server reload logs to identify these cases.
After reloading the segments, run the the following to query the new column:
Command
Response
As you can see, the query returns the defaultNullValue
for the newly added column. To populate this column with real values, re-run the batch ingestion job for the past datesBackfill data.
Backfilling data does not work for real-time tables. You can convert a real-time table to a hybrid table by adding an offline table that uses the same counterpart, and then backfilling the offline table to fill in values for the newly added column. For more information, see hybrid tables.
In practice, we need to run Pinot data ingestion as a pipeline or a scheduled job.
Assuming pinot-distribution is already built, inside examples directory, you could find several sample table layouts.
Usually each table deserves its own directory, like airlineStats.
Inside the table directory, rawdata is created to put all the input data.
Typically, for data events with timestamp, we partition those data and store them into a daily folder. E.g. a typically layout would follow this pattern: rawdata/%yyyy%/%mm%/%dd%/[daily_input_files]
.
Create a batch ingestion job spec file to describe how to ingest the data.
Below is an example (also located at examples/batch/airlineStats/ingestionJobSpec.yaml
)
Below command will create example table into Pinot cluster.
Below command will kick off the ingestion job to generate Pinot segments and push them into the cluster.
After job finished, segments are stored in examples/batch/airlineStats/segments
following same layout of input directory layout.
Below example is running in a spark local mode. You can download spark distribution and start it by running:
Build latest Pinot Distribution following this Wiki.
Below command shows how to use spark-submit command to submit a spark job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
.
Sample Spark ingestion job spec yaml, (also located at examples/batch/airlineStats/sparkIngestionJobSpec.yaml
):
Ensure parameter PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
Ensure you set
spark.driver.extraJavaOptions =>
-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
Or put all the required plugins jars to CLASSPATH, then set -Dplugins.dir=${CLASSPATH}
spark.driver.extraClassPath =>
pinot-all-${PINOT_VERSION}-jar-with-depdencies.jar
Below command shows how to use Hadoop jar command to run a Hadoop job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
.
Sample Hadoop ingestion job spec yaml(also located at examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
):
Ensure parameter PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
You can set Environment Variable: JAVA_OPTS
to modify:
Log4j2 file location with -Dlog4j2.configurationFile
Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
JVM props, like -Xmx8g -Xms4G
Note that you need to config above three all together in JAVA_OPTS
. If you only config JAVA_OPTS="-Xmx4g"
then plugins.dir
is empty usually will cause job failure.
E.g.
You can also add your customized JAVA_OPTS
if necessary.
Configure AliCloud Object Storage Service (OSS) as Pinot deep storage
OSS can be used as HDFS deep storage for Apache Pinot without implement OSS file system plugin. You should follow the steps below;
1. Configure hdfs-site.xml and core-site.xml files. After that, put these configurations under any path, then set the value of pinot.<node>.storage.factory.oss.hadoop.conf
config on the controller/server configs to this path.
For hdfs-site.xml; you do not have to give any configuration;
For core-site.xml; you have to give OSS access/secret and bucket configurations like below;
2. In order to access OSS, find your HDFS jars related to OSS and put them under the PINOT_DIR/lib
. You can use jars below but be careful about versions to avoid conflict.
smartdata-aliyun-oss
smartdata-hadoop-common
guava
3. Set OSS deep storage configs on controller.conf and server.conf;
Controller config
Server config
Example Job Spec
Using the same HDFS deep storage configs and jars, you can read data from OSS, then create segments and push them to OSS again. An example standalone batch ingestion job can be like below;
In order to setup Pinot in Docker to use S3 as deep store, we need to put extra configs for Controller and Server.
Below sections will prepare 3 config files under /tmp/pinot-s3-docker
to mount to the container.
Below is a sample controller.conf
file.
Configure controller.data.dir
to your s3 bucket. All the uploaded segments will be stored there.
And add s3 as a pinot storage with configs:
Regarding AWS Credential, we also follow the convention of DefaultAWSCredentialsProviderChain.
You can specify AccessKey and Secret using:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Credential profiles file at the default location (~/.aws/credentials
) shared by all AWS SDKs and the AWS CLI
Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey
and pinot.controller.storage.factory.s3.secretKey
in the config file. (Not recommended)
Add s3
to pinot.controller.segment.fetcher.protocols
and set pinot.controller.segment.fetcher.s3.class
toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
Then start pinot controller with:
Broker is a simple one you can just start it with default:
Below is a sample server.conf
file
Similar to controller config, also set s3 configs in pinot server.
Then start pinot server with:
In this demo, we just use airlineStats
table as an example which is already packaged inside the docker image.
You can also mount your table conf and schema files to the container and run it.
Below is a sample standalone ingestion job spec with certain notable changes:
jobType is SegmentCreationAndMetadataPush (this job will bypass controller download segment )
inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/
outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments
Add a new PinotFs under pinotFSSpecs
Sample ingestionJobSpec.yaml
Launch the data ingestion job:
One of the primary advantage of using Pinot is its pluggable architecture. The plugins make it easy to add support for any third-party system which can be an execution framework, a filesystem or input format.
In this tutorial, we will use three such plugins to easily ingest data and push it to our pinot cluster. The plugins we will be using are -
pinot-batch-ingestion-spark
pinot-s3
pinot-parquet
You can check out Batch Ingestion, File systems and Input formats for all the available plugins.
We are using the following tools and frameworks for this tutorial -
Apache Spark 2.4.0 (Although any spark 2.X should work)
Apache Parquet 1.8.2
We need to get input data to ingest first. For our demo, we'll just create some small parquet files and upload them to our S3 bucket. The easiest way is to create CSV files and then convert them to parquet. CSV makes it human-readable and thus easier to modify input in case of some failure in our demo. We will call this file students.csv
Now, we'll create parquet files from the above CSV file using Spark. Since this is a small program, we will be using Spark shell instead of writing a full fledged Spark code.
The .parquet
files can now be found in /path/to/batch_input
directory. You can now upload this directory to S3 either using their UI or running the command
We need to create a table to query the data that will be ingested. All tables in pinot are associated with a schema. You can check out Table configuration and Schema configuration for more details on creating configurations.
For our demo, we will have the following schema and table configs
We can now upload these configurations to pinot and create an empty table. We will be using pinot-admin.sh
CLI for these purpose.
You can check out Command-Line Interface (CLI) for all the available commands.
Our table will now be available in the Pinot data explorer
Now that our data is available in S3 as well as we have the Tables in Pinot, we can start the process of ingesting the data. Data ingestion in Pinot involves the following steps -
Read data and generate compressed segment files from input
Upload the compressed segment files to output location
Push the location of the segment files to the controller
Once the location is available to the controller, it can notify the servers to download the segment files and populate the tables.
The above steps can be performed using any distributed executor of your choice such as Hadoop, Spark, Flink etc. For this demo we will be using Apache Spark to execute the steps.
Pinot provides runners for Spark out of the box. So as a user, you don't need to write a single line of code. You can write runners for any other executor using our provided interfaces.
Firstly, we will create a job spec configuration file for our data ingestion process.
In the job spec, we have kept execution framework as spark
and configured the appropriate runners for each of our steps. We also need a temporary stagingDir
for our spark job. This directory is cleaned up after our job has executed.
We also provide the S3 Filesystem and Parquet reader implementation in the config to use. You can refer Ingestion Job Spec for complete list of configuration.
We can now run our spark job to execute all the steps and populate data in pinot.
You can go through the FAQ section of our Spark ingestion guide in case you face any errors.
Voila! Now our data is successfully ingested. Let's try to query it from Pinot's broker
If everything went right, you should receive the following output
Below commands are based on pinot distribution binary.
In order to set up Pinot to use S3 as deep store, we need to put extra configs for Controller and Server.
Below is a sample controller.conf
file.
Configure controller.data.dir
to your s3 bucket. All the uploaded segments will be stored there.
And add s3 as a pinot storage with configs:
Regarding AWS Credential, we also follow the convention of DefaultAWSCredentialsProviderChain.
You can specify AccessKey and Secret using:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Credential profiles file at the default location (~/.aws/credentials
) shared by all AWS SDKs and the AWS CLI
Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey
and pinot.controller.storage.factory.s3.secretKey
in the config file. (Not recommended)
Add s3
to pinot.controller.segment.fetcher.protocols
and set pinot.controller.segment.fetcher.s3.class
toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
If you to grant full control to bucket owner, then add this to the config:
Then start pinot controller with:
Broker is a simple one you can just start it with default:
Below is a sample server.conf
file
Similar to controller config, also set s3 configs in pinot server.
If you to grant full control to bucket owner, then add this to the config:
Then start pinot controller with:
In this demo, we just use airlineStats
table as an example.
Create table with below command:
Below is a sample standalone ingestion job spec with certain notable changes:
jobType is SegmentCreationAndUriPush
inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/
outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments
Add a new PinotFs under pinotFSSpecs
For library version < 0.6.0, set segmentUriPrefix
to [scheme]://[bucket.name]
, e.g. s3://my.bucket
, from version 0.6.0, you can put empty string or just ignore segmentUriPrefix
.
Sample ingestionJobSpec.yaml
Below is a sample job output:
Follow this page to setup a local spark cluster.
Below is a sample Spark Ingestion job
Submit spark job with the ingestion job:
Below is the sample snapshot of s3 location for controller:
Below is a sample download URI in PropertyStore. We expect the segment download URI to start with s3://