Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
How to turn on the water valve
.csv, .json or .avro), and are case insensitive. Note that the converter expects the .csv extension even if the data is delimited using tabs or spaces instead.# === Index segment creation job config ===
# path.to.input: Input directory containing Avro files
path.to.input=/user/pinot/input/data
# path.to.output: Output directory containing Pinot segments
path.to.output=/user/pinot/output
# path.to.schema: Schema file for the table, stored locally
path.to.schema=flights-schema.json
# segment.table.name: Name of the table for which to generate segments
segment.table.name=flights
# === Segment tar push job config ===
# push.to.hosts: Comma separated list of controllers host names to which to push
push.to.hosts=controller_host_0,controller_host_1
# push.to.port: The port on which the controller runs
push.to.port=8888mvn clean install -DskipTests -Pbuild-shaded-jar
hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentCreation job.propertieshadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentTarPush job.propertiesbin/pinot-admin.sh CreateSegment -dataDir <input_data_dir> [-format [CSV/JSON/AVRO]] [-readerConfigFile <csv_config_file>] [-generatorConfigFile <generator_config_file>] -segmentName <segment_name> -schemaFile <input_schema_file> -tableName <table_name> -outDir <output_data_dir> [-overwrite]{
"fileFormat": "EXCEL",
"header": "col1,col2,col3,col4",
"delimiter": "\t",
"multiValueDelimiter": ","
}{
"schemaName": "flights",
"dimensionFieldSpecs": [
{
"name": "flightNumber",
"dataType": "LONG"
},
{
"name": "tags",
"dataType": "STRING",
"singleValueField": false
}
],
"metricFieldSpecs": [
{
"name": "price",
"dataType": "DOUBLE"
}
],
"timeFieldSpec": {
"incomingGranularitySpec": {
"name": "daysSinceEpoch",
"dataType": "INT",
"timeType": "DAYS"
}
}
}curl -X POST -F segment=@<segment-tar-file-path> http://controllerHost:controllerPort/segmentspinot-tools/target/pinot-tools-pkg/bin//pinot-admin.sh UploadSegment -controllerHost <hostname> -controllerPort <port> -segmentDir <segmentDirectoryPath>pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.{
"tableName": "flights",
"tableType": "REALTIME",
"segmentsConfig": {
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "7",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"replication": "1",
"timeColumnName": "daysSinceEpoch",
"timeType": "DAYS",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
},
"tableIndexConfig": {
"invertedIndexColumns": [
"flightNumber",
"tags",
"daysSinceEpoch"
],
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "highLevel",
"stream.kafka.topic.name": "flights-realtime",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.zk.broker.url": "localhost:2181",
"stream.kafka.hlc.zk.connect.string": "localhost:2181"
}
},
"tenants": {
"broker": "brokerTenant",
"server": "serverTenant"
},
"metadata": {
}
}bin/pinot-admin.sh StartKafka &
bin/pinot-admin.sh StreamAvroIntoKafka -avroFile flights-2014.avro -kafkaTopic flights-realtime &bin/pinot-admin.sh AddTable -filePath flights-definition-realtime.jsonSELECT COUNT(*) FROM flightslocalhost:9092"streamType" : "foo",
"stream.foo.topic.name" : "SomeTopic",
"stream.foo.consumer.type": "LowLevel",
"stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
"stream.foo.consumer.prop.auto.offset.reset": "largest",
"stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
"stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
"stream.foo.connection.timeout.millis" : "10000", // default 30_000
"stream.foo.fetch.timeout.millis" : "10000" // default 5_000"stream.foo.some.buffer.size" : "24g""realtime.segment.flush.threshold.size" : "100000"
"realtime.segment.flush.threshold.time" : "6h"mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "highLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.hlc.bootstrap.server": "localhost:19092"
}{
"tableName": "meetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
},
"metadata": {
"customConfigs": {}
}
}