arrow-left

All pages
gitbookPowered by GitBook
1 of 5

Loading...

Loading...

Loading...

Loading...

Loading...

Data Ingestion Overview

hashtag
Ingesting Offline data

Segments for offline tables are constructed outside of Pinot, typically in Hadoop via map-reduce jobs and ingested into Pinot via REST API provided by the Controller. Pinot provides libraries to create Pinot segments out of input files in AVRO, JSON or CSV formats in a hadoop job, and push the constructed segments to the controllers via REST APIs.

When an Offline segment is ingested, the controller looks up the table’s configuration and assigns the segment to the servers that host the table. It may assign multiple servers for each segment depending on the number of replicas configured for that table.

Pinot supports different segment assignment strategies that are optimized for various use cases.

Once segments are assigned, Pinot servers get notified via Helix to “host” the segment. The segments are downloaded from the remote segment store to the local storage, untarred, and memory-mapped.

Once the server has loaded (memory-mapped) the segment, Helix notifies brokers of the availability of these segments. The brokers start to include the new segments for queries. Brokers support different routing strategies depending on the type of table, the segment assignment strategy, and the use case.

Data in offline segments are immutable (Rows cannot be added, deleted, or modified). However, segments may be replaced with modified data.

hashtag
Ingesting Realtime Data

Segments for realtime tables are constructed by Pinot servers with rows ingested from data streams such as Kafka. Rows ingested from streams are made available for query processing as soon as they are ingested, thus enabling applications such as those that need real-time charts on analytics.

In large scale installations, data in streams is typically split across multiple stream partitions. The underlying stream may provide consumer implementations that allow applications to consume data from any subset of partitions, including all partitions (or, just from one partition).

A pinot table can be configured to consume from streams in one of two modes:

  • LowLevel: This is the preferred mode of consumption. Pinot creates independent partition-level consumers for each partition. Depending on the the configured number of replicas, multiple consumers may be created for each partition, taking care that no two replicas exist on the same server host. Therefore you need to provision at least as many hosts as the number of replcias configured.

  • HighLevel: Pinot creates one stream-level consumer that consumes from all partitions. Each message consumed could be from any of the partitions of the stream. Depending on the configured number of replicas, multiple stream-level consumers are created, taking care that no two replicas exist on the same server host. Therefore you need to provision exactly as many hosts as the number of replicas configured.

Of course, the underlying stream should support either mode of consumption in order for a Pinot table to use that mode. Kafka has support for both of these modes. See for more information on the support of other data streams in Pinot.

In either mode, Pinot servers store the ingested rows in volatile memory until either one of the following conditions are met:

  1. A certain number of rows are consumed

  2. The consumption has gone on for a certain length of time

(See on how to set these values, or have pinot compute them for you)

Upon reaching either one of these limits, the servers do the following:

  • Pause consumption

  • Persist the rows consumed so far into non-volatile storage

  • Continue consuming new rows into volatile memory again.

The persisted rows form what we call a completed segment (as opposed to a consuming segment that resides in volatile memory).

In LowLevel mode, the completed segments are persisted the into local non-volatile store of pinot server as well as the segment store of the pinot cluster (See ). This allows for easy and automated mechanisms for replacing pinot servers, or expanding capacity, etc. Pinot has that ensure that the completed segment is equivalent across all replicas.

During segment completion, one winner is chosen by the controller from all the replicas as the committer server. The committer server builds the segment and uploads it to the controller. All the other non-committer servers follow one of these two paths:

  1. If the in-memory segment is equivalent to the committed segment, the non-committer server also builds the segment locally and replaces the in-memory segment

  2. If the in-memory segment is non equivalent to the committed segment, the non-committer server downloads the segment from the controller.

For more details on this protocol, please refer to .

In HighLevel mode, the servers persist the consumed rows into local store (and not the segment store). Since consumption of rows can be from any partition, it is not possible to guarantee equivalence of segments across replicas.

See for details.

Stream ingestion
StreamConfigs Section
Pinot Architecture Overview
special mechanismsarrow-up-right
this docarrow-up-right
Consuming and Indexing rows in Realtimearrow-up-right

Advanced

Ingestion Transformations

Raw source data often needs to undergo some transformations before it is pushed to Pinot.

Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.

A preprocessing job is usually needed to perform these operations. In streaming data sources you might write a Samza job and create an intermediate topic to store the transformed data.

For simple transformations, this can result in inconsistencies in the batch/stream data source and increase maintenance and operator overhead.

To make things easier, Pinot supports transformations that can be applied via the table config.

hashtag
Transformation Functions

Pinot supports the following functions:

  1. Groovy functions

  2. Inbuilt functions

circle-exclamation

A transformation function cannot mix Groovy and inbuilt functions - you can only use one type of function at a time.

hashtag
Groovy functions

Groovy functions can be defined using the syntax:

Any valid Groovy expression can be used.

⚠️ Disabling Groovy

Allowing execuatable Groovy in ingestion transformation can be a security vulnerability. If you would like to disable Groovy for ingestion, you can set the following controller config.

controller.disable.ingestion.groovy=true

If not set, Groovy for ingestion transformation is enabled by default.

hashtag
Inbuilt Pinot functions

There are also several inbuilt functions that can be used directly as ingestion transform functions

hashtag
DateTime functions

These functions enable time transformations.

toEpochXXX

Converts from epoch milliseconds to a higher granularity.

Function name
Description

toEpochXXXRounded

Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000 (2020-05-01 42:29:12) is 26474489 minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480 (2020-05-01 42:20:00)

Function Name
Description

fromEpochXXX

Converts from an epoch granularity to milliseconds.

Function Name
Description

Simple date format

Converts simple date format strings to milliseconds and vice-a-versa, as per the provided pattern string.

Function name
Description
circle-info

Note

Letters that are not part of Simple Date Time legend () need to be escaped. For example:

"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"

hashtag
JSON functions

Function name
Description

hashtag
Types of transformation

hashtag
Filtering

Records can be filtered as they are being ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.

If the expression evaluates to true, the record will be filtered out. The expressions can use any of the transform functions described in the previous section.

Consider a table that has a column timestamp. If you want to filter out records that are older than timestamp 1589007600000, you could apply the following function:

Consider a table that has a string column campaign and a multi-value column double column prices. If you want to filter out records where campaign = 'X' or 'Y' and sum of all elements in prices is less than 100, you could apply the following function:

hashtag
Column Transformation

Transform functions can be defined on columns in the ingestion config of the table config.

For example, imagine that our source data contains the prices and timestamp fields. We want to extract the maximum price and store that in the maxPrices field and convert the timestamp into the number of hours since the epoch and store it in the hoursSinceEpoch field. You can do this by applying the following transformation:

Below are some examples of commonly used functions.

hashtag
String concatenation

Concat firstName and lasName to get fullName

hashtag
Find an element in an array

Find max value in array bids

hashtag
Time transformation

Convert timestamp from MILLISECONDS to HOURS

hashtag
Column name change

Change name of the column from user_id to userId

hashtag
Extract value from a column containing space

Pinot doesn't support columns that have spaces, so if a source data column has a space, we'll need to store that value in a column with a supported name. To extract the value from first Name into the column firstName, run the following:

hashtag
Ternary operation

If eventType is IMPRESSION set impression to 1. Similar for CLICK.

hashtag
AVRO Map

Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping. 1) The keys of the map as map_keys 2) The values of the map as map_values

hashtag
Chaining transformations

Transformations can be chained. This means that you can use a field created by a transformation in another transformation function.

For example, we might have the following JSON document in the data field of our source data:

We can apply one transformation to extract the userId and then another one to pull out the numerical part of the identifier:

hashtag
Flattening

There are 2 kinds of flattening:

hashtag
One record into many

This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$. The segment generation drivers will treat this as a special case and handle the multiple records case.

hashtag
Extract attributes from complex objects

Feature TBD

toEpochSeconds

Converts epoch millis to epoch seconds.

Usage:"toEpochSeconds(millis)"

toEpochMinutes

Converts epoch millis to epoch minutes

Usage: "toEpochMinutes(millis)"

toEpochHours

Converts epoch millis to epoch hours

Usage: "toEpochHours(millis)"

toEpochDays

Converts epoch millis to epoch days

Usage: "toEpochDays(millis)"

toEpochSecondsRounded

Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)"

toEpochMinutesRounded

Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)"

toEpochHoursRounded

Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)"

toEpochDaysRounded

Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)"

fromEpochSeconds

Converts from epoch seconds to milliseconds

"fromEpochSeconds(secondsSinceEpoch)"

fromEpochMinutes

Converts from epoch minutes to milliseconds

"fromEpochMinutes(minutesSinceEpoch)"

fromEpochHours

Converts from epoch hours to milliseconds

"fromEpochHours(hoursSinceEpoch)"

fromEpochDays

Converts from epoch days to milliseconds

"fromEpochDays(daysSinceEpoch)"

ToDateTime

Converts from milliseconds to a formatted date time string, as per the provided pattern

"toDateTime(millis, 'yyyy-MM-dd')"

FromDateTime

Converts a formatted date time string to milliseconds, as per the provided pattern

"fromDateTime(dateTimeStr, 'EEE MMM dd HH:mm:ss ZZZ yyyy')"

json_format

Converts a JSON/AVRO complex object to a string. This json map can then be queried using jsonExtractScalararrow-up-right function.

"json_format(jsonMapField)"

https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.htmlarrow-up-right
Groovy({groovy script}, argument1, argument2...argumentN)
"tableConfig": {
    "tableName": ...,
    "tableType": ...,
    "ingestionConfig": {
        "filterConfig": {
            "filterFunction": "<expression>"
        }
    }
}
"ingestionConfig": {
    "filterConfig": {
        "filterFunction": "Groovy({timestamp < 1589007600000}, timestamp)"
    }
}
"ingestionConfig": {
    "filterConfig": {
        "filterFunction": "Groovy({(campaign == \"X\" || campaign == \"Y\") && prices.sum() < 100}, prices, campaign)"
    }
}
{ "tableConfig": {
    "tableName": ...,
    "tableType": ...,
    "ingestionConfig": {
        "transformConfigs": [{
          "columnName": "fieldName",
          "transformFunction": "<expression>"
        }
    ],
}
pinot-table-offline.json
{
"tableName": "myTable",
...
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "maxPrice",
      "transformFunction": "Groovy({prices.max()}, prices)" // groovy function
    },
    {
      "columnName": "hoursSinceEpoch",
      "transformFunction": "toEpochHours(timestamp)" // inbuilt function
    }]
  }
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "fullName",
      "transformFunction": "Groovy({firstName+' '+lastName}, firstName, lastName)" 
    }]
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "maxBid",
      "transformFunction": "Groovy({bids.max{ it.toBigDecimal() }}, bids)" 
    }]
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "hoursSinceEpoch",
      "transformFunction": "Groovy({timestamp/(1000*60*60)}, timestamp)" 
    }]
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "userId",
      "transformFunction": "Groovy({user_id}, user_id)" 
    }]
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "firstName",
      "transformFunction": "\"first Name \"" 
    }]
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "impressions",
      "transformFunction": "Groovy({eventType == 'IMPRESSION' ? 1: 0}, eventType)" 
    },
    {
      "columnName": "clicks",
      "transformFunction": "Groovy({eventType == 'CLICK' ? 1: 0}, eventType)" 
    }]
}
"ingestionConfig": {
    "transformConfigs": [{
      "columnName": "map2_keys",
      "transformFunction": "Groovy({map2.sort()*.key}, map2)" 
    },
    {
      "columnName": "map2_values",
      "transformFunction": "Groovy({map2.sort()*.value}, map2)" 
    }]
}
{
  "userId": "12345678__foo__othertext"
}
"ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "userOid",
        "transformFunction": "jsonPathString(data, '$.userId')"
      },
      {
        "columnName": "userId",
        "transformFunction": "Groovy({Long.valueOf(userOid.substring(0, 8))}, userOid)"
      }
   ]
}

Null Value Support

hashtag
Need for special NULL value handling

By default, Pinot transforms null values coming from the data source to a default value determined by the type of the corresponding column (or as specified in the schema). Eg: for INT column, the default will be 0 and for STRING column, the default is "null". This transformation is necessary to ensure all the indices can be built correctly during segment creation. However, we're now unable to keep track of the null values in the Pinot table and hence cannot support queries such as:

There is a workaround by matching with default values in the filter predicate. However, this is error prone since oftentimes it's difficult to distinguish valid values from the default null values. Therefore, we added first class NULL value support in Pinot for overcoming this limitation. As of today, the latest version supports NULL filter predicates only. Generic support for NULL handling in query execution is in progress (eg: within aggregation functions such as count or sum).

hashtag
High Level Architecture

To turn on NULL handling, simply enable the boolean flag in the table index config called as nullHandlingEnabled (please see ). Please note - this will cause Pinot to use additional memory and disk space per segment. The details are as follows:

hashtag
Ingestion Phase

During data ingestion (either realtime/offline) eachGenericRow object derived from the original data source record keeps track of all the column names containing null values. This is done as part of the NullValueTransformer. For each such column, the segment creation logic updates a NULL value vector (implemented by a roaring bitmap) with the corresponding document ID. Effectively, at the end of the segment creation process we get a per column NULL value vector which can give us the set of document IDs containing null values for that column. This per column vector is then exposed through the DataSource interface for use in query execution.

hashtag
Query Phase

During Query execution, if the query includes a IS NULL or IS NOT NULL predicate as shown above, we fetch the NULL value vector for the corresponding column within FilterPlanNode and retrieve the corresponding bitmap which represents all document IDs containing NULL values for that column. This bitmap is then used to create a BitmapBasedFilterOperatorwhich does the actual filtering operation.

hashtag

select count(*) from my_table where column IS NOT NULL
tableIndexConfig sectionarrow-up-right

Advanced Pinot Setup

hashtag
Start Pinot components (scripts or docker images)

Setup Pinot by starting each component individually

hashtag
Start Pinot Components using docker

hashtag
Prerequisites

circle-info

If running locally, please ensure your docker cluster has enough resources, below is a sample config.

hashtag
Pull docker image

You can try out pre-built Pinot all-in-one docker image.

(Optional) You can also follow the instructions to build your own images.

hashtag
0. Create a Network

Create an isolated bridge network in docker

hashtag
1. Start Zookeeper

Start Zookeeper in daemon.

Start to browse Zookeeper data at .

hashtag
2. Start Pinot Controller

Start Pinot Controller in daemon and connect to Zookeeper.

hashtag
3. Start Pinot Broker

Start Pinot Broker in daemon and connect to Zookeeper.

hashtag
4. Start Pinot Server

Start Pinot Server in daemon and connect to Zookeeper.

Now all Pinot related components are started as an empty cluster.

You can run below command to check container status.

Sample Console Output

Download Pinot Distribution from

hashtag
Start Pinot components via launcher scripts

hashtag

hashtag
Start Pinot Using Config Files

Often times we need to customized the setup of Pinot Components. Hence user can compile a config file and use it to start Pinot Components.

Below are the examples config files and sample command to start Pinot.

hashtag
Pinot Controller

Below is a sample pinot-controller.conf used in HelmChart setup.

In order to run Pinot Controller, the command is:

hashtag
Configure Controller

Below are some configurations you can set in Pinot Controller. You can head over to for complete list of available configs.

Config Name
Description
Default Value

hashtag
Pinot Broker

Below is a sample pinot-broker.conf used in HelmChart setup.

In order to run Pinot Broker, the command is:

hashtag
Configure Broker

Below are some configurations you can set in Pinot Broker. You can head over to for complete list of available configs.

Config Name
Description
Default Value

hashtag
Pinot Server

Below is a sample pinot-server.conf used in HelmChart setup.

In order to run Pinot Server, the command is:

hashtag
Configure Server

Below are some outstanding configurations you can set in Pinot Server. You can head over to for complete list of available configs.

Config Name
Description
Default Value

hashtag
Create and Configure table

A TABLE in regular database world is represented as <TABLE>_OFFLINE and/or <TABLE>_REALTIME in Pinot depending on the ingestion mode (batch, real-time, hybrid)

See for all possible batch/streaming tables.

hashtag
Batch Table Creation

Please see for table configuration details and how to customize it.

Sample Console Output

hashtag
Streaming Table Creation

Please see for table configuration details and how to customize it.

Start Kafka

Create a Kafka Topic

Create a Streaming table

Sample output

Start Kafka-Zookeeper

Start Kafka

hashtag
Load Data

Now that the table is configured, let's load some data. Data can be loaded in batch mode or streaming mode. See page for details. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster.

hashtag
Load Data in Batch

User can always generate and push segments to Pinot via standalone scripts or using frameworks such as Hadoop or Spark. See this for more details on setting up Data Ingestion Jobs.

Below example goes with the standalone mode.

Sample Console Output

JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location. See for more details.

hashtag
Load Data in Streaming

hashtag
Kafka

Run below command to stream JSON data into Kafka topic: flights-realtime

Run below command to stream JSON data into Kafka topic: flights-realtime

Start Zookeeper

hashtag
Start Pinot Controller

See controller page for more details .

hashtag
Start Pinot Broker

hashtag
Start Pinot Controller

Pinot Controller Port

9000

controller.vip.host

The VIP hostname used to set the download URL for segments

${controller.host}

controller.vip.port

The VIP port used to set the download URL for segments

${controller.port}

controller.data.dir

Directory to host segment data

${java.io.tmpdir}/PinotController

controller.zk.str

Zookeeper URL

localhost:2181

cluster.tenant.isolation.enable

Enable Tenant Isolation, default is single tenant cluster

true

Timeout for Broker Query in Milliseconds

10000

pinot.broker.enable.query.limit.override

Configuration to enable Query LIMIT Override to protect Pinot Broker and Server from fetch too many records back.

false

pinot.broker.query.response.limit

When config pinot.broker.enable.query.limit.override is enabled, reset limit for selection query if it exceeds this value.

2147483647

pinot.broker.startup.minResourcePercent

Configuration to consider the broker ServiceStatus as being STARTED if the percent of resources (tables) that are ONLINE for this this broker has crossed the threshold percentage of the total number of tables that it is expected to serve

100.0

Port for Pinot Server Admin UI

8097

pinot.server.instance.dataDir

Directory to hold all the data

${java.io.tmpDir}/PinotServer/index

pinot.server.instance.segmentTarDir

Directory to hold temporary segments downloaded from Controller or Deep Store

${java.io.tmpDir}/PinotServer/segmentTar

pinot.server.query.executor.timeout

Timeout for Server to process Query in Milliseconds

15000

Create stream table

controller.helix.cluster.name

Pinot Cluster name

PinotCluster

controller.host

Pinot Controller Host

Required if config pinot.set.instance.id.to.hostname is false.

pinot.set.instance.id.to.hostname

When enabled, use server hostname to infer controller.host

false

instanceId

Unique id to register Pinot Broker in the cluster.

BROKER_${BROKER_HOST}_${pinot.broker.client.queryPort}

pinot.set.instance.id.to.hostname

When enabled, use server hostname to set ${BROKER_HOST} in above config, else use IP address.

false

pinot.broker.client.queryPort

Port to query Pinot Broker

8099

instanceId

Unique id to register Pinot Server in the cluster.

Server_${SERVER_HOST}_${pinot.server.netty.port}

pinot.set.instance.id.to.hostname

When enabled, use server hostname to set ${SERVER_HOST} in above config, else use IP address.

false

pinot.server.netty.port

Port to query Pinot Server

8098

here
ZKUIarrow-up-right
http://localhost:9090arrow-up-right
http://pinot.apache.org/download/arrow-up-right
Controller
Broker
Server
examplesarrow-up-right
Batch Tables
Streaming Tables
ingestion overview
page
Pinot Ingestion Job
Sample docker resources

controller.port

pinot.broker.timeoutMs

pinot.server.adminapi.port

export PINOT_VERSION=0.10.0
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
docker network create -d bridge pinot-demo
docker run \
    --network=pinot-demo \
    --name  pinot-zookeeper \
    --restart always \
    -p 2181:2181 \
    -d zookeeper:3.5.6
docker run \
    --network pinot-demo --name=zkui \
    -p 9090:9090 \
    -e ZK_SERVER=pinot-zookeeper:2181 \
    -d qnib/plain-zkui:latest
docker run \
    --network=pinot-demo \
    --name pinot-controller \
    -p 9000:9000 \
    -d ${PINOT_IMAGE} StartController \
    -zkAddress pinot-zookeeper:2181
docker run \
    --network=pinot-demo \
    --name pinot-broker \
    -d ${PINOT_IMAGE} StartBroker \
    -zkAddress pinot-zookeeper:2181
export PINOT_IMAGE=apachepinot/pinot:0.3.0-SNAPSHOT
docker run \
    --network=pinot-demo \
    --name pinot-server \
    -d ${PINOT_IMAGE} StartServer \
    -zkAddress pinot-zookeeper:2181
docker container ls -a
CONTAINER ID        IMAGE                              COMMAND                  CREATED              STATUS                PORTS                                                  NAMES
9e80c3fcd29b        apachepinot/pinot:0.3.0-SNAPSHOT   "./bin/pinot-admin.s…"   18 seconds ago       Up 17 seconds         8096-8099/tcp, 9000/tcp                                pinot-server
f4c42a5865c7        apachepinot/pinot:0.3.0-SNAPSHOT   "./bin/pinot-admin.s…"   21 seconds ago       Up 21 seconds         8096-8099/tcp, 9000/tcp                                pinot-broker
a413b0013806        apachepinot/pinot:0.3.0-SNAPSHOT   "./bin/pinot-admin.s…"   26 seconds ago       Up 25 seconds         8096-8099/tcp, 0.0.0.0:9000->9000/tcp                  pinot-controller
9d3b9c4d454b        zookeeper:3.5.6                    "/docker-entrypoint.…"   About a minute ago   Up About a minute     2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   pinot-zookeeper
$ export PINOT_VERSION=0.10.0
$ tar -xvf apache-pinot-${PINOT_VERSION}-bin.tar.gz

$ cd apache-pinot-${PINOT_VERSION}-bin
$ ls
DISCLAIMER    LICENSE        NOTICE        bin        conf        lib        licenses    query_console    sample_data

$ PINOT_INSTALL_DIR=`pwd`
controller.helix.cluster.name=pinot-quickstart
controller.port=9000
controller.vip.host=pinot-controller
controller.vip.port=9000
controller.data.dir=/var/pinot/controller/data
controller.zk.str=pinot-zookeeper:2181
pinot.set.instance.id.to.hostname=true
bin/pinot-admin.sh StartController -configFileName config/pinot-controller.conf
pinot.broker.client.queryPort=8099
pinot.broker.routing.table.builder.class=random
pinot.set.instance.id.to.hostname=true
bin/pinot-admin.sh StartBroker -clusterName pinot-quickstart -zkAddress pinot-zookeeper:2181 -configFileName config/pinot-broker.conf
pinot.server.netty.port=8098
pinot.server.adminapi.port=8097
pinot.server.instance.dataDir=/var/pinot/server/data/index
pinot.server.instance.segmentTarDir=/var/pinot/server/data/segment
pinot.set.instance.id.to.hostname=true
bin/pinot-admin.sh StartServer -clusterName pinot-quickstart -zkAddress pinot-zookeeper:2181 -configFileName config/pinot-server.conf
docker run \
    --network=pinot-demo \
    --name pinot-batch-table-creation \
    ${PINOT_IMAGE} AddTable \
    -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
    -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
    -controllerHost pinot-controller \
    -controllerPort 9000 \
    -exec
Executing command: AddTable -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -schemaFile examples/batch/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
Sending request: http://pinot-controller:9000/schemas to controller: a413b0013806, version: Unknown
{"status":"Table airlineStats_OFFLINE succesfully added"}
bin/pinot-admin.sh AddTable \
    -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
    -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
    -exec
docker run \
    --network pinot-demo --name=kafka \
    -e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
    -e KAFKA_BROKER_ID=0 \
    -e KAFKA_ADVERTISED_HOST_NAME=kafka \
    -d wurstmeister/kafka:latest
docker exec \
  -t kafka \
  /opt/kafka/bin/kafka-topics.sh \
  --zookeeper pinot-zookeeper:2181/kafka \
  --partitions=1 --replication-factor=1 \
  --create --topic flights-realtime
docker run \
    --network=pinot-demo \
    --name pinot-streaming-table-creation \
    ${PINOT_IMAGE} AddTable \
    -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
    -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json \
    -controllerHost pinot-controller \
    -controllerPort 9000 \
    -exec
Executing command: AddTable -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json -schemaFile examples/stream/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
Sending request: http://pinot-controller:9000/schemas to controller: 8fbe601012f3, version: Unknown
{"status":"Table airlineStats_REALTIME succesfully added"}
bin/pinot-admin.sh StartZookeeper -zkPort 2191
bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2191/kafka -port 19092
docker run \
    --network=pinot-demo \
    --name pinot-data-ingestion-job \
    ${PINOT_IMAGE} LaunchDataIngestionJob \
    -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
SegmentGenerationJobSpec:
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
excludeFileNamePattern: null
executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
  segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
  segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
includeFileNamePattern: glob:**/*.avro
inputDirURI: examples/batch/airlineStats/rawdata
jobType: SegmentCreationAndTarPush
outputDirURI: examples/batch/airlineStats/segments
overwriteOutput: true
pinotClusterSpecs:
- {controllerURI: 'http://pinot-controller:9000'}
pinotFSSpecs:
- {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
pushJobSpec: {pushAttempts: 2, pushParallelism: 1, pushRetryIntervalMillis: 1000,
  segmentUriPrefix: null, segmentUriSuffix: null}
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
  configClassName: null, configs: null, dataFormat: avro}
segmentNameGeneratorSpec: null
tableSpec: {schemaURI: 'http://pinot-controller:9000/tables/airlineStats/schema',
  tableConfigURI: 'http://pinot-controller:9000/tables/airlineStats', tableName: airlineStats}

Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
Finished building StatsCollector!
Collected stats for 403 documents
Created dictionary for INT column: FlightNum with cardinality: 386, range: 14 to 7389
Using fixed bytes value dictionary for column: Origin, size: 294
Created dictionary for STRING column: Origin with cardinality: 98, max length in bytes: 3, range: ABQ to VPS
Created dictionary for INT column: Quarter with cardinality: 1, range: 1 to 1
Created dictionary for INT column: LateAircraftDelay with cardinality: 50, range: -2147483648 to 303
......
......
Pushing segment: airlineStats_OFFLINE_16085_16085_29 to location: http://pinot-controller:9000 for table airlineStats
Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
Response for pushing table airlineStats segment airlineStats_OFFLINE_16085_16085_29 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16085_16085_29 of table: airlineStats"}
Pushing segment: airlineStats_OFFLINE_16084_16084_30 to location: http://pinot-controller:9000 for table airlineStats
Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
Response for pushing table airlineStats segment airlineStats_OFFLINE_16084_16084_30 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16084_16084_30 of table: airlineStats"}
bin/pinot-admin.sh LaunchDataIngestionJob \
    -jobSpecFile examples/batch/airlineStats/ingestionJobSpec.yaml
docker run \
  --network pinot-demo \
  --name=loading-airlineStats-data-to-kafka \
  ${PINOT_IMAGE} StreamAvroIntoKafka \
  -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
  -kafkaTopic flights-realtime -kafkaBrokerList kafka:9092 -zkAddress pinot-zookeeper:2181/kafka
bin/pinot-admin.sh StreamAvroIntoKafka \
  -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
  -kafkaTopic flights-realtime -kafkaBrokerList localhost:19092 -zkAddress localhost:2191/kafka
cd apache-pinot-${PINOT_VERSION}-bin
bin/pinot-admin.sh StartZookeeper
bin/pinot-admin.sh StartController \
    -zkAddress localhost:2181
bin/pinot-admin.sh StartBroker \
    -zkAddress localhost:2181
bin/pinot-admin.sh StartServer \
    -zkAddress localhost:2181
bin/pinot-admin.sh AddTable \
    -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
    -tableConfigFile examples/stream/airlineStats/airlineStats_realtime_table_config.json \
    -exec