timestampInEpoch,id,name,age,score
1597044264380,1,david,15,98
1597044264381,2,henry,16,97
1597044264382,3,katie,14,99
1597044264383,4,catelyn,15,96
1597044264384,5,emma,13,93
1597044264390,6,john,15,100
1597044264396,7,isabella,13,89
1597044264399,8,linda,17,91
1597044264502,9,mark,16,67
1597044264670,10,tom,14,78scala> val df = spark.read.format("csv").option("header", true).load("path/to/students.csv")
scala> df.write.option("compression","none").mode("overwrite").parquet("/path/to/batch_input/")aws s3 cp /path/to/batch_input s3://my-bucket/batch-input/ --recursive{
"schemaName": "students",
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "INT"
},
{
"name": "name",
"dataType": "STRING"
},
{
"name": "age",
"dataType": "INT"
}
],
"metricFieldSpecs": [
{
"name": "score",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "timestampInEpoch",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}{
"tableName": "students",
"segmentsConfig": {
"timeColumnName": "timestampInEpoch",
"timeType": "MILLISECONDS",
"replication": "1",
"schemaName": "students"
},
"tableIndexConfig": {
"invertedIndexColumns": [],
"loadMode": "MMAP"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableType": "OFFLINE",
"metadata": {}
}pinot-admin.sh AddTable -tableConfigFile /path/to/student_table.json -schemaFile /path/to/student_schema.json -controllerHost localhost -controllerPort 9000 -execexecutionFrameworkSpec:
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
extraConfigs:
stagingDir: s3://my-bucket/spark/staging/
# jobType: Pinot ingestion job type.
# Supported job types are:
# 'SegmentCreation'
# 'SegmentTarPush'
# 'SegmentUriPush'
# 'SegmentCreationAndTarPush'
# 'SegmentCreationAndUriPush'
# 'SegmentCreationAndMetadataPush'
jobType: SegmentCreationAndMetadataPush
inputDirURI: 's3://my-bucket/path/to/batch-input/'
outputDirURI: 's3:///my-bucket/path/to/batch-output/'
overwriteOutput: true
pinotFSSpecs:
- scheme: s3
className: org.apache.pinot.plugin.filesystem.S3PinotFS
configs:
region: 'us-west-2'
recordReaderSpec:
dataFormat: 'parquet'
className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
tableSpec:
tableName: 'students'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
pushJobSpec:
pushParallelism: 2
pushAttempts: 2
pushRetryIntervalMillis: 1000export PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin
spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master local --deploy-mode client //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dplugins.include=pinot-s3,pinot-parquet -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" //
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-s3/pinot-s3-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-parquet/pinot-parquet-${PINOT_VERSION}-shaded.jar" //
--conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-s3/pinot-s3-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-parquet/pinot-parquet-${PINOT_VERSION}-shaded.jar" //
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yamlbin/pinot-admin.sh PostQuery -brokerHost localhost -brokerPort 8000 -queryType sql -query "SELECT * FROM students LIMIT 10"{
"resultTable": {
"dataSchema": {
"columnNames": [
"age",
"id",
"name",
"score",
"timestampInEpoch"
],
"columnDataTypes": [
"INT",
"INT",
"STRING",
"INT",
"LONG"
]
},
"rows": [
[
15,
1,
"david",
98,
1597044264380
],
[
16,
2,
"henry",
97,
1597044264381
],
[
14,
3,
"katie",
99,
1597044264382
],
[
15,
4,
"catelyn",
96,
1597044264383
],
[
13,
5,
"emma",
93,
1597044264384
],
[
15,
6,
"john",
100,
1597044264390
],
[
13,
7,
"isabella",
89,
1597044264396
],
[
17,
8,
"linda",
91,
1597044264399
],
[
16,
9,
"mark",
67,
1597044264502
],
[
14,
10,
"tom",
78,
1597044264670
]
]
},
"exceptions": [],
"numServersQueried": 1,
"numServersResponded": 1,
"numSegmentsQueried": 1,
"numSegmentsProcessed": 1,
"numSegmentsMatched": 1,
"numConsumingSegmentsQueried": 0,
"numDocsScanned": 10,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 50,
"numGroupsLimitReached": false,
"totalDocs": 10,
"timeUsedMs": 11,
"segmentStatistics": [],
"traceInfo": {},
"minConsumingFreshnessTimeMs": 0
}