# Stream Ingestion Connectors

## Applicable to all Stream Connectors

<table><thead><tr><th width="281">Configuration</th><th>Description</th></tr></thead><tbody><tr><td>stream.&#x3C;stream_type>.consumer.factory.class.name</td><td>Factory class to be used for the stream consumer</td></tr><tr><td>stream.&#x3C;stream_type>.consumer.prop.auto.offset.reset</td><td>Offset or position in the source stream from which to start consuming data<br><strong>Valid values:</strong><br><strong><code>smallest</code></strong> - Start consuming from the earliest data in the stream<br><strong><code>largest</code></strong> - Start consuming from the latest data in the stream<br><strong><code>timestamp</code></strong> - Start consuming from the offset after a timestamp , which is specified in the format <code>yyyy-MM-dd'T'HH:mm:ss.SSSZ</code><br><strong><code>datetime</code> -</strong> Start consuming from the offset after the specified period or duration from current time. Eg: <code>2d</code><br><strong>Default Value:</strong> <code>largest</code></td></tr><tr><td>stream.&#x3C;stream_type>.topic.name</td><td>Name of the source stream to consume</td></tr><tr><td>stream.&#x3C;stream_type>.fetch.timeout.millis</td><td>Indicates the timeout (in milliseconds) to use for each fetch call to the consumer. If the timeout expires before data becomes available, the consumer will return an empty batch.<br><strong>Default Value:</strong> <code>5_000</code></td></tr><tr><td>stream.&#x3C;stream_type>.connection.timeout.millis</td><td>Indicates the timeout (in milliseconds) used to create the connection to the upstream (Currently, used only by Kafka 0.9 partition level consumer)<br><strong>Default Value:</strong> <code>30_000</code></td></tr><tr><td>stream.&#x3C;stream_type>.idle.timeout.millis</td><td>If the stream remains idle (ie. without any data) for the specified time, the client connection is reset and a new consumer instance is created.<br><strong>Default Value:</strong> <code>180_000</code></td></tr><tr><td>stream.&#x3C;stream_type>.decoder.class.name</td><td>Indicates the name of the decoder class that should be used to decoder the stream payload</td></tr><tr><td>stream.&#x3C;stream_type>.decoder.prop</td><td>Prefix used for any decoder specific property</td></tr><tr><td>topic.consumption.rate.limit</td><td>Indicates the upper bound on the message rate for the entire topic. Use <code>-1</code> to ignore this config.<br><strong>Default Value:</strong> <code>-1</code><br>See <a href="https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion#throttling-stream-consumption">here</a> for more details.</td></tr><tr><td>stream.&#x3C;stream_type>.metadata.populate</td><td>When set to <code>true</code>, the supported consumer may extract the key, user headers and record metadata from the incoming payload.<br>Currently, this is supported in Kafka connector only.</td></tr><tr><td>realtime.segment.flush.threshold.time</td><td>Time based flush threshold for realtime segments. Used to decides when a realtime segment is ready to be committed / closed / flushed to disk.<br><br><span data-gb-custom-inline data-tag="emoji" data-code="26a0">⚠️</span> This time should be smaller than the retention period configured for the corresponding topic</td></tr><tr><td>realtime.segment.flush.threshold.size</td><td>The size a completed realtime segment should be.<br><br><span data-gb-custom-inline data-tag="emoji" data-code="2139">ℹ️</span>This config is used only if <code>realtime.segment.flush.threshold.rows</code> is set to 0.</td></tr><tr><td>realtime.segment.flush.threshold.rows</td><td><p>Row count based flush threshold for realtime segments.</p><p>If this value is set to 0, then the consumers adjust the number of rows consumed by a partition so the completed segment is the correct size (unless</p><p>threshold.time is reached first)</p></td></tr><tr><td>realtime.segment.flush.autotune.initialRows</td><td>Initial number of rows to use for <code>SegmentSizeBasedFlushThresholdUpdater</code> . This threshold updater is used by the controller to compute the new segment's flush threshold based on the previous segment's size.<br><span data-gb-custom-inline data-tag="emoji" data-code="26a0">⚠️</span> This flush threshold updater is used only when <code>realtime.segment.flush.threshold.rows</code> is set to <code>&#x3C;=0</code> . Otherwise, the <code>DefaultFlushThresholdUpdater</code> is used.</td></tr><tr><td>realtime.segment.commit.timeoutSeconds</td><td>Time threshold that controller will wait for the segment to be built by the server</td></tr></tbody></table>

## Kafka Partition-Level Connector

### version 2.0

<table><thead><tr><th width="285">Config</th><th>Description</th></tr></thead><tbody><tr><td>stream.kafka.consumer.factory.class.name</td><td><strong>Allowed Value:</strong> org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory</td></tr><tr><td>stream.kafka.topic.name</td><td>(Required) Name of the kafka topic to be ingested</td></tr><tr><td>stream.kafka.broker.list</td><td>(Required) Connection string for the kafka broker</td></tr><tr><td>stream.kafka.buffer.size</td><td><strong>Default Value:</strong> <code>512000</code></td></tr><tr><td>stream.kafka.socket.timeout</td><td><strong>Default Value:</strong> <code>10000</code></td></tr><tr><td>stream.kafka.fetcher.size</td><td><strong>Default Value:</strong> <code>100000</code></td></tr><tr><td>stream.kafka.isolation.level</td><td><strong>Allowed Value:</strong> <code>read_committed</code> <strong>,</strong> <code>read_uncommitted</code><br><strong>Default:</strong> <code>read_uncommitted</code><br><br><strong>Note:</strong> This must be set to <code>read_committed</code> when using transactions in Kafka.</td></tr></tbody></table>

## Kinesis Partition-level Connector

<table><thead><tr><th width="293">Config</th><th>Description</th></tr></thead><tbody><tr><td>stream.kinesis.consumer.factory.class.name</td><td><strong>Allowed Value:</strong> org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory</td></tr><tr><td>stream.kinesis.topic.name</td><td>(Required) Name of the Kinesis data stream to consume</td></tr><tr><td>region</td><td>(Required) The AWS region where the configured Kinesis data stream resides</td></tr><tr><td>maxRecordsToFetch</td><td>Maximum records to fetch during a single GetRecord request<br><strong>Default:</strong> <code>20</code></td></tr><tr><td>shardIteratorType</td><td>Similar to Kafka's offset reset property - indicates the point in the AWS Kinesis data stream from where the consumption should begin<br><strong>Allowed Values:</strong> <code>TRIM_HORIZON</code> , <code>LATEST</code></td></tr></tbody></table>

### Key-based Authentication Properties

<table><thead><tr><th width="301">Config</th><th>Description</th></tr></thead><tbody><tr><td>accessKey</td><td>(Required) AWS Access key used to access the AWS Kinesis Data stream</td></tr><tr><td>secretKey</td><td>(Required) AWS Secret key used to access the AWS Kinesis Data stream</td></tr></tbody></table>

### IAM Role-based Authentication Properties

<table><thead><tr><th width="301">Config</th><th>Description</th></tr></thead><tbody><tr><td>iamRoleBasedAccessEnabled</td><td>Set to <code>true</code> when using IAM role-based authentication for connecting to the AWS Kinesis Data Stream<br><strong>Default:</strong> <code>false</code></td></tr><tr><td>roleArn</td><td><strong>(Required)</strong> ARN of cross-account IAM role</td></tr><tr><td>roleSessionName</td><td>Unique identifier for a session when the client assumes the IAM role<br><strong>Default:</strong> <code>pinot-kinesis-&#x3C;UUID></code></td></tr><tr><td>externalId</td><td>Unique identifier used to manage trust between AWS accounts and prevent the confused deputy problem. More details <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html">here</a></td></tr><tr><td>sessionDurationSeconds</td><td>Duration of the role session in seconds<br><strong>Default:</strong> <code>900</code></td></tr><tr><td>asyncSessionUpdateEnabled</td><td>Flag to determine with the session update should be enabled<br><strong>Default:</strong> <code>true</code></td></tr></tbody></table>
