# Stream Ingestion Connectors

## Applicable to all Stream Connectors

| Configuration                                          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| ------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| stream.\<stream\_type>.consumer.factory.class.name     | Factory class to be used for the stream consumer                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| stream.\<stream\_type>.consumer.prop.auto.offset.reset | Offset or position in the source stream from which to start consuming data **Valid values:** **`smallest`** - Start consuming from the earliest data in the stream **`largest`** - Start consuming from the latest data in the stream **`timestamp`** - Start consuming from the offset after a timestamp , which is specified in the format `yyyy-MM-dd'T'HH:mm:ss.SSSZ` **`datetime` -** Start consuming from the offset after the specified period or duration from current time. Eg: `2d` **Default Value:** `largest` |
| stream.\<stream\_type>.topic.name                      | Name of the source stream to consume                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| stream.\<stream\_type>.fetch.timeout.millis            | Indicates the timeout (in milliseconds) to use for each fetch call to the consumer. If the timeout expires before data becomes available, the consumer will return an empty batch. **Default Value:** `5_000`                                                                                                                                                                                                                                                                                                              |
| stream.\<stream\_type>.connection.timeout.millis       | Indicates the timeout (in milliseconds) used to create the connection to the upstream (Timeout for the initial connection to the upstream) **Default Value:** `30_000`                                                                                                                                                                                                                                                                                                                                                     |
| stream.\<stream\_type>.idle.timeout.millis             | If the stream remains idle (ie. without any data) for the specified time, the client connection is reset and a new consumer instance is created. **Default Value:** `180_000`                                                                                                                                                                                                                                                                                                                                              |
| stream.\<stream\_type>.decoder.class.name              | Indicates the name of the decoder class that should be used to decoder the stream payload                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| stream.\<stream\_type>.decoder.prop                    | Prefix used for any decoder specific property                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| topic.consumption.rate.limit                           | Indicates the upper bound on the message rate for the entire topic. Use `-1` to ignore this config. **Default Value:** `-1` See [here](/build-with-pinot/ingestion/stream-ingestion/stream-ingestion.md#throttle-stream-consumption) for more details.                                                                                                                                                                                                                                                                     |
| stream.\<stream\_type>.metadata.populate               | When set to `true`, the supported consumer may extract the key, user headers and record metadata from the incoming payload. Currently, this is supported in Kafka connector only.                                                                                                                                                                                                                                                                                                                                          |
| realtime.segment.flush.threshold.time                  | Time based flush threshold for realtime segments. Used to decides when a realtime segment is ready to be committed / closed / flushed to disk. ⚠ This time should be smaller than the retention period configured for the corresponding topic                                                                                                                                                                                                                                                                              |
| realtime.segment.flush.threshold.size                  | The size a completed realtime segment should be. ℹThis config is used only if `realtime.segment.flush.threshold.rows` is set to 0.                                                                                                                                                                                                                                                                                                                                                                                         |
| realtime.segment.flush.threshold.rows                  | Row count based flush threshold for realtime segments. If this value is set to 0, then the consumers adjust the number of rows consumed by a partition so the completed segment is the correct size (unless threshold.time is reached first)                                                                                                                                                                                                                                                                               |
| realtime.segment.flush.autotune.initialRows            | Initial number of rows to use for `SegmentSizeBasedFlushThresholdUpdater` . This threshold updater is used by the controller to compute the new segment's flush threshold based on the previous segment's size. ⚠ This flush threshold updater is used only when `realtime.segment.flush.threshold.rows` is set to `<=0` . Otherwise, the `DefaultFlushThresholdUpdater` is used.                                                                                                                                          |
| realtime.segment.commit.timeoutSeconds                 | Time threshold that controller will wait for the segment to be built by the server                                                                                                                                                                                                                                                                                                                                                                                                                                         |

## Kafka Partition-Level Connector

### Kafka 3.x / 4.x

Pinot ships two Kafka connector modules: `pinot-kafka-3.0` (Kafka client 3.9.2, default) and `pinot-kafka-4.0` (Kafka client 4.1.1, for KRaft-mode clusters). The legacy `kafka-0.9` and `kafka-2.x` modules have been removed.

| Config                                   | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| ---------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| stream.kafka.consumer.factory.class.name | **Allowed Values:** - `org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory` (Kafka 3.x, default) - `org.apache.pinot.plugin.stream.kafka40.KafkaConsumerFactory` (Kafka 4.x)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| stream.kafka.topic.name                  | (Required) Name of the kafka topic to be ingested                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| stream.kafka.broker.list                 | (Required) Connection string for the kafka broker                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| stream.kafka.partition.ids               | Optional comma-separated string of Kafka partition IDs and/or inclusive ranges to consume (for example, `"0,2,5"`, `"0-3"`, or `"0-3,6,8-9"`). When set, only the resolved partitions are consumed by this table. When absent or blank, all topic partitions are consumed (the default behavior). Partition IDs must be non-negative integers, ranges are inclusive, duplicates are silently removed, and the resolved set is capped at 10,000 unique partition IDs. Pinot validates the resolved IDs against the topic metadata before it starts consuming. See [Subset Partition Ingestion](/build-with-pinot/ingestion/stream-ingestion/import-from-apache-kafka.md#subset-partition-ingestion) for details and examples. |
| stream.kafka.buffer.size                 | **Default Value:** `512000`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| stream.kafka.socket.timeout              | **Default Value:** `10000`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| stream.kafka.fetcher.size                | **Default Value:** `100000`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| stream.kafka.isolation.level             | **Allowed Value:** `read_committed` **,** `read_uncommitted` **Default:** `read_uncommitted` **Note:** This must be set to `read_committed` when using transactions in Kafka.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |

### Supported Decoder Classes

| Decoder Class                                                                                       | Description                                                                                                                                                         |
| --------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder`                                       | Decodes plain JSON messages without a schema registry.                                                                                                              |
| `org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder`                                 | Decodes Avro messages using a schema provided via `stream.kafka.decoder.prop.schema`.                                                                               |
| `org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder` | Decodes Avro messages whose schemas are registered in Confluent Schema Registry. Requires `stream.kafka.decoder.prop.schema.registry.rest.url`.                     |
| `org.apache.pinot.plugin.inputformat.json.confluent.KafkaConfluentSchemaRegistryJsonMessageDecoder` | Decodes JSON messages whose schemas are registered in Confluent Schema Registry. Requires `stream.kafka.decoder.prop.schema.registry.rest.url`. Added in Pinot 1.4. |
| `org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder`                               | Decodes Protocol Buffer messages.                                                                                                                                   |

## Kinesis Partition-level Connector

| Config                                     | Description                                                                                                                                                                                                                                                                                                                 |
| ------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| stream.kinesis.consumer.factory.class.name | **Allowed Value:** org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory                                                                                                                                                                                                                                            |
| stream.kinesis.topic.name                  | (Required) Name of the Kinesis data stream to consume                                                                                                                                                                                                                                                                       |
| region                                     | (Required) The AWS region where the configured Kinesis data stream resides                                                                                                                                                                                                                                                  |
| maxRecordsToFetch                          | Maximum records to fetch during a single `GetRecords` request **Default:** `10000`                                                                                                                                                                                                                                          |
| requests\_per\_second\_limit               | Maximum Kinesis read requests per second Pinot will attempt per shard. Accepts fractional values such as `0.25` so several consumers can share one shard budget. Pinot applies this limit to both `GetRecords` and `GetShardIterator` reads and backs off/retries on throttling until the fetch timeout. **Default:** `1.0` |
| shardIteratorType                          | AWS shard iterator type Pinot uses when it opens a shard iterator. Pinot passes this value through to the Kinesis client. **Default:** `LATEST`                                                                                                                                                                             |

### Key-based Authentication Properties

| Config    | Description                                                          |
| --------- | -------------------------------------------------------------------- |
| accessKey | (Required) AWS Access key used to access the AWS Kinesis Data stream |
| secretKey | (Required) AWS Secret key used to access the AWS Kinesis Data stream |

### IAM Role-based Authentication Properties

| Config                    | Description                                                                                                                                                                                                         |
| ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| iamRoleBasedAccessEnabled | Set to `true` when using IAM role-based authentication for connecting to the AWS Kinesis Data Stream **Default:** `false`                                                                                           |
| roleArn                   | **(Required)** ARN of cross-account IAM role                                                                                                                                                                        |
| roleSessionName           | Unique identifier for a session when the client assumes the IAM role **Default:** `pinot-kinesis-<UUID>`                                                                                                            |
| externalId                | Unique identifier used to manage trust between AWS accounts and prevent the confused deputy problem. More details [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html) |
| sessionDurationSeconds    | Duration of the role session in seconds **Default:** `900`                                                                                                                                                          |
| asyncSessionUpdateEnabled | Flag to determine with the session update should be enabled **Default:** `true`                                                                                                                                     |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pinot.apache.org/reference/plugin-reference/stream-ingestion-connectors.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
