Stream Ingestion Connectors

This documents enlists all the configurations for each of the supported stream ingestion connectors.

Applicable to all Stream Connectors

Configuration
Description

stream.<stream_type>.consumer.type

Type of the consumer Allowed values: lowlevel , highlevel Case-insensitive

stream.<stream_type>.consumer.factory.class.name

Factory class to be used for the stream consumer

stream.<stream_type>.consumer.prop.auto.offset.reset

Offset or position in the source stream from which to start consuming data Valid values: smallest - Start consuming from the earliest data in the stream largest - Start consuming from the latest data in the stream timestamp - Start consuming from the offset after a timestamp , which is specified in the format yyyy-MM-dd'T'HH:mm:ss.SSSZ datetime - Start consuming from the offset after the specified period or duration from current time. Eg: 2d Default Value: largest

stream.<stream_type>.topic.name

Name of the source stream to consume

stream.<stream_type>.fetch.timeout.millis

Indicates the timeout (in milliseconds) to use for each fetch call to the consumer. If the timeout expires before data becomes available, the consumer will return an empty batch. Default Value: 5_000

stream.<stream_type>.connection.timeout.millis

Indicates the timeout (in milliseconds) used to create the connection to the upstream (Currently, used only by Kafka 0.9 partition level consumer) Default Value: 30_000

stream.<stream_type>.idle.timeout.millis

If the stream remains idle (ie. without any data) for the specified time, the client connection is reset and a new consumer instance is created. Default Value: 180_000

stream.<stream_type>.decoder.class.name

Indicates the name of the decoder class that should be used to decoder the stream payload

stream.<stream_type>.decoder.prop

Prefix used for any decoder specific property

topic.consumption.rate.limit

Indicates the upper bound on the message rate for the entire topic. Use -1 to ignore this config. Default Value: -1 See here for more details.

stream.<stream_type>.metadata.populate

When set to true, the supported consumer may extract the key, user headers and record metadata from the incoming payload. Currently, this is supported in Kafka connector only.

realtime.segment.flush.threshold.time

Time based flush threshold for realtime segments. Used to decides when a realtime segment is ready to be committed / closed / flushed to disk. ⚠️ This time should be smaller than the retention period configured for the corresponding topic

realtime.segment.flush.threshold.size

The desired size of a completed realtime segment. ℹ️This config is used only if realtime.segment.flush.threshold.rows is set to 0.

realtime.segment.flush.threshold.rows

Row count based flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC,

since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows,

then a high level consumer would have a buffer size of two million.

If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless

threshold.time is reached first)

realtime.segment.flush.autotune.initialRows

Initial number of rows to use for SegmentSizeBasedFlushThresholdUpdater . This threshold updater is used by the controller to compute the new segment's flush threshold based on the previous segment's size. ⚠️ This flush threshold updater is used only when realtime.segment.flush.threshold.rows is set to <=0 . Otherwise, the DefaultFlushThresholdUpdater is used.

realtime.segment.commit.timeoutSeconds

Time threshold that controller will wait for the segment to be built by the server

Kafka Partition-Level Connector

version 2.0

Config
Description

stream.kafka.consumer.type

Allowed Value: lowlevel

stream.kafka.consumer.factory.class.name

Allowed Value: org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

stream.kafka.topic.name

(Required) Name of the kafka topic to be ingested

stream.kafka.broker.list

(Required) Connection string for the kafka broker

stream.kafka.buffer.size

Default Value: 512000

stream.kafka.socket.timeout

Default Value: 10000

stream.kafka.fetcher.size

Default Value: 100000

stream.kafka.isolation.level

Allowed Value: read_committed , read_uncommitted Default: read_uncommitted

Kinesis Partition-level Connector

Config
Description

stream.kinesis.consumer.type

Allowed Value: lowlevel

stream.kinesis.consumer.factory.class.name

Allowed Value: org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory

stream.kinesis.topic.name

(Required) Name of the Kinesis data stream to consume

region

(Required) The AWS region where the configured Kinesis data stream resides

maxRecordsToFetch

Maximum records to fetch during a single GetRecord request Default: 20

shardIteratorType

Similar to Kafka's offset reset property - indicates the point in the AWS Kinesis data stream from where the consumption should begin Allowed Values: TRIM_HORIZON , LATEST

Key-based Authentication Properties

Config
Description

accessKey

(Required) AWS Access key used to access the AWS Kinesis Data stream

secretKey

(Required) AWS Secret key used to access the AWS Kinesis Data stream

IAM Role-based Authentication Properties

Config
Description

iamRoleBasedAccessEnabled

Set to true when using IAM role-based authentication for connecting to the AWS Kinesis Data Stream Default: false

roleArn

(Required) ARN of cross-account IAM role

roleSessionName

Unique identifier for a session when the client assumes the IAM role Default: pinot-kinesis-<UUID>

externalId

Unique identifier used to manage trust between AWS accounts and prevent the confused deputy problem. More details here

sessionDurationSeconds

Duration of the role session in seconds Default: 900

asyncSessionUpdateEnabled

Flag to determine with the session update should be enabled Default: true

Last updated