# Kafka Connector Versions

Apache Pinot provides multiple Kafka connector versions to match different Kafka broker deployments. Choose the connector that matches your Kafka cluster version.

## Available Connectors

| Connector Plugin  | Kafka Client Version | Notes                                                                                                           |
| ----------------- | -------------------- | --------------------------------------------------------------------------------------------------------------- |
| `pinot-kafka-3.0` | 3.9.x                | Recommended for Kafka 3.x clusters. Built with Scala 2.13 (default); use `-Pscala-2.12` profile for Scala 2.12. |
| `pinot-kafka-4.0` | 4.1.x                | Recommended for Kafka 4.x clusters (KRaft mode). Pure Java — no Scala dependency.                               |

{% hint style="warning" %}
The `pinot-kafka-2.0` (kafka20) plugin has been removed. If your table config references `org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory`, you must migrate to either `kafka30` or `kafka40`.
{% endhint %}

## Kafka 4.0 Connector

The Kafka 4.0 connector (`pinot-kafka-4.0`) supports Apache Kafka 4.x brokers running in **KRaft mode** (ZooKeeper-free). It uses pure Java Kafka clients with no Scala dependency, resulting in a smaller deployment footprint.

### When to use Kafka 4.0

* Your Kafka cluster runs Kafka 4.0+ with KRaft mode
* You want to eliminate the Scala transitive dependency
* You are deploying new Pinot clusters against modern Kafka infrastructure

### Configuration

The Kafka 4.0 connector uses the same configuration properties as the Kafka 3.0 connector. The only difference is the `stream.kafka.consumer.factory.class.name`:

```json
{
  "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.topic.name": "your-topic",
    "stream.kafka.broker.list": "kafka:9092",
    "stream.kafka.consumer.type": "lowlevel",
    "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka40.KafkaConsumerFactory",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
    "realtime.segment.flush.threshold.rows": "0",
    "realtime.segment.flush.threshold.time": "24h",
    "realtime.segment.flush.threshold.segment.size": "100M"
  }
}
```

### Migration from Kafka 2.0 or 3.0

To migrate from an older Kafka connector to Kafka 3.0 or 4.0, update the consumer factory class name in your table configuration:

| From                                                          | To                                                                                                                                                     |
| ------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory` | `org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory` (Kafka 3.x) or `org.apache.pinot.plugin.stream.kafka40.KafkaConsumerFactory` (Kafka 4.x) |
| `org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory` | `org.apache.pinot.plugin.stream.kafka40.KafkaConsumerFactory`                                                                                          |

2. Ensure the `pinot-kafka-4.0` plugin JAR is available in your Pinot plugin directory.
3. All other `stream.kafka.*` configuration properties remain the same.

{% hint style="info" %}
The Kafka 4.0 connector is fully compatible with all existing Kafka consumer configuration properties including SSL/TLS, SASL authentication, isolation levels, and Schema Registry integration. See the [main Kafka ingestion guide](/build-with-pinot/ingestion/stream-ingestion/import-from-apache-kafka.md) for detailed configuration examples.
{% endhint %}

## Kafka 3.0 Connector

The Kafka 3.0 connector (`pinot-kafka-3.0`) supports Apache Kafka 3.x brokers. This is the most widely deployed connector version.

### Scala Version

The Kafka 3.0 connector is built with **Scala 2.13** by default. If you need Scala 2.12 compatibility, build Pinot with the `-Pscala-2.12` Maven profile:

```
mvn clean install -Pscala-2.12 -DskipTests
```

### Configuration

```json
{
  "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.topic.name": "your-topic",
    "stream.kafka.broker.list": "kafka:9092",
    "stream.kafka.consumer.type": "lowlevel",
    "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
  }
}
```

## Common Configuration Properties

All Kafka connector versions share the same configuration properties. See [Ingest streaming data from Apache Kafka](/build-with-pinot/ingestion/stream-ingestion/import-from-apache-kafka.md) for the complete configuration reference, including:

* SSL/TLS setup
* SASL authentication
* Schema Registry integration (Avro, JSON Schema, Protobuf)
* Consumer tuning properties
* Isolation levels (`read_committed` / `read_uncommitted`)

## Passing Native Kafka Consumer Properties

You can pass any native Kafka consumer configuration property using the `stream.kafka.consumer.prop.` prefix:

```json
{
  "streamConfigs": {
    "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
    "stream.kafka.consumer.prop.max.poll.records": "500",
    "stream.kafka.consumer.prop.fetch.min.bytes": "100000",
    "stream.kafka.consumer.prop.session.timeout.ms": "30000"
  }
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pinot.apache.org/build-with-pinot/ingestion/stream-ingestion/kafka-connector-versions.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
