Ingestion

The ingestion configuration ('ingestionConfig') is a section of the table configuration that specifies how to ingest streaming data into Pinot.

ingestionConfig

streamConfigMaps

The number of rows per segment is computed using the following formula: realtime.segment.flush.threshold.rows / maxPartitionsConsumedByServer For example, if you set realtime.segment.flush.threshold.rows = 1000 and each server consumes 10 partitions, the rows per segment is 1000/10 = 100.

Since release-1.2.0, we introduced realtime.segment.flush.threshold.segment.rows, which is directly used as the number of rows per segment.

Take the above example, if you set realtime.segment.flush.threshold.segment.rows = 1000 and each server consumes 10 partitions, the rows per segment is 1000.

Example table config with ingestionConfig

{
  "tableName": "transcript",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
  },
  "metadata": {
    "customConfigs": {}
  },
  "ingestionConfig": {
    "streamIngestionConfig": {
        "streamConfigMaps": [
          {
            "stream.kafka.decoder.prop.format": "JSON",
            "key.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "streamType": "kafka",
            "value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
            "stream.kafka.consumer.type": "LOWLEVEL",
            "stream.kafka.broker.list": "localhost:9876",
            "realtime.segment.flush.threshold.segment.rows": "500000",
            "realtime.segment.flush.threshold.time": "3600000",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.topic.name": "transcript-topic"
          }
        ]
      },
      "transformConfigs": [],
      "continueOnError": true,
      "rowTimeValueCheck": true,
      "segmentTimeValueCheck": false
    },
    "isDimTable": false
  }
}