{
"OFFLINE": {
"tableName": "dimBaseballTeams_OFFLINE",
"tableType": "OFFLINE",
"segmentsConfig": {
"schemaName": "dimBaseballTeams",
"segmentPushType": "REFRESH"
},
"metadata": {},
"quota": {
"storage": "200M"
},
"isDimTable": true
}.
"dimensionTableConfig": {
"disablePreload": true
}
}
}
...
{
"dimensionFieldSpecs": [
{
"dataType": "STRING",
"name": "teamID"
},
{
"dataType": "STRING",
"name": "teamName"
}
],
"schemaName": "dimBaseballTeams",
"primaryKeyColumns": ["teamID"]
}// some environmental setup
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> srcRows = execEnv.addSource(new FlinkKafkaConsumer<Row>(...));
RowTypeInfo typeInfo = new RowTypeInfo(
new TypeInformation[]{Types.FLOAT, Types.FLOAT, Types.STRING, Types.STRING},
new String[]{"lon", "lat", "address", "name"});
// add processing logic for the data stream for example:
DataStream<Row> processedRows = srcRow.keyBy(r -> r.getField(0));
...
// configurations for PinotSinkFunction
Schema pinotSchema = ...
TableConfig pinotTableConfig = ...
processedRows.addSink(new PinotSinkFunction<>(
new FlinkRowGenericRowConverter(typeInfo),
pinotTableConfig,
pinotSchema);
// execute the program
execEnv.execute();{
"tableName" : "tbl_OFFLINE",
"tableType" : "OFFLINE",
"segmentsConfig" : {
// ...
},
"tenants" : {
// ...
},
"tableIndexConfig" : {
// ....
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY",
"batchConfigMaps": [
{
"outputDirURI": "file://path/to/flink/segmentwriter/output/dir",
"overwriteOutput": "false",
"push.controllerUri": "https://target.pinot.cluster.controller.url"
}
]
}
}
}
# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:
# name: execution framework name
name: 'hadoop'
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
# segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentMetadataPushJobRunner'
# extraConfigs: extra configs for execution framework.
extraConfigs:
# stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
stagingDir: your/local/dir/stagingexport PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"
hadoop jar \\
${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\
org.apache.pinot.tools.admin.PinotAdministrator \\
LaunchDataIngestionJob \\
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yamlenable.preprocessing = true
preprocess.path.to.output = <output_path>{
"OFFLINE": {
"metadata": {
"customConfigs": {
“preprocessing.operations”: “resize, partition, sort”, // To enable the following preprocessing operations
"preprocessing.max.num.records.per.file": "100", // To enable resizing
"preprocessing.num.reducers": "3" // To enable resizing
}
},
...
"tableIndexConfig": {
"aggregateMetrics": false,
"autoGeneratedInvertedIndex": false,
"bloomFilterColumns": [],
"createInvertedIndexDuringSegmentGeneration": false,
"invertedIndexColumns": [],
"loadMode": "MMAP",
"nullHandlingEnabled": false,
"segmentPartitionConfig": { // To enable partitioning
"columnPartitionMap": {
"item": {
"functionName": "murmur",
"numPartitions": 4
}
}
},
"sortedColumn": [ // To enable sorting
"actorId"
],
"streamConfigs": {}
},
"tableName": "tableName_OFFLINE",
"tableType": "OFFLINE",
"tenants": {
...
}
}
}# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:
# name: execution framework name
name: 'spark'
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
#segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
# extraConfigs: extra configs for execution framework.
extraConfigs:
# stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
stagingDir: your/local/dir/stagingspark.driver.extraClassPath =>
pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jarspark.driver.extraJavaOptions =>
-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/pluginsexport PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin
spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master local --deploy-mode client //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
-conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yamlspark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master yarn --deploy-mode cluster //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--conf "spark.executor.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--jars "${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar,${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar"
--files s3://path/to/spark_job_spec.yaml
local://pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile spark_job_spec.yamlBatch ingestion of data into Apache Pinot.
studentID,firstName,lastName,gender,subject,score,timestampInEpoch
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000{
"schemaName": "transcript",
"dimensionFieldSpecs": [
{
"name": "studentID",
"dataType": "INT"
},
{
"name": "firstName",
"dataType": "STRING"
},
{
"name": "lastName",
"dataType": "STRING"
},
{
"name": "gender",
"dataType": "STRING"
},
{
"name": "subject",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "score",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [{
"name": "timestampInEpoch",
"dataType": "LONG",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}{
"tableName": "transcript",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1,
"timeColumnName": "timestampInEpoch",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": 365
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"continueOnError": true,
"rowTimeValueCheck": true,
"segmentTimeValueCheck": false
},
"metadata": {}
}bin/pinot-admin.sh AddTable \\
-tableConfigFile /path/to/table-config.json \\
-schemaFile /path/to/table-schema.json -execcurl -X POST -F [email protected] \
-H "Content-Type: multipart/form-data" \
"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
batchConfigMapStr={"inputFormat":"json"}"curl -X POST -F [email protected] \
-H "Content-Type: multipart/form-data" \
"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&
batchConfigMapStr={
"inputFormat":"csv",
"recordReader.prop.delimiter":"|"
}"curl -X POST "http://localhost:9000/ingestFromURI?tableNameWithType=foo_OFFLINE
&batchConfigMapStr={
"inputFormat":"json",
"input.fs.className":"org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region":"us-central",
"input.fs.prop.accessKey":"foo",
"input.fs.prop.secretKey":"bar"
}
&sourceURIStr=s3://test.bucket/path/to/json/data/data.json"executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
# Recommended to set jobType to SegmentCreationAndMetadataPush for production environment where Pinot Deep Store is configured
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000bin/pinot-admin.sh LaunchDataIngestionJob \\
-jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yaml{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::metadata-push-staging",
"arn:aws:s3:::metadata-push-staging/*"
]
}
]
}...
jobType: SegmentCreationAndMetadataPush
...
outputDirURI: 's3://metadata-push-staging/stagingDir/'
...
pushJobSpec:
copyToDeepStoreForMetadataPush: true
...