Stream Ingestion Connectors
This documents enlists all the configurations for each of the supported stream ingestion connectors.
Applicable to all Stream Connectors
Configuration | Description |
---|---|
stream.<stream_type>.consumer.type | Type of the consumer
Allowed values: |
stream.<stream_type>.consumer.factory.class.name | Factory class to be used for the stream consumer |
stream.<stream_type>.consumer.prop.auto.offset.reset | Offset or position in the source stream from which to start consuming data
Valid values:
|
stream.<stream_type>.topic.name | Name of the source stream to consume |
stream.<stream_type>.fetch.timeout.millis | Indicates the timeout (in milliseconds) to use for each fetch call to the consumer. If the timeout expires before data becomes available, the consumer will return an empty batch.
Default Value: |
stream.<stream_type>.connection.timeout.millis | Indicates the timeout (in milliseconds) used to create the connection to the upstream (Currently, used only by Kafka 0.9 partition level consumer)
Default Value: |
stream.<stream_type>.idle.timeout.millis | If the stream remains idle (ie. without any data) for the specified time, the client connection is reset and a new consumer instance is created.
Default Value: |
stream.<stream_type>.decoder.class.name | Indicates the name of the decoder class that should be used to decoder the stream payload |
stream.<stream_type>.decoder.prop | Prefix used for any decoder specific property |
topic.consumption.rate.limit | Indicates the upper bound on the message rate for the entire topic. Use |
stream.<stream_type>.metadata.populate | When set to |
realtime.segment.flush.threshold.time | Time based flush threshold for realtime segments. Used to decides when a realtime segment is ready to be committed / closed / flushed to disk. ⚠️ This time should be smaller than the retention period configured for the corresponding topic |
realtime.segment.flush.threshold.size | The desired size of a completed realtime segment.
ℹ️This config is used only if |
realtime.segment.flush.threshold.rows | Row count based flush threshold for realtime segments. This behaves in a similar way for HLC and LLC. For HLC, since there is only one consumer per server, this size is used as the size of the consumption buffer and determines after how many rows we flush to disk. For example, if this threshold is set to two million rows, then a high level consumer would have a buffer size of two million. If this value is set to 0, then the consumers adjust the number of rows consumed by a partition such that the size of the completed segment is the desired size (unless threshold.time is reached first) |
realtime.segment.flush.autotune.initialRows | Initial number of rows to use for |
realtime.segment.commit.timeoutSeconds | Time threshold that controller will wait for the segment to be built by the server |
Kafka Partition-Level Connector
version 2.0
Config | Description |
---|---|
stream.kafka.consumer.type | Allowed Value: lowlevel |
stream.kafka.consumer.factory.class.name | Allowed Value: org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory |
stream.kafka.topic.name | (Required) Name of the kafka topic to be ingested |
stream.kafka.broker.list | (Required) Connection string for the kafka broker |
stream.kafka.buffer.size | Default Value: |
stream.kafka.socket.timeout | Default Value: |
stream.kafka.fetcher.size | Default Value: |
stream.kafka.isolation.level | Allowed Value: |
Kinesis Partition-level Connector
Config | Description |
---|---|
stream.kinesis.consumer.type | Allowed Value: lowlevel |
stream.kinesis.consumer.factory.class.name | Allowed Value: org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory |
stream.kinesis.topic.name | (Required) Name of the Kinesis data stream to consume |
region | (Required) The AWS region where the configured Kinesis data stream resides |
maxRecordsToFetch | Maximum records to fetch during a single GetRecord request
Default: |
shardIteratorType | Similar to Kafka's offset reset property - indicates the point in the AWS Kinesis data stream from where the consumption should begin
Allowed Values: |
Key-based Authentication Properties
Config | Description |
---|---|
accessKey | (Required) AWS Access key used to access the AWS Kinesis Data stream |
secretKey | (Required) AWS Secret key used to access the AWS Kinesis Data stream |
IAM Role-based Authentication Properties
Config | Description |
---|---|
iamRoleBasedAccessEnabled | Set to |
roleArn | (Required) ARN of cross-account IAM role |
roleSessionName | Unique identifier for a session when the client assumes the IAM role
Default: |
externalId | Unique identifier used to manage trust between AWS accounts and prevent the confused deputy problem. More details here |
sessionDurationSeconds | Duration of the role session in seconds
Default: |
asyncSessionUpdateEnabled | Flag to determine with the session update should be enabled
Default: |
Last updated