arrow-left

All pages
gitbookPowered by GitBook
1 of 18

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Import Data

This section is an overview of the various options for importing data into Pinot.

There are multiple options for importing data into Pinot. These guides are ready-made examples that show you step-by-step instructions for importing records into Pinot, supported by our plugin architecture.

These guides are meant to get you up and running with imported data as quick as possible. Pinot supports multiple file input formats without needing to change anything other than the file name. Each example imports a ready-made dataset so you can see how things work without needing to bring your own dataset.

hashtag
Pinot Batch Ingestion

Sparkchevron-rightHadoopchevron-right

hashtag
Pinot Stream Ingestion

This guide will show you how to import data using stream ingestion from Apache Kafka topics.

This guide will show you how to import data using stream ingestion with upsert.

hashtag
Pinot File Systems

By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add a deep storage. Checkout for all the info and related configs.

These guides will show you how to import data as well as persist it in the file systems.

hashtag
Pinot Input Formats

These guides will show you how to import data from a Pinot supported input format.

This guide will show you how to handle the complex type in the ingested data, such as map and array.

Apache Kafkachevron-right
Stream Ingestion with Upsertchevron-right
File systems
Amazon S3chevron-right
Azure Data Lake Storagechevron-right
Google Cloud Storagechevron-right
HDFSchevron-right
Input formatschevron-right
Complex Type (Array, Map) Handlingchevron-right

Dimension Table

Dimension tables in Apache Pinot.

Dimension tables are a special kind of offline tables from which data can be looked up via the lookup UDF, providing join like functionality.

Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups.

To mark an offline table as a dim table, isDimTable should be set to true and segmentsConfig.segementPushType should be set to REFRESH in the table config as shown below:

{
  "OFFLINE": {
    "tableName": "dimBaseballTeams_OFFLINE",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "schemaName": "dimBaseballTeams",
      "segmentPushType": "REFRESH"
    },
    "metadata": {},
    "quota": {
      "storage": "200M"
    },
    "isDimTable": true
  }
}

As dimension tables are used to perform lookups of dimension values, they are required to have a primary key (can be a composite key).

{
  "dimensionFieldSpecs": [
    {
      "dataType": "STRING",
      "name": "teamID"
    },
    {
      "dataType": "STRING",
      "name": "teamName"
    }
  ],
  "schemaName": "dimBaseballTeams",
  "primaryKeyColumns": ["teamID"]
}

When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.

The maximum size quota for a dimension table in a cluster is controlled by the controller.dimTable.maxSize controller property. Table creation will fail if the storage quota exceeds this maximum size.

A dimension table cannot be part of a .

Backfill Data

hashtag
Introduction

Pinot batch ingestion involves two parts: routing ingestion job(hourly/daily) and backfill. Here are some tutorials on how routine batch ingestion works in Pinot Offline Table:

  • Batch Ingestion Overviewarrow-up-right

High Level Idea

  1. Organize raw data into buckets (eg: /var/pinot/airlineStats/rawdata/2014/01/01). Each bucket typically contains several files (eg: /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01_0.avro)

  2. Run a Pinot batch ingestion job, which points to a specific date folder like ‘/var/pinot/airlineStats/rawdata/2014/01/01’. The segment generation job will convert each such avro file into a Pinot segment for that day and give it a unique name.

  3. Run Pinot segment push job to upload those segments with those uniques names via a Controller API

circle-info

IMPORTANT: The segment name is the unique identifier used to uniquely identify that segment in Pinot. If the controller gets an upload request for a segment with the same name - it will attempt to replace it with the new one.

This newly uploaded data can now be queried in Pinot. However, sometimes users will make changes to the raw data which need to be reflected in Pinot. This process is known as 'Backfill'.

hashtag
How to Backfill data in Pinot

Pinot supports data modification only at the segment level, which means we should update entire segments for doing backfills. The high level idea is to repeat steps 2 (segment generation) and 3 (segment upload) mentioned above:

  • Backfill jobs must run at the same granularity as the daily job. E.g., if you need to backfill data for 2014/01/01, specify that input folder for your backfill job (e.g.: ‘/var/pinot/airlineStats/rawdata/2014/01/01’)

  • The backfill job will then generate segments with the same name as the original job (with the new data).

  • When uploading those segments to Pinot, the controller will replace the old segments with the new ones (segment names act like primary keys within Pinot) one by one.

hashtag
Edge case

Backfill jobs expect the same number of (or more) data files on the backfill date. So the segment generation job will create the same number of (or more) segments than the original run.

E.g. assuming table airlineStats has 2 segments(airlineStats_2014-01-01_2014-01-01_0, airlineStats_2014-01-01_2014-01-01_1) on date 2014/01/01 and the backfill input directory contains only 1 input file. Then the segment generation job will create just one segment: airlineStats_2014-01-01_2014-01-01_0. After the segment push job, only segment airlineStats_2014-01-01_2014-01-01_0 got replaced and stale data in segment airlineStats_2014-01-01_2014-01-01_1 are still there.

In case the raw data is modified in such a way that the original time bucket has fewer input files than the first ingestion run, backfill will fail.

File Systems

This section contains a collection of short guides to show you how to import from a Pinot supported file system.

FileSystem is an abstraction provided by Pinot to access data in distributed file systems (DFS).

Pinot uses distributed file systems for the following purposes:

  • Batch Ingestion Job - To read the input data (CSV, Avro, Thrift, etc.) and to write generated segments to DFS

  • Controller - When a segment is uploaded to the controller, the controller saves it in the DFS configured.

  • Server - When a server(s) is notified of a new segment, the server copies the segment from remote DFS to their local node using the DFS abstraction.

hashtag
Supported File Systems

Pinot lets you choose a distributed file system provider. The following file systems are supported by Pinot:

hashtag
Enabling a File System

To use a distributed file system, you need to enable plugins. To do that, specify the plugin directory and include the required plugins -

Now, You can proceed to change the filesystem in the controller and server config as shown below:

scheme refers to the prefix used in the URI of the filesystem. e.g. for the URI s3://bucket/path/to/file , the scheme is s3

You can also change the filesystem during ingestion. In the ingestion job spec, specify the filesystem with the following config:

Spark

Pinot supports Apache spark as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

You can follow the to build pinot distribution from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

Next, you need to change the execution config in the to the following -

You can check out the sample job spec here.

To run Spark ingestion, you need the following jars in your classpath

hybrid table
Batch Ingestion in Practicearrow-up-right
Azure Data Lake Storage
Amazon S3
Google Cloud Storage
HDFS

pinot-batch-ingestion-spark plugin jar - available in plugins-external directory in the package

  • pinot-all jar - available in lib directory in the package

  • These jars can be specified using spark.driver.extraClassPath or any other option.

    For loading any other plugins that you want to use, you can use -

    The complete spark-submit command should look as follows

    Please ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

    Note: You should change the master to yarn and deploy-mode to cluster for production environments

    circle-info

    We have stopped including spark-core dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark in case of any runtime issues. You can either build from source or download latest master build jarsarrow-up-right.

    circle-info

    For Pinot version prior to 0.10.0, the spark plugin is located in ${PINOT_DISTRIBUTION_DIR}/plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar

    wiki
    job spec
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-plugin-to-include-1,pinot-plugin-to-include-2
    #CONTROLLER
    
    pinot.controller.storage.factory.class.[scheme]=className of the pinot file systems
    pinot.controller.segment.fetcher.protocols=file,http,[scheme]
    pinot.controller.segment.fetcher.[scheme].class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    #SERVER
    
    pinot.server.storage.factory.class.[scheme]=className of the pinotfile systems
    pinot.server.segment.fetcher.protocols=file,http,[scheme]
    pinot.server.segment.fetcher.[scheme].class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinotFSSpecs
      - scheme: file
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'spark'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
    
      #segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
    
      # extraConfigs: extra configs for execution framework.
      extraConfigs:
    
        # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
        stagingDir: your/local/dir/staging
    spark.driver.extraClassPath =>
    pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
    spark.driver.extraJavaOptions =>
    -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
    export PINOT_VERSION=0.10.0
    export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin
    
    spark-submit //
    --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
    --master local --deploy-mode client //
    --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
    --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
    -conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
    local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yaml

    Batch Ingestion

    Batch ingestion allows users to create a table using data already present in a file system such as S3. This is particularly useful for the cases where the user wants to utilize Pinot's ability to query large data with minimal latency or test out new features using a simple data file.

    Ingesting data from a filesystem involves the following steps -

    1. Define Schema

    2. Define Table Config

    3. Upload Schema and Table configs

    4. Upload data

    Batch Ingestion currently supports the following mechanisms to upload the data -

    • Standalone

    Here we'll take a look at the standalone local processing to get you started.

    Let's create a table for the following CSV data source.

    hashtag
    Create Schema Configuration

    In our data, the only column on which aggregations can be performed is score. Secondly, timestampInEpoch is the only timestamp column. So, on our schema, we keep score as metric and timestampInEpoch as timestamp column.

    Here, we have also defined two extra fields - format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it is in milliseconds hence we have specified 1:MILLISECONDS:EPOCH

    hashtag
    Create Table Configuration

    We define a tabletranscriptand map the schema created in the previous step to the table. For batch data, we keep the tableType as OFFLINE

    hashtag
    Upload Schema and Table

    Now that we have both the configs, we can simply upload them and create a table. To achieve that, just run the command -

    Check out the table config and schema in the [Rest API] to make sure it was successfully uploaded.

    hashtag
    Upload data

    We now have an empty table in pinot. So as the next step we will upload our CSV file to this table.

    A table is composed of multiple segments. The segments can be created using three ways

    1) Minion based ingestion 2) Upload API 3) Ingestion jobs

    hashtag
    Minion Based Ingestion

    Refer to

    hashtag
    Upload API

    There are 2 Controller APIs that can be used for a quick ingestion test using a small file.

    triangle-exclamation

    When these APIs are invoked, the controller has to download the file and build the segment locally.

    Hence, these APIs are NOT meant for production environments and for large input files.

    hashtag
    /ingestFromFile

    This API creates a segment using the given file and pushes it to Pinot. All steps happen on the controller. Example usage:

    To upload a JSON file data.json to a table called foo_OFFLINE, use below command

    Note that query params need to be URLEncoded. For example, {"inputFormat":"json"} in the command below needs to be converted to %7B%22inputFormat%22%3A%22json%22%7D.

    The batchConfigMapStr can be used to pass in additional properties needed for decoding the file. For example, in case of csv, you may need to provide the delimiter

    hashtag
    /ingestFromURI

    This API creates a segment using file at the given URI and pushes it to Pinot. Properties to access the FS need to be provided in the batchConfigMap. All steps happen on the controller. Example usage:

    hashtag
    Ingestion Jobs

    Segments can be created and uploaded using tasks known as DataIngestionJobs. A job also needs a config of its own. We call this config the JobSpec.

    For our CSV file and table, the job spec should look like below.

    You can refer to for more details.

    Now that we have the job spec for our table transcript , we can trigger the job using the following command

    Once the job has successfully finished, you can head over to the [query console] and start playing with the data.

    hashtag
    Segment Push Job Type

    There are 3 ways to upload a Pinot segment:

    hashtag
    1. Segment Tar Push

    This is the original and default push mechanism.

    Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.

    The push job will:

    1. Upload the entire segment tar file to the Pinot controller.

    Pinot controller will:

    1. Save the segment into the controller segment directory(Local or any PinotFS).

    2. Extract segment metadata.

    3. Add the segment to the table.

    hashtag
    2. Segment URI Push

    This push mechanism requires the segment Tar file stored on a deep store with a globally accessible segment tar URI.

    URI push is light-weight on the client-side, and the controller side requires equivalent work as the Tar push.

    The push job will:

    1. POST this segment Tar URI to the Pinot controller.

    Pinot controller will:

    1. Download segment from the URI and save it to controller segment directory(Local or any PinotFS).

    2. Extract segment metadata.

    3. Add the segment to the table.

    hashtag
    3. Segment Metadata Push

    This push mechanism also requires the segment Tar file stored on a deep store with a globally accessible segment tar URI.

    Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.

    The push job will:

    1. Download the segment based on URI.

    2. Extract metadata.

    3. Upload metadata to the Pinot Controller.

    Pinot Controller will:

    1. Add the segment to the table based on the metadata.

    hashtag
    Segment Fetchers

    When pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot Controller and Server:

    1. Push segment to shared NFS and let pinot pull segment files from the location of that NFS. See .

    2. Push segment to a Web server and let pinot pull segment files from the Web server with HTTP/HTTPS link. See .

    3. Push segment to PinotFS(HDFS/S3/GCS/ADLS) and let pinot pull segment files from PinotFS URI. See and .

    The first three options are supported out of the box within the Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for PinotFS, you will need to provide configuration and proper Hadoop dependencies.

    hashtag
    Persistence

    By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of a system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add deep storage. Checkout for all the info and related configs.

    hashtag
    Tuning

    hashtag
    Standalone

    Since pinot is written in Java, you can set the following basic java configurations to tune the segment runner job -

    • Log4j2 file location with -Dlog4j2.configurationFile

    • Plugin directory location with -Dplugins.dir=/opt/pinot/plugins

    • JVM props, like -Xmx8g -Xms4G

    If you are using the docker, you can set the following under JAVA_OPTS variable.

    hashtag
    Hadoop

    You can set -D mapreduce.map.memory.mb=8192 to set the mapper memory size when submitting the Hadoop job.

    hashtag
    Spark

    You can add config spark.executor.memory to tune the memory usage for segment creation when submitting the Spark job.

    Google Cloud Storage

    This guide shows you how to import data from GCP (Google Cloud Platform).

    You can enable the Google Cloud Storagearrow-up-right using the plugin pinot-gcs. In the controller or server, add the config -

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    GCP filesystems provides the following options -

    • projectId - The name of the Google Cloud Platform project under which you have created your storage bucket.

    • gcpKey - Location of the json file containing GCP keys. You can refer to download the keys.

    Each of these properties should be prefixed by pinot.[node].storage.factory.class.gs. where node is either controller or server depending on the config

    e.g.

    hashtag
    Examples

    hashtag
    Job spec

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    Amazon Kinesis

    To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config

    where the Kinesis specific properties are:

    Property
    Description

    Apache Pulsar

    Pinot supports consuming data from via pinot-pulsar plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.

    You can enable pulsar plugin with the following config at the time of Pinot setup -Dplugins.include=pinot-pulsar

    circle-info

    pinot-pulsar plugin is not part of official 0.10.0 binary. You can download the plugin from

    Hadoop

    hashtag
    Segment Creation and Push

    Pinot supports as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

    You can follow the [wiki] to build pinot distribution from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

    Azure Data Lake Storage

    This guide shows you how to import data from files stored in Azure Data Lake Storage Gen2 (ADLS Gen2)

    You can enable the Azure Data Lake Storage using the plugin pinot-adls. In the controller or server, add the config -

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-gcs

    Push segment to other systems and implement your own segment fetcher to pull data from those systems.

    Hadoop
    Spark
    SegmentGenerationAndPushTask
    Ingestion Job Spec
    Segment URI Pusharrow-up-right
    Segment URI Pusharrow-up-right
    Segment URI Pusharrow-up-right
    Segment Metadata Pusharrow-up-right
    PinotFS
    File systems
    Creating and managing service account keysarrow-up-right

    Kinesis region e.g. us-west-1

    accessKey

    Kinesis access key

    secretKey

    Kinesis secret key

    shardIteratorType

    Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number, AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number

    maxRecordsToFetch

    ... Default is 20.

    Kinesis supports authentication using the DefaultCredentialsProviderChainarrow-up-right. The credential provider looks for the credentials in the following order -

    • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)

    • Java System Properties - aws.accessKeyId and aws.secretKey

    • Web Identity Token credentials from the environment or container

    • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

    • Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable,

    • Instance profile credentials delivered through the Amazon EC2 metadata service

    You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups. You can also specify other aws fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.

    hashtag
    Limitations

    1. ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX_VALUE, we will overflow

    2. Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.

    streamType

    This should be set to "kinesis"

    stream.kinesis.topic.name

    Kinesis stream name

    region

    and add it to libs or plugins directory in pinot.

    hashtag
    Set up Pulsar table

    A sample Pulsar stream config to ingest data should look as follows. You can use the streamConfigs section from this sample and make changes for your corresponding table.

    hashtag
    Pulsar configuration options

    You can change the following Pulsar specifc configurations for your tables

    Property
    Description

    streamType

    This should be set to "pulsar"

    stream.pulsar.topic.name

    Your pulsar topic name

    stream.pulsar.bootstrap.servers

    Comma-seperated broker list for Apache Pulsar

    hashtag
    Authentication

    Pinot-Pulsar connector supports authentication using the security tokens. You can generate the token by following the official Pulsar documentatonarrow-up-right. Once generated, you can add the following property to streamConfigs to add auth token for each request

    hashtag
    TLS support

    Pinot-pulsar connecor also supports TLS for encrypted connections. You can follow the official pulsar documentationarrow-up-right to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.

    Also, make sure to change the brokers url from pulsar://localhost:6650 to pulsar+ssl://localhost:6650 so that secure connections are used.

    For other table and stream configurations, you can headover to Table configuration Reference

    hashtag
    Supported Pulsar versions

    PInot currently relies on Pulsar client version 2.7.2. Users should make sure the Pulsar broker is compatible with the this client version.

    Apache Pulsararrow-up-right
    our external repositoryarrow-up-right
    Next, you need to change the execution config in the job spec to the following -

    You can check out the sample job spec here.

    Finally execute the hadoop job using the command -

    Please ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

    hashtag
    Data Preprocessing before Segment Creation

    We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.

    The MapReduce job called SegmentPreprocessingJob would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.

    Check the below example to see how to use SegmentPreprocessingJob.

    In Hadoop properties, set the following to enable this job:

    In table config, specify the operations in preprocessing.operations that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:

    hashtag
    preprocessing.num.reducers

    Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.

    hashtag
    preprocessing.max.num.records.per.file

    Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.

    For more details on this MR job, please refer to this documentarrow-up-right.

    Apache Hadooparrow-up-right

    Azure Blob Storage provides the following options -

    • accountName : Name of the azure account under which the storage is created

    • accessKey : access key required for the authentication

    • fileSystemName - name of the filesystem to use i.e. container name (container name is similar to bucket name in S3)

    • enableChecksum - enable MD5 checksum for verification. Default is false.

    Each of these properties should be prefixed by pinot.[node].storage.factory.class.adl2. where node is either controller or server depending on the config

    e.g.

    hashtag
    Examples

    hashtag
    Job spec

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    studentID,firstName,lastName,gender,subject,score,timestampInEpoch
    200,Lucy,Smith,Female,Maths,3.8,1570863600000
    200,Lucy,Smith,Female,English,3.5,1571036400000
    201,Bob,King,Male,Maths,3.2,1571900400000
    202,Nick,Young,Male,Physics,3.6,1572418800000
    {
      "schemaName": "transcript",
      "dimensionFieldSpecs": [
        {
          "name": "studentID",
          "dataType": "INT"
        },
        {
          "name": "firstName",
          "dataType": "STRING"
        },
        {
          "name": "lastName",
          "dataType": "STRING"
        },
        {
          "name": "gender",
          "dataType": "STRING"
        },
        {
          "name": "subject",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "score",
          "dataType": "FLOAT"
        }
      ],
      "dateTimeFieldSpecs": [{
        "name": "timestampInEpoch",
        "dataType": "LONG",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }]
    }
    {
      "tableName": "transcript",
      "tableType": "OFFLINE",
      "segmentsConfig": {
        "replication": 1,
        "timeColumnName": "timestampInEpoch",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": 365
      },
      "tenants": {
        "broker":"DefaultTenant",
        "server":"DefaultTenant"
      },
      "tableIndexConfig": {
        "loadMode": "MMAP"
      },
      "ingestionConfig": {
        "batchIngestionConfig": {
          "segmentIngestionType": "APPEND",
          "segmentIngestionFrequency": "DAILY"
        }
      },
      "metadata": {}
    }
    bin/pinot-admin.sh AddTable \\
      -tableConfigFile /path/to/table-config.json \\
      -schemaFile /path/to/table-schema.json -exec
    curl -X POST -F [email protected] \
      -H "Content-Type: multipart/form-data" \
      "http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
      batchConfigMapStr={"inputFormat":"json"}"
    curl -X POST -F [email protected] \
      -H "Content-Type: multipart/form-data" \
      "http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
    batchConfigMapStr={
      "inputFormat":"csv",
      "recordReader.prop.delimiter":"|"
    }"
    curl -X POST "http://localhost:9000/ingestFromURI?tableNameWithType=foo_OFFLINE
    &batchConfigMapStr={
      "inputFormat":"json",
      "input.fs.className":"org.apache.pinot.plugin.filesystem.S3PinotFS",
      "input.fs.prop.region":"us-central",
      "input.fs.prop.accessKey":"foo",
      "input.fs.prop.secretKey":"bar"
    }
    &sourceURIStr=s3://test.bucket/path/to/json/data/data.json"
    executionFrameworkSpec:
      name: 'standalone'
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
    
    # Recommended to set jobType to SegmentCreationAndMetadataPush for production environment where Pinot Deep Store is configured  
    jobType: SegmentCreationAndTarPush
    
    inputDirURI: '/tmp/pinot-quick-start/rawdata/'
    includeFileNamePattern: 'glob:**/*.csv'
    outputDirURI: '/tmp/pinot-quick-start/segments/'
    overwriteOutput: true
    pinotFSSpecs:
      - scheme: file
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    recordReaderSpec:
      dataFormat: 'csv'
      className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
      tableName: 'transcript'
    pinotClusterSpecs:
      - controllerURI: 'http://localhost:9000'
    pushJobSpec:
      pushAttempts: 2
      pushRetryIntervalMillis: 1000
    bin/pinot-admin.sh LaunchDataIngestionJob \\
        -jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yaml
    pinot.controller.storage.factory.class.gs.projectId=test-project
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'gs://my-bucket/path/to/input/directory/'
    outputDirURI: 'gs://my-bucket/path/to/output/directory/'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: gs
          className: org.apache.pinot.plugin.filesystem.GcsPinotFS
          configs:
            projectId: 'my-project'
            gcpKey: 'path-to-gcp json key file'
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=gs://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
    pinot.controller.storage.factory.gs.projectId=my-project
    pinot.controller.storage.factory.gs.gcpKey=path/to/gcp/key.json
    pinot.controller.segment.fetcher.protocols=file,http,gs
    pinot.controller.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
    pinot.server.storage.factory.gs.projectId=my-project
    pinot.server.storage.factory.gs.gcpKey=path/to/gcp/key.json
    pinot.server.segment.fetcher.protocols=file,http,gs
    pinot.server.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.minion.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
    pinot.minion.storage.factory.gs.projectId=my-project
    pinot.minion.storage.factory.gs.gcpKey=path/to/gcp/key.json
    pinot.minion.segment.fetcher.protocols=file,http,gs
    pinot.minion.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    {
      "tableName": "kinesisTable",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kinesis",
          "stream.kinesis.topic.name": "<your kinesis stream name>",
          "region": "<your region>",
          "accessKey": "<your access key>",
          "secretKey": "<your secret key>",
          "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
          "stream.kinesis.consumer.type": "lowlevel",
          "stream.kinesis.fetch.timeout.millis": "30000",
          "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
          "realtime.segment.flush.threshold.rows": "1000000",
          "realtime.segment.flush.threshold.time": "6h"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    {
      "tableName": "pulsarTable",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "pulsar",
          "stream.pulsar.topic.name": "<your pulsar topic name>",
          "stream.pulsar.bootstrap.servers": "pulsar://localhost:6650,pulsar://localhost:6651",
          "stream.pulsar.consumer.prop.auto.offset.reset" : "smallest",
          "stream.pulsar.consumer.type": "lowlevel",
          "stream.pulsar.fetch.timeout.millis": "30000",
          "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
          "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
          "realtime.segment.flush.threshold.rows": "1000000",
          "realtime.segment.flush.threshold.time": "6h"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    "stream.pulsar.authenticationToken":"your-auth-token"
    "stream.pulsar.tlsTrustCertsFilePath": "/path/to/ca.cert.pem"
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
        # name: execution framework name
      name: 'hadoop'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
    
      # segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentMetadataPushJobRunner'
    
        # extraConfigs: extra configs for execution framework.
      extraConfigs:
    
        # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
        stagingDir: your/local/dir/staging
    export PINOT_VERSION=0.10.0
    export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
    export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"
    
    hadoop jar  \\
            ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\
            org.apache.pinot.tools.admin.PinotAdministrator \\
            LaunchDataIngestionJob \\
            -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
    enable.preprocessing = true
    preprocess.path.to.output = <output_path>
    {
        "OFFLINE": {
            "metadata": {
                "customConfigs": {
                    “preprocessing.operations”: “resize, partition, sort”, // To enable the following preprocessing operations
                    "preprocessing.max.num.records.per.file": "100",       // To enable resizing
                    "preprocessing.num.reducers": "3"                      // To enable resizing
                }
            },
            ...
            "tableIndexConfig": {
                "aggregateMetrics": false,
                "autoGeneratedInvertedIndex": false,
                "bloomFilterColumns": [],
                "createInvertedIndexDuringSegmentGeneration": false,
                "invertedIndexColumns": [],
                "loadMode": "MMAP",
                "nullHandlingEnabled": false,
                "segmentPartitionConfig": {       // To enable partitioning
                    "columnPartitionMap": {
                        "item": {
                            "functionName": "murmur",
                            "numPartitions": 4
                        }
                    }
                },
                "sortedColumn": [                // To enable sorting
                    "actorId"
                ],
                "streamConfigs": {}
            },
            "tableName": "tableName_OFFLINE",
            "tableType": "OFFLINE",
            "tenants": {
                ...
            }
        }
    }
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-adls
    pinot.controller.storage.factory.class.adl2.accountName=test-user
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'abfs://path/to/input/directory/'
    outputDirURI: 'abfs://path/to/output/directory/'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: adl2
          className: org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
          configs:
            accountName: 'my-account'
            accessKey: 'foo-bar-1234'
            fileSystemName: 'fs-name'
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=abfs://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
    pinot.controller.storage.factory.adl2.accountName=my-account
    pinot.controller.storage.factory.adl2.accessKey=foo-bar-1234
    pinot.controller.storage.factory.adl2.fileSystemName=fs-name
    pinot.controller.segment.fetcher.protocols=file,http,adl2
    pinot.controller.segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
    pinot.server.storage.factory.adl2.accountName=my-account
    pinot.server.storage.factory.adl2.accessKey=foo-bar-1234
    pinot.controller.storage.factory.adl2.fileSystemName=fs-name
    pinot.server.segment.fetcher.protocols=file,http,adl2
    pinot.server.segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
    storage.factory.adl2.accountName=my-account
    storage.factory.adl2.fileSystemName=fs-name
    storage.factory.adl2.accessKey=foo-bar-1234
    segment.fetcher.protocols=file,http,adl2
    segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher

    Amazon S3

    You can enable Amazon S3arrow-up-right Filesystem backend by including the plugin pinot-s3 .

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    You can also configure the S3 filesystem using the following options:

    Each of these properties should be prefixed by pinot.[node].storage.factory.s3. where node is either controller or server depending on the config

    e.g.

    S3 Filesystem supports authentication using the . The credential provider looks for the credentials in the following order -

    • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)

    • Java System Properties - aws.accessKeyId and aws.secretKey

    You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups.

    hashtag
    Examples

    hashtag
    Job spec

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    HDFS

    This guide shows you how to import data from HDFS.

    You can enable the using the plugin pinot-hdfs. In the controller or server, add the config:

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-s3

    (Optional) The server-side encryption algorithm used when storing this object in Amazon S3 (Now supports aws:kms), set to null to disable SSE.

    ssekmsKeyId

    (Optional, but required when serverSideEncryption=aws:kms) Specifies the AWS KMS key ID to use for object encryption. All GET and PUT requests for an object protected by AWS KMS will fail if not made via SSL or using SigV4.

    ssekmsEncryptionContext

    (Optional) Specifies the AWS KMS Encryption Context to use for object encryption. The value of this header is a base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs.

  • Web Identity Token credentials from the environment or container

  • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

  • Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable,

  • Instance profile credentials delivered through the Amazon EC2 metadata service

  • Configuration

    Description

    region

    The AWS Data center region in which the bucket is located

    accessKey

    (Optional) AWS access key required for authentication. This should only be used for testing purposes as we don't store these keys in secret.

    secretKey

    (Optional) AWS secret key required for authentication. This should only be used for testing purposes as we don't store these keys in secret.

    endpoint

    (Optional) Override endpoint for s3 client.

    disableAcl

    If this is set tofalse, bucket owner is granted full access to the objects created by pinot. Default value is true.

    DefaultCredentialsProviderChainarrow-up-right

    serverSideEncryption

    HDFS implementation provides the following options -

    • hadoop.conf.path : Absolute path of the directory containing hadoop XML configuration files such as hdfs-site.xml, core-site.xml .

    • hadoop.write.checksum : create checksum while pushing an object. Default is false

    • hadoop.kerberos.principle

    • hadoop.kerberos.keytab

    Each of these properties should be prefixed by pinot.[node].storage.factory.class.hdfs. where node is either controller or server depending on the config

    The kerberos configs should be used only if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide arrow-up-righton how to generate Kerberos security identification.

    You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.

    hashtag
    Push HDFS segment to Pinot Controller

    To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.

    For example, the following curl requests to Controller will notify it to download segment files to the proper table:

    hashtag
    Examples

    hashtag
    Job spec

    Standalone Job:

    Hadoop Job:

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    Hadoop DFSarrow-up-right
    pinot.controller.storage.factory.s3.region=ap-southeast-1
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 's3://pinot-bucket/pinot-ingestion/batch-input/'
    outputDirURI: 's3://pinot-bucket/pinot-ingestion/batch-output/'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: s3
          className: org.apache.pinot.plugin.filesystem.S3PinotFS
          configs:
            region: 'ap-southeast-1'
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=s3://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.controller.storage.factory.s3.region=ap-southeast-1
    pinot.controller.segment.fetcher.protocols=file,http,s3
    pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.server.storage.factory.s3.region=ap-southeast-1
    pinot.server.segment.fetcher.protocols=file,http,s3
    pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.minion.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.minion.storage.factory.s3.region=ap-southeast-1
    pinot.minion.segment.fetcher.protocols=file,http,s3
    pinot.minion.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-hdfs
    export HADOOP_HOME=/local/hadoop/
    export HADOOP_VERSION=2.7.1
    export HADOOP_GUAVA_VERSION=11.0.2
    export HADOOP_GSON_VERSION=2.2.4
    export CLASSPATH_PREFIX="${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-annotations-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-auth-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/guava-${HADOOP_GUAVA_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/gson-${HADOOP_GSON_VERSION}.jar"
    curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'hdfs:///path/to/input/directory/'
    outputDirURI: 'hdfs:///path/to/output/directory/'
    includeFileNamePath: 'glob:**/*.csv'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: hdfs
          className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
          configs:
            hadoop.conf.path: 'path/to/conf/directory/' 
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    executionFrameworkSpec:
        name: 'hadoop'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
        extraConfigs:
          stagingDir: 'hdfs:///path/to/staging/directory/'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'hdfs:///path/to/input/directory/'
    outputDirURI: 'hdfs:///path/to/output/directory/'
    includeFileNamePath: 'glob:**/*.csv'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: hdfs
          className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
          configs:
            hadoop.conf.path: '/etc/hadoop/conf/' 
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=hdfs://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
    pinot.controller.storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory/
    pinot.controller.segment.fetcher.protocols=file,http,hdfs
    pinot.controller.segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
    pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
    pinot.server.storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory/
    pinot.server.segment.fetcher.protocols=file,http,hdfs
    pinot.server.segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
    pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>
    storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
    storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory
    segment.fetcher.protocols=file,http,hdfs
    segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
    segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>

    Complex Type (Array, Map) Handling

    Complex-type handling in Apache Pinot.

    It's common for the ingested data to have complex structure. For example, Avro schema has recordsarrow-up-right and arraysarrow-up-right, and JSON data has objectsarrow-up-right and arraysarrow-up-right. In Apache Pinot, the data model supports primitive data types (including int, long, float, double, string, bytes), as well as limited multi-value types such as an array of primitive types. Such simple data types allow Pinot to build fast indexing structures for good query performance, but it requires some handling on the complex structures. There are in general two options for such handling: convert the complex-type data into JSON string and then build JSON index; or use the inbuilt complex-type handling rules in the ingestion config.

    In this page, we'll show how to handle this complex-type structure with these two approaches, to process the example data in the following figure, which is a field group from the Meetup events Quickstart examplearrow-up-right. Note this object has two child fields, and the child group is a nested array with the element of object type.

    Example JSON data

    hashtag
    Handle the complex type with JSON indexing

    Apache Pinot provides powerful to accelerate the value lookup and filtering for the column. To convert an object group with complex type to JSON, you can add the following config to table config.

    Note the config transformConfigs transforms the object group to a JSON string group_json, which then creates the JSON indexing with config jsonIndexColumns. To read the full spec, see this . Also note that group is a reserved keyword in SQL and therefore needs to be quoted in transformFunction.

    circle-info

    The columnName can't use the same name as any of the fields in the source JSON data e.g. if our source data contains the field group and we want to transform the data in that field before persisting it, the destination column name would need to be something different, like group_json.

    Additionally, you need to overwrite the maxLength of the field group_json on the schema, because by default, a string column has a limited length. For example,

    For the full spec, please check out this .

    With this, you can start to query the nested fields under group. For the details about the supported JSON function, see ).

    hashtag
    Handle the complex type with ingestion configurations

    Though JSON indexing is a handy way to process the complex types, there are some limitations:

    • It’s not performant to group by or order by a JSON field, because JSON_EXTRACT_SCALAR is needed to extract the values in the GROUP BY and ORDER BY clauses, which invokes the function evaluation.

    • For cases that you want to use Pinot's such as DISTINCTCOUNTMV

    Alternatively, from Pinot 0.8, you can use the complex-type handling in ingestion configurations to flatten and unnest the complex structure and convert them into primitive types. Then you can reduce the complex-type data into a flattened Pinot table, and query it via SQL. With the inbuilt processing rules, you do not need to write ETL jobs in another compute framework such as Flink or Spark.

    To process this complex-type, you can add the configuration complexTypeConfig to the ingestionConfig. For example:

    With the complexTypeConfig , all the map objects will be flattened to direct fields automatically. And with unnestFields , a record with the nested collection will unnest into multiple records. For instance, the example in the beginning will transform into two rows with this configuration example.

    Note that

    • The nested field group_id under group is flattened to field group.group_id. The default value of the delimiter is ., you can choose other delimiter by changing the configuration delimiter under complexTypeConfig. This flattening rule also apllies on the maps in the collections to be unnested.

    You can find the full spec of the table config and the table schema .

    With the flattening/unnesting, you can then query the table with primitive values using the SQL query like:

    Note . is a reserved character in SQL, so you need to quote the flattened column.

    hashtag
    Infer the Pinot schema from the Avro schema and JSON data

    When there are complex structures, it could be challenging and tedious to figure out the Pinot schema manually. To help the schema inference, Pinot provides utility tools to take the Avro schema or JSON data as input and output the inferred Pinot schema.

    To infer the Pinot schema from Avro schema, you can use the command like the following

    Note you can input configurations like fieldsToUnnest similar to the ones in complexTypeConfig. And this will simulate the complex-type handling rules on the Avro schema and output the Pinot schema in the file specified in outputDir.

    Similarly, you can use the command like the following to infer the Pinot schema from a file of JSON objects.

    You can check out an example of this run in this .

    Input formats

    This section contains a collection of guides that will show you how to import data from a Pinot supported input format.

    Pinot offers support for various popular input formats during ingestion. By changing the input format, you can reduce the time spent doing serialization-deserialization and speed up the ingestion.

    hashtag
    Configuring input formats

    The input format can be changed using the recordReaderSpec config in the ingestion job spec.

    The nested array group_topics under group is unnested into the top-level, and convert the output to a collection of two rows. Note the handling of the nested field within group_topics, and the eventual top-level field of group.group_topics.urlkey. All the collections to unnest shall be included in configuration fieldsToUnnest.
  • For the collections not in specified in fieldsToUnnest, the ingestion by default will serialize them into JSON string, except for the array of primitive values, which will be ingested as multi-value column by default. The behavior is defined in config collectionNotUnnestedToJson with default value to NON_PRIMITIVE. Other behaviors include (1) ALL, which aslo convert the array of primitive values to JSON string; (2) NONE, this does not do conversion, but leave it to the users to use transform functions for handling.

  • JSON index
    filearrow-up-right
    filearrow-up-right
    guide
    multi-column functionsarrow-up-right
    herearrow-up-right
    herearrow-up-right
    PRarrow-up-right
    Flattened/unnested data
    The config consists of the following keys:
    • dataFormat - Name of the data format to consume.

    • className - name of the class that implements the RecordReader interface. This class is used for parsing the data.

    • configClassName - name of the class that implements the RecordReaderConfig interface. This class is used the parse the values mentioned in configs

    • configs - Key value pair for format specific configs. This field can be left out.

    hashtag
    Supported input formats

    Pinot supports the multiple input formats out of the box. You just need to specify the corresponding readers and the associated custom configs to switch between the formats.

    hashtag
    CSV

    CSV Record Reader supports the following configs -

    fileFormat - can be one of default, rfc4180, excel, tdf, mysql

    header - header of the file. The columnNames should be seperated by the delimiter mentioned in the config

    delimiter - The character seperating the columns

    multiValueDelimiter - The character seperating multiple values in a single column. This can be used to split a column into a list.

    circle-info

    Your CSV file may have raw text fields that cannot be reliably delimited using any character. In this case, explicitly set the multiValueDelimeter field to empty in the ingestion config. multiValueDelimiter: ''

    hashtag
    AVRO

    The Avro record reader converts the data in file to a GenericRecord. A java class or .avro file is not required.

    hashtag
    JSON

    hashtag
    Thrift

    circle-info

    Thrift requires the generated class using .thrift file to parse the data. The .class file should be available in the Pinot's classpath. You can put the files in the lib/ folder of pinot distribution directory.

    hashtag
    Parquet

    circle-exclamation

    The above class doesn't read the Parquet INT96 and Decimaltype.

    Please use the below class to handle INT96 and Decimaltype.

    Parquet Data Type

    Java Data Type

    Comment

    INT96

    INT64

    ParquetINT96 type converts nanoseconds

    to Pinot INT64 type of milliseconds

    DECIMAL

    DOUBLE

    hashtag
    ORC

    ORC record reader supports the following data types -

    ORC Data Type

    Java Data Type

    BOOLEAN

    String

    SHORT

    Integer

    INT

    Integer

    LONG

    Integer

    FLOAT

    Float

    circle-info

    In LIST and MAP types, the object should only belong to one of the data types supported by Pinot.

    hashtag
    Protocol Buffers

    The reader requires a descriptor file to deserialize the data present in the files. You can generate the descriptor file (.desc) from the .proto file using the command -

    json_meetupRsvp_realtime_table_config.json
    {
        "ingestionConfig":{
          "transformConfigs": [
            {
              "columnName": "group_json",
              "transformFunction": "jsonFormat(\"group\")"
            }
          ],
        },
        ...
        "tableIndexConfig": {
        "loadMode": "MMAP",
        "noDictionaryColumns": [
          "group_json"
        ],
        "jsonIndexColumns": [
          "group_json"
        ]
      },
    
    }
    json_meetupRsvp_realtime_table_schema.json
    {
      {
          "name": "group_json",
          "dataType": "JSON",
          "maxLength": 2147483647
        }
        ...
    }
    complexTypeHandling_meetupRsvp_realtime_table_config.json
    {
      "ingestionConfig": {    
        "complexTypeConfig": {
          "delimiter": '.',
          "fieldsToUnnest": ["group.group_topics"],
          "collectionNotUnnestedToJson": "NON_PRIMITIVE"
        }
      }
    }
    SELECT "group.group_topics.urlkey", 
           "group.group_topics.topic_name", 
           "group.group_id" 
    FROM meetupRsvp
    LIMIT 10
    bin/pinot-admin.sh AvroSchemaToPinotSchema \
      -timeColumnName fields.hoursSinceEpoch \
      -avroSchemaFile /tmp/test.avsc \
      -pinotSchemaName myTable \
      -outputDir /tmp/test \
      -fieldsToUnnest entries
    bin/pinot-admin.sh JsonToPinotSchema \
      -timeColumnName hoursSinceEpoch \
      -jsonFile /tmp/test.json \
      -pinotSchemaName myTable \
      -outputDir /tmp/test \
      -fieldsToUnnest payload.commits
    recordReaderSpec:
      dataFormat: 'csv'
      className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
      configs: 
    			key1 : 'value1'
    			key2 : 'value2'
    dataFormat: 'csv'
    className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
    configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    configs:
    	fileFormat: 'default' #should be one of default, rfc4180, excel, tdf, mysql
    	header: 'columnName seperated by delimiter'
      delimiter: ','
      multiValueDelimiter: '-'
    dataFormat: 'avro'
    className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    dataFormat: 'json'
    className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
    dataFormat: 'thrift'
    className: 'org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader'
    configs:
    	thriftClass: 'ParserClassName'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'
    dataFormat: 'orc'
    className: 'org.apache.pinot.plugin.inputformat.orc.ORCRecordReader'
    dataFormat: 'proto'
    className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
    configs:
    	descriptorFile: 'file:///path/to/sample.desc'
    protoc --include_imports --descriptor_set_out=/absolute/path/to/output.desc /absolute/path/to/input.proto

    DOUBLE

    Double

    STRING

    String

    VARCHAR

    String

    CHAR

    String

    LIST

    Object[]

    MAP

    Map<Object, Object>

    DATE

    Long

    TIMESTAMP

    Long

    BINARY

    byte[]

    BYTE

    Integer

    Stream ingestion

    Apache Pinot lets users consume data from streams and push it directly into the database, in a process known as stream ingestion. Stream Ingestion makes it possible to query data within seconds of publication.

    Stream Ingestion provides support for checkpoints for preventing data loss.

    Setting up Stream ingestion involves the following steps:

    1. Create schema configuration

    2. Create table configuration

    3. Upload table and schema spec

    Let's take a look at each of the steps in more detail.

    Let us assume the data to be ingested is in the following format:

    hashtag
    Create Schema Configuration

    Schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions , metrics or timestamp. For more details on schema configuration, see .

    For our sample data, the schema configuration looks like this:

    hashtag
    Create Table Configuration

    The next step is to create a table where all the ingested data will flow and can be queried. Unlike batch ingestion, table configuration for real-time ingestion also triggers the data ingestion job. For a more detailed overview of tables, see the reference.

    The real-time table configuration consists of the following fields:

    • tableName - The name of the table where the data should flow

    • tableType - The internal type for the table. Should always be set to REALTIME for realtime ingestion

    • segmentsConfig -

    Config key
    Description
    Supported values

    The following flush threshold settings are also supported:

    Config key
    Description
    Supported values

    You can also specify additional configs for the consumer by prefixing the key with stream.[streamType] where streamType is the name of the streaming platform. e.g. kafka

    For our sample data and schema, the table config will look like this:

    hashtag
    Upload schema and table config

    Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, pinot will start ingesting available records from the topic.

    hashtag
    Custom Ingestion Support

    We are working on support for other ingestion platforms, but you can also write your own ingestion plugin if it is not supported out of the box. For a walkthrough, see .

    Stream Ingestion with Upsert

    Upsert support in Apache Pinot.

    Pinot provides native support of upsert during the real-time ingestion (v0.6.0+). There are scenarios that the records need modifications, such as correcting a ride fare and updating a delivery status.

    With the foundation of full upsert support in Pinot, another category of use cases on partial upsert are enabled (v0.8.0+). Partial upsert is convenient to users so that they only need to specify the columns whose value changes, and ignore the others.

    To enable upsert on a Pinot table, there are a couple of configurations to make on the table configurations as well as on the input stream.

    hashtag
    Define the primary key in the schema

    To update a record, a primary key is needed to uniquely identify the record. To define a primary key, add the field primaryKeyColumns to the schema definition. For example, the schema definition of UpsertMeetupRSVP in the quick start example has this definition.

    Note this field expects a list of columns, as the primary key can be composite.

    When two records of the same primary key are ingested, the record with the greater event time (as defined by the time column) is used. When records with the same primary key and event time, then the order is not determined. In most cases, the later ingested record will be used, but may not be so in the cases when the table has a column to sort by.

    hashtag
    Partition the input stream by the primary key

    An important requirement for the Pinot upsert table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the API. If the original stream is not partitioned, then a streaming processing job (e.g. Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.

    hashtag
    Enable upsert in the table configurations

    There are a few configurations needed in the table configurations to enable upsert.

    hashtag
    Upsert mode

    For append-only tables, the upsert mode defaults to NONE. To enable the full upsert, set the mode to FULL for the full update. For example:

    Pinot also added the partial update support in v0.8.0+. To enable the partial upsert, set the mode to PARTIAL and specify partialUpsertStrategies for partial upsert columns. Since v0.10.0, defaultPartialUpsertStrategy is introduced as the default merge strategy for columns without specified strategy. For example:

    Pinot supports the following partial upsert strategies -

    Strategy
    Description
    circle-info

    Note: If you don't specify any strategy for a given column, by default the value will always be overwritten by the new value for that column. In v0.10.0+, we added support for defaultPartialUpsertStrategy. The default value of defaultPartialUpsertStrategy is OVERWRITE.

    hashtag
    Comparison Column

    By default, Pinot uses the value in the time column to determine the latest record. That means, for two records with the same primary key, the record with the larger value of the time column is picked as the latest update. However, there are cases when users need to use another column to determine the order. In such case, you can use option comparisonColumn to override the column used for comparison. For example,

    For partial upsert table, the out-of-order events won't be consumed and indexed. For example, for two records with the same primary key, if the record with the smaller value of the comparison column came later than the other record, it will be skipped.

    hashtag
    Use strictReplicaGroup for routing

    The upsert Pinot table can use only the low-level consumer for the input streams. As a result, it uses the for the segments. Moreover,upsert poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires to use strictReplicaGroup as the routing strategy. To use that, configure instanceSelectorType in Routing as the following:

    hashtag
    Limitations

    There are some limitations for the upsert Pinot tables.

    First, the high-level consumer is not allowed for the input stream ingestion, which means stream.kafka.consumer.type must be lowLevel.

    Second, the star-tree index cannot be used for indexing, as the star-tree index performs pre-aggregation during the ingestion.

    Third, unlike append-only tables, out-of-order events won't be consumed and indexed by Pinot partial upsert table, these late events will be skipped.

    hashtag
    Best practices

    Unlike other real-time tables, Upsert table takes up more memory resources as it needs to bookkeep the record locations in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Upsert table.

    • Create the Kafka topic with more partitions. The number of Kafka partitions determines the partition numbers of the Pinot table. The more partitions you have in the Kafka topic, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally.

    • Upsert table maintains an in-memory map from the primary key to the record location. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction config in the Upsert config, which can be MD5 or MURMUR3, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.

    hashtag
    Example

    Putting these together, you can find the table configurations of the quick start example as the following:

    hashtag
    Quick Start

    To illustrate how the full upsert works, the Pinot binary comes with a quick start example. Use the following command to creates a realtime upsert table meetupRSVP.

    You can also run partial upsert demo with the following command

    As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to checkout the realtime data.

    For partial upsert you can see only the value from configured column changed based on specified partial upsert strategy.

    An example for partial upsert is shown below, each of the event_id kept being unique during ingestion, meanwhile the value of rsvp_count incremented.

    To see the difference from the append-only table, you can use a query option skipUpsert to skip the upsert effect in the query result.

    tableIndexConfig - defines which column to use for indexing along with the type of index. For full configuration, see [Indexing Configs]. It has the following required fields -

    • loadMode - specifies how the segments should be loaded. Should beheap or mmap. Here's the difference between both the configs

      • mmap: Segments are loaded onto memory-mapped files. This is the default mode.

      • heap: Segments are loaded into direct memory. Note, 'heap' here is a legacy misnomer, and it does not imply JVM heap. This mode should only be used when we want faster performance than memory-mapped files, and are also sure that we will never run into OOM.

    • streamConfig - specifies the data source along with the necessary configs to start consuming the real-time data. The streamConfig can be thought of as the equivalent to the job spec for batch ingestion. The following options are supported:

    Name of the class to be used for parsing the data. The class should implement org.apache.pinot.spi.stream.StreamMessageDecoder interface

    String. Available options:

    • org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder

    • org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder

    stream.[streamType].consumer.factory.class.name

    Name of the factory class to be used to provide the appropriate implementation of low level and high level consumer as well as the metadata

    String. Available options:

    • org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory

    • org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

    stream.[streamType].consumer.prop.auto.offset.reset

    Determines the offset from which to start the ingestion

    • smallest

    • largest or

    • timestamp in milliseconds

    topic.consumption.rate.limit

    Determines the upper bound for consumption rate for the whole topic. Having a consumption rate limiter is beneficial in case the stream message rate has a bursty pattern which leads to long GC pauses on the Pinot servers. The rate limiter can also be considered as a safeguard against excessive ingestion of realtime tables.

    Double. The values should be greater than zero.

    streamType

    The streaming platform from which to consume the data

    kafka

    stream.[streamType].consumer.type

    Whether to use per partition low-level consumer or high-level stream consumer

    • lowLevel - Consume data from each partition with offset management

    • highLevel - Consume data without control over the partitions

    stream.[streamType].topic.name

    The datasource (e.g. topic, data stream) from which to consume the data

    String

    realtime.segment.flush.threshold.time

    Time threshold that will keep the realtime segment open for before we complete the segment. Noted that this time should be smaller than the Kafka retention period configured for the corresponding topic.

    realtime.segment.flush.threshold.rows

    Row count flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,

    since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,

    then a high level consumer would have a buffer size of two million.

    If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless

    threshold.time is reached first)

    realtime.segment.flush.threshold.segment.size

    The desired size of a completed realtime segment. This config is used only if realtime.segment.flush.threshold.rows is set to 0.

    creating a schema
    table
    Stream Ingestion Plugin

    stream.[streamType].decoder.class.name

    Set up a dashboard over the metric pinot.server.upsertPrimaryKeysCount.tableName to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth.

  • Capacity planning. It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the amount of the primary keys in the Kafka throughput per partition and time the primary key space cost to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an upsert table instance.

  • OVERWRITE

    Overwrite the column of the last record

    INCREMENT

    Add the new value to the existing values

    APPEND

    Add the new item to the Pinot unordered set

    UNION

    Add the new item to the Pinot unordered set if not exists

    IGNORE

    Ignore the new value, keep the existing value (v0.10.0+)

    sendarrow-up-right
    partitioned replica-group assignment
    Query the upsert table
    Query the partial upsert table
    Explain partial upsert table
    Disable the upsert during query via query option
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
    {"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
    /tmp/pinot-quick-start/transcript-schema.json
    {
      "schemaName": "transcript",
      "dimensionFieldSpecs": [
        {
          "name": "studentID",
          "dataType": "INT"
        },
        {
          "name": "firstName",
          "dataType": "STRING"
        },
        {
          "name": "lastName",
          "dataType": "STRING"
        },
        {
          "name": "gender",
          "dataType": "STRING"
        },
        {
          "name": "subject",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "score",
          "dataType": "FLOAT"
        }
      ],
      "dateTimeFieldSpecs": [{
        "name": "timestamp",
        "dataType": "LONG",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }]
    }
    {
      "tableName": "transcript",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "transcript-topic",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.broker.list": "localhost:9876",
          "realtime.segment.flush.threshold.time": "3600000",
          "realtime.segment.flush.threshold.rows": "50000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    docker run \
        --network=pinot-demo \
        -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
        --name pinot-streaming-table-creation \
        apachepinot/pinot:latest AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -controllerHost pinot-quickstart \
        -controllerPort 9000 \
        -exec
    bin/pinot-admin.sh AddTable \
        -schemaFile /path/to/transcript-schema.json \
        -tableConfigFile /path/to/transcript-table-realtime.json \
        -exec
    upsert_meetupRsvp_schema.json
    {
        "primaryKeyColumns": ["event_id"]
    }
    upsert mode: full
    {
      "upsertConfig": {
        "mode": "FULL"
      }
    }
    upsert mode: partial (v0.8.0)
    {
      "upsertConfig": {
        "mode": "PARTIAL",
        "partialUpsertStrategies":{
          "rsvp_count": "INCREMENT",
          "group_name": "UNION",
          "venue_name": "APPEND"
        }
      }
    }
    upsert mode: partial (v0.10.0+)
    {
      "upsertConfig": {
        "mode": "PARTIAL",
        "defaultPartialUpsertStrategy": "OVERWRITE",
        "partialUpsertStrategies":{
          "rsvp_count": "INCREMENT",
          "group_name": "UNION",
          "venue_name": "APPEND"
        }
      }
    }
    comparison column
    {
      "upsertConfig": {
        "mode": "FULL",
        "comparisonColumn": "anotherTimeColumn",
        "hashFunction": "NONE"
      }
    }
    routing
    {
      "routing": {
        "instanceSelectorType": "strictReplicaGroup"
      }
    }
    upsert_meetupRsvp_realtime_table_config.json
    {
      "tableName": "meetupRsvp",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mtime",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
        "schemaName": "meetupRsvp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowLevel",
          "stream.kafka.topic.name": "meetupRSVPEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092",
          "realtime.segment.flush.threshold.rows": 30
        }
      },
      "metadata": {
        "customConfigs": {}
      },
      "routing": {
        "instanceSelectorType": "strictReplicaGroup"
      },
      "upsertConfig": {
        "mode": "FULL"
      }
    }
    # stop previous quick start cluster, if any
    bin/quick-start-upsert-streaming.sh
    # stop previous quick start cluster, if any
    bin/quick-start-partial-upsert-streaming.sh
    org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder
  • org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder

  • org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory
  • org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory

  • Apache Kafka

    This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.

    hashtag
    Introduction

    In this guide, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.

    Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic

    Start Kafka

    Create a Kafka Topic

    Start Kafka

    Start Kafka cluster on port 9876 using the same Zookeeper from the .

    Create a Kafka topic

    Download the latest . Create a topic.

    hashtag
    Creating Schema Configuration

    We will publish the data in the same format as mentioned in the docs. So you can use the same schema mentioned under .

    hashtag
    Creating a table configuration

    The real-time table configuration for the transcript table described in the schema from the previous step.

    For Kafka, we use streamType as kafka . Currently only JSON format is supported but you can easily write your own decoder by extending the StreamMessageDecoder interface. You can then access your decoder class by putting the jar file in plugins directory

    The lowLevel consumer reads data per partition whereas the highLevel consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.

    For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory

    For Kafka version 2.X and above, use org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

    You can set the offset to -

    • smallest to start consumer from the earliest offset

    • largest to start consumer from the latest offset

    • timestamp in milliseconds

    The resulting configuration should look as follows -

    hashtag
    Upgrade from Kafka 0.9 connector to Kafka 2.x connector

    • Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

    • If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server into tableIndexConfig.streamConfigs. This config should be the URI of Kafka broker lists, e.g.

    hashtag
    How to consume from higher Kafka version?

    This connector is also suitable for Kafka lib version higher than 2.0.0. In , change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

    hashtag
    How to consume transactional-committed Kafka messages

    The connector with Kafka lib 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level in Kafka stream config, which can be read_committed or read_uncommitted (default). Setting it to read_committed will ingest transactionally committed messages in Kafka stream only.

    hashtag
    Upload schema and table

    Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.

    hashtag
    Add sample data to the Kafka topic

    We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json.

    Push sample JSON into the transcript-topic Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json file.

    hashtag
    Ingesting streaming data

    As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the to checkout the real-time data.

    hashtag
    Some More kafka ingestion configs

    hashtag
    Use Kafka Partition(Low) Level Consumer with SSL

    Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl. are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    hashtag
    Ingest transactionally committed messages only from Kafka

    With Kafka consumer 2.0, you can ingest transactionally committed messages only by configuring kafka.isolation.level to read_committed. For example,

    Note that the default value of this config read_uncommitted to read all messages. Also, this config supports low-level consumer only.

    hashtag
    Use Kafka Level Consumer with SASL_SSL

    Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    to start the consumer from the offset after the timestamp.
    localhost:9092
    .
    bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2123/kafka -port 9876
    quick-start examples
    Kafkaarrow-up-right
    Stream ingestion
    Create Schema Configuration
    Kafka 2.0 connector pom.xmlarrow-up-right
    Query Console arrow-up-right
    docker run \
        --network pinot-demo --name=kafka \
        -e KAFKA_ZOOKEEPER_CONNECT=pinot-quickstart:2123/kafka \
        -e KAFKA_BROKER_ID=0 \
        -e KAFKA_ADVERTISED_HOST_NAME=kafka \
        -d wurstmeister/kafka:latest
    docker exec \
      -t kafka \
      /opt/kafka/bin/kafka-topics.sh \
      --zookeeper pinot-quickstart:2123/kafka \
      --partitions=1 --replication-factor=1 \
      --create --topic transcript-topic
    bin/kafka-topics.sh --create --bootstrap-server localhost:9876 --replication-factor 1 --partitions 1 --topic transcript-topic
    /tmp/pinot-quick-start/transcript-table-realtime.json
     {
      "tableName": "transcript",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "transcript-topic",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.broker.list": "localhost:9876",
          "realtime.segment.flush.threshold.time": "3600000",
          "realtime.segment.flush.threshold.rows": "50000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    docker run \
        --network=pinot-demo \
        -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
        --name pinot-streaming-table-creation \
        apachepinot/pinot:latest AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -controllerHost pinot-quickstart \
        -controllerPort 9000 \
        -exec
    bin/pinot-admin.sh AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -exec
    transcript.json
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
    {"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
    bin/kafka-console-producer.sh \
        --broker-list localhost:9876 \
        --topic transcript-topic < transcript.json
    SELECT * FROM transcript
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "localhost:2191/kafka",
            "stream.kafka.broker.list": "localhost:9876",
            "schema.registry.url": "",
            "security.protocol": "SSL",
            "ssl.truststore.location": "",
            "ssl.keystore.location": "",
            "ssl.truststore.password": "",
            "ssl.keystore.password": "",
            "ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.protocol": "",
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "localhost:2191/kafka",
            "stream.kafka.broker.list": "localhost:9876",
            "stream.kafka.isolation.level": "read_committed"
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
    "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.topic.name": "mytopic",
            "stream.kafka.consumer.prop.auto.offset.reset": "largest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.broker.list": "kafka-broker-host:9092",
            "stream.kafka.schema.registry.url": "https://xxx",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "https://xxx",
            "stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
            "stream.kafka.decoder.prop.schema.registry.basic.auth.user.info": "schema_registry_username:schema_registry_password",
            "sasl.mechanism": "PLAIN" ,
            "security.protocol": "SASL_SSL" ,
            "sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.autotune.initialRows": "3000000",
            "realtime.segment.flush.threshold.segment.size": "500M"
          },