arrow-left

All pages
gitbookPowered by GitBook
1 of 27

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Import Data

This page lists options for importing data into Apache Pinot™ with links to detailed instructions with examples.

There are multiple options for importing data into Apache Pinot™. The pages in this section provide step-by-step instructions for importing records into Pinot, supported by our plugin architecture. The intent is to get you up and running with imported data as quickly as possible.

Pinot supports multiple file input formats without needing to change anything other than the file name. Each example imports a readsdsdy-made dataset so you can see how things work without needing to find or create your own dataset.

hashtag
Pinot Batch Ingestion

These guides show you how to import data from popular big data platforms.

hashtag
Pinot Stream Ingestion

This guide shows you how to import data using stream ingestion from Apache Kafka topics.

This guide shows you how to import data using stream ingestion with upsert.

This guide shows you how to import data using stream ingestion with deduplication.

This guide shows you how to import data using stream ingestion with CLP.

hashtag
Pinot file systems

By default, Pinot does not come with a storage layer, so all the data sent won't be stored in case of system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add a deep storage. See for all the info and related configs.

These guides show you how to import data and persist it in these file systems.

hashtag
Pinot input formats

This guide shows you how to import data from various Pinot-supported input formats.

This guide shows you how to handle the complex type in the ingested data, such as map and array.

This guide shows you how to handle records with dynamic schemas, like JSON log events.

hashtag
Reloading and uploading existing Pinot segments

This guide shows you how to reload Pinot segments from your deep store.

This guide shows you how to upload Pinot segments from an old, closed Pinot instance.

From Query Console

Insert a file into Pinot from Query Console

circle-info

This feature is supported after the 0.11.0 release. Reference PR:

hashtag
Prerequisite

Sparkchevron-right
Hadoopchevron-right
Ingest streaming data from Apache Kafkachevron-right
Stream ingestion with Upsertchevron-right
Stream ingestion with Dedupchevron-right
Stream ingestion with CLPchevron-right
File systems
Amazon S3chevron-right
Azure Data Lake Storagechevron-right
Google Cloud Storagechevron-right
HDFSchevron-right
Input formatschevron-right
Complex Type (Array, Map) Handlingchevron-right
Ingest records with dynamic schemaschevron-right
Reload a table segmentchevron-right
Upload a table segmentchevron-right

Ensure you have available Pinot Minion instances deployed within the cluster.

  • Pinot version is 0.11.0 or above

  • hashtag
    How it works

    1. Parse the query with the table name and directory URI along with a list of options for the ingestion job.

    2. Call controller minion task execution API endpoint to schedule the task on minion

    3. Response has the schema of table name and task job id.

    hashtag
    Usage Syntax

    INSERT INTO [database.]table FROM FILE dataDirURI OPTION ( k=v ) [, OPTION (k=v)]*

    hashtag
    Example

    hashtag
    Insert Rows into Pinot

    We are actively developing this feature...

    The details will be revealed soon.

    https://github.com/apache/pinot/pull/8557arrow-up-right
    SET taskName = 'myTask-s3';
    SET input.fs.className = 'org.apache.pinot.plugin.filesystem.S3PinotFS';
    SET input.fs.prop.accessKey = 'my-key';
    SET input.fs.prop.secretKey = 'my-secret';
    SET input.fs.prop.region = 'us-west-2';
    INSERT INTO "baseballStats"
    FROM FILE 's3://my-bucket/public_data_set/baseballStats/rawdata/'

    Flink

    Batch ingestion of data into Apache Pinot using Apache Flink.

    Pinot supports Apache Flink as a processing framework to push segment files to the database.

    Pinot distribution contains an Apache Flink SinkFunctionarrow-up-right that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.

    hashtag
    Example

    hashtag
    Flink application

    Here is an example code snippet to show how to utilize the in a Flink streaming application:

    As in the example shown above, the only required information from the Pinot side is the table and the table .

    For a more detailed executable, refer to the .

    hashtag
    Table Config

    PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.

    Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.

    Here is an example table config

    the only required configurations are:

    • "outputDirURI": where PinotSinkFunction should write the constructed segment file to

    • "push.controllerUri": which Pinot cluster (controller) URL PinotSinkFunction should communicate with.

    The rest of the configurations are standard for any Pinot table.

    Dimension table

    Batch ingestion of data into Apache Pinot using dimension tables.

    Dimension tables are a special kind of offline tables from which data can be looked up via the lookup UDF, providing join-like functionality.

    Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups. When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.

    A dimension table cannot be part of a hybrid table.

    Configure dimension tables using following properties in the table configuration:

    • isDimTable: Set to true.

    • segmentsConfig.segmentPushType: Set to REFRESH.

    • dimensionTableConfig.disablePreload: By default, dimension tables are preloaded to allow for fast lookups. Set to true to trade off speed for memory by storing only the segment reference and docID. Otherwise, the whole row is stored in the Dimension table hash map.

    • controller.dimTable.maxSize: Determines the maximum size quota for a dimension table in a cluster. Table creation will fail if the storage quota exceeds this maximum size.

    • dimensionFieldSpecs: To look up dimension values, dimension tables need a primary key. For details, see .

    hashtag
    Example dimension table configuration

    PinotSinkFunctionarrow-up-right
    schema
    config
    quick start examplearrow-up-right
    dimensionFieldSpecsarrow-up-right
    // some environmental setup
    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Row> srcRows = execEnv.addSource(new FlinkKafkaConsumer<Row>(...));
    RowTypeInfo typeInfo = new RowTypeInfo(
      new TypeInformation[]{Types.FLOAT, Types.FLOAT, Types.STRING, Types.STRING},
      new String[]{"lon", "lat", "address", "name"});
    
    
    // add processing logic for the data stream for example:
    DataStream<Row> processedRows = srcRow.keyBy(r -> r.getField(0));
    ...
    
    // configurations for PinotSinkFunction
    Schema pinotSchema = ...
    TableConfig pinotTableConfig = ...
    processedRows.addSink(new PinotSinkFunction<>(
      new FlinkRowGenericRowConverter(typeInfo), 
      pinotTableConfig,
      pinotSchema);
    
    // execute the program
    execEnv.execute();
    {
      "tableName" : "tbl_OFFLINE",
      "tableType" : "OFFLINE",
      "segmentsConfig" : {
        // ...
      },
      "tenants" : {
        // ...
      },
      "tableIndexConfig" : {
        // ....
      },
      "ingestionConfig": {
        "batchIngestionConfig": {
          "segmentIngestionType": "APPEND",
          "segmentIngestionFrequency": "HOURLY", 
          "batchConfigMaps": [
            {
              "outputDirURI": "file://path/to/flink/segmentwriter/output/dir",
              "overwriteOutput": "false",
              "push.controllerUri": "https://target.pinot.cluster.controller.url"
            }
          ]
        }
      }
    }
    
    {
      "OFFLINE": {
        "tableName": "dimBaseballTeams_OFFLINE",
        "tableType": "OFFLINE",
        "segmentsConfig": {
          "schemaName": "dimBaseballTeams",
          "segmentPushType": "REFRESH"
        },
        "metadata": {},
        "quota": {
          "storage": "200M"
        },
        "isDimTable": true
        }.
        "dimensionTableConfig": {
          "disablePreload": true
          }
      }
    }
    ...
    {
      "dimensionFieldSpecs": [
        {
          "dataType": "STRING",
          "name": "teamID"
        },
        {
          "dataType": "STRING",
          "name": "teamName"
        }
      ],
      "schemaName": "dimBaseballTeams",
      "primaryKeyColumns": ["teamID"]
    }

    Reload a table segment

    Reload a table segment in Apache Pinot.

    When Pinot writes data to segments in a table, it saves those segments to a deep store location specified in your table configuration, such as a storage drive or Amazon S3 bucket.

    circle-info

    If a new column is added to your table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s). To ensure accurate values are reloaded, see how to add a new column during ingestion.

    hashtag
    Use the Pinot Controller API to reload segments

    To reload all segments from a table, use:

    To reload a specific segment from a table, use:

    A successful API call returns the following response:

    hashtag
    Use the Pinot Admin Console to reload segments

    To use the Pinot Admin Console, do the following:

    1. From the left navigation menu, select Cluster Manager.

    2. Under TENANTS, select the Tenant Name.

    3. From the list of tables in the tenant, select the Table Name.

    Backfill Data

    Batch ingestion of backfill data into Apache Pinot.

    hashtag
    Introduction

    Pinot batch ingestion involves two parts: routine ingestion job(hourly/daily) and backfill. Here are some examples to show how routine batch ingestion works in Pinot offline table:

  • Batch Ingestion in Practicearrow-up-right

  • High-level description

    1. Organize raw data into buckets (eg: /var/pinot/airlineStats/rawdata/2014/01/01). Each bucket typically contains several files (eg: /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01_0.avro)

    2. Run a Pinot batch ingestion job, which points to a specific date folder like ‘/var/pinot/airlineStats/rawdata/2014/01/01’. The segment generation job will convert each such avro file into a Pinot segment for that day and give it a unique name.

    3. Run Pinot segment push job to upload those segments with those uniques names via a Controller API

    circle-info

    IMPORTANT: The segment name is the unique identifier used to uniquely identify that segment in Pinot. If the controller gets an upload request for a segment with the same name - it will attempt to replace it with the new one.

    This newly uploaded data can now be queried in Pinot. However, sometimes users will make changes to the raw data which need to be reflected in Pinot. This process is known as 'Backfill'.

    hashtag
    How to backfill data in Pinot

    Pinot supports data modification only at the segment level, which means you must update entire segments for doing backfills. The high level idea is to repeat steps 2 (segment generation) and 3 (segment upload) mentioned above:

    • Backfill jobs must run at the same granularity as the daily job. E.g., if you need to backfill data for 2014/01/01, specify that input folder for your backfill job (e.g.: ‘/var/pinot/airlineStats/rawdata/2014/01/01’)

    • The backfill job will then generate segments with the same name as the original job (with the new data).

    • When uploading those segments to Pinot, the controller will replace the old segments with the new ones (segment names act like primary keys within Pinot) one by one.

    hashtag
    Edge case example

    Backfill jobs expect the same number of (or more) data files on the backfill date. So the segment generation job will create the same number of (or more) segments than the original run.

    For example, assuming table airlineStats has 2 segments(airlineStats_2014-01-01_2014-01-01_0, airlineStats_2014-01-01_2014-01-01_1) on date 2014/01/01 and the backfill input directory contains only 1 input file. Then the segment generation job will create just one segment: airlineStats_2014-01-01_2014-01-01_0. After the segment push job, only segment airlineStats_2014-01-01_2014-01-01_0 got replaced and stale data in segment airlineStats_2014-01-01_2014-01-01_1 are still there.

    If the raw data is modified in such a way that the original time bucket has fewer input files than the first ingestion run, backfill will fail.

    Batch Ingestion Overviewarrow-up-right

    Do one of the following:

    • To reload all segments, under OPERATIONS, click Reload All Segments.

    • To reload a specific segment, under SEGMENTS, select the Segment Name, and then in the new OPERATIONS section, select Reload Segment.

    POST /segments/{tableName}/reload
    POST /segments/{tableName}/{segmentName}/reload
    {
        "status": "200"
    }

    Hadoop

    Batch ingestion of data into Apache Pinot using Apache Hadoop.

    hashtag
    Segment Creation and Push

    Pinot supports Apache Hadooparrow-up-right as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

    You can follow the wiki to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

    Next, you need to change the execution config in the job spec to the following -

    You can check out the sample job spec here.

    Finally execute the hadoop job using the command -

    Ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

    hashtag
    Data Preprocessing before Segment Creation

    We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.

    The MapReduce job called SegmentPreprocessingJob would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.

    Check the below example to see how to use SegmentPreprocessingJob.

    In Hadoop properties, set the following to enable this job:

    In table config, specify the operations in preprocessing.operations that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:

    hashtag
    preprocessing.num.reducers

    Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.

    hashtag
    preprocessing.max.num.records.per.file

    Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.

    For more details on this MR job, refer to this .

    HDFS

    This guide shows you how to import data from HDFS.

    Enable the using the pinot-hdfs plugin. In the controller or server, add the config:

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    Spark

    Batch ingestion of data into Apache Pinot using Apache Spark.

    Pinot supports Apache Spark (2.x and 3.x) as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

    To set up Spark, do one of the following:

    • Use the Spark-Pinot Connector. For more information, see the .

    Segment compaction on upserts

    Use segment compaction on upsert-enabled real-time tables.

    hashtag
    Overview of segment compaction

    Compacting a segment replaces the completed segment with a compacted segment that only contains the latest version of records. For more information about how to use upserts on a real-time table in Pinot, see .

    The Pinot upsert feature stores all versions of the record ingested into immutable segments on disk. Even though the previous versions are not queried, they continue to add to the storage overhead. To remove older records (no longer used in query results) and reclaim storage space, we need to compact Pinot segments periodically. Segment compaction is done via a new minion task. To schedule Pinot tasks periodically, see the .

    File Systems

    This section contains a collection of short guides to show you how to import data from a Pinot-supported file system.

    FileSystem is an abstraction provided by Pinot to access data stored in distributed file systems (DFS).

    Pinot uses distributed file systems for the following purposes:

    • Batch ingestion job: To read the input data (CSV, Avro, Thrift, etc.) and to write generated segments to DFS.

    documentarrow-up-right

    HDFS implementation provides the following options:

    • hadoop.conf.path: Absolute path of the directory containing Hadoop XML configuration files, such as hdfs-site.xml, core-site.xml .

    • hadoop.write.checksum: Create checksum while pushing an object. Default is false

    • hadoop.kerberos.principle

    • hadoop.kerberos.keytab

    Each of these properties should be prefixed by pinot.[node].storage.factory.class.hdfs. where node is either controller or server depending on the config

    The kerberos configs should be used only if your Hadoop installation is secured with Kerberos. Refer to the Hadoop in secure mode documentationarrow-up-right for information on how to secure Hadoop using Kerberos.

    You must provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.

    hashtag
    Push HDFS segment to Pinot Controller

    To push HDFS segment files to Pinot controller, send the HDFS path of your newly created segment files to the Pinot Controller. The controller will download the files.

    This curl example requests tells the controller to download segment files to the proper table:

    hashtag
    Examples

    hashtag
    Job spec

    Standalone Job:

    Hadoop Job:

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    Hadoop distributed file system (HDFS)arrow-up-right
    Follow the instructions below.

    You can follow the wiki to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

    If you do build Pinot from Source, you should consider opting into using the build-shaded-jar jar profile with -Pbuild-shaded-jar. While Pinot does not bundle spark into its jar, it does bundle certain hadoop libraries.

    Next, you need to change the execution config in the job spec to the following:

    To run Spark ingestion, you need the following jars in your classpath

    • pinot-batch-ingestion-spark plugin jar - available in plugins-external directory in the package

    • pinot-all jar - available in lib directory in the package

    These jars can be specified using spark.driver.extraClassPath or any other option.

    For loading any other plugins that you want to use, use:

    The complete spark-submit command should look like this:

    Ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

    Note: You should change the master to yarn and deploy-mode to cluster for production environments.

    circle-info

    We have stopped including spark-core dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark in case of any runtime issues. You can either build from source or download latest master build jars.

    hashtag
    Running in Cluster Mode on YARN

    If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -

    • Build Pinot from source with option -DuseProvidedHadoop

    • Copy Pinot binaries to S3, HDFS or any other distributed storage that is accessible from all nodes.

    • Copy Ingestion spec YAML file to S3, HDFS or any other distributed storage. Mention this path as part of --files argument in the command

    • Add --jars options that contain the s3/hdfs paths to all the required plugin and pinot-all jar

    • Point classPath to spark working directory. Generally, just specifying the jar names without any paths works. Same should be done for main jar as well as the spec YAML file

    Example

    circle-check

    For Spark 3.x, replace pinot-batch-ingestion-spark-2.4 with pinot-batch-ingestion-spark-3.2 in all places in the commands. Also, ensure the classpath in ingestion spec is changed from org.apache.pinot.plugin.ingestion.batch.spark. to org.apache.pinot.plugin.ingestion.batch.spark3.

    hashtag
    FAQ

    Q - I am getting the following exception - Class has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

    Since 0.8.0 release, Pinot binaries are compiled with JDK 11. If you are using Spark along with Hadoop 2.7+, you need to use the Java8 version of Pinot. Currently, you need to build jdk 8 version from source.

    Q - I am not able to find pinot-batch-ingestion-spark jar.

    For Pinot version prior to 0.10.0, the spark plugin is located in plugin dir of binary distribution. For 0.10.0 and later, it is located in pinot-external dir.

    Q - Spark is not able to find the jars leading to java.nio.file.NoSuchFileException

    This means the classpath for spark job has not been configured properly. If you are running spark in a distributed environment such as Yarn or k8s, make sure both spark.driver.classpath and spark.executor.classpath are set. Also, the jars in driver.classpath should be added to --jars argument in spark-submit so that spark can distribute those jars to all the nodes in your cluster. You also need to take provide appropriate scheme with the file path when running the jar. In this doc, we have used local:\\ but it can be different depending on your cluster setup.

    Q - Spark job failing while pushing the segments.

    It can be because of misconfigured controllerURI in job spec yaml file. If the controllerURI is correct, make sure it is accessible from all the nodes of your YARN or k8s cluster.

    Q - My data gets overwritten during ingestion.

    Set segmentPushType to APPEND in the tableConfig.

    If already set to APPEND, this is likely due to a missing timeColumnName in your table config. If you can't provide a time column, use our segment name generation configs in ingestion spec. Generally using inputFile segment name generator should fix your issue.

    Q - I am getting java.lang.RuntimeException: java.io.IOException: Failed to create directory: pinot-plugins-dir-0/plugins/*

    Removing -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins from spark.driver.extraJavaOptions should fix this. As long as plugins are mentioned in classpath and jars argument it should not be an issue.

    Q - Getting Class not found: exception

    Check if extraClassPath arguments contain all the plugin jars for both driver and executors. Also, all the plugin jars are mentioned in the --jars argument. If both of these are correct, check if the extraClassPath contains local filesystem classpaths and not s3 or hdfs or any other distributed file system classpaths.

    ReadMearrow-up-right
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
        # name: execution framework name
      name: 'hadoop'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
    
      # segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentMetadataPushJobRunner'
    
        # extraConfigs: extra configs for execution framework.
      extraConfigs:
    
        # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
        stagingDir: your/local/dir/staging
    export PINOT_VERSION=0.10.0
    export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
    export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"
    
    hadoop jar  \\
            ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\
            org.apache.pinot.tools.admin.PinotAdministrator \\
            LaunchDataIngestionJob \\
            -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yaml
    enable.preprocessing = true
    preprocess.path.to.output = <output_path>
    {
        "OFFLINE": {
            "metadata": {
                "customConfigs": {
                    “preprocessing.operations”: “resize, partition, sort”, // To enable the following preprocessing operations
                    "preprocessing.max.num.records.per.file": "100",       // To enable resizing
                    "preprocessing.num.reducers": "3"                      // To enable resizing
                }
            },
            ...
            "tableIndexConfig": {
                "aggregateMetrics": false,
                "autoGeneratedInvertedIndex": false,
                "bloomFilterColumns": [],
                "createInvertedIndexDuringSegmentGeneration": false,
                "invertedIndexColumns": [],
                "loadMode": "MMAP",
                "nullHandlingEnabled": false,
                "segmentPartitionConfig": {       // To enable partitioning
                    "columnPartitionMap": {
                        "item": {
                            "functionName": "murmur",
                            "numPartitions": 4
                        }
                    }
                },
                "sortedColumn": [                // To enable sorting
                    "actorId"
                ],
                "streamConfigs": {}
            },
            "tableName": "tableName_OFFLINE",
            "tableType": "OFFLINE",
            "tenants": {
                ...
            }
        }
    }
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-hdfs
    export HADOOP_HOME=/local/hadoop/
    export HADOOP_VERSION=2.7.1
    export HADOOP_GUAVA_VERSION=11.0.2
    export HADOOP_GSON_VERSION=2.2.4
    export CLASSPATH_PREFIX="${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-annotations-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-auth-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/guava-${HADOOP_GUAVA_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/gson-${HADOOP_GSON_VERSION}.jar"
    curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'hdfs:///path/to/input/directory/'
    outputDirURI: 'hdfs:///path/to/output/directory/'
    includeFileNamePath: 'glob:**/*.csv'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: hdfs
          className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
          configs:
            hadoop.conf.path: 'path/to/conf/directory/' 
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    executionFrameworkSpec:
        name: 'hadoop'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
        extraConfigs:
          stagingDir: 'hdfs:///path/to/staging/directory/'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'hdfs:///path/to/input/directory/'
    outputDirURI: 'hdfs:///path/to/output/directory/'
    includeFileNamePath: 'glob:**/*.csv'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: hdfs
          className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
          configs:
            hadoop.conf.path: '/etc/hadoop/conf/' 
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=hdfs://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
    pinot.controller.storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory/
    pinot.controller.segment.fetcher.protocols=file,http,hdfs
    pinot.controller.segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
    pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
    pinot.server.storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory/
    pinot.server.segment.fetcher.protocols=file,http,hdfs
    pinot.server.segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
    pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>
    storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
    storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory
    segment.fetcher.protocols=file,http,hdfs
    segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
    segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>
    # executionFrameworkSpec: Defines ingestion jobs to be running.
    executionFrameworkSpec:
    
      # name: execution framework name
      name: 'spark'
    
      # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
    
      # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
    
      # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
    
      #segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
    
      # extraConfigs: extra configs for execution framework.
      extraConfigs:
    
        # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
        stagingDir: your/local/dir/staging
    spark.driver.extraClassPath =>
    pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
    spark.driver.extraJavaOptions =>
    -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
    export PINOT_VERSION=0.10.0
    export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin
    
    spark-submit //
    --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
    --master local --deploy-mode client //
    --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
    --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
    -conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
    local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yaml
    spark-submit //
    --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
    --master yarn --deploy-mode cluster //
    --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
    --conf "spark.driver.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
    --conf "spark.executor.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
    --jars "${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar,${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar"
    --files s3://path/to/spark_job_spec.yaml
    local://pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile spark_job_spec.yaml

    hashtag
    Compact segments on upserts in a real-time table

    To compact segments on upserts, complete the following steps:

    1. Ensure task scheduling is enabled and a minion is available.

    2. Add the following to your table configuration. These configurations (except schedule)determine which segments to compact.

    • bufferTimePeriod: To compact segments once they are complete, set to “0d”. To delay compaction (as the configuration above shows by 7 days ("7d")), specify the number of days to delay compaction after a segment completes.

    • invalidRecordsThresholdPercent (Optional) Limits the older records allowed in the completed segment as a percentage of the total number of records in the segment. In the example above, the completed segment may be selected for compaction when 30% of the records in the segment are old.

    • invalidRecordsThresholdCount (Optional) Limits the older records allowed in the completed segment by record count. In the example above, if the segment contains more than 100K records, it may be selected for compaction.

    • tableMaxNumTasks (Optional) Limits the number of tasks allowed to be scheduled.

    • validDocIdsType (Optional) Specifies the source of validDocIds to fetch when running the data compaction. The valid types are SNAPSHOT, IN_MEMORY, IN_MEMORY_WITH_DELETE

      • SNAPSHOT: Default validDocIds type. This indicates that the validDocIds bitmap is loaded from the snapshot from the Pinot segment. UpsertConfig's enableSnapshot must be enabled for this type.

    circle-exclamation

    WARNING Using in-memory based validDocids type (IN_MEMORY, IN_MEMORY_WITH_DELETE) is dangerous as it will not guarantee us the consistency in some edge cases (e.g. fetching validDocIds bitmap while the server is restarting & updating validDocIds).

    circle-info

    Because segment compaction is an expensive operation, we do not recommend setting invalidRecordsThresholdPercent and invalidRecordsThresholdCount too low (close to 1). By default, all configurations above are 0, so no thresholds are applied.

    hashtag
    Example

    The following example includes a dataset with 24M records and 240K unique keys that have each been duplicated 100 times. After ingesting the data, there are 6 segments (5 completed segments and 1 consuming segment) with a total estimated size of 22.8MB.

    Example dataset

    Submitting the query “set skipUpsert=true; select count(*) from transcript_upsert” before compaction produces 24,000,000 results:

    Results before segment compaction

    After the compaction tasks are complete, the Minion Task Manager UI reports the following.

    Minion compaction task completed

    Segment compactions generates a task for each segment to compact. Five tasks were generated in this case because 90% of the records (3.6–4.5M records) are considered ready for compaction in the completed segments, exceeding the configured thresholds.

    circle-info

    If a completed segment only contains old records, Pinot immediately deletes the segment (rather than creating a task to compact it).

    Submitting the query again shows the count matches the set of 240K unique keys.

    Results after segment compaction

    Once segment compaction has completed, the total number of segments remain the same and the total estimated size drops to 2.77MB.

    circle-info

    To further improve query latency, merge small segments into larger one.

    Stream Ingestion with Upsert
    Minion documentation
    Controller: When a segment is uploaded to the controller, the controller saves it in the configured DFS.
  • Server:- When a server(s) is notified of a new segment, the server copies the segment from remote DFS to their local node using the DFS abstraction.

  • hashtag
    Supported file systems

    Pinot lets you choose a distributed file system provider. The following file systems are supported by Pinot:

    • Amazon S3

    • Google Cloud Storage

    • HDFS

    hashtag
    Enabling a file system

    To use a distributed file system, you need to enable plugins. To do that, specify the plugin directory and include the required plugins:

    You can change the file system in the controller and server configuration. In the following configuration example, the URI is s3://bucket/path/to/file and scheme refers to the file system URI prefix s3.

    You can also change the file system during ingestion. In the ingestion job spec, specify the file system with the following configuration:

    Batch Ingestion

    Batch ingestion of data into Apache Pinot.

    With batch ingestion you create a table using data already present in a file system such as S3. This is particularly useful when you want to use Pinot to query across large data with minimal latency or to test out new features using a simple data file.

    To ingest data from a filesystem, perform the following steps, which are described in more detail in this page:

    1. Create schema configuration

    Amazon S3

    This guide shows you how to import data from files stored in Amazon S3.

    Enable the file system backend by including the pinot-s3 plugin. In the controller or server configuration, add the config:

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro ,

    Ingest streaming data from Amazon Kinesis

    This guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.

    To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into your table config:

    where the Kinesis specific properties are:

    Property
    Description

    Configure indexes

    Learn how to apply indexes to a Pinot table. This guide assumes that you have followed the guide.

    Pinot supports a series of different indexes that can be used to optimize query performance. In this guide, we'll learn how to add indexes to the events table that we set up in the guide.

    hashtag
    Why do we need indexes?

    If no indexes are applied to the columns in a Pinot segment, the query engine needs to scan through every document, checking whether that document meets the filter criteria provided in a query. This can be a slow process if there are a lot of documents to scan.

    Azure Data Lake Storage

    This guide shows you how to import data from files stored in Azure Data Lake Storage Gen2 (ADLS Gen2)

    Enable the Azure Data Lake Storage using the pinot-adls plugin. In the controller or server, add the config:

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    Stream ingestion with Dedup

    Deduplication support in Apache Pinot.

    Pinot provides native support for deduplication (dedup) during the real-time ingestion (v0.11.0+).

    hashtag
    Prerequisites for enabling dedup

    To enable dedup on a Pinot table, make the following table configuration and schema changes:

    "task": {
      "taskTypeConfigsMap": {
        "UpsertCompactionTask": {
          "schedule": "0 */5 * ? * *",
          "bufferTimePeriod": "7d",
          "invalidRecordsThresholdPercent": "30",
          "invalidRecordsThresholdCount": "100000",
          "tableMaxNumTasks": "100",
          "validDocIdsType": "SNAPSHOT"
        }
      }
    }
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-plugin-to-include-1,pinot-plugin-to-include-2
    #CONTROLLER
    
    pinot.controller.storage.factory.class.[scheme]=className of the pinot file system
    pinot.controller.segment.fetcher.protocols=file,http,[scheme]
    pinot.controller.segment.fetcher.[scheme].class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    #SERVER
    
    pinot.server.storage.factory.class.[scheme]=className of the Pinot file system
    pinot.server.segment.fetcher.protocols=file,http,[scheme]
    pinot.server.segment.fetcher.[scheme].class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinotFSSpecs
      - scheme: file
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    Azure Data Lake Storage

    Kinesis region e.g. us-west-1

    accessKey

    Kinesis access key

    secretKey

    Kinesis secret key

    shardIteratorType

    Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number_,_ AT___SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number

    maxRecordsToFetch

    ... Default is 20.

    Kinesis supports authentication using the DefaultCredentialsProviderChainarrow-up-right. The credential provider looks for the credentials in the following order:

    • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)

    • Java System Properties - aws.accessKeyId and aws.secretKey

    • Web Identity Token credentials from the environment or container

    • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

    • Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable,

    • Instance profile credentials delivered through the Amazon EC2 metadata service

    circle-info

    You must provide all read access level permissions for Pinot to work with an AWS Kinesis data stream. See the AWS documentationarrow-up-right for details.

    Although you can also specify the accessKey and secretKey in the properties above, we don't recommend this insecure method. We recommend using it only for non-production proof-of-concept (POC) setups. You can also specify other AWS fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.

    hashtag
    Limitations

    1. ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX\_VALUE, we will overflow into the partitionId space.

    2. Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.

    streamType

    This should be set to "kinesis"

    stream.kinesis.topic.name

    Kinesis stream name

    region

    When indexes are applied, the query engine can more quickly work out which documents satisfy the filter criteria, reducing the time it takes to execute the query.

    hashtag
    What indexes does Pinot support?

    By default, Pinot creates a forward index for every column. The forward index generally stores documents in insertion order.

    However, before flushing the segment, Pinot does a single pass over every column to see whether the data is sorted. If data is sorted, Pinot creates a sorted (forward) index for that column instead of the forward index.

    For real-time tables you can also explicitly tell Pinot that one of the columns should be sorted. For more details, see the [Sorted Index Documentation](https://docs.pinot.apache.org/basics/indexing/forward-index#real-time-tables).

    For filtering documents within a segment, Pinot supports the following indexing techniques:

    • Inverted index: Used for exact lookups.

    • Range index - Used for range queries.

    • Text index - Used for phrase, term, boolean, prefix, or regex queries.

    • Geospatial index - Based on H3, a hexagon-based hierarchical gridding. Used for finding points that exist within a certain distance from another point.

    • JSON index - Used for querying columns in JSON documents.

    • Star-Tree index - Pre-aggregates results across multiple columns.

    hashtag
    View events table

    Let's see how we can apply these indexing techniques to our data. To recap, the events table has the following fields:

    Date Time Fields
    Dimensions Fields
    Metric Fields

    ts

    uuid

    count

    We might want to write queries that filter on the ts and uuid columns, so these are the columns on which we would want to configure indexes.

    Since the data we're ingesting into the Kafka topic is all implicitly ordered by timestamp, this means that the ts column already has a sorted index. This means that any queries that filter on this column are already optimised.

    So that leaves us with the uuid column.

    hashtag
    Add an inverted index

    We're going to add an inverted index to the uuid column so that queries that filter on that column will return quicker. We need to add the following line:

    To the tableIndexConfig section.

    Copy the following to the clipboard:

    /tmp/pinot/table-config-stream.json

    Navigate to localhost:9000/#/tenants/table/events_REALTIMEarrow-up-right, click on Edit Table, paste the next table config, and then click Save.

    Once you've done that, you'll need to click Reload All Segments and then Yes to apply the indexing change to all segments.

    hashtag
    Check the index has been applied

    We can check that the index has been applied to all our segments by querying Pinot's REST API. You can find Swagger documentation at localhost:9000/helparrow-up-right.

    The following query will return the indexes defined on the uuid column:

    Output

    We're using the jq command line JSON processorarrow-up-right to extract the fields that we're interested in.

    We can see from looking at the inverted-index property that the index has been applied.

    hashtag
    Querying

    You can now run some queries that filter on the uuid column, as shown below:

    You'll need to change the actual uuid value to a value that exists in your database, because the UUIDs are generated randomly by our script.

    Ingest data from Apache Kafka
    Ingest data from Apache Kafka

    Azure Blob Storage provides the following options:

    • accountName: Name of the Azure account under which the storage is created.

    • accessKey: Access key required for the authentication.

    • fileSystemName: Name of the file system to use, for example, the container name (similar to the bucket name in S3).

    • enableChecksum: Enable MD5 checksum for verification. Default is false.

    Each of these properties should be prefixed by pinot.[node].storage.factory.class.adl2. where node is either controller or server depending on the config, like this:

    hashtag
    Examples

    hashtag
    Job spec

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    hashtag
    Define the primary key in the schema

    To be able to dedup records, a primary key is needed to uniquely identify a given record. To define a primary key, add the field primaryKeyColumns to the schema definition.

    Note this field expects a list of columns, as the primary key can be composite.

    While ingesting a record, if its primary key is found to be already present, the record will be dropped.

    hashtag
    Partition the input stream by the primary key

    An important requirement for the Pinot dedup table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the sendarrow-up-right API. If the original stream is not partitioned, then a streaming processing job (e.g. Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.

    hashtag
    Use strictReplicaGroup for routing

    The dedup Pinot table can use only the low-level consumer for the input streams. As a result, it uses the partitioned replica-group assignment for the segments. Moreover, dedup poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires strictReplicaGroup as the routing strategy. To use that, configure instanceSelectorType in Routing as the following:

    hashtag
    Other limitations

    • The high-level consumer is not allowed for the input stream ingestion, which means stream.kafka.consumer.type must be lowLevel.

    • The incoming stream must be partitioned by the primary key such that, all records with a given primaryKey must be consumed by the same Pinot server instance.

    hashtag
    Enable dedup in the table configurations

    To enable dedup for a REALTIME table, add the following to the table config.

    Supported values for hashFunction are NONE, MD5 and MURMUR3, with the default being NONE.

    hashtag
    Best practices

    Unlike other real-time tables, Dedup table takes up more memory resources as it needs to bookkeep the primary key and its corresponding segment reference, in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Dedup table.

    • Create the Kafka topic with more partitions. The number of Kafka partitions determines the partition numbers of the Pinot table. The more partitions you have in the Kafka topic, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally.

    • Dedup table maintains an in-memory map from the primary key to the segment reference. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction config in the Dedup config, which can be MD5 or MURMUR3, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.

    • Monitoring: Set up a dashboard over the metric pinot.server.dedupPrimaryKeysCount.tableName to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth.

    • Capacity planning: It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the amount of the primary keys in the Kafka throughput per partition and time the primary key space cost to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an dedup table instance.

    {
      "tableName": "kinesisTable",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kinesis",
          "stream.kinesis.topic.name": "<your kinesis stream name>",
          "region": "<your region>",
          "accessKey": "<your access key>",
          "secretKey": "<your secret key>",
          "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
          "stream.kinesis.consumer.type": "lowlevel",
          "stream.kinesis.fetch.timeout.millis": "30000",
          "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
          "realtime.segment.flush.threshold.rows": "1000000",
          "realtime.segment.flush.threshold.time": "6h"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    "invertedIndexColumns": ["uuid"]
    {
      "tableName": "events",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "ts",
        "schemaName": "events",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "invertedIndexColumns": ["uuid"],
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "events",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.broker.list": "kafka:9092",
          "realtime.segment.flush.threshold.rows": "0",
          "realtime.segment.flush.threshold.time": "24h",
          "realtime.segment.flush.threshold.segment.size": "50M",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    curl -X GET "http://localhost:9000/segments/events/metadata?columns=uuid" \
      -H "accept: application/json" 2>/dev/null | 
      jq '.[] | [.segmentName, .indexes]'
    [
      "events__0__1__20220214T1106Z",
      {
        "uuid": {
          "bloom-filter": "NO",
          "dictionary": "YES",
          "forward-index": "YES",
          "inverted-index": "YES",
          "null-value-vector-reader": "NO",
          "range-index": "NO",
          "json-index": "NO"
        }
      }
    ]
    [
      "events__0__0__20220214T1053Z",
      {
        "uuid": {
          "bloom-filter": "NO",
          "dictionary": "YES",
          "forward-index": "YES",
          "inverted-index": "YES",
          "null-value-vector-reader": "NO",
          "range-index": "NO",
          "json-index": "NO"
        }
      }
    ]
    SELECT * 
    FROM events 
    WHERE uuid = 'f4a4f'
    LIMIT 10
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-adls
    pinot.controller.storage.factory.class.adl2.accountName=test-user
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'adl2://path/to/input/directory/'
    outputDirURI: 'adl2://path/to/output/directory/'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: adl2
          className: org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
          configs:
            accountName: 'my-account'
            accessKey: 'foo-bar-1234'
            fileSystemName: 'fs-name'
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=adl2://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
    pinot.controller.storage.factory.adl2.accountName=my-account
    pinot.controller.storage.factory.adl2.accessKey=foo-bar-1234
    pinot.controller.storage.factory.adl2.fileSystemName=fs-name
    pinot.controller.segment.fetcher.protocols=file,http,adl2
    pinot.controller.segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
    pinot.server.storage.factory.adl2.accountName=my-account
    pinot.server.storage.factory.adl2.accessKey=foo-bar-1234
    pinot.server.storage.factory.adl2.fileSystemName=fs-name
    pinot.server.segment.fetcher.protocols=file,http,adl2
    pinot.server.segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
    storage.factory.adl2.accountName=my-account
    storage.factory.adl2.fileSystemName=fs-name
    storage.factory.adl2.accessKey=foo-bar-1234
    segment.fetcher.protocols=file,http,adl2
    segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    schemaWithPK.json
    {
        "primaryKeyColumns": ["id"]
    }
    routing
    {
      "routing": {
        "instanceSelectorType": "strictReplicaGroup"
      }
    }
    tableConfigWithDedup.json
    { 
     ...
      "dedupConfig": { 
            "dedupEnabled": true, 
            "hashFunction": "NONE" 
       }, 
     ...
    }

    IN_MEMORY: This indicates that the validDocIds bitmap is loaded from the real-time server's in-memory.

  • IN_MEMORY_WITH_DELETE: This indicates that the validDocIds bitmap is read from the real-time server's in-memory. The valid document ids here does take account into the deleted records. UpsertConfig's deleteRecordColumn must be provided for this type.

  • Create table configuration
  • Upload schema and table configs

  • Upload data

  • Batch ingestion currently supports the following mechanisms to upload the data:

    • Standalone

    • Hadoop

    • Spark

    Here's an example using standalone local processing.

    First, create a table using the following CSV data.

    hashtag
    Create schema configuration

    In our data, the only column on which aggregations can be performed is score. Secondly, timestampInEpoch is the only timestamp column. So, on our schema, we keep score as metric and timestampInEpoch as timestamp column.

    Here, we have also defined two extra fields: format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it's in milliseconds, so we've specified 1:MILLISECONDS:EPOCH.

    hashtag
    Create table configuration

    We define a table transcript and map the schema created in the previous step to the table. For batch data, we keep the tableType as OFFLINE.

    hashtag
    Upload schema and table configs

    Now that we have both the configs, upload them and create a table by running the following command:

    Check out the table config and schema in the \[Rest API] to make sure it was successfully uploaded.

    hashtag
    Upload data

    We now have an empty table in Pinot. Next, upload the CSV file to this empty table.

    A table is composed of multiple segments. The segments can be created in the following three ways:

    • Minion based ingestion\

    • Upload API\

    • Ingestion jobs

    hashtag
    Minion-based ingestion

    Refer to SegmentGenerationAndPushTask

    hashtag
    Upload API

    There are 2 controller APIs that can be used for a quick ingestion test using a small file.

    triangle-exclamation

    When these APIs are invoked, the controller has to download the file and build the segment locally.

    Hence, these APIs are NOT meant for production environments and for large input files.

    hashtag
    /ingestFromFile

    This API creates a segment using the given file and pushes it to Pinot. All steps happen on the controller.

    Example usage:

    To upload a JSON file data.json to a table called foo_OFFLINE, use below command

    Note that query params need to be URLEncoded. For example, {"inputFormat":"json"} in the command below needs to be converted to %7B%22inputFormat%22%3A%22json%22%7D.

    The batchConfigMapStr can be used to pass in additional properties needed for decoding the file. For example, in case of csv, you may need to provide the delimiter

    hashtag
    /ingestFromURI

    This API creates a segment using file at the given URI and pushes it to Pinot. Properties to access the FS need to be provided in the batchConfigMap. All steps happen on the controller. Example usage:

    hashtag
    Ingestion jobs

    Segments can be created and uploaded using tasks known as DataIngestionJobs. A job also needs a config of its own. We call this config the JobSpec.

    For our CSV file and table, the JobSpec should look like this:

    For more detail, refer to Ingestion job spec.

    Now that we have the job spec for our table transcript, we can trigger the job using the following command:

    Once the job successfully finishes, head over to the \[query console] and start playing with the data.

    hashtag
    Segment push job type

    There are 3 ways to upload a Pinot segment:

    • Segment tar push

    • Segment URI push

    • Segment metadata push

    hashtag
    Segment tar push

    This is the original and default push mechanism.

    Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.

    The push job will:

    1. Upload the entire segment tar file to the Pinot controller.

    Pinot controller will:

    1. Save the segment into the controller segment directory(Local or any PinotFS).

    2. Extract segment metadata.

    3. Add the segment to the table.

    hashtag
    Segment URI push

    This push mechanism requires the segment tar file stored on a deep store with a globally accessible segment tar URI.

    URI push is light-weight on the client-side, and the controller side requires equivalent work as the tar push.

    The push job will:

    1. POST this segment tar URI to the Pinot controller.

    Pinot controller will:

    1. Download segment from the URI and save it to controller segment directory (local or any PinotFS).

    2. Extract segment metadata.

    3. Add the segment to the table.

    hashtag
    Segment metadata push

    This push mechanism also requires the segment tar file stored on a deep store with a globally accessible segment tar URI.

    Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.

    The push job will:

    1. Download the segment based on URI.

    2. Extract metadata.

    3. Upload metadata to the Pinot Controller.

    Pinot Controller will:

    1. Add the segment to the table based on the metadata.

    4. Segment Metadata Push with copyToDeepStore

    This extends the original Segment Metadata Push for cases, where the segments are pushed to a location not used as deep store. The ingestion job can still do metadata push but ask Pinot Controller to copy the segments into deep store. Those use cases usually happen when the ingestion jobs don't have direct access to deep store but still want to use metadata push for its efficiency, thus using a staging location to keep the segments temporarily.

    NOTE: the staging location and deep store have to use same storage scheme, like both on s3. This is because the copy is done via PinotFS.copyDir interface that assumes so; but also because this does copy at storage system side, so segments don't need to go through Pinot Controller at all.

    To make this work, grant Pinot controllers access to the staging location. For example on AWS, this may require adding an access policy like this example for the controller EC2 instances:

    Then use metadata push to add one extra config like this one:

    hashtag
    Consistent data push and rollback

    Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent results due to interleaving of old and new data.

    See Consistent Push and Rollback for how to enable this feature.

    hashtag
    Segment fetchers

    When Pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot controller and server:

    1. Push segment to shared NFS and let pinot pull segment files from the location of that NFS. See Segment URI Pusharrow-up-right.

    2. Push segment to a Web server and let pinot pull segment files from the Web server with HTTP/HTTPS link. See Segment URI Pusharrow-up-right.

    3. Push segment to PinotFS(HDFS/S3/GCS/ADLS) and let pinot pull segment files from PinotFS URI. See Segment URI Pusharrow-up-right and Segment Metadata Pusharrow-up-right.

    4. Push segment to other systems and implement your own segment fetcher to pull data from those systems.

    The first three options are supported out of the box within the Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files, it will pick up the file and allocate it to proper Pinot servers and brokers. To enable Pinot support for PinotFS, you'll need to provide PinotFS configuration and proper Hadoop dependencies.

    hashtag
    Persistence

    By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of a system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add deep storage. Checkout File systems for all the info and related configs.

    hashtag
    Tuning

    hashtag
    Standalone

    Since pinot is written in Java, you can set the following basic Java configurations to tune the segment runner job -

    • Log4j2 file location with -Dlog4j2.configurationFile

    • Plugin directory location with -Dplugins.dir=/opt/pinot/plugins

    • JVM props, like -Xmx8g -Xms4G

    If you are using the docker, you can set the following under JAVA_OPTS variable.

    hashtag
    Hadoop

    You can set -D mapreduce.map.memory.mb=8192 to set the mapper memory size when submitting the Hadoop job.

    hashtag
    Spark

    You can add config spark.executor.memory to tune the memory usage for segment creation when submitting the Spark job.

    pinot-kafka-2.0...

    You can configure the S3 file system using the following options:

    Configuration
    Description

    region

    The AWS Data center region in which the bucket is located

    accessKey

    (Optional) AWS access key required for authentication. This should only be used for testing purposes as we don't store these keys in secret.

    secretKey

    (Optional) AWS secret key required for authentication. This should only be used for testing purposes as we don't store these keys in secret.

    endpoint

    (Optional) Override endpoint for s3 client.

    disableAcl

    If this is set tofalse, bucket owner is granted full access to the objects created by pinot. Default value is true.

    Each of these properties should be prefixed by pinot.[node].storage.factory.s3. where node is either controller or server depending on the config

    e.g.

    S3 Filesystem supports authentication using the DefaultCredentialsProviderChainarrow-up-right. The credential provider looks for the credentials in the following order -

    • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)

    • Java System Properties - aws.accessKeyId and aws.secretKey

    • Web Identity Token credentials from the environment or container

    • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

    • Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable,

    • Instance profile credentials delivered through the Amazon EC2 metadata service

    You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups.

    hashtag
    Examples

    hashtag
    Job spec

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    Amazon S3arrow-up-right

    Stream ingestion with CLP

    Support for encoding fields with CLP during ingestion.

    circle-exclamation

    This is an experimental feature. Configuration options and usage may change frequently until it is stabilized.

    When performing stream ingestion of JSON records using Kafka, users can encode specific fields with CLParrow-up-right by using a CLP-specific StreamMessageDecoder.

    CLP is a compressor designed to encode unstructured log messages in a way that makes them more compressible while retaining the ability to search them. It does this by decomposing the message into three fields:

    • the message's static text, called a log type;

    • repetitive variable values, called dictionary variables; and

    • non-repetitive variable values (called encoded variables since we encode them specially if possible).

    Searches are similarly decomposed into queries on the individual fields.

    circle-info

    Although CLP is designed for log messages, other unstructured text like file paths may also benefit from its encoding.

    For example, consider this JSON record:

    If the user specifies the fields message and logPath should be encoded with CLP, then the StreamMessageDecoder will output:

    In the fields with the _logtype suffix, \x11 is a placeholder for an integer variable, \x12 is a placeholder for a dictionary variable, and \x13 is a placeholder for a float variable. In message_encoedVars, the float variable 0.335 is encoded as an integer using CLP's custom encoding.

    All remaining fields are processed in the same way as they are in org.apache.pinot.plugin.inputformat.json.JSONRecordExtractor. Specifically, fields in the table's schema are extracted from each record and any remaining fields are dropped.

    hashtag
    Configuration

    hashtag
    Table Index

    Assuming the user wants to encode message and logPath as in the example, they should change/add the following settings to their tableIndexConfig (we omit irrelevant settings for brevity):

    • stream.kafka.decoder.prop.fieldsForClpEncoding is a comma-separated list of names for fields that should be encoded with CLP.

    • We use for the logtype and dictionary variables since their length can vary significantly.

    hashtag
    Schema

    For the table's schema, users should configure the CLP-encoded fields as follows (we omit irrelevant settings for brevity):

    • We use the maximum possible length for the logtype and dictionary variable columns.

    • The dictionary and encoded variable columns are multi-valued columns.

    hashtag
    Searching and decoding CLP-encoded fields

    To decode CLP-encoded fields, use .

    To search CLP-encoded fields, you can combine CLPDECODE with LIKE. Note, this may decrease performance when querying a large number of rows.

    We are working to integrate efficient searches on CLP-encoded columns as another UDF. The development of this feature is being tracked in this .

    Ingest streaming data from Apache Pulsar

    This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.

    Pinot supports consuming data from via the pinot-pulsar plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.

    Enable the Pulsar plugin with the following config at the time of Pinot setup: -Dplugins.include=pinot-pulsar

    circle-info

    The pinot-pulsar

    Upload a table segment

    Upload a table segment in Apache Pinot.

    This procedure uploads one or more table segments that have been stored as Pinot segment binary files outside of Apache Pinot, such as if you had to close an original Pinot cluster and create a new one.

    Choose one of the following:

    • If your data is in a location that uses HDFS, create a segment fetcher.

    • If your data is on a host where you have SSH access, use the Pinot Admin script.

    studentID,firstName,lastName,gender,subject,score,timestampInEpoch
    200,Lucy,Smith,Female,Maths,3.8,1570863600000
    200,Lucy,Smith,Female,English,3.5,1571036400000
    201,Bob,King,Male,Maths,3.2,1571900400000
    202,Nick,Young,Male,Physics,3.6,1572418800000
    {
      "schemaName": "transcript",
      "dimensionFieldSpecs": [
        {
          "name": "studentID",
          "dataType": "INT"
        },
        {
          "name": "firstName",
          "dataType": "STRING"
        },
        {
          "name": "lastName",
          "dataType": "STRING"
        },
        {
          "name": "gender",
          "dataType": "STRING"
        },
        {
          "name": "subject",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "score",
          "dataType": "FLOAT"
        }
      ],
      "dateTimeFieldSpecs": [{
        "name": "timestampInEpoch",
        "dataType": "LONG",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }]
    }
    {
      "tableName": "transcript",
      "tableType": "OFFLINE",
      "segmentsConfig": {
        "replication": 1,
        "timeColumnName": "timestampInEpoch",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": 365
      },
      "tenants": {
        "broker":"DefaultTenant",
        "server":"DefaultTenant"
      },
      "tableIndexConfig": {
        "loadMode": "MMAP"
      },
      "ingestionConfig": {
        "batchIngestionConfig": {
          "segmentIngestionType": "APPEND",
          "segmentIngestionFrequency": "DAILY"
        },
        "continueOnError": true,
        "rowTimeValueCheck": true,
        "segmentTimeValueCheck": false
    
      },
      "metadata": {}
    }
    bin/pinot-admin.sh AddTable \\
      -tableConfigFile /path/to/table-config.json \\
      -schemaFile /path/to/table-schema.json -exec
    curl -X POST -F [email protected] \
      -H "Content-Type: multipart/form-data" \
      "http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
      batchConfigMapStr={"inputFormat":"json"}"
    curl -X POST -F [email protected] \
      -H "Content-Type: multipart/form-data" \
      "http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
    batchConfigMapStr={
      "inputFormat":"csv",
      "recordReader.prop.delimiter":"|"
    }"
    curl -X POST "http://localhost:9000/ingestFromURI?tableNameWithType=foo_OFFLINE
    &batchConfigMapStr={
      "inputFormat":"json",
      "input.fs.className":"org.apache.pinot.plugin.filesystem.S3PinotFS",
      "input.fs.prop.region":"us-central",
      "input.fs.prop.accessKey":"foo",
      "input.fs.prop.secretKey":"bar"
    }
    &sourceURIStr=s3://test.bucket/path/to/json/data/data.json"
    executionFrameworkSpec:
      name: 'standalone'
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
    
    # Recommended to set jobType to SegmentCreationAndMetadataPush for production environment where Pinot Deep Store is configured  
    jobType: SegmentCreationAndTarPush
    
    inputDirURI: '/tmp/pinot-quick-start/rawdata/'
    includeFileNamePattern: 'glob:**/*.csv'
    outputDirURI: '/tmp/pinot-quick-start/segments/'
    overwriteOutput: true
    pinotFSSpecs:
      - scheme: file
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    recordReaderSpec:
      dataFormat: 'csv'
      className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
      tableName: 'transcript'
    pinotClusterSpecs:
      - controllerURI: 'http://localhost:9000'
    pushJobSpec:
      pushAttempts: 2
      pushRetryIntervalMillis: 1000
    bin/pinot-admin.sh LaunchDataIngestionJob \\
        -jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yaml
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "s3:ListAllMyBuckets",
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": "s3:*",
                "Resource": [
                    "arn:aws:s3:::metadata-push-staging",
                    "arn:aws:s3:::metadata-push-staging/*"
                ]
            }
        ]
    }
    ...
    jobType: SegmentCreationAndMetadataPush
    ...
    outputDirURI: 's3://metadata-push-staging/stagingDir/'
    ...
    pushJobSpec:
      copyToDeepStoreForMetadataPush: true
    ...
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-s3
    pinot.controller.storage.factory.s3.region=ap-southeast-1
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 's3://pinot-bucket/pinot-ingestion/batch-input/'
    outputDirURI: 's3://pinot-bucket/pinot-ingestion/batch-output/'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: s3
          className: org.apache.pinot.plugin.filesystem.S3PinotFS
          configs:
            region: 'ap-southeast-1'
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=s3://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.controller.storage.factory.s3.region=ap-southeast-1
    pinot.controller.segment.fetcher.protocols=file,http,s3
    pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.server.storage.factory.s3.region=ap-southeast-1
    pinot.server.storage.factory.s3.httpclient.maxConnections=50
    pinot.server.storage.factory.s3.httpclient.socketTimeout=30s
    pinot.server.storage.factory.s3.httpclient.connectionTimeout=2s
    pinot.server.storage.factory.s3.httpclient.connectionTimeToLive=0s
    pinot.server.storage.factory.s3.httpclient.connectionAcquisitionTimeout=10s
    pinot.server.segment.fetcher.protocols=file,http,s3
    pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.minion.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
    pinot.minion.storage.factory.s3.region=ap-southeast-1
    pinot.minion.segment.fetcher.protocols=file,http,s3
    pinot.minion.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher

    serverSideEncryption

    (Optional) The server-side encryption algorithm used when storing this object in Amazon S3 (Now supports aws:kms), set to null to disable SSE.

    ssekmsKeyId

    (Optional, but required when serverSideEncryption=aws:kms) Specifies the AWS KMS key ID to use for object encryption. All GET and PUT requests for an object protected by AWS KMS will fail if not made via SSL or using SigV4.

    ssekmsEncryptionContext

    (Optional) Specifies the AWS KMS Encryption Context to use for object encryption. The value of this header is a base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs.

    Flink
    variable-length dictionariesarrow-up-right
    CLPDECODEarrow-up-right
    design docarrow-up-right

    Before you upload, do the following:

    1. Create a schema configurationarrow-up-right or confirm one exists that matches the segment you want to upload.

    2. Create a table configuration or confirm one exists that matches the segment you want to upload.

    3. (If needed) Upload the schema and table configs.

    hashtag
    Create a segment fetcher

    If the data is in a location using HDFS, you can create a segment fetcher, which will push segment files from external systems such as those running Hadoop or Spark. It is possible to implement your own segment fetcher for other systems with an external jar by implementing a class that extends this interface.

    hashtag
    Use the Pinot Admin script to upload segments

    To do this, you need to create a JobSpec configuration file. For details, see Ingestion job spec. This file defines the job, including things like the job type, the input directory or URI, and the table name that the segments will be connected to.

    You can upload a Pinot segment using several methods:

    • Segment tar push

    • Segment URI push

    • Segment metadata push

    hashtag
    Segment tar push

    This is the original and default push mechanism. It requires the segment to be stored locally, or that the segment can be opened as an InputStream on PinotFS, so we can stream the entire segment tar file to the controller.

    The push job will upload the entire segment tar file to the Pinot controller.

    The Pinot controller will save the segment into the controller segment directory (Local or any PinotFS), then extract segment metadata, and add the segment to the table.

    While you can create a JobSpec for this job, in simple instances you can push without one.

    Upload segment files to your Pinot server from controller using the Pinot Admin script as follows:

    All options should be prefixed with - (hyphen)

    Option
    Description

    controllerHost

    Hostname or IP address of the controller

    controllerPort

    Port of the controller

    segmentDir

    Local directory containing segment files

    tableName

    Name of the table to push the segments into

    hashtag
    Segment URI push

    This push mechanism requires the segment tar file stored on a deep store with a globally accessible segment tar URI.

    URI push is lightweight on the client-side, and the controller side requires equivalent work as the tar push.

    The push job posts this segment tar URI to the Pinot controller.

    The Pinot controller saves the segment into the controller segment directory (local or any PinotFS), then extracts segment metadata, and adds the segment to the table.

    Upload segment files to your Pinot server using the JobSpec you create and the Pinot Admin script as follows:

    hashtag
    Segment metadata push

    This push mechanism also requires the segment tar file stored on a deep store with a globally accessible segment tar URI.

    Metadata push is lightweight on the controller side. There is no deep store download involved from the controller side.

    The push job downloads the segment based on URI, then extracts metadata, and upload metadata to the Pinot controller.

    The Pinot controller adds the segment to the table based on the metadata.

    Upload segment metadata to your Pinot server using the JobSpec you create and the Pinot Admin script as follows:

    {
      "timestamp": 1672531200000,
      "message": "INFO Task task_12 assigned to container: [ContainerID:container_15], operation took 0.335 seconds. 8 tasks remaining.",
      "logPath": "/mnt/data/application_123/container_15/stdout"
    }
    {
      "timestamp": 1672531200000,
      "message_logtype": "INFO Task \\x12 assigned to container: [ContainerID:\\x12], operation took \\x13 seconds. \\x11 tasks remaining.",
      "message_dictionaryVars": [
        "task_12",
        "container_15"
      ],
      "message_encodedVars": [
        1801439850948198735,
        8
      ],
      "logPath_logtype": "/mnt/data/\\x12/\\x12/stdout",
      "logPath_dictionaryVars": [
        "application_123",
        "container_15"
      ],
      "logPath_encodedVars": []
    }
    {
      "tableIndexConfig": {
        "streamConfigs": {
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.clplog.CLPLogMessageDecoder",
          "stream.kafka.decoder.prop.fieldsForClpEncoding": "message,logPath"
        },
        "varLengthDictionaryColumns": [
          "message_logtype",
          "message_dictionaryVars",
          "logPath_logtype",
          "logPath_dictionaryVars"
        ]
      }
    }
    {
      "dimensionFieldSpecs": [
        {
          "name": "message_logtype",
          "dataType": "STRING",
          "maxLength": 2147483647
        },
        {
          "name": "message_encodedVars",
          "dataType": "LONG",
          "singleValueField": false
        },
        {
          "name": "message_dictionaryVars",
          "dataType": "STRING",
          "maxLength": 2147483647,
          "singleValueField": false
        },
        {
          "name": "message_logtype",
          "dataType": "STRING",
          "maxLength": 2147483647
        },
        {
          "name": "message_encodedVars",
          "dataType": "LONG",
          "singleValueField": false
        },
        {
          "name": "message_dictionaryVars",
          "dataType": "STRING",
          "maxLength": 2147483647,
          "singleValueField": false
        }
      ]
    }
    pinot-admin.sh AddTable \\
      -tableConfigFile /path/to/table-config.json \\
      -schemaFile /path/to/table-schema.json -exec
    pinot-admin.sh UploadSegment \\
      -controllerHost localhost \\
      -controllerPort 9000 \\
      -segmentDir /path/to/local/dir \\
      -tableName myTable
    pinot-admin.sh LaunchDataIngestionJob \\
        -jobSpecFile /file/location/my-job-spec.yaml
    pinot-admin.sh LaunchDataIngestionJob \\
        -jobSpecFile /file/location/my-job-spec.yaml
    plugin is not part of official 0.10.0 binary. You can download the plugin from
    and add it to the libs or plugins directory in pinot.

    hashtag
    Set up Pulsar table

    Here is a sample Pulsar stream config. You can use the streamConfigs section from this sample and make changes for your corresponding table.

    hashtag
    Pulsar configuration options

    You can change the following Pulsar specifc configurations for your tables

    Property
    Description

    streamType

    This should be set to "pulsar"

    stream.pulsar.topic.name

    Your pulsar topic name

    stream.pulsar.bootstrap.servers

    Comma-separated broker list for Apache Pulsar

    stream.pulsar.metadata.populate

    set to true to populate metadata

    stream.pulsar.metadata.fields

    set to comma separated list of metadata fields

    hashtag
    Authentication

    The Pinot-Pulsar connector supports authentication using security tokens. To generate a token, follow the instructions in Pulsar documentationarrow-up-right. Once generated, add the following property to streamConfigs to add an authentication token for each request:

    hashtag
    OAuth2 Authentication

    The Pinot-Pulsar connector supports authentication using OAuth2, for example, if connecting to a StreamNative Pulsar cluster. For more information, see how to Configure OAuth2 authentication in Pulsar clientsarrow-up-right. Once configured, you can add the following properties to streamConfigs:

    hashtag
    TLS support

    The Pinot-pulsar connector also supports TLS for encrypted connections. You can follow the official pulsar documentationarrow-up-right to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.

    Also, make sure to change the brokers url from pulsar://localhost:6650 to pulsar+ssl://localhost:6650 so that secure connections are used.

    For other table and stream configurations, you can headover to Table configuration Reference

    hashtag
    Supported Pulsar versions

    Pinot currently relies on Pulsar client version 2.7.2. Make sure the Pulsar broker is compatible with the this client version.

    hashtag
    Extract record headers as Pinot table columns

    Pinot's Pulsar connector supports automatically extracting record headers and metadata into the Pinot table columns. Pulsar supports a large amount of per-record metadata. Reference the official Pulsar documentationarrow-up-right for the meaning of the metadata fields.

    The following table shows the mapping for record header/metadata to Pinot table column names:

    Pulsar Message
    Pinot table Column
    Comments
    Available By Default

    key : String

    __key : String

    Yes

    properties : Map<String, String>

    Each header key is listed as a separate column: __header$HeaderKeyName : String

    Yes

    publishTime : Long

    __metadata$publishTime : String

    In order to enable the metadata extraction in a Pulsar table, set the stream config metadata.populate to true. The fields eventTime, publishTime, brokerPublishTime, and key are populated by default. If you would like to extract additional fields from the Pulsar Message, populate the metadataFields config with a comma separated list of fields to populate. The fields are referenced by the field name in the Pulsar Message. For example, setting:

    Will make the __metadata$messageId, __metadata$messageBytes, __metadata$eventTime, and __metadata$topicName, fields available for mapping to columns in the Pinot schema.

    In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.

    For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:

    Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.

    circle-info

    Remember to follow the schema evolution guidelines when updating schema of an existing table!

    Apache Pulsararrow-up-right
    our external repositoryarrow-up-right

    Google Cloud Storage

    This guide shows you how to import data from GCP (Google Cloud Platform).

    Enable the Google Cloud Storagearrow-up-right using the pinot-gcs plugin. In the controller or server, add the config:

    circle-info

    By default Pinot loads all the plugins, so you can just drop this plugin there. Also, if you specify -Dplugins.include, you need to put all the plugins you want to use, e.g. pinot-json, pinot-avro , pinot-kafka-2.0...

    GCP file systems provides the following options:

    • projectId - The name of the Google Cloud Platform project under which you have created your storage bucket.

    • gcpKey - Location of the json file containing GCP keys. You can refer to download the keys.

    Each of these properties should be prefixed by pinot.[node].storage.factory.class.gs. where node is either controller or server depending on the configuration, like this:

    hashtag
    Examples

    hashtag
    Job spec

    hashtag
    Controller config

    hashtag
    Server config

    hashtag
    Minion config

    Complex Type (Array, Map) Handling

    Complex type handling in Apache Pinot.

    Commonly, ingested data has a complex structure. For example, Avro schemas have and while JSON supports and .

    Apache Pinot's data model supports primitive data types (including int, long, float, double, BigDecimal, string, bytes), and limited multi-value types, such as an array of primitive types. Simple data types allow Pinot to build fast indexing structures for good query performance, but does require some handling of the complex structures.

    There are two options for complex type handling:

    • Convert the complex-type data into a JSON string and then build a JSON index.

    Ingest records with dynamic schemas

    Storing records with dynamic schemas in a table with a fixed schema.

    Some domains (e.g., logging) generate records where each record can have a different set of keys, whereas Pinot tables have a relatively static schema. For records with varying keys, it's impractical to store each field in its own table column. However, most (if not all) fields may be important, so fields should not be dropped unnecessarily.

    The is a that can transform records with dynamic schemas such that they can be ingested in a table with a static schema. The transformer primarily takes record fields that don't exist in the schema and stores them in a type of catchall field.

    For example, consider this record:

    Let's say the table's schema contains the following fields:

    {
      "tableName": "pulsarTable",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "pulsar",
          "stream.pulsar.topic.name": "<your pulsar topic name>",
          "stream.pulsar.bootstrap.servers": "pulsar://localhost:6650,pulsar://localhost:6651",
          "stream.pulsar.consumer.prop.auto.offset.reset" : "smallest",
          "stream.pulsar.consumer.type": "lowlevel",
          "stream.pulsar.fetch.timeout.millis": "30000",
          "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
          "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
          "realtime.segment.flush.threshold.rows": "1000000",
          "realtime.segment.flush.threshold.time": "6h"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    "stream.pulsar.authenticationToken":"your-auth-token"
    "stream.pulsar.issuerUrl": "https://auth.streamnative.cloud"
    "stream.pulsar.credsFilePath": "file:///path/to/private_creds_file
    "stream.pulsar.audience": "urn:sn:pulsar:test:test-cluster"
    "stream.pulsar.tlsTrustCertsFilePath": "/path/to/ca.cert.pem"
    
    "streamConfigs": {
      ...
            "stream.pulsar.metadata.populate": "true",
            "stream.pulsar.metadata.fields": "messageId,messageIdBytes,eventTime,topicName",
      ...
    }
      "dimensionFieldSpecs": [
        {
          "name": "__key",
          "dataType": "STRING"
        },
        {
          "name": "__metadata$messageId",
          "dataType": "STRING"
        },
        ...
      ],
    -Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-gcs

    publish time as determined by the producer

    Yes

    brokerPublishTime: Optional

    __metadata$brokerPublishTime : String

    publish time as determined by the broker

    Yes

    eventTime : Long

    __metadata$eventTime : String

    Yes

    messageId : MessageId -> String

    __metadata$messageId : String

    String representation of the MessagId field. The format is ledgerId:entryId:partitionIndex

    messageId : MessageId -> bytes

    __metadata$messageBytes : String

    Base64 encoded version of the bytes returned from calling MessageId.toByteArray()

    producerName : String

    __metadata$producerName : String

    schemaVersion : byte[]

    __metadata$schemaVersion : String

    Base64 encoded value

    sequenceId : Long

    __metadata$sequenceId : String

    orderingKey : byte[]

    __metadata$orderingKey : String

    Base64 encoded value

    size : Integer

    __metadata$size : String

    topicName : String

    __metadata$topicName : String

    index : String

    __metadata$index : String

    redeliveryCount : Integer

    __metadata$redeliveryCount : String

    Creating and managing service account keysarrow-up-right

    timestamp

  • hostname

  • level

  • message

  • tags.platform

  • tags.service

  • indexableExtras

  • unindexableExtras

  • Without this transformer, the HOSTNAME field and the entire tags field would be dropped when storing the record in the table. However, with this transformer, the record would be transformed into the following:

    Notice that the transformer does the following:

    • Flattens nested fields which exist in the schema, like tags.platform

    • Drops some fields like HOSTNAME, where HOSTNAME must be listed as a field in the config option fieldPathsToDrop

    • Moves fields that don't exist in the schema and have the suffix _noIndex into the unindexableExtras field

    • Moves any remaining fields that don't exist in the schema into the indexableExtras field

    The unindexableExtras field allows the transformer to separate fields that don't need indexing (because they are only retrieved, not searched) from those that do.

    hashtag
    SchemaConformingTransformer Configuration

    To use the transformer, add the schemaConformingTransformerConfig option in the ingestionConfig section of your table configuration, as shown in the following example.

    For example:

    Available configuration options are listed in SchemaConformingTransformerConfigarrow-up-right.

    SchemaConformingTransformerarrow-up-right
    RecordTransformerarrow-up-right
    pinot.controller.storage.factory.class.gs.projectId=test-project
    executionFrameworkSpec:
        name: 'standalone'
        segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
        segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
        segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'gs://my-bucket/path/to/input/directory/'
    outputDirURI: 'gs://my-bucket/path/to/output/directory/'
    overwriteOutput: true
    pinotFSSpecs:
        - scheme: gs
          className: org.apache.pinot.plugin.filesystem.GcsPinotFS
          configs:
            projectId: 'my-project'
            gcpKey: 'path-to-gcp json key file'
    recordReaderSpec:
        dataFormat: 'csv'
        className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
        configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    tableSpec:
        tableName: 'students'
    pinotClusterSpecs:
        - controllerURI: 'http://localhost:9000'
    controller.data.dir=gs://path/to/data/directory/
    controller.local.temp.dir=/path/to/local/temp/directory
    controller.enable.split.commit=true
    pinot.controller.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
    pinot.controller.storage.factory.gs.projectId=my-project
    pinot.controller.storage.factory.gs.gcpKey=path/to/gcp/key.json
    pinot.controller.segment.fetcher.protocols=file,http,gs
    pinot.controller.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.server.instance.enable.split.commit=true
    pinot.server.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
    pinot.server.storage.factory.gs.projectId=my-project
    pinot.server.storage.factory.gs.gcpKey=path/to/gcp/key.json
    pinot.server.segment.fetcher.protocols=file,http,gs
    pinot.server.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    pinot.minion.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
    pinot.minion.storage.factory.gs.projectId=my-project
    pinot.minion.storage.factory.gs.gcpKey=path/to/gcp/key.json
    pinot.minion.segment.fetcher.protocols=file,http,gs
    pinot.minion.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
    {
      "timestamp": 1687786535928,
      "hostname": "host1",
      "HOSTNAME": "host1",
      "level": "INFO",
      "message": "Started processing job1",
      "tags": {
        "platform": "data",
        "service": "serializer",
        "params": {
          "queueLength": 5,
          "timeout": 299,
          "userData_noIndex": {
            "nth": 99
          }
        }
      }
    }
    {
      "timestamp": 1687786535928,
      "hostname": "host1",
      "level": "INFO",
      "message": "Started processing job1",
      "tags.platform": "data",
      "tags.service": "serializer",
      "indexableExtras": {
        "tags": {
          "params": {
            "queueLength": 5,
            "timeout": 299
          }
        }
      },
      "unindexableExtras": {
        "tags": {
          "userData_noIndex": {
            "nth": 99
          }
        }
      }
    }
    {
      "ingestionConfig": {
        "schemaConformingTransformerConfig": {
          "indexableExtrasField": "extras",
          "unindexableExtrasField": "extrasNoIndex",
          "unindexableFieldSuffix": "_no_index",
          "fieldPathsToDrop": [
            "HOSTNAME"
          ]
        }
      }
    }
  • Use the built-in complex-type handling rules in the ingestion configuration.

  • On this page, we'll show how to handle these complex-type structures with each of these two approaches. We will process some example data, consisting of the field group from the Meetup events Quickstart examplearrow-up-right.

    This object has two child fields and the child group is a nested array with elements of object type.

    Example JSON data

    hashtag
    JSON indexing

    Apache Pinot provides a powerful JSON index to accelerate the value lookup and filtering for the column. To convert an object group with complex type to JSON, add the following to your table configuration.

    The config transformConfigs transforms the object group to a JSON string group_json, which then creates the JSON indexing with configuration jsonIndexColumns. To read the full spec, see meetupRsvpJson_realtime_table_config.jsonarrow-up-right.

    Also, note that group is a reserved keyword in SQL and therefore needs to be quoted in transformFunction.

    circle-info

    The columnName can't use the same name as any of the fields in the source JSON data, for example, if our source data contains the field group and we want to transform the data in that field before persisting it, the destination column name would need to be something different, like group_json.

    circle-info

    Note that you do not need to worry about the maxLength of the field group_json on the schema, because "JSON" data type does not have a maxLength and will not be truncated. This is true even though "JSON" is stored as a string internally.

    The schema will look like this:

    For the full specification, see json_meetupRsvp_schema.jsonarrow-up-right.

    With this, you can start to query the nested fields under group. For more details about the supported JSON function, see guide).

    hashtag
    Ingestion configurations

    Though JSON indexing is a handy way to process the complex types, there are some limitations:

    • It’s not performant to group by or order by a JSON field, because JSON_EXTRACT_SCALAR is needed to extract the values in the GROUP BY and ORDER BY clauses, which invokes the function evaluation.

    • It does not work with Pinot's multi-column functionsarrow-up-right such as DISTINCTCOUNTMV.

    Alternatively, from Pinot 0.8, you can use the complex-type handling in ingestion configurations to flatten and unnest the complex structure and convert them into primitive types. Then you can reduce the complex-type data into a flattened Pinot table, and query it via SQL. With the built-in processing rules, you do not need to write ETL jobs in another compute framework such as Flink or Spark.

    To process this complex type, you can add the configuration complexTypeConfig to the ingestionConfig. For example:

    With the complexTypeConfig , all the map objects will be flattened to direct fields automatically. And with unnestFields , a record with the nested collection will unnest into multiple records. For instance, the example at the beginning will transform into two rows with this configuration example.

    Flattened/unnested data

    Note that:

    • The nested field group_id under group is flattened to group.group_id. The default value of the delimiter is . You can choose another delimiter by specifying the configuration delimiter under complexTypeConfig. This flattening rule also applies to maps in the collections to be unnested.

    • The nested array group_topics under group is unnested into the top-level, and converts the output to a collection of two rows. Note the handling of the nested field within group_topics, and the eventual top-level field of group.group_topics.urlkey. All the collections to unnest shall be included in the configuration fieldsToUnnest.

    • Collections not specified in fieldsToUnnestwill be serialized into JSON string, except for the array of primitive values, which will be ingested as a multi-value column by default. The behavior is defined by the collectionNotUnnestedToJson config, which takes the following values:

      • NON_PRIMITIVE- Converts the array to a multi-value column. (default)

    You can find the full specifications of the table config herearrow-up-right and the table schema herearrow-up-right.

    You can then query the table with primitive values using the following SQL query:

    circle-info

    . is a reserved character in SQL, so you need to quote the flattened columns in the query.

    hashtag
    Infer the Pinot schema from the Avro schema and JSON data

    When there are complex structures, it can be challenging and tedious to figure out the Pinot schema manually. To help with schema inference, Pinot provides utility tools to take the Avro schema or JSON data as input and output the inferred Pinot schema.

    To infer the Pinot schema from Avro schema, you can use a command like this:

    Note you can input configurations like fieldsToUnnest similar to the ones in complexTypeConfig. And this will simulate the complex-type handling rules on the Avro schema and output the Pinot schema in the file specified in outputDir.

    Similarly, you can use the command like the following to infer the Pinot schema from a file of JSON objects.

    You can check out an example of this run in this PRarrow-up-right.

    recordsarrow-up-right
    arraysarrow-up-right
    objectsarrow-up-right
    arraysarrow-up-right
    json_meetupRsvp_realtime_table_config.json
    {
        "ingestionConfig":{
          "transformConfigs": [
            {
              "columnName": "group_json",
              "transformFunction": "jsonFormat(\"group\")"
            }
          ],
        },
        ...
        "tableIndexConfig": {
        "loadMode": "MMAP",
        "noDictionaryColumns": [
          "group_json"
        ],
        "jsonIndexColumns": [
          "group_json"
        ]
      },
    
    }
    json_meetupRsvp_realtime_table_schema.json
    {
      {
          "name": "group_json",
          "dataType": "JSON",
        }
        ...
    }
    complexTypeHandling_meetupRsvp_realtime_table_config.json
    {
      "ingestionConfig": {    
        "complexTypeConfig": {
          "delimiter": ".",
          "fieldsToUnnest": ["group.group_topics"],
          "collectionNotUnnestedToJson": "NON_PRIMITIVE"
        }
      }
    }
    SELECT "group.group_topics.urlkey", 
           "group.group_topics.topic_name", 
           "group.group_id" 
    FROM meetupRsvp
    LIMIT 10
    bin/pinot-admin.sh AvroSchemaToPinotSchema \
      -timeColumnName fields.hoursSinceEpoch \
      -avroSchemaFile /tmp/test.avsc \
      -pinotSchemaName myTable \
      -outputDir /tmp/test \
      -fieldsToUnnest entries
    bin/pinot-admin.sh JsonToPinotSchema \
      -timeColumnName hoursSinceEpoch \
      -jsonFile /tmp/test.json \
      -pinotSchemaName myTable \
      -outputDir /tmp/test \
      -fieldsToUnnest payload.commits

    ALL- Converts the array of primitive values to JSON string.

  • NONE- Does not do any conversion.

  • Ingest streaming data from Apache Kafka

    This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.

    Learn how to ingest data from Kafka, a stream processing platform. You should have a local cluster up and running, following the instructions in Set up a cluster.

    hashtag
    Install and Launch Kafka

    Let's start by downloading Kafka to our local machine.

    To pull down the latest Docker image, run the following command:

    docker pull wurstmeister/kafka:latest

    Download Kafka from and then extract it:

    Next we'll spin up a Kafka broker:

    Note: The --network pinot-demo flag is optional and assumes that you have a Docker network named pinot-demo that you want to connect the Kafka container to.

    On one terminal window run this command:

    Start Zookeeper

    And on another window, run this command:

    Start Kafka Broker

    hashtag
    Data Source

    We're going to generate some JSON messages from the terminal using the following script:

    datagen.py

    If you run this script (python datagen.py), you'll see the following output:

    hashtag
    Ingesting Data into Kafka

    Let's now pipe that stream of messages into Kafka, by running the following command:

    We can check how many messages have been ingested by running the following command:

    Output

    And we can print out the messages themselves by running the following command

    Output

    hashtag
    Schema

    A schema defines what fields are present in the table along with their data types in JSON format.

    Create a file called /tmp/pinot/schema-stream.json and add the following content to it.

    hashtag
    Table Config

    A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.

    Create a file called /tmp/pinot/table-config-stream.json and add the following content to it.

    hashtag
    Create schema and table

    Create the table and schema by running the appropriate command below:

    hashtag
    Querying

    Navigate to and click on the events table to run a query that shows the first 10 rows in this table.

    Querying the events table

    hashtag
    Kafka ingestion guidelines

    hashtag
    Kafka versions in Pinot

    Pinot supports two versions of the Kafka library: kafka-0.9 and kafka-2.x for low level consumers.

    circle-info

    Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest tagged docker images or master build, you should replace org.apache.kafka with shaded.org.apache.kafka in your table config.

    hashtag
    Upgrade from Kafka 0.9 connector to Kafka 2.x connector

    • Update table config for low level consumer: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

    circle-info

    Pinot does not support using high-level Kafka consumers (HLC). Pinot uses low-level consumers to ensure accurate results, supports operational complexity and scalability, and minimizes storage overhead.

    hashtag
    How to consume from a Kafka version > 2.0.0

    This connector is also suitable for Kafka lib version higher than 2.0.0. In , change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

    hashtag
    Kafka configurations in Pinot

    hashtag
    Use Kafka partition (low) level consumer with SSL

    Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl. are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    hashtag
    Consume transactionally-committed messages

    The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level in Kafka stream config, which can be read_committed or read_uncommitted (default). Setting it to read_committed will ingest transactionally committed messages in Kafka stream only.

    For example,

    Note that the default value of this config read_uncommitted to read all messages. Also, this config supports low-level consumer only.

    hashtag
    Use Kafka partition (low) level consumer with SASL_SSL

    Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    hashtag
    Extract record headers as Pinot table columns

    Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:

    Kafka Record
    Pinot Table Column
    Description

    In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate to true.

    In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.

    For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:

    Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.

    circle-info

    Remember to follow the when updating schema of an existing table!

    hashtag
    Tell Pinot where to find an Avro schema

    There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data]() for details.

    To avoid errors like The Avro schema must be provided, designate the location of the schema in your streamConfigs section. For example, if your current section contains the following:

    Then add this key: "stream.kafka.decoder.prop.schema"followed by a value that denotes the location of your schema.

    Record metadata - recordTimestamp : long

    __metadata$recordTimestamp : String

    Record key: any type <K>

    __key : String

    For simplicity of design, we assume that the record key is always a UTF-8 encoded String

    Record Headers: Map<String, String>

    Each header key is listed as a separate column: __header$HeaderKeyName : String

    For simplicity of design, we directly map the string headers from kafka record to pinot table column

    Record metadata - offset : long

    __metadata$offset : String

    Record metadata - partition : int

    __metadata$partition : String

    kafka.apache.org/quickstart#quickstart_downloadarrow-up-right
    localhost:9000/#/queryarrow-up-right
    Kafka 2.0 connector pom.xmlarrow-up-right
    schema evolution guidelines
    https://docs.pinot.apache.org/basics/data-import/complex-type#infer-the-pinot-schema-from-the-avro-schema-and-json-dataarrow-up-right
    tar -xzf kafka_2.13-3.7.0.tgz
    cd kafka_2.13-3.7.0
    docker run --network pinot-demo --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka -e KAFKA_BROKER_ID=0 -e KAFKA_ADVERTISED_HOST_NAME=kafka wurstmeister/kafka:latest
    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties

    Stream ingestion

    This guide shows you how to ingest a stream of records into a Pinot table.

    Apache Pinot lets users consume data from streams and push it directly into the database. This process is called stream ingestion. Stream ingestion makes it possible to query data within seconds of publication.

    Stream ingestion provides support for checkpoints for preventing data loss.

    To set up Stream ingestion, perform the following steps, which are described in more detail in this page:

    1. Create schema configuration

    import datetime
    import uuid
    import random
    import json
    
    while True:
        ts = int(datetime.datetime.now().timestamp()* 1000)
        id = str(uuid.uuid4())
        count = random.randint(0, 1000)
        print(
            json.dumps({"ts": ts, "uuid": id, "count": count})
        )
    
    {"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
    {"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
    {"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}
    python datagen.py | docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic events;
    python datagen.py | bin/kafka-console-producer.sh --bootstrap-server localhost:9092  --topic events;
    docker exec -i kafka kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic events
    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic events
    events:0:11940
    docker exec -i kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events
    ...
    {"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
    {"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
    {"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}
    ...
    {
      "schemaName": "events",
      "dimensionFieldSpecs": [
        {
          "name": "uuid",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "INT"
        }
      ],
      "dateTimeFieldSpecs": [{
        "name": "ts",
        "dataType": "TIMESTAMP",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }]
    }
    {
      "tableName": "events",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "ts",
        "schemaName": "events",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "events",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.broker.list": "kafka:9092",
          "realtime.segment.flush.threshold.rows": "0",
          "realtime.segment.flush.threshold.time": "24h",
          "realtime.segment.flush.threshold.segment.size": "50M",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    docker run --rm -ti  --network=pinot-demo  -v /tmp/pinot:/tmp/pinot  apachepinot/pinot:1.0.0 AddTable  -schemaFile /tmp/pinot/schema-stream.json  -tableConfigFile /tmp/pinot/table-config-stream.json  -controllerHost pinot-controller  -controllerPort 9000 -exec
    bin/pinot-admin.sh AddTable -schemaFile /tmp/pinot/schema-stream.json -tableConfigFile /tmp/pinot/table-config-stream.json
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
            "stream.kafka.broker.list": "localhost:9092",
            "schema.registry.url": "",
            "security.protocol": "SSL",
            "ssl.truststore.location": "",
            "ssl.keystore.location": "",
            "ssl.truststore.password": "",
            "ssl.keystore.password": "",
            "ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.protocol": ""
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.isolation.level": "read_committed"
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
    "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.topic.name": "mytopic",
            "stream.kafka.consumer.prop.auto.offset.reset": "largest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.schema.registry.url": "https://xxx",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "https://xxx",
            "stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
            "stream.kafka.decoder.prop.schema.registry.basic.auth.user.info": "schema_registry_username:schema_registry_password",
            "sasl.mechanism": "PLAIN" ,
            "security.protocol": "SASL_SSL" ,
            "sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.autotune.initialRows": "3000000",
            "realtime.segment.flush.threshold.segment.size": "500M"
          },
      "dimensionFieldSpecs": [
        {
          "name": "__key",
          "dataType": "STRING"
        },
        {
          "name": "__metadata$offset",
          "dataType": "STRING"
        },
        {
          "name": "__metadata$partition",
          "dataType": "STRING"
        },
        ...
      ],
    ...
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest"
      ...
    }

    Create table configuration

  • Create ingestion configuration

  • Upload table and schema spec

  • Here's an example where we assume the data to be ingested is in the following format:

    hashtag
    Create schema configuration

    The schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions , metrics, or timestamp. For more details on schema configuration, see creating a schema.

    For our sample data, the schema configuration looks like this:

    hashtag
    Create table configuration with ingestion configuration

    The next step is to create a table where all the ingested data will flow and can be queried. For details about each table component, see the tablearrow-up-right reference.

    The table configuration contains an ingestion configuration (ingestionConfig), which specifies how to ingest streaming data into Pinot. For details, see the ingestion configuration reference.

    hashtag
    Example table config with ingestionConfig

    For our sample data and schema, the table config will look like this:

    hashtag
    Upload schema and table config

    Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, Pinot will start ingesting available records from the topic.

    hashtag
    Tune the stream config

    hashtag
    Throttle stream consumption

    There are some scenarios where the message rate in the input stream can come in bursts which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other real-time tables on the same server. If this happens to you, throttle the consumption rate during stream ingestion to better manage overall performance.

    Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit which indicates the upper bound on the message rate for the entire topic.

    Here is the sample configuration on how to configure the consumption throttling:

    Some things to keep in mind while tuning this config are:

    • Since this configuration applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.

    • In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.

    Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:

    In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION.

    Note that any configuration change for topic.consumption.rate.limit in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Refer to Pause Stream Ingestion for more details.

    hashtag
    Custom ingestion support

    You can also write an ingestion plugin if the platform you are using is not supported out of the box. For a walkthrough, see Stream Ingestion Plugin.

    hashtag
    Pause stream ingestion

    There are some scenarios in which you may want to pause the real-time ingestion while your table is available for queries. For example, if there is a problem with the stream ingestion and, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.

    When a Pause request is issued, the controller instructs the real-time servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Note that Pause and Resume requests are async. An OK response means that instructions for pausing or resuming has been successfully sent to the real-time server. If you want to know if the consumption has actually stopped or resumed, issue a pause status request.

    It's worth noting that consuming segments on real-time servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages), it will result in the table getting into an error state.

    The pause and resume feature is helpful in these instances. When a pause request is issued by the operator, consuming segments are committed without starting new mutable segments. Instead, new mutable segments are started only when the resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.

    There is another feature called Force Commit which utilizes the primitives of the pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.

    (v 0.12.0+) Once submitted, the forceCommit API returns a jobId that can be used to get the current progress of the forceCommit operation. A sample response and status API call:

    circle-info

    The forceCommit request just triggers a regular commit before the consuming segments reaching the end criteria, so it follows the same mechanism as regular commit. It is one-time shot request, and not retried automatically upon failure. But it is idempotent so one may keep issuing it till success if needed.

    This API is async, as it doesn't wait for the segment commit to complete. But a status entry is put in ZK to track when the request is issued and the consuming segments included. The consuming segments tracked in the status entry are compared with the latest IdealState to indicate the progress of forceCommit. However, this status is not updated or deleted upon commit success or failure, so that it could become stale. Currently, the most recent 100 status entries are kept in ZK, and the oldest ones only get deleted when the total number is about to exceed 100.

    For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a pause request. Second, change the consumption parameters. Finally, issue the resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.

    hashtag
    Handle partition changes in streams

    If a Pinot table is configured to consume using a Low Level (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.

    Pinot runs a periodic task called RealtimeSegmentValidationManager that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a periodic task that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.

    If you want to recognize the new partitions sooner, then manually trigger the periodic task so as to recognize such data immediately.

    hashtag
    Infer ingestion status of real-time tables

    Often, it is important to understand the rate of ingestion of data into your real-time table. This is commonly done by looking at the consumption lag of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).

    The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo API or the table's /debug API, as shown below:

    A sample response from a Kafka-based real-time table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.

    Term
    Description

    currentOffsetsMap

    Current consuming offset position per partition

    latestUpstreamOffsetMap

    (Wherever applicable) Latest offset found in the upstream topic partition

    recordsLagMap

    (Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset and currentOffset for the partition when the lag computation request is made.

    recordsAvailabilityLagMap

    (Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.

    hashtag
    Monitor real-time ingestion

    Real-time ingestion includes 3 stages of message processing: Decode, Transform, and Index.

    In each of these stages, a failure can happen which may or may not result in an ingestion failure. The following metrics are available to investigate ingestion issues:

    1. Decode stage -> an error here is recorded as INVALID_REALTIME_ROWS_DROPPED

    2. Transform stage -> possible errors here are:

      1. When a message gets dropped due to the FILTER transform, it is recorded as REALTIME_ROWS_FILTERED

      2. When the transform pipeline sets the $INCOMPLETE_RECORD_KEY$ key in the message, it is recorded as INCOMPLETE_REALTIME_ROWS_CONSUMED , only when continueOnError configuration is enabled. If the continueOnError is not enabled, the ingestion fails.

    3. Index stage -> When there is failure at this stage, the ingestion typically stops and marks the partition as ERROR.

    There is yet another metric called ROWS_WITH_ERROR which is the sum of all error counts in the 3 stages above.

    Furthermore, the metric REALTIME_CONSUMPTION_EXCEPTIONS gets incremented whenever there is a transient/permanent stream exception seen during consumption.

    These metrics can be used to understand why ingestion failed for a particular table partition before diving into the server logs.

    docker run \
        --network=pinot-demo \
        -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
        --name pinot-streaming-table-creation \
        apachepinot/pinot:latest AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -controllerHost pinot-quickstart \
        -controllerPort 9000 \
        -exec
    bin/pinot-admin.sh AddTable \
        -schemaFile /path/to/transcript-schema.json \
        -tableConfigFile /path/to/transcript-table-realtime.json \
        -exec
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
    {"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
    /tmp/pinot-quick-start/transcript-schema.json
    {
      "schemaName": "transcript",
      "dimensionFieldSpecs": [
        {
          "name": "studentID",
          "dataType": "INT"
        },
        {
          "name": "firstName",
          "dataType": "STRING"
        },
        {
          "name": "lastName",
          "dataType": "STRING"
        },
        {
          "name": "gender",
          "dataType": "STRING"
        },
        {
          "name": "subject",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "score",
          "dataType": "FLOAT"
        }
      ],
      "dateTimeFieldSpecs": [{
        "name": "timestamp",
        "dataType": "LONG",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }]
    }
    {
      "tableName": "transcript",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
      },
      "metadata": {
        "customConfigs": {}
      },
      "ingestionConfig": {
        "streamIngestionConfig": {
            "streamConfigMaps": [
              {
                "realtime.segment.flush.threshold.rows": "0",
                "stream.kafka.decoder.prop.format": "JSON",
                "key.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
                "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
                "streamType": "kafka",
                "value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
                "stream.kafka.consumer.type": "LOWLEVEL",
                "realtime.segment.flush.threshold.segment.rows": "50000",
                "stream.kafka.broker.list": "localhost:9876",
                "realtime.segment.flush.threshold.time": "3600000",
                "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
                "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
                "stream.kafka.topic.name": "transcript-topic"
              }
            ]
          },
          "transformConfigs": [],
          "continueOnError": true,
          "rowTimeValueCheck": true,
          "segmentTimeValueCheck": false
        },
        "isDimTable": false
      }
    }
    {
      "tableName": "transcript",
      "tableType": "REALTIME",
      ...
      "ingestionConfig": {
        "streamIngestionConfig":,
        "streamConfigMaps": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "transcript-topic",
          ...
          "topic.consumption.rate.limit": 1000
        }
      },
      ...
    A consumption rate limiter is set up for topic <topic_name> in table <tableName> with rate limit: <rate_limit> (topic rate limit: <topic_rate_limit>, partition count: <partition_count>)
    $ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
    $ curl -X POST {controllerHost}/tables/{tableName}/pauseConsumption
    $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption
    $ curl -X POST {controllerHost}/tables/{tableName}/pauseStatus
    $ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
    $ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
    {
      "forceCommitJobId": "6757284f-b75b-45ce-91d8-a277bdbc06ae",
      "forceCommitStatus": "SUCCESS",
      "jobMetaZKWriteStatus": "SUCCESS"
    }
    
    $ curl -X GET {controllerHost}/tables/forceCommitStatus/6757284f-b75b-45ce-91d8-a277bdbc06ae
    {
      "jobId": "6757284f-b75b-45ce-91d8-a277bdbc06ae",
      "segmentsForceCommitted": "[\"airlineStats__0__0__20230119T0700Z\",\"airlineStats__1__0__20230119T0700Z\",\"airlineStats__2__0__20230119T0700Z\"]",
      "submissionTimeMs": "1674111682977",
      "numberOfSegmentsYetToBeCommitted": 0,
      "jobType": "FORCE_COMMIT",
      "segmentsYetToBeCommitted": [],
      "tableName": "airlineStats_REALTIME"
    }
    $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption?resumeFrom=smallest
    $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption?resumeFrom=largest
    # GET /tables/{tableName}/consumingSegmentsInfo
    curl -X GET "http://<controller_url:controller_admin_port>/tables/meetupRsvp/consumingSegmentsInfo" -H "accept: application/json"
    
    # GET /debug/tables/{tableName}
    curl -X GET "http://localhost:9000/debug/tables/meetupRsvp?type=REALTIME&verbosity=1" -H "accept: application/json"
    {
      "_segmentToConsumingInfoMap": {
        "meetupRsvp__0__0__20221019T0639Z": [
          {
            "serverName": "Server_192.168.0.103_7000",
            "consumerState": "CONSUMING",
            "lastConsumedTimestamp": 1666161593904,
            "partitionToOffsetMap": { // <<-- Deprecated. See currentOffsetsMap for same info
              "0": "6"
            },
            "partitionOffsetInfo": {
              "currentOffsetsMap": {
                "0": "6" // <-- Current consumer position
              },
              "latestUpstreamOffsetMap": {
                "0": "6"  // <-- Upstream latest position
              },
              "recordsLagMap": {
                "0": "0"  // <-- Lag, in terms of #records behind latest
              },
              "recordsAvailabilityLagMap": {
                "0": "2"  // <-- Lag, in terms of time
              }
            }
          }
        ],

    Input formats

    This section contains a collection of guides that will show you how to import data from a Pinot-supported input format.

    Pinot offers support for various popular input formats during ingestion. By changing the input format, you can reduce the time spent doing serialization-deserialization and speed up the ingestion.

    hashtag
    Configuring input formats

    To change the input format, adjust the recordReaderSpec config in the ingestion job specification.

    The configuration consists of the following keys:
    • dataFormat: Name of the data format to consume.

    • className: Name of the class that implements the RecordReader interface. This class is used for parsing the data.

    • configClassName: Name of the class that implements the RecordReaderConfig interface. This class is used the parse the values mentioned in configs

    • configs: Key-value pair for format-specific configurations. This field is optional.

    hashtag
    Supported input formats

    Pinot supports multiple input formats out of the box. Specify the corresponding readers and the associated custom configurations to switch between formats.

    hashtag
    CSV

    CSV Record Reader supports the following configs:

    • fileFormat: default, rfc4180, excel, tdf, mysql

    • header: Header of the file. The columnNames should be separated by the delimiter mentioned in the configuration.

    • delimiter: The character seperating the columns.

    • multiValueDelimiter: The character separating multiple values in a single column. This can be used to split a column into a list.

    • skipHeader: Skip header record in the file. Boolean.

    • ignoreEmptyLines: Ignore empty lines (instead of filling them with default values). Boolean.

    • ignoreSurroundingSpaces: ignore spaces around column names and values. Boolean

    • quoteCharacter: Single character used for quotes in CSV files.

    • recordSeparator: Character used to separate records in the input file. Default is or \r depending on the platform.

    • nullStringValue: String value that represents null in CSV files. Default is empty string.

    • skipUnParseableLines : Skip lines that cannot be parsed. Note that this would result in data loss. Boolean.

    circle-info

    Your CSV file may have raw text fields that cannot be reliably delimited using any character. In this case, explicitly set the multiValueDelimeter field to empty in the ingestion config. multiValueDelimiter: ''

    hashtag
    Avro

    The Avro record reader converts the data in file to a GenericRecord. A Java class or .avro file is not required. By default, the Avro record reader only supports primitive types. To enable support for rest of the Avro data types, set enableLogicalTypes to true .

    We use the following conversion table to translate between Avro and Pinot data types. The conversions are done using the offical Avro methods present in org.apache.avro.Conversions.

    Avro Data Type
    Pinot Data Type
    Comment

    INT

    INT

    LONG

    LONG

    FLOAT

    FLOAT

    hashtag
    JSON

    hashtag
    Thrift

    circle-info

    Thrift requires the generated class using .thrift file to parse the data. The .class file should be available in the Pinot's classpath. You can put the files in the lib/ folder of Pinot distribution directory.

    hashtag
    Parquet

    Since 0.11.0 release, the Parquet record reader determines whether to use ParquetAvroRecordReader or ParquetNativeRecordReader to read records. The reader looks for the parquet.avro.schema or avro.schema key in the parquet file footer, and if present, uses the Avro reader.

    You can change the record reader manually in case of a misconfiguration.

    circle-exclamation

    For the support of DECIMAL and other parquet native data types, always use ParquetNativeRecordReader.

    INT96

    LONG

    ParquetINT96 type converts nanoseconds

    to Pinot INT64 type of milliseconds

    INT64

    LONG

    INT32

    INT

    FLOAT

    FLOAT

    DOUBLE

    For ParquetAvroRecordReader , you can refer to the Avro section above for the type conversions.

    hashtag
    ORC

    ORC record reader supports the following data types -

    ORC Data Type
    Java Data Type

    BOOLEAN

    String

    SHORT

    Integer

    INT

    Integer

    LONG

    Integer

    FLOAT

    Float

    circle-info

    In LIST and MAP types, the object should only belong to one of the data types supported by Pinot.

    hashtag
    Protocol Buffers

    The reader requires a descriptor file to deserialize the data present in the files. You can generate the descriptor file (.desc) from the .proto file using the command -

    recordReaderSpec:
      dataFormat: 'csv'
      className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
      configs: 
    			key1 : 'value1'
    			key2 : 'value2'
    dataFormat: 'csv'
    className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
    configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
    configs:
    	fileFormat: 'default' #should be one of default, rfc4180, excel, tdf, mysql
    	header: 'columnName separated by delimiter'
      delimiter: ','
      multiValueDelimiter: '-'
    dataFormat: 'avro'
    className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
    configs:
        enableLogicalTypes: true
    dataFormat: 'json'
    className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
    dataFormat: 'thrift'
    className: 'org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader'
    configs:
    	thriftClass: 'ParserClassName'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
    dataFormat: 'parquet'
    className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'
    dataFormat: 'orc'
    className: 'org.apache.pinot.plugin.inputformat.orc.ORCRecordReader'
    dataFormat: 'proto'
    className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
    configs:
    	descriptorFile: 'file:///path/to/sample.desc'
    protoc --include_imports --descriptor_set_out=/absolute/path/to/output.desc /absolute/path/to/input.proto

    DOUBLE

    DOUBLE

    BOOLEAN

    BOOLEAN

    STRING

    STRING

    ENUM

    STRING

    BYTES

    BYTES

    FIXED

    BYTES

    MAP

    JSON

    ARRAY

    JSON

    RECORD

    JSON

    UNION

    JSON

    DECIMAL

    BYTES

    UUID

    STRING

    DATE

    STRING

    yyyy-MM-dd format

    TIME_MILLIS

    STRING

    HH:mm:ss.SSS format

    TIME_MICROS

    STRING

    HH:mm:ss.SSSSSS format

    TIMESTAMP_MILLIS

    TIMESTAMP

    TIMESTAMP_MICROS

    TIMESTAMP

    DOUBLE

    BINARY

    BYTES

    FIXED-LEN-BYTE-ARRAY

    BYTES

    DECIMAL

    DOUBLE

    ENUM

    STRING

    UTF8

    STRING

    REPEATED

    MULTIVALUE/MAP (represented as MV

    if parquet original type is LIST, then it is converted to MULTIVALUE column otherwise a MAP column.

    DOUBLE

    Double

    STRING

    String

    VARCHAR

    String

    CHAR

    String

    LIST

    Object[]

    MAP

    Map<Object, Object>

    DATE

    Long

    TIMESTAMP

    Long

    BINARY

    byte[]

    BYTE

    Integer

    Stream ingestion with Upsert

    Upsert support in Apache Pinot.

    Pinot provides native support of upserts during real-time ingestion. There are scenarios where records need modifications, such as correcting a ride fare or updating a delivery status.

    Partial upserts are convenient as you only need to specify the columns where values change, and you ignore the rest.

    hashtag
    Overview of upserts in Pinot

    See an overview of how upserts work in Pinot 1.0.

    hashtag
    Enable upserts in Pinot

    To enable upserts on a Pinot table, do the following:

    hashtag
    Define the primary key in the schema

    To update a record, you need a primary key to uniquely identify the record. To define a primary key, add the field primaryKeyColumns to the schema definition. For example, the schema definition of UpsertMeetupRSVP in the quick start example has this definition.

    Note this field expects a list of columns, as the primary key can be a composite.

    When two records of the same primary key are ingested, the record with the greater comparison value (timeColumn by default) is used. When records have the same primary key and event time, then the order is not determined. In most cases, the later ingested record will be used, but this may not be true in cases where the table has a column to sort by.

    circle-exclamation

    Partition the input stream by the primary key

    An important requirement for the Pinot upsert table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the API. If the original stream is not partitioned, then a streaming processing job (such as with Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.

    Additionally if using

    hashtag
    Enable upsert in the table configurations

    To enable upsert, make the following configurations in the table configurations.

    hashtag
    Upsert modes

    Full upsert

    The upsert mode defaults to FULL . FULL upsert means that a new record will replace the older record completely if they have same primary key. Example config:

    Partial upserts

    Partial upsert lets you choose to update only specific columns and ignore the rest.

    To enable the partial upsert, set the mode to PARTIAL and specify partialUpsertStrategies for partial upsert columns. Since release-0.10.0, OVERWRITE is used as the default strategy for columns without a specified strategy. defaultPartialUpsertStrategy is also introduced to change the default strategy for all columns.

    circle-info

    Note that null handling must be enabled for partial upsert to work.

    For example:

    Pinot supports the following partial upsert strategies:

    Strategy
    Description
    circle-info

    With partial upsert, if the value is null in either the existing record or the new coming record, Pinot will ignore the upsert strategy and the null value:

    (null, newValue) -> newValue

    (oldValue, null

    None upserts

    If set mode to NONE, the upsert is disabled.

    hashtag
    Comparison column

    By default, Pinot uses the value in the time column (timeColumn in tableConfig) to determine the latest record. That means, for two records with the same primary key, the record with the larger value of the time column is picked as the latest update. However, there are cases when users need to use another column to determine the order. In such case, you can use option comparisonColumn to override the column used for comparison. For example,

    For partial upsert table, the out-of-order events won't be consumed and indexed. For example, for two records with the same primary key, if the record with the smaller value of the comparison column came later than the other record, it will be skipped.

    hashtag
    Multiple comparison columns

    In some cases, especially where partial upsert might be employed, there may be multiple producers of data each writing to a mutually exclusive set of columns, sharing only the primary key. In such a case, it may be helpful to use one comparison column per producer group so that each group can manage its own specific versioning semantics without the need to coordinate versioning across other producer groups.

    Documents written to Pinot are expected to have exactly 1 non-null value out of the set of comparisonColumns; if more than 1 of the columns contains a value, the document will be rejected. When new documents are written, whichever comparison column is non-null will be compared against only that same comparison column seen in prior documents with the same primary key. Consider the following examples, where the documents are assumed to arrive in the order specified in the array.

    The following would occur:

    1. orderReceived: 1

    • Result: persisted

    • Reason: first doc seen for primary key "aa"

    1. orderReceived: 2

    • Result: persisted (replacing orderReceived: 1)

    • Reason: comparison column (secondsSinceEpoch) larger than that previously seen

    1. orderReceived: 3

    • Result: rejected

    • Reason: comparison column (secondsSinceEpoch) smaller than that previously seen

    1. orderReceived: 4

    • Result: persisted (replacing orderReceived: 2)

    • Reason: comparison column (otherComparisonColumn) larger than previously seen (never seen previously), despite the value being smaller than that seen for secondsSinceEpoch

    1. orderReceived: 5

    • Result: rejected

    • Reason: comparison column (otherComparisonColumn) smaller than that previously seen

    1. orderReceived: 6

    • Result: persist (replacing orderReceived: 4)

    • Reason: comparison column (otherComparisonColumn) larger than that previously seen

    hashtag
    Metadata time-to-live (TTL)

    In Pinot, the metadata map is stored in heap memory. To decrease in-memory data and improve performance, minimize the time primary key entries are stored in the metadata map (metadata time-to-live (TTL)). Limiting the TTL is especially useful for primary keys with high cardinality and frequent updates.

    Since the metadata TTL is applied on the first comparison column, the time unit of upsert TTL is the same as the first comparison column.

    hashtag
    Configure how long primary keys are stored in metadata

    To configure how long primary keys are stored in metadata, specify the length of time in upsertTTL. For example:{

    In this example, Pinot will retain primary keys in metadata for 1 day.

    Note that enabling upsert snapshot is required for metadata TTL for in-memory validDocsIDs recovery.

    hashtag
    Delete column

    Upsert Pinot table can support soft-deletes of primary keys. This requires the incoming record to contain a dedicated boolean single-field column that serves as a delete marker for a primary key. Once the real-time engine encounters a record with delete column set to true , the primary key will no longer be part of the queryable set of documents. This means the primary key will not be visible in the queries, unless explicitly requested via query option skipUpsert=true.

    Note that the delete column has to be a single-value boolean column.

    circle-info

    Note that when deleteRecordColumn is added to an existing table, it will require a server restart to actually pick up the upsert config changes.

    A deleted primary key can be revived by ingesting a record with the same primary, but with higher comparison column value(s).

    Note that when reviving a primary key in a partial upsert table, the revived record will be treated as the source of truth for all columns. This means any previous updates to the columns will be ignored and overwritten with the new record's values.

    hashtag
    Deleted Keys time-to-live (TTL)

    The above config deleteRecordColumn only soft-deletes the primary key. To decrease in-memory data and improve performance, minimize the time deleted-primary-key entries are stored in the metadata map (deletedKeys time-to-live (TTL)). Limiting the TTL is especially useful for deleted-primary-keys where there are no future updates foreseen.

    hashtag
    Configure how long deleted-primary-keys are stored in metadata

    To configure how long primary keys are stored in metadata, specify the length of time in deletedKeysTTL For example:

    In this example, Pinot will retain the deleted-primary-keys in metadata for 1 day.

    circle-info

    Note that the value of this field deletedKeysTTL should be the same as the unit of comparison column. If your comparison column is having values which corresponds to seconds, this config should also have values in seconds (see above example).

    hashtag
    Use strictReplicaGroup for routing

    The upsert Pinot table can use only the low-level consumer for the input streams. As a result, it uses the implicitly for the segments. Moreover, upsert poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires to use strictReplicaGroup as the routing strategy. To use that, configure instanceSelectorType in Routing as the following:

    circle-exclamation

    Using implicit partitioned replica-group assignment from low-level consumer won't persist the instance assignment (mapping from partition to servers) to the ZooKeeper, and new added servers will be automatically included without explicit reassigning instances (usually through rebalance). This can cause new segments of the same partition assigned to a different server and break the requirement of upsert.

    To prevent this, we recommend using explicit to ensure the instance assignment is persisted. Note that numInstancesPerPartition should always be 1 in replicaGroupPartitionConfig.

    hashtag
    Enable validDocIds snapshots for upsert metadata recovery

    Upsert snapshot support is also added in release-0.12.0. To enable the snapshot, set the enableSnapshot to true. For example:

    Upsert maintains metadata in memory containing which docIds are valid in a particular segment (ValidDocIndexes). This metadata gets lost during server restarts and needs to be recreated again. ValidDocIndexes can not be recovered easily after out-of-TTL primary keys get removed. Enabling snapshots addresses this problem by adding functions to store and recover validDocIds snapshot for Immutable Segments

    The snapshots are taken on every segment commit to ensure that they are consistent with the persisted data in case of abrupt shutdown. We recommend that you enable this feature so as to speed up server boot times during restarts.

    circle-info

    The lifecycle for validDocIds snapshots are shows as follows,

    1. If snapshot is enabled, load validDocIds from snapshot during add segments.

    2. If snapshot is not enabled, delete validDocIds snapshots during add segments if exists.

    hashtag
    Enable preload for faster server restarts

    Upsert preload feature can make it faster to restore the upsert states when server restarts. To enable the preload feature, set the enablePreload to true. To enable preloading, enableSnapshot: true should also be set in the table config. For example:

    Under the hood, it uses the validDocIds snapshots to identify the valid docs and restore their upsert metadata quickly instead of performing a whole upsert comparison flow. The flow is triggered before the server is marked as ready, after which the server starts to load the remaining segments without snapshots (hence the name preload).

    The feature also requires you to specify pinot.server.instance.max.segment.preload.threads: N in the server config where N should be replaced with the number of threads that should be used for preload. It's 0 by default to disable the preloading feature.

    hashtag
    Handle out-of-order events

    There are 2 configs added related to handling out-of-order events.

    hashtag
    dropOutOfOrderRecord

    To enable dropping of out-of-order record, set the dropOutOfOrderRecord to true. For example:

    This feature doesn't persist any out-of-order event to the consuming segment. If not specified, the default value is false.

    • When false, the out-of-order record gets persisted to the consuming segment, but the MetadataManager mapping is not updated thus this record is not referenced in query or in any future updates. You can still see the records when using skipUpsert query option.

    • When true, the out-of-order record doesn't get persisted at all and the MetadataManager mapping is not updated so this record is not referenced in query or in any future updates. You cannot see the records when using skipUpsert query option.

    hashtag
    outOfOrderRecordColumn

    This is to identify out-of-order events programmatically. To enable this config, add a boolean field in your table schema, say isOutOfOrder and enable via this config. For example:

    This feature persists a true / false value to the isOutOfOrder field based on the orderness of the event. You can filter out out-of-order events while using skipUpsert to avoid any confusion. For example:

    hashtag
    Use custom metadata manager

    Pinot supports custom PartitionUpsertMetadataManager that handle records and segments updates.

    hashtag
    Adding custom upsert managers

    You can add custom PartitionUpsertMetadataManager as follows:

    • Create a new java project. Make sure you keep the package name as org.apache.pinot.segment.local.upsert.xxx

    • In your java project include the dependency

    • Add your custom partition manager that implements PartitionUpsertMetadataManager interface

    • Add your custom TableUpsertMetadataManager that implements BaseTableUpsertMetadataManager interface

    • Place the compiled JAR in the /plugins directory in pinot. You will need to restart all Pinot instances if they are already running.

    • Now, you can use the custom upsert manager in table configs as follows:

    ⚠️ The upsert manager class name is case-insensitive as well.

    hashtag
    Upsert table limitations

    There are some limitations for the upsert Pinot tables.

    • The upsert feature is supported for Real-time tables only, and not for Hybrid or Offline tables.

    • The high-level consumer is not allowed for the input stream ingestion, which means stream.[consumerName].consumer.type must always be lowLevel.

    hashtag
    Best practices

    Unlike other real-time tables, Upsert table takes up more memory resources as it needs to bookkeep the record locations in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Upsert table.

    hashtag
    Create the topic/stream with more partitions.

    The number of partitions in input streams determines the partition numbers of the Pinot table. The more partitions you have in input topic/stream, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally. Do note that you can't increase the partitions in future for upsert enabled tables so you need to start with good enough partitions (atleast 2-3X the number of pinot servers)

    hashtag
    Memory usage

    Upsert table maintains an in-memory map from the primary key to the record location. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction config in the Upsert config, which can be MD5 or MURMUR3, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.

    hashtag
    Monitoring

    Set up a dashboard over the metric pinot.server.upsertPrimaryKeysCount.tableName to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth. **** The total memory usage by upsert is roughly (primaryKeysCount * (sizeOfKeyInBytes + 24))

    hashtag
    Capacity planning

    It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the rate of the primary keys in the input stream per partition and extrapolate the data to a specific time period (based on table retention) to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an upsert table instance.

    hashtag
    Example

    Putting these together, you can find the table configurations of the quick start examples as the following:

    circle-info

    Pinot server maintains a primary key to record location map across all the segments served in an upsert-enabled table. As a result, when updating the config for an existing upsert table (e.g. change the columns in the primary key, change the comparison column), servers need to be restarted in order to apply the changes and rebuild the map.

    hashtag
    Quick Start

    To illustrate how the full upsert works, the Pinot binary comes with a quick start example. Use the following command to creates a real-time upsert table meetupRSVP.

    You can also run partial upsert demo with the following command

    As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to check out the real-time data.

    For partial upsert you can see only the value from configured column changed based on specified partial upsert strategy.

    An example for partial upsert is shown below, each of the event_id kept being unique during ingestion, meanwhile the value of rsvp_count incremented.

    To see the difference from the non-upsert table, you can use a query option skipUpsert to skip the upsert effect in the query result.

    hashtag
    FAQ

    Can I change primary key columns in existing upsert table?

    Yes, you can add or delete columns to primary keys as long as input stream is partitioned on one of the primary key columns. However, you need to restart all Pinot servers so that it can rebuild the primary key to record location map with the new columns.

    segmentPartitionConfig
    to leverage Broker segment pruning then it's important to ensure that the partition function used matches both on the Kafka producer side as well as Pinot. In Kafka default for Java client is 32-bit
    murmur2
    hash and for all other languages such as Python its
    CRC32
    (Cyclic Redundancy Check 32-bit).

    MIN

    Keep the minimum value betwen the existing value and new value (v0.12.0+)

    ) ->
    oldValue

    (null, null) -> null

  • If snapshot is enabled, persist validDocIds snapshot for immutable segments when removing segment.

  • The star-tree index cannot be used for indexing, as the star-tree index performs pre-aggregation during the ingestion.
  • Unlike append-only tables, out-of-order events (with comparison value in incoming record less than the latest available value) won't be consumed and indexed by Pinot partial upsert table, these late events will be skipped.

  • OVERWRITE

    Overwrite the column of the last record

    INCREMENT

    Add the new value to the existing values

    APPEND

    Add the new item to the Pinot unordered set

    UNION

    Add the new item to the Pinot unordered set if not exists

    IGNORE

    Ignore the new value, keep the existing value (v0.10.0+)

    MAX

    Keep the maximum value betwen the existing value and new value (v0.12.0+)

    Define the primary key in the schema
    Enable upserts in the table configurations
    sendarrow-up-right
    partitioned replica-group assignment
    partitioned replica-group instance assignment
    Query the upsert table
    Query the partial upsert table
    Explain partial upsert table
    Disable the upsert during query via query option
    upsert_meetupRsvp_schema.json
    {
        "primaryKeyColumns": ["event_id"]
    }
    {
      "upsertConfig": {
        "mode": "FULL"
      }
    }
    release-0.8.0
    {
      "upsertConfig": {
        "mode": "PARTIAL",
        "partialUpsertStrategies":{
          "rsvp_count": "INCREMENT",
          "group_name": "IGNORE",
          "venue_name": "OVERWRITE"
        }
      },
      "tableIndexConfig": {
        "nullHandlingEnabled": true
      }
    }
    release-0.10.0
    {
      "upsertConfig": {
        "mode": "PARTIAL",
        "defaultPartialUpsertStrategy": "OVERWRITE",
        "partialUpsertStrategies":{
          "rsvp_count": "INCREMENT",
          "group_name": "IGNORE"
        }
      },
      "tableIndexConfig": {
        "nullHandlingEnabled": true
      }
    }
    {
      "upsertConfig": {
        "mode": "FULL",
        "comparisonColumn": "anotherTimeColumn"
      }
    }
    {
      "upsertConfig": {
        "mode": "PARTIAL",
        "defaultPartialUpsertStrategy": "OVERWRITE",
        "partialUpsertStrategies":{},
        "comparisonColumns": ["secondsSinceEpoch", "otherComparisonColumn"]
      }
    }
    [
      {
        "event_id": "aa",
        "orderReceived": 1,
        "description" : "first",
        "secondsSinceEpoch": 1567205394
      },
      {
        "event_id": "aa",
        "orderReceived": 2,
        "description" : "update",
        "secondsSinceEpoch": 1567205397
      },
      {
        "event_id": "aa",
        "orderReceived": 3,
        "description" : "update",
        "secondsSinceEpoch": 1567205396
      },
      {
        "event_id": "aa",
        "orderReceived": 4,
        "description" : "first arrival, other column",
        "otherComparisonColumn": 1567205395
      },
      {
        "event_id": "aa",
        "orderReceived": 5,
        "description" : "late arrival, other column",
        "otherComparisonColumn": 1567205392
      },
      {
        "event_id": "aa",
        "orderReceived": 6,
        "description" : "update, other column",
        "otherComparisonColumn": 1567205398
      }
    ]
      "upsertConfig": {
        "mode": "FULL",
        "enableSnapshot": true,
        "enablePreload": true,
        "metadataTTL": 86400
      }
    }
    { 
        "upsertConfig": {  
            ... 
            "deleteRecordColumn": <column_name>
        } 
    }
    // In the Schema
    {
        ...
        {
          "name": "<delete_column_name>",
          "dataType": "BOOLEAN"
        },
        ...
    }
      "upsertConfig": {
        "mode": "FULL",
        "deleteRecordColumn": <column_name>,
        "deletedKeysTTL": 86400
      }
    }
    {
      "routing": {
        "instanceSelectorType": "strictReplicaGroup"
      }
    }
    {
      "upsertConfig": {
        "mode": "FULL",
        "enableSnapshot": true
      }
    }
    {
      "upsertConfig": {
        "mode": "FULL",
        "enableSnapshot": true,
        "enablePreload": true
      }
    }
    {
      "upsertConfig": {
        ...,
        "dropOutOfOrderRecord": true
      }
    }
    {
      "upsertConfig": {
        ...,
        "outOfOrderRecordColumn": "isOutOfOrder"
      }
    }
    select key, val from tbl1 where isOutOfOrder = false option(skipUpsert=false)
    {
      "upsertConfig": {
        "metadataManagerClass": org.apache.pinot.segment.local.upsert.CustomPartitionUpsertMetadataManager
      }
    }
    <dependency>
      <groupId>org.apache.pinot</groupId>
      <artifactId>pinot-segment-local</artifactId>
      <version>1.0.0</version>
     </dependency>
    include 'org.apache.pinot:pinot-common:1.0.0'
    //Example custom partition manager
    
    class CustomPartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {}
    //Example custom table upsert metadata manager
    
    public class CustomTableUpsertMetadataManager extends BaseTableUpsertMetadataManager {}
    {
      "upsertConfig": {
        "metadataManagerClass": org.apache.pinot.segment.local.upsert.CustomPartitionUpsertMetadataManager
      }
    }
    {
      "tableName": "upsertMeetupRsvp",
      "tableType": "REALTIME",
      "tenants": {},
      "segmentsConfig": {
        "timeColumnName": "mtime",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "replication": "1"
      },
      "tableIndexConfig": {
        "segmentPartitionConfig": {
          "columnPartitionMap": {
            "event_id": {
              "functionName": "Hashcode",
              "numPartitions": 2
            }
          }
        }
      },
      "instanceAssignmentConfigMap": {
        "CONSUMING": {
          "tagPoolConfig": {
            "tag": "DefaultTenant_REALTIME"
          },
          "replicaGroupPartitionConfig": {
            "replicaGroupBased": true,
            "numReplicaGroups": 1,
            "partitionColumn": "event_id",
            "numPartitions": 2,
            "numInstancesPerPartition": 1
          }
        }
      },
      "routing": {
        "segmentPrunerTypes": [
          "partition"
        ],
        "instanceSelectorType": "strictReplicaGroup"
      },
      "ingestionConfig": {
        "streamIngestionConfig": {
          "streamConfigMaps": [
            {
              "streamType": "kafka",
              "stream.kafka.topic.name": "upsertMeetupRSVPEvents",
              "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
              "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
              "stream.kafka.zk.broker.url": "localhost:2191/kafka",
              "stream.kafka.broker.list": "localhost:19092"
            }
          ]
        }
      },
      "upsertConfig": {
        "mode": "FULL",
        "enableSnapshot": true,
        "enablePreload": true
      },
      "fieldConfigList": [
        {
          "name": "location",
          "encodingType": "RAW",
          "indexType": "H3",
          "properties": {
            "resolutions": "5"
          }
        }
      ],
      "metadata": {
        "customConfigs": {}
      }
    }
    {
      "tableName": "upsertPartialMeetupRsvp",
      "tableType": "REALTIME",
      "tenants": {},
      "segmentsConfig": {
        "timeColumnName": "mtime",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "replication": "1"
      },
      "tableIndexConfig": {
        "segmentPartitionConfig": {
          "columnPartitionMap": {
            "event_id": {
              "functionName": "Hashcode",
              "numPartitions": 2
            }
          }
        },
        "nullHandlingEnabled": true
      },
      "instanceAssignmentConfigMap": {
        "CONSUMING": {
          "tagPoolConfig": {
            "tag": "DefaultTenant_REALTIME"
          },
          "replicaGroupPartitionConfig": {
            "replicaGroupBased": true,
            "numReplicaGroups": 1,
            "partitionColumn": "event_id",
            "numPartitions": 2,
            "numInstancesPerPartition": 1
          }
        }
      },
      "routing": {
        "segmentPrunerTypes": [
          "partition"
        ],
        "instanceSelectorType": "strictReplicaGroup"
      },
      "ingestionConfig": {
        "streamIngestionConfig": {
          "streamConfigMaps": [
            {
              "streamType": "kafka",
              "stream.kafka.topic.name": "upsertPartialMeetupRSVPEvents",
              "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
              "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
              "stream.kafka.zk.broker.url": "localhost:2191/kafka",
              "stream.kafka.broker.list": "localhost:19092"
            }
          ]
        }
      },
      "upsertConfig": {
        "mode": "PARTIAL",
        "partialUpsertStrategies": {
          "rsvp_count": "INCREMENT",
          "group_name": "UNION",
          "venue_name": "APPEND"
        }
      },
      "fieldConfigList": [
        {
          "name": "location",
          "encodingType": "RAW",
          "indexType": "H3",
          "properties": {
            "resolutions": "5"
          }
        }
      ],
      "metadata": {
        "customConfigs": {}
      }
    }
    # stop previous quick start cluster, if any
    bin/quick-start-upsert-streaming.sh
    # stop previous quick start cluster, if any
    bin/quick-start-partial-upsert-streaming.sh
    Querying the events table
    Apache Pinot 1.0 Upserts overview