Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Batch ingestion of data into Apache Pinot using dimension tables.
isDimTable: Set to true.SET taskName = 'myTask-s3';
SET input.fs.className = 'org.apache.pinot.plugin.filesystem.S3PinotFS';
SET input.fs.prop.accessKey = 'my-key';
SET input.fs.prop.secretKey = 'my-secret';
SET input.fs.prop.region = 'us-west-2';
INSERT INTO "baseballStats"
FROM FILE 's3://my-bucket/public_data_set/baseballStats/rawdata/'// some environmental setup
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> srcRows = execEnv.addSource(new FlinkKafkaConsumer<Row>(...));
RowTypeInfo typeInfo = new RowTypeInfo(
new TypeInformation[]{Types.FLOAT, Types.FLOAT, Types.STRING, Types.STRING},
new String[]{"lon", "lat", "address", "name"});
// add processing logic for the data stream for example:
DataStream<Row> processedRows = srcRow.keyBy(r -> r.getField(0));
...
// configurations for PinotSinkFunction
Schema pinotSchema = ...
TableConfig pinotTableConfig = ...
processedRows.addSink(new PinotSinkFunction<>(
new FlinkRowGenericRowConverter(typeInfo),
pinotTableConfig,
pinotSchema);
// execute the program
execEnv.execute();{
"tableName" : "tbl_OFFLINE",
"tableType" : "OFFLINE",
"segmentsConfig" : {
// ...
},
"tenants" : {
// ...
},
"tableIndexConfig" : {
// ....
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY",
"batchConfigMaps": [
{
"outputDirURI": "file://path/to/flink/segmentwriter/output/dir",
"overwriteOutput": "false",
"push.controllerUri": "https://target.pinot.cluster.controller.url"
}
]
}
}
}
{
"OFFLINE": {
"tableName": "dimBaseballTeams_OFFLINE",
"tableType": "OFFLINE",
"segmentsConfig": {
"schemaName": "dimBaseballTeams",
"segmentPushType": "REFRESH"
},
"metadata": {},
"quota": {
"storage": "200M"
},
"isDimTable": true
}.
"dimensionTableConfig": {
"disablePreload": true
}
}
}
...
{
"dimensionFieldSpecs": [
{
"dataType": "STRING",
"name": "teamID"
},
{
"dataType": "STRING",
"name": "teamName"
}
],
"schemaName": "dimBaseballTeams",
"primaryKeyColumns": ["teamID"]
}This guide shows you how to import data from GCP (Google Cloud Platform).
This guide shows you how to import data from files stored in Azure Data Lake Storage Gen2 (ADLS Gen2)
POST /segments/{tableName}/{segmentName}/reload{
"status": "200"
}-Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-plugin-to-include-1,pinot-plugin-to-include-2#CONTROLLER
pinot.controller.storage.factory.class.[scheme]=className of the pinot file system
pinot.controller.segment.fetcher.protocols=file,http,[scheme]
pinot.controller.segment.fetcher.[scheme].class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher#SERVER
pinot.server.storage.factory.class.[scheme]=className of the Pinot file system
pinot.server.segment.fetcher.protocols=file,http,[scheme]
pinot.server.segment.fetcher.[scheme].class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherpinotFSSpecs
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS-Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-gcspinot.controller.storage.factory.class.gs.projectId=test-projectexecutionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: 'gs://my-bucket/path/to/input/directory/'
outputDirURI: 'gs://my-bucket/path/to/output/directory/'
overwriteOutput: true
pinotFSSpecs:
- scheme: gs
className: org.apache.pinot.plugin.filesystem.GcsPinotFS
configs:
projectId: 'my-project'
gcpKey: 'path-to-gcp json key file'
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'students'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'controller.data.dir=gs://path/to/data/directory/
controller.local.temp.dir=/path/to/local/temp/directory
controller.enable.split.commit=true
pinot.controller.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
pinot.controller.storage.factory.gs.projectId=my-project
pinot.controller.storage.factory.gs.gcpKey=path/to/gcp/key.json
pinot.controller.segment.fetcher.protocols=file,http,gs
pinot.controller.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherpinot.server.instance.enable.split.commit=true
pinot.server.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
pinot.server.storage.factory.gs.projectId=my-project
pinot.server.storage.factory.gs.gcpKey=path/to/gcp/key.json
pinot.server.segment.fetcher.protocols=file,http,gs
pinot.server.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherpinot.minion.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
pinot.minion.storage.factory.gs.projectId=my-project
pinot.minion.storage.factory.gs.gcpKey=path/to/gcp/key.json
pinot.minion.segment.fetcher.protocols=file,http,gs
pinot.minion.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher-Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-adlspinot.controller.storage.factory.class.adl2.accountName=test-userexecutionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: 'adl2://path/to/input/directory/'
outputDirURI: 'adl2://path/to/output/directory/'
overwriteOutput: true
pinotFSSpecs:
- scheme: adl2
className: org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
configs:
accountName: 'my-account'
accessKey: 'foo-bar-1234'
fileSystemName: 'fs-name'
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'students'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'controller.data.dir=adl2://path/to/data/directory/
controller.local.temp.dir=/path/to/local/temp/directory
controller.enable.split.commit=true
pinot.controller.storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
pinot.controller.storage.factory.adl2.accountName=my-account
pinot.controller.storage.factory.adl2.accessKey=foo-bar-1234
pinot.controller.storage.factory.adl2.fileSystemName=fs-name
pinot.controller.segment.fetcher.protocols=file,http,adl2
pinot.controller.segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherpinot.server.instance.enable.split.commit=true
pinot.server.storage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
pinot.server.storage.factory.adl2.accountName=my-account
pinot.server.storage.factory.adl2.accessKey=foo-bar-1234
pinot.controller.storage.factory.adl2.fileSystemName=fs-name
pinot.server.segment.fetcher.protocols=file,http,adl2
pinot.server.segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherstorage.factory.class.adl2=org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
storage.factory.adl2.accountName=my-account
storage.factory.adl2.fileSystemName=fs-name
storage.factory.adl2.accessKey=foo-bar-1234
segment.fetcher.protocols=file,http,adl2
segment.fetcher.adl2.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherThis guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.
-Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-hdfs{
"timestamp": 1687786535928,
"hostname": "host1",
"HOSTNAME": "host1",
"level": "INFO",
"message": "Started processing job1",
"tags": {
"platform": "data",
"service": "serializer",
"params": {
"queueLength": 5,
"timeout": 299,
"userData_noIndex": {
"nth": 99
}
}
}
}hadoop.kerberos.principle_noIndex into the unindexableExtras field# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:
# name: execution framework name
name: 'spark'
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
#segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
# extraConfigs: extra configs for execution framework.
extraConfigs:
# stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
stagingDir: your/local/dir/stagingspark.driver.extraClassPath =>
pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jarspark.driver.extraJavaOptions =>
-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/pluginsexport PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin
spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master local --deploy-mode client //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
-conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yamlspark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master yarn --deploy-mode cluster //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--conf "spark.executor.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--jars "${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar,${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar"
--files s3://path/to/spark_job_spec.yaml
local://pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile spark_job_spec.yaml# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:
# name: execution framework name
name: 'hadoop'
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
# segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentMetadataPushJobRunner'
# extraConfigs: extra configs for execution framework.
extraConfigs:
# stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
stagingDir: your/local/dir/stagingexport PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"
hadoop jar \\
${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\
org.apache.pinot.tools.admin.PinotAdministrator \\
LaunchDataIngestionJob \\
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yamlenable.preprocessing = true
preprocess.path.to.output = <output_path>{
"OFFLINE": {
"metadata": {
"customConfigs": {
“preprocessing.operations”: “resize, partition, sort”, // To enable the following preprocessing operations
"preprocessing.max.num.records.per.file": "100", // To enable resizing
"preprocessing.num.reducers": "3" // To enable resizing
}
},
...
"tableIndexConfig": {
"aggregateMetrics": false,
"autoGeneratedInvertedIndex": false,
"bloomFilterColumns": [],
"createInvertedIndexDuringSegmentGeneration": false,
"invertedIndexColumns": [],
"loadMode": "MMAP",
"nullHandlingEnabled": false,
"segmentPartitionConfig": { // To enable partitioning
"columnPartitionMap": {
"item": {
"functionName": "murmur",
"numPartitions": 4
}
}
},
"sortedColumn": [ // To enable sorting
"actorId"
],
"streamConfigs": {}
},
"tableName": "tableName_OFFLINE",
"tableType": "OFFLINE",
"tenants": {
...
}
}
}export HADOOP_HOME=/local/hadoop/
export HADOOP_VERSION=2.7.1
export HADOOP_GUAVA_VERSION=11.0.2
export HADOOP_GSON_VERSION=2.2.4
export CLASSPATH_PREFIX="${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-annotations-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-auth-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/guava-${HADOOP_GUAVA_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/gson-${HADOOP_GSON_VERSION}.jar"curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: 'hdfs:///path/to/input/directory/'
outputDirURI: 'hdfs:///path/to/output/directory/'
includeFileNamePath: 'glob:**/*.csv'
overwriteOutput: true
pinotFSSpecs:
- scheme: hdfs
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
configs:
hadoop.conf.path: 'path/to/conf/directory/'
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'students'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'executionFrameworkSpec:
name: 'hadoop'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
extraConfigs:
stagingDir: 'hdfs:///path/to/staging/directory/'
jobType: SegmentCreationAndTarPush
inputDirURI: 'hdfs:///path/to/input/directory/'
outputDirURI: 'hdfs:///path/to/output/directory/'
includeFileNamePath: 'glob:**/*.csv'
overwriteOutput: true
pinotFSSpecs:
- scheme: hdfs
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
configs:
hadoop.conf.path: '/etc/hadoop/conf/'
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'students'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'controller.data.dir=hdfs://path/to/data/directory/
controller.local.temp.dir=/path/to/local/temp/directory
controller.enable.split.commit=true
pinot.controller.storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
pinot.controller.storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory/
pinot.controller.segment.fetcher.protocols=file,http,hdfs
pinot.controller.segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>pinot.server.instance.enable.split.commit=true
pinot.server.storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
pinot.server.storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory/
pinot.server.segment.fetcher.protocols=file,http,hdfs
pinot.server.segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>storage.factory.class.hdfs=org.apache.pinot.plugin.filesystem.HadoopPinotFS
storage.factory.hdfs.hadoop.conf.path=path/to/conf/directory
segment.fetcher.protocols=file,http,hdfs
segment.fetcher.hdfs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>{
"timestamp": 1687786535928,
"hostname": "host1",
"level": "INFO",
"message": "Started processing job1",
"tags.platform": "data",
"tags.service": "serializer",
"indexableExtras": {
"tags": {
"params": {
"queueLength": 5,
"timeout": 299
}
}
},
"unindexableExtras": {
"tags": {
"userData_noIndex": {
"nth": 99
}
}
}
}{
"ingestionConfig": {
"schemaConformingTransformerConfig": {
"indexableExtrasField": "extras",
"unindexableExtrasField": "extrasNoIndex",
"unindexableFieldSuffix": "_no_index",
"fieldPathsToDrop": [
"HOSTNAME"
]
}
}
}{
"tableName": "kinesisTable",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kinesis",
"stream.kinesis.topic.name": "<your kinesis stream name>",
"region": "<your region>",
"accessKey": "<your access key>",
"secretKey": "<your secret key>",
"shardIteratorType": "AFTER_SEQUENCE_NUMBER",
"stream.kinesis.consumer.type": "lowlevel",
"stream.kinesis.fetch.timeout.millis": "30000",
"stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
"realtime.segment.flush.threshold.rows": "1000000",
"realtime.segment.flush.threshold.time": "6h"
}
},
"metadata": {
"customConfigs": {}
}
}"task": {
"taskTypeConfigsMap": {
"UpsertCompactionTask": {
"schedule": "0 */5 * ? * *",
"bufferTimePeriod": "7d",
"invalidRecordsThresholdPercent": "30",
"invalidRecordsThresholdCount": "100000",
"tableMaxNumTasks": "100",
"validDocIdsType": "SNAPSHOT"
}
}
}-Dplugins.dir=/opt/pinot/plugins -Dplugins.include=pinot-s3pinot.controller.storage.factory.s3.region=ap-southeast-1executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: 's3://pinot-bucket/pinot-ingestion/batch-input/'
outputDirURI: 's3://pinot-bucket/pinot-ingestion/batch-output/'
overwriteOutput: true
pinotFSSpecs:
- scheme: s3
className: org.apache.pinot.plugin.filesystem.S3PinotFS
configs:
region: 'ap-southeast-1'
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'students'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'controller.data.dir=s3://path/to/data/directory/
controller.local.temp.dir=/path/to/local/temp/directory
controller.enable.split.commit=true
pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.controller.storage.factory.s3.region=ap-southeast-1
pinot.controller.segment.fetcher.protocols=file,http,s3
pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherpinot.server.instance.enable.split.commit=true
pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.server.storage.factory.s3.region=ap-southeast-1
pinot.server.storage.factory.s3.httpclient.maxConnections=50
pinot.server.storage.factory.s3.httpclient.socketTimeout=30s
pinot.server.storage.factory.s3.httpclient.connectionTimeout=2s
pinot.server.storage.factory.s3.httpclient.connectionTimeToLive=0s
pinot.server.storage.factory.s3.httpclient.connectionAcquisitionTimeout=10s
pinot.server.segment.fetcher.protocols=file,http,s3
pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcherpinot.minion.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.minion.storage.factory.s3.region=ap-southeast-1
pinot.minion.segment.fetcher.protocols=file,http,s3
pinot.minion.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher{
"timestamp": 1672531200000,
"message": "INFO Task task_12 assigned to container: [ContainerID:container_15], operation took 0.335 seconds. 8 tasks remaining.",
"logPath": "/mnt/data/application_123/container_15/stdout"
}{
"timestamp": 1672531200000,
"message_logtype": "INFO Task \\x12 assigned to container: [ContainerID:\\x12], operation took \\x13 seconds. \\x11 tasks remaining.",
"message_dictionaryVars": [
"task_12",
"container_15"
],
"message_encodedVars": [
1801439850948198735,
8
],
"logPath_logtype": "/mnt/data/\\x12/\\x12/stdout",
"logPath_dictionaryVars": [
"application_123",
"container_15"
],
"logPath_encodedVars": []
}{
"tableIndexConfig": {
"streamConfigs": {
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.clplog.CLPLogMessageDecoder",
"stream.kafka.decoder.prop.fieldsForClpEncoding": "message,logPath"
},
"varLengthDictionaryColumns": [
"message_logtype",
"message_dictionaryVars",
"logPath_logtype",
"logPath_dictionaryVars"
]
}
}{
"dimensionFieldSpecs": [
{
"name": "message_logtype",
"dataType": "STRING",
"maxLength": 2147483647
},
{
"name": "message_encodedVars",
"dataType": "LONG",
"singleValueField": false
},
{
"name": "message_dictionaryVars",
"dataType": "STRING",
"maxLength": 2147483647,
"singleValueField": false
},
{
"name": "message_logtype",
"dataType": "STRING",
"maxLength": 2147483647
},
{
"name": "message_encodedVars",
"dataType": "LONG",
"singleValueField": false
},
{
"name": "message_dictionaryVars",
"dataType": "STRING",
"maxLength": 2147483647,
"singleValueField": false
}
]
}{
"primaryKeyColumns": ["id"]
}{
"routing": {
"instanceSelectorType": "strictReplicaGroup"
}
}{
...
"dedupConfig": {
"dedupEnabled": true,
"hashFunction": "NONE"
},
...
}This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.
pinot-admin.sh AddTable \\
-tableConfigFile /path/to/table-config.json \\
-schemaFile /path/to/table-schema.json -execpinot-admin.sh UploadSegment \\
-controllerHost localhost \\
-controllerPort 9000 \\
-segmentDir /path/to/local/dir \\
-tableName myTablepinot-admin.sh LaunchDataIngestionJob \\
-jobSpecFile /file/location/my-job-spec.yamlpinot-admin.sh LaunchDataIngestionJob \\
-jobSpecFile /file/location/my-job-spec.yamlgroup_topics under group is unnested into the top-level, and converts the output to a collection of two rows. Note the handling of the nested field within group_topics, and the eventual top-level field of group.group_topics.urlkey. All the collections to unnest shall be included in the configuration fieldsToUnnest.libs or plugins directory in pinot.{
"ingestionConfig":{
"transformConfigs": [
{
"columnName": "group_json",
"transformFunction": "jsonFormat(\"group\")"
}
],
},
...
"tableIndexConfig": {
"loadMode": "MMAP",
"noDictionaryColumns": [
"group_json"
],
"jsonIndexColumns": [
"group_json"
]
},
}{
{
"name": "group_json",
"dataType": "JSON",
}
...
}{
"ingestionConfig": {
"complexTypeConfig": {
"delimiter": '.',
"fieldsToUnnest": ["group.group_topics"],
"collectionNotUnnestedToJson": "NON_PRIMITIVE"
}
}
}SELECT "group.group_topics.urlkey",
"group.group_topics.topic_name",
"group.group_id"
FROM meetupRsvp
LIMIT 10bin/pinot-admin.sh AvroSchemaToPinotSchema \
-timeColumnName fields.hoursSinceEpoch \
-avroSchemaFile /tmp/test.avsc \
-pinotSchemaName myTable \
-outputDir /tmp/test \
-fieldsToUnnest entriesbin/pinot-admin.sh JsonToPinotSchema \
-timeColumnName hoursSinceEpoch \
-jsonFile /tmp/test.json \
-pinotSchemaName myTable \
-outputDir /tmp/test \
-fieldsToUnnest payload.commits{
"tableName": "pulsarTable",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.topic.name": "<your pulsar topic name>",
"stream.pulsar.bootstrap.servers": "pulsar://localhost:6650,pulsar://localhost:6651",
"stream.pulsar.consumer.prop.auto.offset.reset" : "smallest",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.fetch.timeout.millis": "30000",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"realtime.segment.flush.threshold.rows": "1000000",
"realtime.segment.flush.threshold.time": "6h"
}
},
"metadata": {
"customConfigs": {}
}
}"stream.pulsar.authenticationToken":"your-auth-token""stream.pulsar.issuerUrl": "https://auth.streamnative.cloud"
"stream.pulsar.credsFilePath": "file:///path/to/private_creds_file
"stream.pulsar.audience": "urn:sn:pulsar:test:test-cluster""stream.pulsar.tlsTrustCertsFilePath": "/path/to/ca.cert.pem"
"streamConfigs": {
...
"stream.pulsar.metadata.populate": "true",
"stream.pulsar.metadata.fields": "messageId,messageIdBytes,eventTime,topicName",
...
} "dimensionFieldSpecs": [
{
"name": "__key",
"dataType": "STRING"
},
{
"name": "__metadata$messageId",
"dataType": "STRING"
},
...
],



Batch ingestion of data into Apache Pinot.
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
studentID,firstName,lastName,gender,subject,score,timestampInEpoch
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000{
"schemaName": "transcript",
"dimensionFieldSpecs": [
{
"name": "studentID",
"dataType": "INT"
},
{
"name": "firstName",
"dataType": "STRING"
},
{
"name": "lastName",
"dataType": "STRING"
},
{
"name": "gender",
"dataType": "STRING"
},
{
"name": "subject",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "score",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [{
"name": "timestampInEpoch",
"dataType": "LONG",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}{
"tableName": "transcript",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1,
"timeColumnName": "timestampInEpoch",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": 365
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"continueOnError": true,
"rowTimeValueCheck": true,
"segmentTimeValueCheck": false
},
"metadata": {}
}bin/pinot-admin.sh AddTable \\
-tableConfigFile /path/to/table-config.json \\
-schemaFile /path/to/table-schema.json -execcurl -X POST -F [email protected] \
-H "Content-Type: multipart/form-data" \
"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
batchConfigMapStr={"inputFormat":"json"}"curl -X POST -F [email protected] \
-H "Content-Type: multipart/form-data" \
"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
batchConfigMapStr={
"inputFormat":"csv",
"recordReader.prop.delimiter":"|"
}"curl -X POST "http://localhost:9000/ingestFromURI?tableNameWithType=foo_OFFLINE
&batchConfigMapStr={
"inputFormat":"json",
"input.fs.className":"org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region":"us-central",
"input.fs.prop.accessKey":"foo",
"input.fs.prop.secretKey":"bar"
}
&sourceURIStr=s3://test.bucket/path/to/json/data/data.json"executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
# Recommended to set jobType to SegmentCreationAndMetadataPush for production environment where Pinot Deep Store is configured
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000bin/pinot-admin.sh LaunchDataIngestionJob \\
-jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yaml{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::metadata-push-staging",
"arn:aws:s3:::metadata-push-staging/*"
]
}
]
}...
jobType: SegmentCreationAndMetadataPush
...
outputDirURI: 's3://metadata-push-staging/stagingDir/'
...
pushJobSpec:
copyToDeepStoreForMetadataPush: true
...docker run \
--network pinot-demo --name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ADVERTISED_HOST_NAME=kafka \
-p 2181:2181 \
-d wurstmeister/kafka:latestdocker exec \
-t kafka \
/opt/kafka/bin/kafka-topics.sh \
--zookeeper pinot-zookeeper:2181/kafka \
--partitions=1 --replication-factor=1 \
--create --topic transcript-topicbin/pinot-admin.sh StartKafka -zkAddress=localhost:2181/kafka -port 9092bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic transcript-topicThis guide shows you how to ingest a stream of records into a Pinot table.
docker run \
--network=pinot-demo \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-streaming-table-creation \
apachepinot/pinot:latest AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
-controllerHost pinot-quickstart \
-controllerPort 9000 \
-execbin/pinot-admin.sh AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
-exec {
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "transcript-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.rows": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
{"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}docker exec -ti kafka bashbin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic transcript-topic < transcript.jsonSELECT * FROM transcript {
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "transcript-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
"stream.kafka.broker.list": "localhost:9092",
"schema.registry.url": "",
"security.protocol": "SSL",
"ssl.truststore.location": "",
"ssl.keystore.location": "",
"ssl.truststore.password": "",
"ssl.keystore.password": "",
"ssl.key.password": "",
"stream.kafka.decoder.prop.schema.registry.rest.url": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
"stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.protocol": ""
}
},
"metadata": {
"customConfigs": {}
}
} {
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "transcript-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "pinot-zookeeper:2191/kafka",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.isolation.level": "read_committed"
}
},
"metadata": {
"customConfigs": {}
}
}"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "mytopic",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.schema.registry.url": "https://xxx",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.decoder.prop.schema.registry.rest.url": "https://xxx",
"stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
"stream.kafka.decoder.prop.schema.registry.basic.auth.user.info": "schema_registry_username:schema_registry_password",
"sasl.mechanism": "PLAIN" ,
"security.protocol": "SASL_SSL" ,
"sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.autotune.initialRows": "3000000",
"realtime.segment.flush.threshold.segment.size": "500M"
}, "dimensionFieldSpecs": [
{
"name": "__key",
"dataType": "STRING"
},
{
"name": "__metadata$offset",
"dataType": "STRING"
},
...
],...
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.SimpleAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "",
"stream.kafka.consumer.prop.auto.offset.reset": "largest"
...
}recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
configs:
key1 : 'value1'
key2 : 'value2'dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
configs:
fileFormat: 'default' #should be one of default, rfc4180, excel, tdf, mysql
header: 'columnName separated by delimiter'
delimiter: ','
multiValueDelimiter: '-'dataFormat: 'avro'
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
configs:
enableLogicalTypes: truedataFormat: 'json'
className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'dataFormat: 'thrift'
className: 'org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader'
configs:
thriftClass: 'ParserClassName'dataFormat: 'parquet'
className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'dataFormat: 'parquet'
className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'dataFormat: 'orc'
className: 'org.apache.pinot.plugin.inputformat.orc.ORCRecordReader'dataFormat: 'proto'
className: 'org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader'
configs:
descriptorFile: 'file:///path/to/sample.desc'protoc --include_imports --descriptor_set_out=/absolute/path/to/output.desc /absolute/path/to/input.proto

docker run \
--network=pinot-demo \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-streaming-table-creation \
apachepinot/pinot:latest AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
-controllerHost pinot-quickstart \
-controllerPort 9000 \
-execbin/pinot-admin.sh AddTable \
-schemaFile /path/to/transcript-schema.json \
-tableConfigFile /path/to/transcript-table-realtime.json \
-exec{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
{"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}{
"schemaName": "transcript",
"dimensionFieldSpecs": [
{
"name": "studentID",
"dataType": "INT"
},
{
"name": "firstName",
"dataType": "STRING"
},
{
"name": "lastName",
"dataType": "STRING"
},
{
"name": "gender",
"dataType": "STRING"
},
{
"name": "subject",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "score",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [{
"name": "timestamp",
"dataType": "LONG",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}{
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
},
"metadata": {
"customConfigs": {}
},
"ingestionConfig": {
"streamIngestionConfig": {
"streamConfigMaps": [
{
"realtime.segment.flush.threshold.rows": "0",
"stream.kafka.decoder.prop.format": "JSON",
"key.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"streamType": "kafka",
"value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
"stream.kafka.consumer.type": "LOWLEVEL",
"realtime.segment.flush.threshold.segment.rows": "50000",
"stream.kafka.broker.list": "localhost:9876",
"realtime.segment.flush.threshold.time": "3600000",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.topic.name": "transcript-topic"
}
]
},
"transformConfigs": [],
"continueOnError": true,
"rowTimeValueCheck": true,
"segmentTimeValueCheck": false
},
"isDimTable": false
}
}{
"tableName": "transcript",
"tableType": "REALTIME",
...
"ingestionConfig": {
"streamIngestionConfig":,
"streamConfigMaps": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "transcript-topic",
...
"topic.consumption.rate.limit": 1000
}
},
...A consumption rate limiter is set up for topic <topic_name> in table <tableName> with rate limit: <rate_limit> (topic rate limit: <topic_rate_limit>, partition count: <partition_count>)$ curl -X POST {controllerHost}/tables/{tableName}/forceCommit$ curl -X POST {controllerHost}/tables/{tableName}/pauseConsumption
$ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption$ curl -X POST {controllerHost}/tables/{tableName}/pauseStatus$ curl -X POST {controllerHost}/tables/{tableName}/forceCommit$ curl -X POST {controllerHost}/tables/{tableName}/forceCommit
{
"forceCommitJobId": "6757284f-b75b-45ce-91d8-a277bdbc06ae",
"forceCommitStatus": "SUCCESS",
"jobMetaZKWriteStatus": "SUCCESS"
}
$ curl -X GET {controllerHost}/tables/forceCommitStatus/6757284f-b75b-45ce-91d8-a277bdbc06ae
{
"jobId": "6757284f-b75b-45ce-91d8-a277bdbc06ae",
"segmentsForceCommitted": "[\"airlineStats__0__0__20230119T0700Z\",\"airlineStats__1__0__20230119T0700Z\",\"airlineStats__2__0__20230119T0700Z\"]",
"submissionTimeMs": "1674111682977",
"numberOfSegmentsYetToBeCommitted": 0,
"jobType": "FORCE_COMMIT",
"segmentsYetToBeCommitted": [],
"tableName": "airlineStats_REALTIME"
}$ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption?resumeFrom=smallest
$ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption?resumeFrom=largest# GET /tables/{tableName}/consumingSegmentsInfo
curl -X GET "http://<controller_url:controller_admin_port>/tables/meetupRsvp/consumingSegmentsInfo" -H "accept: application/json"
# GET /debug/tables/{tableName}
curl -X GET "http://localhost:9000/debug/tables/meetupRsvp?type=REALTIME&verbosity=1" -H "accept: application/json"{
"_segmentToConsumingInfoMap": {
"meetupRsvp__0__0__20221019T0639Z": [
{
"serverName": "Server_192.168.0.103_7000",
"consumerState": "CONSUMING",
"lastConsumedTimestamp": 1666161593904,
"partitionToOffsetMap": { // <<-- Deprecated. See currentOffsetsMap for same info
"0": "6"
},
"partitionOffsetInfo": {
"currentOffsetsMap": {
"0": "6" // <-- Current consumer position
},
"latestUpstreamOffsetMap": {
"0": "6" // <-- Upstream latest position
},
"recordsLagMap": {
"0": "0" // <-- Lag, in terms of #records behind latest
},
"recordsAvailabilityLagMap": {
"0": "2" // <-- Lag, in terms of time
}
}
}
],Upsert support in Apache Pinot.
{
"primaryKeyColumns": ["event_id"]
}{
"upsertConfig": {
"mode": "FULL"
}
}{
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies":{
"rsvp_count": "INCREMENT",
"group_name": "IGNORE",
"venue_name": "OVERWRITE"
}
},
"tableIndexConfig": {
"nullHandlingEnabled": true
}
}{
"upsertConfig": {
"mode": "PARTIAL",
"defaultPartialUpsertStrategy": "OVERWRITE",
"partialUpsertStrategies":{
"rsvp_count": "INCREMENT",
"group_name": "IGNORE"
}
},
"tableIndexConfig": {
"nullHandlingEnabled": true
}
}{
"upsertConfig": {
"mode": "FULL",
"comparisonColumn": "anotherTimeColumn"
}
}{
"upsertConfig": {
"mode": "PARTIAL",
"defaultPartialUpsertStrategy": "OVERWRITE",
"partialUpsertStrategies":{},
"comparisonColumns": ["secondsSinceEpoch", "otherComparisonColumn"]
}
}[
{
"event_id": "aa",
"orderReceived": 1,
"description" : "first",
"secondsSinceEpoch": 1567205394
},
{
"event_id": "aa",
"orderReceived": 2,
"description" : "update",
"secondsSinceEpoch": 1567205397
},
{
"event_id": "aa",
"orderReceived": 3,
"description" : "update",
"secondsSinceEpoch": 1567205396
},
{
"event_id": "aa",
"orderReceived": 4,
"description" : "first arrival, other column",
"otherComparisonColumn": 1567205395
},
{
"event_id": "aa",
"orderReceived": 5,
"description" : "late arrival, other column",
"otherComparisonColumn": 1567205392
},
{
"event_id": "aa",
"orderReceived": 6,
"description" : "update, other column",
"otherComparisonColumn": 1567205398
}
]{
"upsertConfig": {
...
"deleteRecordColumn": <column_name>
}
}// In the Schema
{
...
{
"name": "<delete_column_name>",
"dataType": "BOOLEAN"
},
...
} "upsertConfig": {
"mode": "FULL",
"deleteRecordColumn": <column_name>,
"deletedKeysTTL": 86400
}
}{
"routing": {
"instanceSelectorType": "strictReplicaGroup"
}
}{
"upsertConfig": {
"mode": "FULL",
"enableSnapshot": true
}
}{
"upsertConfig": {
"mode": "FULL",
"enableSnapshot": true,
"enablePreload": true
}
} "upsertConfig": {
"mode": "FULL",
"enableSnapshot": true,
"enablePreload": true,
"upsertTTL": 3d
}
}{
"upsertConfig": {
...,
"dropOutOfOrderRecord": true
}
}{
"upsertConfig": {
...,
"outOfOrderRecordColumn": "isOutOfOrder"
}
}select key, val from tbl1 where isOutOfOrder = false option(skipUpsert=false){
"tableName": "upsertMeetupRsvp",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
"timeColumnName": "mtime",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"replication": "1"
},
"tableIndexConfig": {
"segmentPartitionConfig": {
"columnPartitionMap": {
"event_id": {
"functionName": "Hashcode",
"numPartitions": 2
}
}
}
},
"instanceAssignmentConfigMap": {
"CONSUMING": {
"tagPoolConfig": {
"tag": "DefaultTenant_REALTIME"
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 1,
"partitionColumn": "event_id",
"numPartitions": 2,
"numInstancesPerPartition": 1
}
}
},
"routing": {
"segmentPrunerTypes": [
"partition"
],
"instanceSelectorType": "strictReplicaGroup"
},
"ingestionConfig": {
"streamIngestionConfig": {
"streamConfigMaps": [
{
"streamType": "kafka",
"stream.kafka.topic.name": "upsertMeetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
]
}
},
"upsertConfig": {
"mode": "FULL",
"enableSnapshot": true,
"enablePreload": true
},
"fieldConfigList": [
{
"name": "location",
"encodingType": "RAW",
"indexType": "H3",
"properties": {
"resolutions": "5"
}
}
],
"metadata": {
"customConfigs": {}
}
}{
"tableName": "upsertPartialMeetupRsvp",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
"timeColumnName": "mtime",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"replication": "1"
},
"tableIndexConfig": {
"segmentPartitionConfig": {
"columnPartitionMap": {
"event_id": {
"functionName": "Hashcode",
"numPartitions": 2
}
}
},
"nullHandlingEnabled": true
},
"instanceAssignmentConfigMap": {
"CONSUMING": {
"tagPoolConfig": {
"tag": "DefaultTenant_REALTIME"
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 1,
"partitionColumn": "event_id",
"numPartitions": 2,
"numInstancesPerPartition": 1
}
}
},
"routing": {
"segmentPrunerTypes": [
"partition"
],
"instanceSelectorType": "strictReplicaGroup"
},
"ingestionConfig": {
"streamIngestionConfig": {
"streamConfigMaps": [
{
"streamType": "kafka",
"stream.kafka.topic.name": "upsertPartialMeetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
]
}
},
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
},
"fieldConfigList": [
{
"name": "location",
"encodingType": "RAW",
"indexType": "H3",
"properties": {
"resolutions": "5"
}
}
],
"metadata": {
"customConfigs": {}
}
}# stop previous quick start cluster, if any
bin/quick-start-upsert-streaming.sh# stop previous quick start cluster, if any
bin/quick-start-partial-upsert-streaming.sh


