arrow-left

All pages
gitbookPowered by GitBook
1 of 3

Loading...

Loading...

Loading...

Stream ingestion

Apache Pinot allows user to consume data from streams and push it directly to pinot database. This process is known as Stream Ingestion. Stream Ingestion allows user to query data within seconds of publishing.

Stream Ingestion provides support for checkpoints out of the box for preventing data loss.

Stream ingestion requires the following steps -

  1. Create schema configuration

  2. Create table configuration

  3. Upload table and schema spec

Let's take a look at each of the following steps in a bit more detail. Let us assume the data to be ingested is in the following format -

hashtag
Create Schema Configuration

Schema defines the fields along with their data types which are available in the datasource. Schema also defines the fields which serve as dimensions , metrics and timestamp respectively.

Follow for more details on schema configuration. For our sample data, the schema configuration should look as follows

hashtag
Create Table Configuration

The next step is to create a table where all the ingested data will flow and can be queried. Unlike batch ingestion, table configuration for realtime ingestion also triggers the data ingestion job.For a more detailed overview about tables, check out the reference.

The realtime table configuration consists of the the following fields -

  • tableName - The name of the table where the data should flow

  • tableType - The internal type for the table. Should always be set to REALTIME for realtime ingestion

  • segmentsConfig -

You can also specify additional configs for the consumer by prefixing the key with stream.[streamType] where streamType is the name of the streaming platform. For our sample data and schema, the table config will look like -

hashtag
Upload schema and table config

Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, pinot will start ingesting available records from the topic.

hashtag
Custom Ingestion Support

We are working on adding more integrations such as Kinesis out of the box. You can easily write your on ingestion plugin in case it is not supported out of the box. Follow for a walkthrough.

tableIndexConfig - defines which column to use for indexing along with the type of index. You can refer [Indexing Configs] for full configuration. It consists of the following required fields -

  • loadMode - specifies how the segments should be loaded. Should be one of heap or mmap. Here's the difference between both the configs

  • streamConfig - specifies the datasource along with the necessary configs to start consuming the realtime data. The streamConfig can be thought of as equivalent of job spec in case of batch ingestion. The following options are supported in this config -

String

stream.[streamType].consumer.factory.class.name

name of the factory class to be used to provide the appropriate implementation of low level and high level consumer as well as the metadata

String

stream.[streamType].consumer.prop.auto.offset.reset

determines the offset from which to start the ingestion

smallest largest or timestamp in milliseconds

realtime.segment.flush.threshold.time

Time threshold that will keep the realtime segment open for before we complete the segment

realtime.segment.flush.threshold.size

Row count flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,

since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,

then a high level consumer would have a buffer size of two million.

If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless

threshold.time is reached first)

realtime.segment.flush.desired.size

The desired size of a completed realtime segment.This config is used only if threshold.size is set to 0.

Config key

Description

Supported values

streamType

the streaming platform from which to consume the data

kafka

stream.[streamType].consumer.type

whether to use per partition low-level consumer or high-level stream consumer

lowLevel or highLevel

stream.[streamType].topic.name

the datasource (e.g. topic, data stream) from which to consume the data

String

stream.[streamType].decoder.class.name

creating a schema
table
Stream Ingestion Plugin

name of the class to be used for parsing the data. The class should implement org.apache.pinot.spi.stream.StreamMessageDecoder interface

heap: Segments are loaded on direct-memory. Note, 'heap' here is a legacy misnomer, and it does not
      imply JVM heap. This mode should only be used when we want faster performance than memory-mapped files,
       and are also sure that we will never run into OOM.
mmap: Segments are loaded on memory-mapped file. This is the default mode.
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
{"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
/tmp/pinot-quick-start/transcript-schema.json
{
  "schemaName": "transcript",
  "dimensionFieldSpecs": [
    {
      "name": "studentID",
      "dataType": "INT"
    },
    {
      "name": "firstName",
      "dataType": "STRING"
    },
    {
      "name": "lastName",
      "dataType": "STRING"
    },
    {
      "name": "gender",
      "dataType": "STRING"
    },
    {
      "name": "subject",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "score",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "timestamp",
    "dataType": "LONG",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}
{
  "tableName": "transcript",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "transcript-topic",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "localhost:9876",
      "realtime.segment.flush.threshold.time": "3600000",
      "realtime.segment.flush.threshold.size": "50000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
docker run \
    --network=pinot-demo \
    -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
    --name pinot-streaming-table-creation \
    apachepinot/pinot:latest AddTable \
    -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
    -controllerHost pinot-quickstart \
    -controllerPort 9000 \
    -exec
bin/pinot-admin.sh AddTable \
    -schemaFile /path/to/transcript-schema.json \
    -tableConfigFile /path/to/transcript-table-realtime.json \
    -exec

Amazon Kinesis

circle-exclamation

This is not tested in production. You may hit some snags while trying to use this.

To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into the table config

where the Kinesis specific properties are:

Property

Kinesis supports authentication using the . The credential provider looks for the credentials in the following order -

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK)

  • Java System Properties - aws.accessKeyId and aws.secretKey

You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups. You can also specify other aws fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.

hashtag
Limitations

  1. ShardID is of the format "shardId-000000000001". We use the numeric part as partitionId. Our partitionId variable is integer. If shardIds grow beyond Integer.MAX_VALUE, we will overflow

  2. Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.

{
  "tableName": "kinesisTable",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kinesis",
      "stream.kinesis.topic.name": "<your kinesis stream name>",
      "region": "<your region>",
      "accessKey": "<your access key>",
      "secretKey": "<your secret key>",
      "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
      "stream.kinesis.consumer.type": "lowlevel",
      "stream.kinesis.fetch.timeout.millis": "30000",
      "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
      "realtime.segment.flush.threshold.size": "1000000",
      "realtime.segment.flush.threshold.time": "6h"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
  • Web Identity Token credentials from the environment or container

  • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

  • Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable,

  • Instance profile credentials delivered through the Amazon EC2 metadata service

  • Description

    streamType

    This should be set to "kinesis"

    stream.kinesis.topic.name

    Kinesis stream name

    region

    Kinesis region e.g. us-west-1

    accessKey

    Kinesis access key

    secretKey

    Kinesis secret key

    shardIteratorType

    Set to "LATEST" for largest offset (default),TRIM_HORIZON for earliest offset.

    maxRecordsToFetch

    ... Default is 20.

    DefaultCredentialsProviderChainarrow-up-right

    Apache Kafka

    This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.

    hashtag
    Introduction

    In this guide, you'll learn how to import data into Pinot using Apache Kafka for real-time stream ingestion. Pinot has out-of-the-box real-time ingestion support for Kafka.

    Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic

    Start Kafka

    Create a Kafka Topic

    Start Kafka

    Start Kafka cluster on port 9876 using the same Zookeeper from the .

    Create a Kafka topic

    Download the latest . Create a topic.

    hashtag
    Creating Schema Configuration

    We will publish the data in the same format as mentioned in the docs. So you can use the same schema mentioned under .

    hashtag
    Creating a table configuration

    The real-time table configuration for the transcript table described in the schema from the previous step.

    For Kafka, we use streamType as kafka . Currently only JSON format is supported but you can easily write your own decoder by extending the StreamMessageDecoder interface. You can then access your decoder class by putting the jar file in plugins directory

    The lowLevel consumer reads data per partition whereas the highLevel consumer utilises Kafka high level consumer to read data from the whole stream. It doesn't have the control over which partition to read at a particular momemt.

    For Kafka versions below 2.X, use org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory

    For Kafka version 2.X and above, use org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

    You can set the offset to -

    • smallest to start consumer from the earliest offset

    • largest to start consumer from the latest offset

    • timestamp in milliseconds

    The resulting configuration should look as follows -

    hashtag
    Upgrade from Kafka 0.9 connector to Kafka 2.x connector

    • Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

    • If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server into tableIndexConfig.streamConfigs. This config should be the URI of Kafka broker lists, e.g.

    hashtag
    How to consume from higher Kafka version?

    This connector is also suitable for Kafka lib version higher than 2.0.0. In , change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

    hashtag
    Upload schema and table

    Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the real-time table is created, it will begin ingesting available records from the Kafka topic.

    hashtag
    Add sample data to the Kafka topic

    We will publish data in the following format to Kafka. Let us save the data in a file named as transcript.json.

    Push sample JSON into the transcript-topic Kafka topic, using the Kafka console producer. This will add 12 records to the topic described in the transcript.json file.

    hashtag
    Ingesting streaming data

    As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the to checkout the real-time data.

    hashtag
    Some More kafka ingestion configs

    hashtag
    Use Kafka Partition(Low) Level Consumer with SSL

    Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl. are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry. are for SchemaRegistryClient used by KafkaConfluentSchemaRegistryAvroMessageDecoder.

    hashtag
    Ingest transactionally committed messages only from Kafka

    With Kafka consumer 2.0, you can ingest transactionally committed messages only by configuring kafka.isolation.level to read_committed. For example,

    Note that the default value of this config read_uncommitted to read all messages. Also, this config supports low-level consumer only.

    to start the consumer from the offset after the timestamp.
    localhost:9092
    .
    bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2123/kafka -port 9876
    quick-start examples
    Kafkaarrow-up-right
    Stream ingestion
    Create Schema Configuration
    Kafka 2.0 connector pom.xmlarrow-up-right
    Query Console arrow-up-right
    docker run \
        --network pinot-demo --name=kafka \
        -e KAFKA_ZOOKEEPER_CONNECT=pinot-quickstart:2123/kafka \
        -e KAFKA_BROKER_ID=0 \
        -e KAFKA_ADVERTISED_HOST_NAME=kafka \
        -d wurstmeister/kafka:latest
    docker exec \
      -t kafka \
      /opt/kafka/bin/kafka-topics.sh \
      --zookeeper pinot-quickstart:2123/kafka \
      --partitions=1 --replication-factor=1 \
      --create --topic transcript-topic
    bin/kafka-topics.sh --create --bootstrap-server localhost:9876 --replication-factor 1 --partitions 1 --topic transcript-topic
    /tmp/pinot-quick-start/transcript-table-realtime.json
     {
      "tableName": "transcript",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.topic.name": "transcript-topic",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.broker.list": "localhost:9876",
          "realtime.segment.flush.threshold.time": "3600000",
          "realtime.segment.flush.threshold.size": "50000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    docker run \
        --network=pinot-demo \
        -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
        --name pinot-streaming-table-creation \
        apachepinot/pinot:latest AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -controllerHost pinot-quickstart \
        -controllerPort 9000 \
        -exec
    bin/pinot-admin.sh AddTable \
        -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
        -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
        -exec
    transcript.json
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
    {"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
    {"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
    {"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
    {"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
    {"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
    bin/kafka-console-producer.sh \
        --broker-list localhost:9876 \
        --topic transcript-topic < transcript.json
    SELECT * FROM transcript
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "localhost:2191/kafka",
            "stream.kafka.broker.list": "localhost:9876",
            "schema.registry.url": "",
            "security.protocol": "SSL",
            "ssl.truststore.location": "",
            "ssl.keystore.location": "",
            "ssl.truststore.password": "",
            "ssl.keystore.password": "",
            "ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.rest.url": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
            "stream.kafka.decoder.prop.schema.registry.ssl.protocol": "",
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }
      {
        "tableName": "transcript",
        "tableType": "REALTIME",
        "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "schemaName": "transcript",
        "replicasPerPartition": "1"
        },
        "tenants": {},
        "tableIndexConfig": {
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "LowLevel",
            "stream.kafka.topic.name": "transcript-topic",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "localhost:2191/kafka",
            "stream.kafka.broker.list": "localhost:9876",
            "stream.kafka.isolation.level": "read_committed"
          }
        },
        "metadata": {
          "customConfigs": {}
        }
      }