This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
docker pull wurstmeister/kafka:latesttar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0docker run --network pinot-demo --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka -e KAFKA_BROKER_ID=0 -e KAFKA_ADVERTISED_HOST_NAME=kafka wurstmeister/kafka:latestbin/zookeeper-server-start.sh config/zookeeper.propertiesbin/kafka-server-start.sh config/server.propertiesimport datetime
import uuid
import random
import json
while True:
ts = int(datetime.datetime.now().timestamp()* 1000)
id = str(uuid.uuid4())
count = random.randint(0, 1000)
print(
json.dumps({"ts": ts, "uuid": id, "count": count})
)
{"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
{"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
{"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}python datagen.py | docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic events;python datagen.py | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic events;docker exec -i kafka kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic eventskafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic eventsevents:0:11940docker exec -i kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic eventsbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events...
{"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
{"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
{"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}
...{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}docker run --rm -ti --network=pinot-demo -v /tmp/pinot:/tmp/pinot apachepinot/pinot:1.0.0 AddTable -schemaFile /tmp/pinot/schema-stream.json -tableConfigFile /tmp/pinot/table-config-stream.json -controllerHost pinot-controller -controllerPort 9000 -execbin/pinot-admin.sh AddTable -schemaFile /tmp/pinot/schema-stream.json -tableConfigFile /tmp/pinot/table-config-stream.json {
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "transcript-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
"stream.kafka.broker.list": "localhost:9092",
"schema.registry.url": "",
"security.protocol": "SSL",
"ssl.truststore.location": "",
"ssl.keystore.location": "",
"ssl.truststore.password": "",
"ssl.keystore.password": "",
"ssl.key.password": "",
"stream.kafka.decoder.prop.schema.registry.rest.url": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
"stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.protocol": ""
}
},
"metadata": {
"customConfigs": {}
}
} {
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "transcript-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.isolation.level": "read_committed"
}
},
"metadata": {
"customConfigs": {}
}
}"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "mytopic",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.schema.registry.url": "https://xxx",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.decoder.prop.schema.registry.rest.url": "https://xxx",
"stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
"stream.kafka.decoder.prop.schema.registry.basic.auth.user.info": "schema_registry_username:schema_registry_password",
"sasl.mechanism": "PLAIN" ,
"security.protocol": "SASL_SSL" ,
"sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.autotune.initialRows": "3000000",
"realtime.segment.flush.threshold.segment.size": "500M"
}, "dimensionFieldSpecs": [
{
"name": "__key",
"dataType": "STRING"
},
{
"name": "__metadata$offset",
"dataType": "STRING"
},
{
"name": "__metadata$partition",
"dataType": "STRING"
},
...
],...
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "",
"stream.kafka.consumer.prop.auto.offset.reset": "largest"
...
}