Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
In order to setup Pinot in Docker to use S3 as deep store, we need to put extra configs for Controller and Server.
Below sections will prepare 3 config files under /tmp/pinot-s3-docker
to mount to the container.
Below is a sample controller.conf
file.
Please config:
controller.data.dir
to your s3 bucket. All the uploaded segments will be stored there.
And add s3 as a pinot storage with configs:
Regarding AWS Credential, we also follow the convention of DefaultAWSCredentialsProviderChain.
You can specify AccessKey and Secret using:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Credential profiles file at the default location (~/.aws/credentials
) shared by all AWS SDKs and the AWS CLI
Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey
and pinot.controller.storage.factory.s3.secretKey
in the config file. (Not recommended)
Add s3
to pinot.controller.segment.fetcher.protocols
and set pinot.controller.segment.fetcher.s3.class
toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
Then start pinot controller with:
Broker is a simple one you can just start it with default:
Below is a sample server.conf
file
Similar to controller config, please also set s3 configs in pinot server.
Then start pinot server with:
In this demo, we just use airlineStats
table as an example which is already packaged inside the docker image.
You can also mount your table conf and schema files to the container and run it.
Below is a sample standalone ingestion job spec with certain notable changes:
jobType is SegmentCreationAndMetadataPush (this job will bypass controller download segment )
inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/
outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments
Add a new PinotFs under pinotFSSpecs
Sample ingestionJobSpec.yaml
Launch the data ingestion job:
Below commands are based on pinot distribution binary.
In order to setup Pinot to use S3 as deep store, we need to put extra configs for Controller and Server.
Below is a sample controller.conf
file.
Please config:
controller.data.dir
to your s3 bucket. All the uploaded segments will be stored there.
And add s3 as a pinot storage with configs:
Regarding AWS Credential, we also follow the convention of DefaultAWSCredentialsProviderChain.
You can specify AccessKey and Secret using:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Credential profiles file at the default location (~/.aws/credentials
) shared by all AWS SDKs and the AWS CLI
Configure AWS credential in pinot config files, e.g. set pinot.controller.storage.factory.s3.accessKey
and pinot.controller.storage.factory.s3.secretKey
in the config file. (Not recommended)
Add s3
to pinot.controller.segment.fetcher.protocols
and set pinot.controller.segment.fetcher.s3.class
toorg.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
If you to grant full control to bucket owner, then add this to the config:
Then start pinot controller with:
Broker is a simple one you can just start it with default:
Below is a sample server.conf
file
Similar to controller config, please also set s3 configs in pinot server.
If you to grant full control to bucket owner, then add this to the config:
Then start pinot controller with:
In this demo, we just use airlineStats
table as an example.
Create table with below command:
Below is a sample standalone ingestion job spec with certain notable changes:
jobType is SegmentCreationAndUriPush
inputDirURI is set to a s3 location s3://my.bucket/batch/airlineStats/rawdata/
outputDirURI is set to a s3 location s3://my.bucket/output/airlineStats/segments
Add a new PinotFs under pinotFSSpecs
For library version < 0.6.0, please set segmentUriPrefix
to [scheme]://[bucket.name]
, e.g. s3://my.bucket
, from version 0.6.0, you can put empty string or just ignore segmentUriPrefix
.
Sample ingestionJobSpec.yaml
Below is a sample job output:
Please follow this page to setup a local spark cluster.
Below is a sample Spark Ingestion job
Submit spark job with the ingestion job:
Below is the sample snapshot of s3 location for controller:
Below is a sample download URI in PropertyStore, we expect the segment download uri is started with s3://
Pinot segments can be created offline on Hadoop, or via command line from data files. Controller REST endpoint can then be used to add the segment to the table to which the segment belongs. Pinot segments can also be created by ingesting data from realtime resources (such as Kafka).
Offline Pinot workflow
To create Pinot segments on Hadoop, a workflow can be created to complete the following steps:
Pre-aggregate, clean up and prepare the data, writing it as Avro format files in a single HDFS directory
Create segments
Upload segments to the Pinot cluster
Step one can be done using your favorite tool (such as Pig, Hive or Spark), Pinot provides two MapReduce jobs to do step two and three.
Create a job properties configuration file, such as one below:
The Pinot Hadoop module contains a job that you can incorporate into your workflow to generate Pinot segments.
You can then use the SegmentTarPush job to push segments via the controller REST API.
Here is how you can create Pinot segments from standard formats like CSV/JSON/AVRO.
Create a top level directory containing all the CSV/JSON/AVRO files that need to be converted into segments.
The file name extensions are expected to be the same as the format name (i.e .csv
, .json
or .avro
), and are case insensitive. Note that the converter expects the .csv
extension even if the data is delimited using tabs or spaces instead.
Prepare a schema file describing the schema of the input data. The schema needs to be in JSON format. See example later in this section.
Specifically for CSV format, an optional csv config file can be provided (also in JSON format). This is used to configure parameters like the delimiter/header for the CSV file etc. A detailed description of this follows below.
Run the pinot-admin command to generate the segments. The command can be invoked as follows. Options within “[ ]” are optional. For -format, the default value is AVRO.
To configure various parameters for CSV a config file in JSON format can be provided. This file is optional, as are each of its parameters. When not provided, default values used for these parameters are described below:
fileFormat: Specify one of the following. Default is EXCEL.
EXCEL
MYSQL
RFC4180
TDF
header: If the input CSV file does not contain a header, it can be specified using this field. Note, if this is specified, then the input file is expected to not contain the header row, or else it will result in parse error. The columns in the header must be delimited by the same delimiter character as the rest of the CSV file.
delimiter: Use this to specify a delimiter character. The default value is “,”.
multiValueDelimiter: Use this to specify a delimiter character for each value in multi-valued columns. The default value is “;”.
Below is a sample config file.
Sample Schema:
You can use curl to push a segment to pinot:
Alternatively you can use the pinot-admin.sh utility to upload one or more segments:
The command uploads all the segments found in segmentDirectoryPath
. The segments could be either tar-compressed (in which case it is a file under segmentDirectoryPath
) or uncompressed (in which case it is a directory under segmentDirectoryPath
).
Follow the steps described in the section on to build pinot. Locate pinot-admin.sh
in pinot-tools/target/pinot-tools=pkg/bin/pinot-admin.sh
.
So far, you've seen how to create a new schema for a Pinot table. In this tutorial, we'll see how to evolve the schema (e.g. add a new column to the schema). This guide assumes you have a Pinot cluster up and running (eg: as mentioned in https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally). We will also assume there's an existing table baseballStats
created as part of the batch quick start.
Pinot only allows adding new columns to the schema. In order to drop a column, change the column name or data type, a new table has to be created.
For a newly added column to become queryable in Pinot, you would need to reload all segments using reload segments API.
As far as values for this newly added column are concerned, all existing records in the table will get defaultNullValue configured for this column.
If you have a scenario to backfill actual values, re-ingestion would be needed.
If newly added column is a derived column, the values will be auto-derived from the dependent columns.
Reloading of each segment is expected to happen gracefully without impacting in-flight queries. When reloading a segment, a new segment will be loaded, and replace the existing segment. The replaced segment will be dropped only after reaching the reference count of 0; (i.e: when the segment is not serving any in-flight queries).
For real-time consuming segment, reload is performed as force commit, which commits the current consuming segment and load it as immutable segment. A new consuming segment will be created after the current one is committed, and will pickup the changes in table config and schema.
Upsert and dedup config change cannot be applied via reload because they will change the table level (cross segments) metadata management. In order to apply these changes, server needs to be restarted.
Let's begin by first fetching the existing schema. We can do this using the controller API:
Let's add a new column at the end of the schema, something like this (by editing baseballStats.schema
In this example, we're adding a new column called yearsOfExperience
with a default value of 1.
You can now update the schema using the following command
Please note: this will not be reflected immediately. You can use the following command to reload the table segments for this column to show up. This can be done as follows:
This will trigger a reload operation on each of the servers hosting the table's segments. The API response has a reloadJobId which can be used to monitor the status of the reload operation using the segment reload status API
The reloadJobId and the segmentReloadStatus API below is only available starting 0.11.0 or from this commit.
After the reload, now you can query the new column as shown below:
Real-Time Pinot table: In case of real-time tables, make sure the "pinot.server.instance.reload.consumingSegment" config is set to true inside Server config. Without this, the current consuming segment(s) won't be reloaded (force committed).
New columns can be added with ingestion transforms. If all the source columns for the new column exist in the schema, the transformed values will be generated for the new column instead of filling default values. Note that derived column as well as corresponding data type needs to be first defined in the schema before making changes in table config for ingestion transform.
As you can observe, the current query returns the defaultNullValue
for the newly added column. In order to populate this column with real values, you will need to re-run the batch ingestion job for the past dates.
Real-Time Pinot table: Backfilling data does not work for real-time tables. If you only have a real-time table, you can convert it to a hybrid table, by adding an offline counterpart that uses the same schema. Then you can backfill the offline table and fill in values for the newly added column. More on hybrid tables here.
One of the primary advantage of using Pinot is its pluggable architecture. The plugins make it easy to add support for any third-party system which can be an execution framework, a filesystem or input format.
In this tutorial, we will use three such plugins to easily ingest data and push it to our pinot cluster. The plugins we will be using are -
pinot-batch-ingestion-spark
pinot-s3
pinot-parquet
You can check out Batch Ingestion, File systems and Input formats for all the available plugins.
We are using the following tools and frameworks for this tutorial -
Apache Spark 2.4.0 (Although any spark 2.X should work)
Apache Parquet 1.8.2
We need to get input data to ingest first. For our demo, we'll just create some small parquet files and upload them to our S3 bucket. The easiest way is to create CSV files and then convert them to parquet. CSV makes it human-readable and thus easier to modify input in case of some failure in our demo. We will call this file students.csv
Now, we'll create parquet files from the above CSV file using Spark. Since this is a small program, we will be using Spark shell instead of writing a full fledged Spark code.
The .parquet
files can now be found in /path/to/batch_input
directory. You can now upload this directory to S3 either using their UI or running the command
We need to create a table to query the data that will be ingested. All tables in pinot are associated with a schema. You can check out Table configuration and Schema configuration for more details on creating configurations.
For our demo, we will have the following schema and table configs
We can now upload these configurations to pinot and create an empty table. We will be using pinot-admin.sh
CLI for these purpose.
You can check out Command-Line Interface (CLI) for all the available commands.
Our table will now be available in the Pinot data explorer
Now that our data is available in S3 as well as we have the Tables in Pinot, we can start the process of ingesting the data. Data ingestion in Pinot involves the following steps -
Read data and generate compressed segment files from input
Upload the compressed segment files to output location
Push the location of the segment files to the controller
Once the location is available to the controller, it can notify the servers to download the segment files and populate the tables.
The above steps can be performed using any distributed executor of your choice such as Hadoop, Spark, Flink etc. For this demo we will be using Apache Spark to execute the steps.
Pinot provides runners for Spark out of the box. So as a user, you don't need to write a single line of code. You can write runners for any other executor using our provided interfaces.
Firstly, we will create a job spec configuration file for our data ingestion process.
In the job spec, we have kept execution framework as spark
and configured the appropriate runners for each of our steps. We also need a temporary stagingDir
for our spark job. This directory is cleaned up after our job has executed.
We also provide the S3 Filesystem and Parquet reader implementation in the config to use. You can refer Ingestion Job Spec for complete list of configuration.
We can now run our spark job to execute all the steps and populate data in pinot.
You can go through the FAQ section of our Spark ingestion guide in case you face any errors.
Voila! Now our data is successfully ingested. Let's try to query it from Pinot's broker
If everything went right, you should receive the following output
In practice, we need to run Pinot data ingestion as a pipeline or a scheduled job.
Assuming pinot-distribution is already built, inside examples directory, you could find several sample table layouts.
Usually each table deserves its own directory, like airlineStats.
Inside the table directory, rawdata is created to put all the input data.
Typically, for data events with timestamp, we partition those data and store them into a daily folder. E.g. a typically layout would follow this pattern: rawdata/%yyyy%/%mm%/%dd%/[daily_input_files]
.
Create a batch ingestion job spec file to describe how to ingest the data.
Below is an example (also located at examples/batch/airlineStats/ingestionJobSpec.yaml
)
Below command will create example table into Pinot cluster.
Below command will kick off the ingestion job to generate Pinot segments and push them into the cluster.
After job finished, segments are stored in examples/batch/airlineStats/segments
following same layout of input directory layout.
Below example is running in a spark local mode. You can download spark distribution and start it by running:
Build latest Pinot Distribution following this Wiki.
Below command shows how to use spark-submit command to submit a spark job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
.
Sample Spark ingestion job spec yaml, (also located at examples/batch/airlineStats/sparkIngestionJobSpec.yaml
):
Please ensure parameter PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
Please ensure you set
spark.driver.extraJavaOptions =>
-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
Or put all the required plugins jars to CLASSPATH, then set -Dplugins.dir=${CLASSPATH}
spark.driver.extraClassPath =>
pinot-all-${PINOT_VERSION}-jar-with-depdencies.jar
Below command shows how to use Hadoop jar command to run a Hadoop job using pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
.
Sample Hadoop ingestion job spec yaml(also located at examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
):
Please ensure parameter PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
You can set Environment Variable: JAVA_OPTS
to modify:
Log4j2 file location with -Dlog4j2.configurationFile
Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
JVM props, like -Xmx8g -Xms4G
Please note that you need to config above three all together in JAVA_OPTS
. If you only config JAVA_OPTS="-Xmx4g"
then plugins.dir
is empty usually will cause job failure.
E.g.
You can also add your customized JAVA_OPTS
if necessary.
Configure AliCloud Object Storage Service (OSS) as Pinot deep storage
OSS can be used as HDFS deep storage for Apache Pinot without implement OSS file system plugin. You should follow the steps below;
1. Configure hdfs-site.xml and core-site.xml files. After that, put these configurations under any desired path, then set the value of pinot.<node>.storage.factory.oss.hadoop.conf
config on the controller/server configs to this path.
For hdfs-site.xml; you do not have to give any configuration;
For core-site.xml; you have to give OSS access/secret and bucket configurations like below;
2. In order to access OSS, find your HDFS jars related to OSS and put them under the PINOT_DIR/lib
. You can use jars below but be careful about versions to avoid conflict.
smartdata-aliyun-oss
smartdata-hadoop-common
guava
3. Set OSS deep storage configs on controller.conf and server.conf;
Controller config
Server config
Example Job Spec
Using the same HDFS deep storage configs and jars, you can read data from OSS, then create segments and push them to OSS again. An example standalone batch ingestion job can be like below;