LogoLogo
release-0.10.0
release-0.10.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
      • Controller
      • Broker
      • Server
      • Minion
      • Tenant
      • Schema
      • Table
      • Segment
      • Deep Store
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • Batch Ingestion
        • Spark
        • Hadoop
        • Backfill Data
        • Dimension Table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
      • Complex Type (Array, Map) Handling
    • Indexing
      • Forward Index
      • Inverted Index
      • Star-Tree Index
      • Bloom Filter
      • Range Index
      • Text search support
      • JSON Index
      • Geospatial
      • Timestamp Index
    • Releases
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Filtering with IdSet
      • Transformation Functions
      • Aggregation Functions
      • User-Defined Functions (UDFs)
      • Cardinality Estimation
      • Lookup UDF Join
      • Querying JSON data
      • Explain Plan
      • Grouping Algorithm
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update Documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Transformations
      • Null Value Support
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Setup cluster
      • Setup table
      • Setup ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Tiered Storage
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Access Control
      • Monitoring
      • Tuning
        • Realtime
        • Routing
      • Upgrading Pinot with confidence
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication, Authorization, and ACLs
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Schema
    • Ingestion Job Spec
    • Functions
      • ABS
      • ADD
      • arrayConcatInt
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctString
      • arrayDistinctInt
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FLOOR
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • hour
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • quarter
      • regexpExtract
      • remove
      • replace
      • reverse
      • round
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • VALUEIN
      • week
      • year
      • yearOfWeek
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
Powered by GitBook
On this page
  • Requirements to support Stream Level (High Level) consumers
  • Requirements to support Partition Level (Low Level) consumers
  • Stream plug-in implementation

Was this helpful?

Export as PDF
  1. For Developers
  2. Plugins
  3. Write Custom Plugins

Stream Ingestion Plugin

This page describes how to write your own stream ingestion plugin for Pinot.

You can write custom stream ingestion plugins to add support for your own streaming platforms such as Pravega, Kinesis etc.

Stream Ingestion Plugins can be of two types -

  • High Level - Consume data without control over the partitions

  • Low Level - Consume data from each partition with offset management

Requirements to support Stream Level (High Level) consumers

The stream should provide the following guarantees:

  • Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.

  • (Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.

  • Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.

  • The checkpoints should be recorded only when Pinot makes a call to do so.

  • The consumer should be able to start consumption from one of:

    • latest available data

    • earliest available data

    • last saved checkpoint

Requirements to support Partition Level (Low Level) consumers

While consuming rows at a partition level, the stream should support the following properties:

  • Stream should provide a mechanism to get the current number of partitions.

  • Each event in a partition should have a unique offset that is not more than 64 bits long.

  • Refer to a partition as a number not exceeding 32 bits long.

  • Stream should provide the following mechanisms to get an offset for a given partition of the stream:

    • get the offset of the oldest event available (assuming events are aged out periodically) in the partition.

    • get the offset of the most recent event published in the partition

    • (optionally) get the offset of an event that was published at a specified time

  • Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.

  • Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1, then the offset o2 of the following event should be such that o2 > o1.

In addition, we have an operational requirement that the number of partitions should not be reduced over time.

Stream plug-in implementation

In order to add a new type of stream (say, Foo) implement the following classes:

Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.

The properties for the stream implementation are to be set in the table configuration, inside streamConfigs section.

Use the streamType property to define the stream type. For example, for the implementation of stream foo, set the property "streamType" : "foo".

The rest of the configuration properties for your stream should be set with the prefix "stream.foo". Be sure to use the same suffix for: (see examples below):

  • topic

  • consumer type

  • stream consumer factory

  • offset

  • decoder class name

  • decoder properties

  • connection timeout

  • fetch timeout

All values should be strings. For example:

"streamType" : "foo",
"stream.foo.topic.name" : "SomeTopic",
"stream.foo.consumer.type": "LowLevel",
"stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
"stream.foo.consumer.prop.auto.offset.reset": "largest",
"stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
"stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
"stream.foo.connection.timeout.millis" : "10000", // default 30_000
"stream.foo.fetch.timeout.millis" : "10000" // default 5_000

You can have additional properties that are specific to your stream. For example:

"stream.foo.some.buffer.size" : "24g"

In addition to these properties, you can define thresholds for the consuming segments:

  • rows threshold

  • time threshold

The properties for the thresholds are as follows:

"realtime.segment.flush.threshold.rows" : "100000"
"realtime.segment.flush.threshold.time" : "6h"
PreviousBatch Segment Fetcher PluginNextDesign Documents

Last updated 3 years ago

Was this helpful?

FooConsumerFactory extends

FooPartitionLevelConsumer implements

FooStreamLevelConsumer implements

FooMetadataProvider implements

FooMessageDecoder implements

An example of this implementation can be found in the , which is an implementation for the kafka stream.

StreamConsumerFactory
PartitionLevelConsumer
StreamLevelConsumer
StreamMetadataProvider
StreamMessageDecoder
KafkaConsumerFactory