Stream Ingestion Connectors
This documents lists all the configurations for each of the supported stream ingestion connectors.
Configuration | Description |
---|---|
stream.<stream_type>.consumer.type | Type of the consumer
Allowed values: lowlevel , highlevel
Case-insensitive |
stream.<stream_type>.consumer.factory.class.name | Factory class to be used for the stream consumer |
stream.<stream_type>.consumer.prop.auto.offset.reset | Offset or position in the source stream from which to start consuming data
Valid values:
smallest - Start consuming from the earliest data in the stream
largest - Start consuming from the latest data in the stream
timestamp - Start consuming from the offset after a timestamp , which is specified in the format yyyy-MM-dd'T'HH:mm:ss.SSSZ
datetime - Start consuming from the offset after the specified period or duration from current time. Eg: 2d
Default Value: largest |
stream.<stream_type>.topic.name | Name of the source stream to consume |
stream.<stream_type>.fetch.timeout.millis | Indicates the timeout (in milliseconds) to use for each fetch call to the consumer. If the timeout expires before data becomes available, the consumer will return an empty batch.
Default Value: 5_000 |
stream.<stream_type>.connection.timeout.millis | Indicates the timeout (in milliseconds) used to create the connection to the upstream (Currently, used only by Kafka 0.9 partition level consumer)
Default Value: 30_000 |
stream.<stream_type>.idle.timeout.millis | If the stream remains idle (ie. without any data) for the specified time, the client connection is reset and a new consumer instance is created.
Default Value: 180_000 |
stream.<stream_type>.decoder.class.name | Indicates the name of the decoder class that should be used to decoder the stream payload |
stream.<stream_type>.decoder.prop | Prefix used for any decoder specific property |
topic.consumption.rate.limit | Indicates the upper bound on the message rate for the entire topic. Use -1 to ignore this config.
Default Value: -1
See here for more details. |
stream.<stream_type>.metadata.populate | When set to true , the supported consumer may extract the key, user headers and record metadata from the incoming payload.
Currently, this is supported in Kafka connector only. |
realtime.segment.flush.threshold.time | Time based flush threshold for realtime segments. Used to decides when a realtime segment is ready to be committed / closed / flushed to disk.
⚠ |
realtime.segment.flush.threshold.size | The desired size of a completed realtime segment.
ℹ realtime.segment.flush.threshold.rows is set to 0. |
realtime.segment.flush.threshold.rows | Row count based flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC, since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows, then a high level consumer would have a buffer size of two million. If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless threshold.time is reached first) |
realtime.segment.flush.autotune.initialRows | Initial number of rows to use for SegmentSizeBasedFlushThresholdUpdater . This threshold updater is used by the controller to compute the new segment's flush threshold based on the previous segment's size.
⚠ realtime.segment.flush.threshold.rows is set to <=0 . Otherwise, the DefaultFlushThresholdUpdater is used. |
realtime.segment.commit.timeoutSeconds | Time threshold that controller will wait for the segment to be built by the server |
Config | Description |
---|---|
stream.kafka.consumer.type | Allowed Value: lowlevel |
stream.kafka.consumer.factory.class.name | Allowed Value: org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory |
stream.kafka.topic.name | (Required) Name of the kafka topic to be ingested |
stream.kafka.broker.list | (Required) Connection string for the kafka broker |
stream.kafka.buffer.size | Default Value: 512000 |
stream.kafka.socket.timeout | Default Value: 10000 |
stream.kafka.fetcher.size | Default Value: 100000 |
stream.kafka.isolation.level | Allowed Value: read_committed , read_uncommitted
Default: read_uncommitted
Note: This must be set to read_committed when using transactions in Kafka. |
Config | Description |
---|---|
stream.kinesis.consumer.type | Allowed Value: lowlevel |
stream.kinesis.consumer.factory.class.name | Allowed Value: org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory |
stream.kinesis.topic.name | (Required) Name of the Kinesis data stream to consume |
region | (Required) The AWS region where the configured Kinesis data stream resides |
maxRecordsToFetch | Maximum records to fetch during a single GetRecord request
Default: 20 |
shardIteratorType | Similar to Kafka's offset reset property - indicates the point in the AWS Kinesis data stream from where the consumption should begin
Allowed Values: TRIM_HORIZON , LATEST |
Config | Description |
---|---|
accessKey | (Required) AWS Access key used to access the AWS Kinesis Data stream |
secretKey | (Required) AWS Secret key used to access the AWS Kinesis Data stream |
Config | Description |
---|---|
iamRoleBasedAccessEnabled | Set to true when using IAM role-based authentication for connecting to the AWS Kinesis Data Stream
Default: false |
roleArn | (Required) ARN of cross-account IAM role |
roleSessionName | Unique identifier for a session when the client assumes the IAM role
Default: pinot-kinesis-<UUID> |
externalId | Unique identifier used to manage trust between AWS accounts and prevent the confused deputy problem. More details here |
sessionDurationSeconds | Duration of the role session in seconds
Default: 900 |
asyncSessionUpdateEnabled | Flag to determine with the session update should be enabled
Default: true |
Last modified 4mo ago