arrow-left

All pages
gitbookPowered by GitBook
1 of 4

Loading...

Loading...

Loading...

Loading...

Stream ingestion

This guide shows you how to ingest a stream of records into a Pinot table.

Apache Pinot lets users consume data from streams and push it directly into the database. This process is called stream ingestion. Stream ingestion makes it possible to query data within seconds of publication.

Stream ingestion provides support for checkpoints for preventing data loss.

To set up Stream ingestion, perform the following steps, which are described in more detail in this page:

  1. Create schema configuration

  2. Create table configuration

  3. Create ingestion configuration

  4. Upload table and schema spec

Here's an example where we assume the data to be ingested is in the following format:

hashtag
Create schema configuration

The schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions , metrics, or timestamp. For more details on schema configuration, see .

For our sample data, the schema configuration looks like this:

hashtag
Create table configuration

The next step is to create a table where all the ingested data will flow and can be queried. For details about each table component, see the reference.

hashtag
Create ingestion configuration

The ingestion configuration (ingestionConfig) specifies how to ingest streaming data into Pinot. First, include a subsection for streamConfigMaps. Next, decide whether to skip table errors with _continueOnError and whether to validate time values with rowTimeValueCheck and _segmentTimeValueCheck. See details about these ingestionConfig configuration options the streamConfigMaps and Additional ingestion configs tables below:

hashtag
Information about streamConfigMaps

hashtag
Additional ingestion configurations

Config key
Description

hashtag
Example table config with ingestionConfig

For our sample data and schema, the table config will look like this:

hashtag
Upload schema and table config

Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, Pinot will start ingesting available records from the topic.

hashtag
Tune the stream config

hashtag
Throttle stream consumption

There are some scenarios where the message rate in the input stream can come in bursts which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other real-time tables on the same server. If this happens to you, throttle the consumption rate during stream ingestion to better manage overall performance.

Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit which indicates the upper bound on the message rate for the entire topic.

Here is the sample configuration on how to configure the consumption throttling:

Some things to keep in mind while tuning this config are:

  • Since this configuration applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.

  • In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.

Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:

In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION.

Note that any configuration change for topic.consumption.rate.limit in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Refer to for more details.

hashtag
Custom ingestion support

You can also write an ingestion plugin if the platform you are using is not supported out of the box. For a walkthrough, see .

hashtag
Pause stream ingestion

There are some scenarios in which you may want to pause the real-time ingestion while your table is available for queries. For example, if there is a problem with the stream ingestion and, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.

When a Pause request is issued, the controller instructs the real-time servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Note that Pause and Resume requests are async. An OK response means that instructions for pausing or resuming has been successfully sent to the real-time server. If you want to know if the consumption has actually stopped or resumed, issue a pause status request.

It's worth noting that consuming segments on real-time servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages), it will result in the table getting into an error state.

The pause and resume feature is helpful in these instances. When a pause request is issued by the operator, consuming segments are committed without starting new mutable segments. Instead, new mutable segments are started only when the resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.

There is another feature called Force Commit which utilizes the primitives of the pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.

(v 0.12.0+) Once submitted, the forceCommit API returns a jobId that can be used to get the current progress of the forceCommit operation. A sample response and status API call:

circle-info

The forceCommit request just triggers a regular commit before the consuming segments reaching the end criteria, so it follows the same mechanism as regular commit. It is one-time shot request, and not retried automatically upon failure. But it is idempotent so one may keep issuing it till success if needed.

This API is async, as it doesn't wait for the segment commit to complete. But a status entry is put in ZK to track when the request is issued and the consuming segments included. The consuming segments tracked in the status entry are compared with the latest IdealState to indicate the progress of forceCommit. However, this status is not updated or deleted upon commit success or failure, so that it could become stale. Currently, the most recent 100 status entries are kept in ZK, and the oldest ones only get deleted when the total number is about to exceed 100.

For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a pause request. Second, change the consumption parameters. Finally, issue the resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.

hashtag
Handle partition changes in streams

If a Pinot table is configured to consume using a (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.

Pinot runs a periodic task called RealtimeSegmentValidationManager that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.

If you want to recognize the new partitions sooner, then the periodic task so as to recognize such data immediately.

hashtag
Infer ingestion status of real-time tables

Often, it is important to understand the rate of ingestion of data into your real-time table. This is commonly done by looking at the consumption lag of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).

The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo API or the table's /debug API, as shown below:

A sample response from a Kafka-based real-time table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.

Term
Description

hashtag
Monitor real-time ingestion

Real-time ingestion includes 3 stages of message processing: Decode, Transform, and Index.

In each of these stages, a failure can happen which may or may not result in an ingestion failure. The following metrics are available to investigate ingestion issues:

  1. Decode stage -> an error here is recorded as INVALID_REALTIME_ROWS_DROPPED

  2. Transform stage -> possible errors here are:

    1. When a message gets dropped due to the transform, it is recorded as REALTIME_ROWS_FILTERED

There is yet another metric called ROWS_WITH_ERROR which is the sum of all error counts in the 3 stages above.

Furthermore, the metric REALTIME_CONSUMPTION_EXCEPTIONS gets incremented whenever there is a transient/permanent stream exception seen during consumption.

These metrics can be used to understand why ingestion failed for a particular table partition before diving into the server logs.

stream.[streamType].decoder.class.name

Name of class to parse the data. The class should implement the org.apache.pinot.spi.stream.StreamMessageDecoder interface.

String. Available options: - org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder - org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder - org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder - org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder - org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder - org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder

stream.[streamType].consumer.factory.class.name

Name of factory class to provide the appropriate implementation of low-level and high-level consumer, as well as the metadata

String. Available options: - org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory - org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory - org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory - org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory

stream.[streamType].consumer.prop.auto.offset.reset

Determines the offset from which to start the ingestion

- smallest - largest - timestamp in milliseconds

stream.[streamType].decoder.prop.format

Specifies the data format to ingest via a stream. The value of this property should match the format of the data in the stream.

- JSON

realtime.segment.flush.threshold.time

Maximum elapsed time after which a consuming segment persist. Note that this time should be smaller than the Kafka retention period configured for the corresponding topic.

String, such 1d or 4h30m. Default is 6h (six hours).

realtime.segment.flush.threshold.rows

The maximum number of rows to consume before persisting the consuming segment. If this value is set to 0, the configuration looks to realtime.segment.flush.threshold.segment.size below.

Default is 5,000,000

realtime.segment.flush.threshold.segment.size

Desired size of the completed segments. This value is used when realtime.segment.flush.threshold.rows is set to 0.

String, such as 150M or 1.1G., etc. Default is 200M (200 megabytes). You can also specify additional configurations for the consumer directly into streamConfigMaps. For example, for Kafka streams, add any of the configs described in to pass them directly to the Kafka consumer.

  • When the transform pipeline sets the $INCOMPLETE_RECORD_KEY$ key in the message, it is recorded as INCOMPLETE_REALTIME_ROWS_CONSUMED , only when continueOnError configuration is enabled. If the continueOnError is not enabled, the ingestion fails.

  • Index stage -> When there is failure at this stage, the ingestion typically stops and marks the partition as ERROR.

  • Config key

    Description

    Supported values

    streamType

    The streaming platform to ingest data from

    kafka

    stream.[streamType].consumer.type

    Whether to use per partition low-level consumer or high-level stream consumer

    - lowLevel: Consume data from each partition with offset management. - highLevel: Consume data without control over the partitions.

    stream.[streamType].topic.name

    Topic or data source to ingest data from

    String

    stream.[streamType].broker.list

    _continueOnError

    Set to true to skip any row indexing error and move on to the next row. Otherwise, an error evaluating a transform or filter function may block ingestion (real-time or offline), and result in data loss or corruption. Consider your use case to determine if it's preferable to set this option to false, and fail the ingestion if an error occurs to maintain data integrity.

    rowTimeValueCheck

    Set to true to validate the time column values ingested during segment upload. Validates each row of data in a segment matches the specified time format, and falls within a valid time range (1971-2071). If the value doesn't meet both criteria, Pinot replaces the value with null. This option ensures that the time values are strictly increasing and that there are no duplicates or gaps in the data.

    _segmentTimeValueCheck

    Set to true to validate the time range of the segment falls between 1971 and 2071. This option ensures data segments stored in the system are correct and consistent

    currentOffsetsMap

    Current consuming offset position per partition

    latestUpstreamOffsetMap

    (Wherever applicable) Latest offset found in the upstream topic partition

    recordsLagMap

    (Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset and currentOffset for the partition when the lag computation request is made.

    recordsAvailabilityLagMap

    (Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.

    creating a schema
    table
    Pause Stream Ingestion
    Stream Ingestion Plugin
    Low Level
    periodic task
    manually trigger
    FILTER

    List of brokers

    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
    {"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
    /tmp/pinot-quick-start/transcript-schema.json
    {
      "schemaName": "transcript",
      "dimensionFieldSpecs": [
        {
          "name": "studentID",
          "dataType": "INT"
        },
        {
          "name": "firstName",
          "dataType": "STRING"
        },
        {
          "name": "lastName",
          "dataType": "STRING"
        },
        {
          "name": "gender",
          "dataType": "STRING"
        },
        {
          "name": "subject",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "score",
          "dataType": "FLOAT"
        }
      ],
      "dateTimeFieldSpecs": [{
        "name": "timestamp",
        "dataType": "LONG",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }]
    }
    {
      "tableName": "transcript",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
      },
      "metadata": {
        "customConfigs": {}
      },
      "ingestionConfig": {
        "streamIngestionConfig": {
            "streamConfigMaps": [
              {
                "realtime.segment.flush.threshold.rows": "0",
                "stream.kafka.decoder.prop.format": "JSON",
                "key.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
                "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
                "streamType": "kafka",
                "value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
                "stream.kafka.consumer.type": "LOWLEVEL",
                "realtime.segment.flush.threshold.segment.rows": "50000",
                "stream.kafka.broker.list": "localhost:9876",
                "realtime.segment.flush.threshold.time": "3600000",
                "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
                "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
                "stream.kafka.topic.name": "transcript-topic"
              }
            ]
          },
          "transformConfigs": [],
          "continueOnError": true,
          "rowTimeValueCheck": true,
          "segmentTimeValueCheck": false
        },
        "isDimTable": false
      }
    }
    docker run \
        --network=pinot-demo \
        -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
        --name pinot-streaming-table-creation \
        apachepinot/pinot:latest AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -controllerHost pinot-quickstart \
        -controllerPort 9000 \
        -exec
    bin/pinot-admin.sh AddTable \
        -schemaFile /path/to/transcript-schema.json \
        -tableConfigFile /path/to/transcript-table-realtime.json \
        -exec
    {
      "tableName": "transcript",
      "tableType": "REALTIME",
      ...
      "ingestionConfig": {
        "streamIngestionConfig":,
        "streamConfigMaps": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "transcript-topic",
          ...
          "topic.consumption.rate.limit": 1000
        }
      },
      ...
    A consumption rate limiter is set up for topic <topic_name> in table <tableName> with rate limit: <rate_limit> (topic rate limit: <topic_rate_limit>, partition count: <partition_count>)
    $ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
    $ curl -X POST {controllerHost}/tables/{tableName}/pauseConsumption
    $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption
    $ curl -X POST {controllerHost}/tables/{tableName}/pauseStatus
    $ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
    $ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
    {
      "forceCommitJobId": "6757284f-b75b-45ce-91d8-a277bdbc06ae",
      "forceCommitStatus": "SUCCESS",
      "jobMetaZKWriteStatus": "SUCCESS"
    }
    
    $ curl -X GET {controllerHost}/tables/forceCommitStatus/6757284f-b75b-45ce-91d8-a277bdbc06ae
    {
      "jobId": "6757284f-b75b-45ce-91d8-a277bdbc06ae",
      "segmentsForceCommitted": "[\"airlineStats__0__0__20230119T0700Z\",\"airlineStats__1__0__20230119T0700Z\",\"airlineStats__2__0__20230119T0700Z\"]",
      "submissionTimeMs": "1674111682977",
      "numberOfSegmentsYetToBeCommitted": 0,
      "jobType": "FORCE_COMMIT",
      "segmentsYetToBeCommitted": [],
      "tableName": "airlineStats_REALTIME"
    }
    $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption?resumeFrom=smallest
    $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption?resumeFrom=largest
    # GET /tables/{tableName}/consumingSegmentsInfo
    curl -X GET "http://<controller_url:controller_admin_port>/tables/meetupRsvp/consumingSegmentsInfo" -H "accept: application/json"
    
    # GET /debug/tables/{tableName}
    curl -X GET "http://localhost:9000/debug/tables/meetupRsvp?type=REALTIME&verbosity=1" -H "accept: application/json"
    {
      "_segmentToConsumingInfoMap": {
        "meetupRsvp__0__0__20221019T0639Z": [
          {
            "serverName": "Server_192.168.0.103_7000",
            "consumerState": "CONSUMING",
            "lastConsumedTimestamp": 1666161593904,
            "partitionToOffsetMap": { // <<-- Deprecated. See currentOffsetsMap for same info
              "0": "6"
            },
            "partitionOffsetInfo": {
              "currentOffsetsMap": {
                "0": "6" // <-- Current consumer position
              },
              "latestUpstreamOffsetMap": {
                "0": "6"  // <-- Upstream latest position
              },
              "recordsLagMap": {
                "0": "0"  // <-- Lag, in terms of #records behind latest
              },
              "recordsAvailabilityLagMap": {
                "0": "2"  // <-- Lag, in terms of time
              }
            }
          }
        ],
    Kafka configuration pagearrow-up-right

    Amazon Kinesis

    This guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.

    To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config:

    where the Kinesis specific properties are:

    Property
    Description

    streamType

    This should be set to "kinesis"

    stream.kinesis.topic.name

    Kinesis stream name

    Kinesis supports authentication using the . The credential provider looks for the credentials in the following order -

    • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)

    • Java System Properties - aws.accessKeyId and aws.secretKey

    Although you can also specify the accessKey and secretKey in the properties above, we don't recommend this unsecure method. We recommend using it only for non-production proof-of-concept (POC) setups. You can also specify other AWS fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.

    hashtag
    Limitations

    1. ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX\_VALUE, we will overflow into the partitionId space.

    2. Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.

    {
      "tableName": "kinesisTable",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kinesis",
          "stream.kinesis.topic.name": "<your kinesis stream name>",
          "region": "<your region>",
          "accessKey": "<your access key>",
          "secretKey": "<your secret key>",
          "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
          "stream.kinesis.consumer.type": "lowlevel",
          "stream.kinesis.fetch.timeout.millis": "30000",
          "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
          "realtime.segment.flush.threshold.rows": "1000000",
          "realtime.segment.flush.threshold.time": "6h"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
  • Web Identity Token credentials from the environment or container

  • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

  • Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable,

  • Instance profile credentials delivered through the Amazon EC2 metadata service

  • region

    Kinesis region e.g. us-west-1

    accessKey

    Kinesis access key

    secretKey

    Kinesis secret key

    shardIteratorType

    Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number_,_ AT___SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number

    maxRecordsToFetch

    ... Default is 20.

    DefaultCredentialsProviderChainarrow-up-right

    Apache Kafka

    This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.

    In this page, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.

    Let's set up a demo Kafka cluster locally, and create a sample topic transcript-topic

    Start Kafka

    docker run \
        --network pinot-demo --name=kafka 
    

    Create a Kafka topic

    docker exec \
      -t kafka \
    

    Start Kafka

    Start Kafka cluster on port 9092 using the same Zookeeper from the .

    Create a Kafka topic

    Download the latest . Create a topic.

    hashtag
    Create schema configuration

    We will publish the data in the same format as mentioned in the docs. So you can use the same schema mentioned under .

    hashtag
    Create table configuration

    The real-time table configuration for the transcript table described in the schema from the previous step.

    For Kafka, we use streamType as kafka . See for available decoder class options. You can also write your own decoder by extending the StreamMessageDecoder interface and putting the jar file in plugins directory.

    The lowLevel consumer reads data per partition whereas the highLevel consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.

    For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory

    For Kafka version 2.X and above, use org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

    You can set the offset to -

    • smallest to start consumer from the earliest offset

    • largest to start consumer from the latest offset

    • timestamp in format yyyy-MM-dd'T'HH:mm:ss.SSSZ

    The resulting configuration should look as follows -

    hashtag
    Upload schema and table

    Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.

    hashtag
    Add sample data to the Kafka topic

    We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json.

    Push sample JSON into the transcript-topic Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json file.

    Checkin Kafka docker container

    Publish messages to the target topic

    hashtag
    Query the table

    As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the to checkout the real-time data.

    hashtag
    Kafka ingestion guidelines

    hashtag
    Kafka versions in Pinot

    Pinot supports 2 major generations of Kafka library - kafka-0.9 and kafka-2.x for both high and low level consumers.

    circle-info

    Post release 0.10.0, we have started shading kafka packages inside Pinot. If you are using our latest tagged docker images or master build, you should replace org.apache.kafka with shaded.org.apache.kafka in your table config.

    hashtag
    Upgrade from Kafka 0.9 connector to Kafka 2.x connector

    • Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

    • If using Stream(High) level consumer, also add config stream.kafka.hlc.bootstrap.server into tableIndexConfig.streamConfigs. This config should be the URI of Kafka broker lists, e.g.

    hashtag
    How to consume from a Kafka version > 2.0.0

    This connector is also suitable for Kafka lib version higher than 2.0.0. In , change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

    hashtag
    Kafka configurations in Pinot

    hashtag
    Use Kafka partition (low) level consumer with SSL

    Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl. are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    hashtag
    Consume transactionally-committed messages

    The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level in Kafka stream config, which can be read_committed or read_uncommitted (default). Setting it to read_committed will ingest transactionally committed messages in Kafka stream only.

    For example,

    Note that the default value of this config read_uncommitted to read all messages. Also, this config supports low-level consumer only.

    hashtag
    Use Kafka partition (low) level consumer with SASL_SSL

    Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    hashtag
    Extract record headers as Pinot table columns

    Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:

    Kafka Record
    Pinot Table Column
    Description

    In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate to true.

    In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.

    For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:

    Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.

    circle-info

    Remember to follow the when updating schema of an existing table!

    hashtag
    Tell Pinot where to find an Avro schema

    There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data]() for details.

    To avoid errors like The Avro schema must be provided, designate the location of the schema in your streamConfigs section. For example, if your current section contains the following:

    Then add this key: "stream.kafka.decoder.prop.schema"followed by a value that denotes the location of your schema.

    to start the consumer from the offset after the timestamp.
  • datetime duration or period to start the consumer from the offset after the period eg., '2d'.

  • localhost:9092
    .

    __metadata$recordTimestamp : String

    \
    -e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
    -e KAFKA_BROKER_ID=0 \
    -e KAFKA_ADVERTISED_HOST_NAME=kafka \
    -p 2181:2181 \
    -d wurstmeister/kafka:latest
    /opt/kafka/bin/kafka-topics.sh \
    --zookeeper pinot-zookeeper:2181/kafka \
    --partitions=1 --replication-factor=1 \
    --create --topic transcript-topic

    Record key: any type <K>

    __key : String

    For simplicity of design, we assume that the record key is always a UTF-8 encoded String

    Record Headers: Map<String, String>

    Each header key is listed as a separate column: __header$HeaderKeyName : String

    For simplicity of design, we directly map the string headers from kafka record to pinot table column

    Record metadata - offset : long

    __metadata$offset : String

    bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2181/kafka -port 9092
    quick-start examples
    Kafkaarrow-up-right
    Stream ingestion
    Create Schema Configuration
    Query Console arrow-up-right
    Kafka 2.0 connector pom.xmlarrow-up-right
    schema evolution guidelines
    https://docs.pinot.apache.org/basics/data-import/complex-type#infer-the-pinot-schema-from-the-avro-schema-and-json-dataarrow-up-right
    Create table configuration
    bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic transcript-topic

    Record metadata - recordTimestamp : long

    /tmp/pinot-quick-start/transcript-table-realtime.json
     {
      "tableName": "transcript",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "transcript-topic",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.broker.list": "kafka:9092",
          "realtime.segment.flush.threshold.time": "3600000",
          "realtime.segment.flush.threshold.rows": "50000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    docker run \
        --network=pinot-demo \
        -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
        --name pinot-streaming-table-creation \
        apachepinot/pinot:latest AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -controllerHost pinot-quickstart \
        -controllerPort 9000 \
        -exec
    bin/pinot-admin.sh AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -exec
    transcript.json
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
    {"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
    docker exec -ti kafka bash
    bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic transcript-topic < transcript.json
    SELECT * FROM transcript
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
            "stream.kafka.broker.list": "localhost:9092",
            "schema.registry.url": "",
            "security.protocol": "SSL",
            "ssl.truststore.location": "",
            "ssl.keystore.location": "",
            "ssl.truststore.password": "",
            "ssl.keystore.password": "",
            "ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.protocol": ""
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.isolation.level": "read_committed"
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
    "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.topic.name": "mytopic",
            "stream.kafka.consumer.prop.auto.offset.reset": "largest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.schema.registry.url": "https://xxx",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "https://xxx",
            "stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
            "stream.kafka.decoder.prop.schema.registry.basic.auth.user.info": "schema_registry_username:schema_registry_password",
            "sasl.mechanism": "PLAIN" ,
            "security.protocol": "SASL_SSL" ,
            "sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.autotune.initialRows": "3000000",
            "realtime.segment.flush.threshold.segment.size": "500M"
          },
      "dimensionFieldSpecs": [
        {
          "name": "__key",
          "dataType": "STRING"
        },
        {
          "name": "__metadata$offset",
          "dataType": "STRING"
        },
        ...
      ],
    ...
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest"
      ...
    }

    Apache Pulsar

    This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.

    Pinot supports consuming data from Apache Pulsararrow-up-right via the pinot-pulsar plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.

    Enable the Pulsar plugin with the following config at the time of Pinot setup: -Dplugins.include=pinot-pulsar

    circle-info

    The pinot-pulsar plugin is not part of official 0.10.0 binary. You can download the plugin from and add it to the libs or plugins directory in pinot.

    hashtag
    Set up Pulsar table

    Here is a sample Pulsar stream config. You can use the streamConfigs section from this sample and make changes for your corresponding table.

    hashtag
    Pulsar configuration options

    You can change the following Pulsar specifc configurations for your tables

    Property
    Description

    hashtag
    Authentication

    The Pinot-Pulsar connector supports authentication using the security tokens. You can generate the token by following the . Once generated, you can add the following property to streamConfigs to add auth token for each request

    hashtag
    TLS support

    The Pinot-pulsar connector also supports TLS for encrypted connections. You can follow to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.

    Also, make sure to change the brokers url from pulsar://localhost:6650 to pulsar+ssl://localhost:6650 so that secure connections are used.

    For other table and stream configurations, you can headover to

    hashtag
    Supported Pulsar versions

    Pinot currently relies on Pulsar client version 2.7.2. Make sure the Pulsar broker is compatible with the this client version.

    hashtag
    Extract record headers as Pinot table columns

    Pinot's Pulsar connector supports automatically extracting record headers and metadata into the Pinot table columns. Pulsar supports a large amount of per-record metadata. Please reference the for the meaning of the metadata fields.

    The following table shows the mapping for record header/metadata to Pinot table column names:

    Pulsar Message
    Pinot table Column
    Comments
    Available By Default

    In order to enable the metadata extraction in a Pulsar table, set the stream config metadata.populate to true. The fields eventTime, publishTime, brokerPublishTime, and key are populated by default. If you would like to extract additional fields from the Pulsar Message, populate the metadataFields config with a comma separated list of fields to populate. The fields are referenced by the field name in the Pulsar Message. For example, setting:

    Will make the __metadata$messageId, __metadata$messageBytes, __metadata$eventTime, and __metadata$topicName, fields available for mapping to columns in the Pinot schema.

    In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.

    For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:

    Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.

    circle-info

    Remember to follow the when updating schema of an existing table!

    __metadata$publishTime : String

    publish time as determined by the producer

    Yes

    brokerPublishTime: Optional

    __metadata$brokerPublishTime : String

    publish time as determined by the broker

    Yes

    eventTime : Long

    __metadata$eventTime : String

    Yes

    messageId : MessageId -> String

    __metadata$messageId : String

    String representation of the MessagId field. The format is ledgerId:entryId:partitionIndex

    messageId : MessageId -> bytes

    __metadata$messageBytes : String

    Base64 encoded version of the bytes returned from calling MessageId.toByteArray()

    producerName : String

    __metadata$producerName : String

    schemaVersion : byte[]

    __metadata$schemaVersion : String

    Base64 encoded value

    sequenceId : Long

    __metadata$sequenceId : String

    orderingKey : byte[]

    __metadata$orderingKey : String

    Base64 encoded value

    size : Integer

    __metadata$size : String

    topicName : String

    __metadata$topicName : String

    index : String

    __metadata$index : String

    redeliveryCount : Integer

    __metadata$redeliveryCount : String

    streamType

    This should be set to "pulsar"

    stream.pulsar.topic.name

    Your pulsar topic name

    stream.pulsar.bootstrap.servers

    Comma-separated broker list for Apache Pulsar

    stream.pulsar.metadata.populate

    set to true to populate metadata

    stream.pulsar.metadata.fields

    set to comma separated list of metadata fields

    key : String

    __key : String

    Yes

    properties : Map<String, String>

    Each header key is listed as a separate column: __header$HeaderKeyName : String

    Yes

    our external repositoryarrow-up-right
    official Pulsar documentatonarrow-up-right
    the official pulsar documentationarrow-up-right
    Table configuration Reference
    official Pulsar documentationarrow-up-right
    schema evolution guidelines

    publishTime : Long

    {
      "tableName": "pulsarTable",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "pulsar",
          "stream.pulsar.topic.name": "<your pulsar topic name>",
          "stream.pulsar.bootstrap.servers": "pulsar://localhost:6650,pulsar://localhost:6651",
          "stream.pulsar.consumer.prop.auto.offset.reset" : "smallest",
          "stream.pulsar.consumer.type": "lowlevel",
          "stream.pulsar.fetch.timeout.millis": "30000",
          "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
          "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
          "realtime.segment.flush.threshold.rows": "1000000",
          "realtime.segment.flush.threshold.time": "6h"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    "stream.pulsar.authenticationToken":"your-auth-token"
    "stream.pulsar.tlsTrustCertsFilePath": "/path/to/ca.cert.pem"
    
    "streamConfigs": {
      ...
            "stream.pulsar.metadata.populate": "true",
            "stream.pulsar.metadata.fields": "messageId,messageIdBytes,eventTime,topicName",
      ...
    }
      "dimensionFieldSpecs": [
        {
          "name": "__key",
          "dataType": "STRING"
        },
        {
          "name": "__metadata$messageId",
          "dataType": "STRING"
        },
        ...
      ],