arrow-left

All pages
gitbookPowered by GitBook
1 of 11

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Components

Learn about the different components and logical abstractions

This section is a reference for the definition of major components and logical abstractions used in Pinot. Please visit the Basic Concepts section to get a general overview that ties together all of the reference material in this section.

hashtag
Operator reference

Clusterchevron-rightControllerchevron-rightBrokerchevron-rightServerchevron-rightMinionchevron-rightTenantchevron-right

hashtag
Developer reference

hashtag

Tablechevron-right
Schemachevron-right
Segmentchevron-right

Broker

Brokers handle Pinot queries. They accept queries from clients and forward them to the right servers. They collect results back from the servers and consolidate them into a single response, to send back to the client.

Broker interaction with other components

Pinot Brokers are modeled as Helix Spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried.

The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers may optimize to prune some of the segments as long as accuracy is not sacrificed.

Helix provides the framework by which spectators can learn the location in which each partition of a resource (i.e. participant) resides. The brokers use this mechanism to learn the servers that host specific segments of a table.

In the case of hybrid tables, the brokers ensure that the overlap between real-time and offline segment data is queried exactly once, by performing offline and real-time federation.

Let's take this example, we have real-time data for 5 days - March 23 to March 27, and offline data has been pushed until Mar 25, which is 2 days behind real-time. The brokers maintain this time boundary.

Suppose, we get a query to this table : select sum(metric) from table. The broker will split the query into 2 queries based on this time boundary - one for offline and one for realtime. This query becomes - select sum(metric) from table_REALTIME where date >= Mar 25 and select sum(metric) from table_OFFLINE where date < Mar 25

The broker merges results from both these queries before returning the result to the client.

hashtag
Starting a Broker

Make sure you've . If you're using docker, make sure to . To start a broker

Controller

The Pinot Controller is responsible for the following:

  • Maintaining global metadata (e.g. configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.

  • Hosting the Helix Controller and managing other Pinot components (brokers, servers, minions)

  • Maintaining the mapping of which servers are responsible for which segments. This mapping is used by the servers to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.

  • Serving admin endpoints for viewing, creating, updating, and deleting configs, which are used to manage and operate the cluster.

  • Serving endpoints for segment uploads, which are used in offline data pushes. They are responsible for initializing real-time consumption and coordination of persisting real-time segments into the segment store periodically.

  • Undertaking other management activities such as managing retention of segments, validations.

For redundancy, there can be multiple instances of Pinot controllers. Pinot expects that all controllers are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .

hashtag
Starting a Controller

Make sure you've . If you're using docker, make sure to . To start a controller

docker run \
    --network=pinot-demo \
    --name pinot-controller \
    -p 9000:9000 \
    -d ${PINOT_IMAGE} StartController \
    -zkAddress pinot-zookeeper:2181
bin/pinot-admin.sh StartController \
  -zkAddress localhost:2181 \
  -clusterName PinotCluster \
  -controllerPort 9000
ADLSarrow-up-right
setup Zookeeper
pull the pinot docker image
setup Zookeeper
pull the pinot docker image
docker run \
    --network=pinot-demo \
    --name pinot-broker \
    -d ${PINOT_IMAGE} StartBroker \
    -zkAddress pinot-zookeeper:2181
bin/pinot-admin.sh StartBroker \
  -zkAddress localhost:2181 \
  -clusterName PinotCluster \
  -brokerPort 7000

Cluster

Cluster is a set of nodes comprising of servers, brokers, controllers and minions.

Pinot cluster components

Pinot leverages Apache Helixarrow-up-right for cluster management. Helix is a cluster management framework to manage replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.

hashtag
Cluster components

Briefly, Helix divides nodes into three logical components based on their responsibilities

hashtag
Participant

The nodes that host distributed, partitioned resources

hashtag
Spectator

The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).

hashtag
Controller

The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.

Pinot Servers are modeled as Participants, more details about server nodes can be found in . Pinot Brokers are modeled as Spectators, more details about broker nodes can be found in . Pinot Controllers are modeled as Controllers, more details about controller nodes can be found in .

hashtag
Logical view

Another way to visualize the cluster is a logical view, wherein a cluster contains , tenants contain , and tables contain .

hashtag
Setup a Pinot Cluster

Typically, there is only one cluster per environment/data center. There is no needed to create multiple Pinot clusters since Pinot supports the concept of . At LinkedIn, the largest Pinot cluster consists of 1000+ nodes.

To setup a Pinot cluster, we need to first start Zookeeper.

hashtag
0. Create a Network

Create an isolated bridge network in docker

hashtag
1. Start Zookeeper

Once we've started Zookeeper, we can start other components to join this cluster. If you're using docker, pull the latest apachepinot/pinot image.

hashtag
Pull pinot docker image

You can try out pre-built Pinot all-in-one docker image.

(Optional) You can also follow the instructions to build your own images.

To start other components to join the cluster

Explore your cluster via

Server

Servers host the data segments and serve queries off the data they host. There are two types of servers:

Offline Offline servers are responsible for downloading segments from the segment store, to host and serve queries off. When a new segment is uploaded to the controller, the controller decides the servers (as many as replication) that will host the new segment and notifies them to download the segment from the segment store. On receiving this notification, the servers download the segment file and load the segment onto the server, to server queries off them.

Real-time Real-time servers directly ingest from a real-time stream (such as Kafka, EventHubs). Periodically, they make segments of the in-memory ingested data, based on certain thresholds. This segment is then persisted onto the segment store.

Pinot Servers are modeled as Helix Participants, hosting Pinot tables (referred to as resources in Helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).

hashtag
Starting a Server

Make sure you've . If you're using docker, make sure to . To start a server

USAGE

Start Zookeeper in daemon.

hashtag
2. Start Zookeeper UI

Start ZKUIarrow-up-right to browse Zookeeper data at http://localhost:9090arrow-up-right.

Download Pinot Distribution using instructions in Downloadarrow-up-right

hashtag
1. Start Zookeeper

hashtag
2. Start Zooinspector

Install to view the data in Zookeeper, and connect to localhost:2181

Server
Broker
Controller
tenants
tables
segments
tenants
here
Start Controller
Start Broker
Start Server
Pinot Data Explorer
docker run \
    --network=pinot-demo \
    --name pinot-server \
    -d ${PINOT_IMAGE} StartServer \
    -zkAddress pinot-zookeeper:2181
setup Zookeeper
pull the pinot docker image
bin/pinot-admin.sh StartZookeeper -zkPort 2181
docker network create -d bridge pinot-demo
export PINOT_VERSION=latest
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
docker run \
    --network=pinot-demo \
    --name pinot-zookeeper \
    --restart always \
    -p 2181:2181 \
    -d zookeeper:3.5.6
docker run \
    --network pinot-demo --name=zkui \
    -p 9090:9090 \
    -e ZK_SERVER=pinot-zookeeper:2181 \
    -d qnib/plain-zkui:latest
Usage: StartServer
	-serverHost               <String>                      : Host name for controller. (required=false)
	-serverPort               <int>                         : Port number to start the server at. (required=false)
	-serverAdminPort          <int>                         : Port number to serve the server admin API at. (required=false)
	-dataDir                  <string>                      : Path to directory containing data. (required=false)
	-segmentDir               <string>                      : Path to directory containing segments. (required=false)
	-zkAddress                <http>                        : Http address of Zookeeper. (required=false)
	-clusterName              <String>                      : Pinot cluster name. (required=false)
	-configFileName           <Config File Name>            : Broker Starter Config file. (required=false)
	-help                                                   : Print this message. (required=false)
bin/pinot-admin.sh StartServer \
    -zkAddress localhost:2181
Usage: StartServer
	-serverHost               <String>                      : Host name for controller. (required=false)
	-serverPort               <int>                         : Port number to start the server at. (required=false)
	-serverAdminPort          <int>                         : Port number to serve the server admin API at. (required=false)
	-dataDir                  <string>                      : Path to directory containing data. (required=false)
	-segmentDir               <string>                      : Path to directory containing segments. (required=false)
	-zkAddress                <http>                        : Http address of Zookeeper. (required=false)
	-clusterName              <String>                      : Pinot cluster name. (required=false)
	-configFileName           <Config File Name>            : Server Starter Config file. (required=false)
	-help                                                   : Print this message. (required=false)
zooinspectorarrow-up-right

Tenant

A tenant is a logical component defined as a group of server/broker nodes with the same Helix tag.

In order to support multi-tenancy, Pinot has first-class support for tenants. Every table is associated with a server tenant and a broker tenant. This controls the nodes that will be used by this table as servers and brokers. This allows all tables belonging to a particular use case to be grouped under a single tenant name.

The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A and Table B in the same Pinot cluster.

Defining tenants for tables

We can configure Table A with server tenant Tenant A and Table B with server tenant Tenant B. We can tag some of the server nodes for Tenant A and some for Tenant B. This will ensure that segments of Table A only reside on servers tagged with Tenant A, and segment of Table B only reside on servers tagged with Tenant B. The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.

No need to create separate clusters for every table or use case!

hashtag
Tenant Config

This tenant is defined in the section of the table config.

This section contains 2 main fields broker and server , which decide the tenants used for the broker and server components of this table.

In the above example:

  • The table will be served by brokers that have been tagged as brokerTenantName_BROKER in Helix.

  • If this were an offline table, the offline segments for the table will be hosted in Pinot servers tagged in Helix as serverTenantName_OFFLINE

  • If this were a real-time table, the real-time segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in Helix as

hashtag
Creating a tenant

hashtag
Broker tenant

Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant by tagging 3 untagged broker nodes as sampleBrokerTenant_BROKER.

To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances.

Follow instructions in to get Pinot locally, and then

Check out the table config in the to make sure it was successfully uploaded.

hashtag
Server tenant

Here's a sample server tenant config. This will create a server tenant sampleServerTenant by tagging 1 untagged server node as sampleServerTenant_OFFLINE and 1 untagged server node as sampleServerTenant_REALTIME.

To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances + realtimeInstances.

Follow instructions in to get Pinot locally, and then

Check out the table config in the to make sure it was successfully uploaded.

serverTenantName_REALTIME
.
tenants
Getting Pinot
Rest APIarrow-up-right
Getting Pinot
Rest APIarrow-up-right
Table isolation using tenants

Pinot Data Explorer

Explore the data on our Pinot cluster

Once you have set up the Cluster, you can start exploring the data and the APIs. Pinot 0.5.0 comes bundled with a completely new UI.

Navigate to http://localhost:9000arrow-up-right in your browser to open the controller UI.

Let's take a look at the following two features on the UI

hashtag
Query Console

Let us run some queries on the data in the Pinot cluster. Head over to to see the querying interface.

We can see our baseballStats table listed on the left (you will see meetupRSVP or airlineStats if you used the streaming or the hybrid quick start). Click on the table name to display all the names along with the data types of the columns of the table. You can also execute a sample query select * from baseballStats limit 10 by typing it in the text box and clicking the Run Query button.

Cmd + Enter can also be used to run the query when focused on the console.

You can also try out the following queries:

Pinot supports a subset of standard SQL. For more information, see .

hashtag
Rest API

The contains all the APIs that you will need to operate and manage your cluster. It provides a set of APIs for Pinot cluster management including health check, instances management, schema and table management, data segments management.

Let's check out the tables in this cluster by going to and click on Try it out!. We can see thebaseballStats table listed here. We can also see the exactcurl call made to the controller API.

You can look at the configuration of this table by going to , type in baseballStats in the table name, and click Try it out!

Let's check out the schemas in the cluster by going to and click Try it out!. We can see a schema called baseballStats in this list.

Take a look at the schema by going to , type baseballStats in the schema name, and click Try it out!.

Finally, let's checkout the data segments in the cluster by going to , type in baseballStats in the table name, and click Try it out!. There's 1 segment for this table, called baseballStats_OFFLINE_0.

To learn how to uploaded your own data and schema, head over to or .

"tenants": {
  "broker": "brokerTenantName",
  "server": "serverTenantName"
}
sample-broker-tenant.json
{
     "tenantRole" : "BROKER",
     "tenantName" : "sampleBrokerTenant",
     "numberOfInstances" : 3
}
bin/pinot-admin.sh AddTenant \
    -name sampleBrokerTenant 
    -role BROKER 
    -instanceCount 3 -exec
curl -i -X POST -H 'Content-Type: application/json' -d @sample-broker-tenant.json localhost:9000/tenants
sample-server-tenant.json
{
     "tenantRole" : "SERVER",
     "tenantName" : "sampleServerTenant",
     "offlineInstances" : 1,
     "realtimeInstances" : 1
}
bin/pinot-admin.sh AddTenant \
    -name sampleServerTenant \
    -role SERVER \
    -offlineInstanceCount 1 \
    -realtimeInstanceCount 1 -exec
curl -i -X POST -H 'Content-Type: application/json' -d @sample-server-tenant.json localhost:9000/tenants
Query Consolearrow-up-right
Pinot Query Language
Pinot Admin UIarrow-up-right
Table -> List all tables in clusterarrow-up-right
Tables -> Get/Enable/Disable/Drop a tablearrow-up-right
Schema -> List all schemas in the clusterarrow-up-right
Schema -> Get a schemaarrow-up-right
Segment -> List all segmentsarrow-up-right
Batch Ingestion
Stream ingestion
List all tables in cluster
List all schemas in the cluster
select playerName, max(hits) 
from baseballStats 
group by playerName 
order by max(hits) desc
select sum(hits), sum(homeRuns), sum(numberOfGames) 
from baseballStats 
where yearID > 2010
select * 
from baseballStats 
order by league
{
  "schemaName": "baseballStats",
  "dimensionFieldSpecs": [
    {
      "name": "playerID",
      "dataType": "STRING"
    },
    {
      "name": "yearID",
      "dataType": "INT"
    },
    {
      "name": "teamID",
      "dataType": "STRING"
    },
    {
      "name": "league",
      "dataType": "STRING"
    },
    {
      "name": "playerName",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "playerStint",
      "dataType": "INT"
    },
    {
      "name": "numberOfGames",
      "dataType": "INT"
    },
    {
      "name": "numberOfGamesAsBatter",
      "dataType": "INT"
    },
    {
      "name": "AtBatting",
      "dataType": "INT"
    },
    {
      "name": "runs",
      "dataType": "INT"
    },
    {
      "name": "hits",
      "dataType": "INT"
    },
    {
      "name": "doules",
      "dataType": "INT"
    },
    {
      "name": "tripples",
      "dataType": "INT"
    },
    {
      "name": "homeRuns",
      "dataType": "INT"
    },
    {
      "name": "runsBattedIn",
      "dataType": "INT"
    },
    {
      "name": "stolenBases",
      "dataType": "INT"
    },
    {
      "name": "caughtStealing",
      "dataType": "INT"
    },
    {
      "name": "baseOnBalls",
      "dataType": "INT"
    },
    {
      "name": "strikeouts",
      "dataType": "INT"
    },
    {
      "name": "intentionalWalks",
      "dataType": "INT"
    },
    {
      "name": "hitsByPitch",
      "dataType": "INT"
    },
    {
      "name": "sacrificeHits",
      "dataType": "INT"
    },
    {
      "name": "sacrificeFlies",
      "dataType": "INT"
    },
    {
      "name": "groundedIntoDoublePlays",
      "dataType": "INT"
    },
    {
      "name": "G_old",
      "dataType": "INT"
    }
  ]
}

Segment

Pinot has the concept of a table, which is a logical abstraction to refer to a collection of related data. Pinot has a distributed architecture and scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, the entire data needs to be distributed across multiple nodes.

Pinot achieves this by breaking the data into smaller chunks known as segments (similar to shards/partitions in relational databases). Segments can be seen as time-based partitions.

Thus, a segment is a horizontal shard representing a chunk of table data with some number of rows. The segment stores data for all columns of the table. Each segment packs the data in a columnar fashion, along with the dictionaries and indices for the columns. The segment is laid out in a columnar format so that it can be directly mapped into memory for serving queries.

Columns can be single or multi-valued and the following types are supported: STRING, INT, LONG, FLOAT, DOUBLE or BYTES.

Columns may be declared to be metric or dimension (or specifically as a time dimension) in the schema. Columns can have default null values. For example, the default null value of a integer column can be 0. The default value for bytes columns must be hex-encoded before it's added to the schema.

Pinot uses dictionary encoding to store values as a dictionary ID. Columns may be configured to be “no-dictionary” column in which case raw values are stored. Dictionary IDs are encoded using minimum number of bits for efficient storage (e.g. a column with a cardinality of 3 will use only 2 bits for each dictionary ID).

A forward index is built for each column and compressed for efficient memory use. In addition, you can optionally configure inverted indices for any set of columns. Inverted indices take up more storage, but improve query performance. Specialized indexes like Star-Tree index are also supported. For more details, see .

hashtag
Creating a segment

Once the table is configured, we can load some data. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster. Data can be loaded in batch mode or streaming mode. For more details, see the page.

hashtag
Load Data in Batch

hashtag
Prerequisites

Below are instructions to generate and push segments to Pinot via standalone scripts. For a production setup, you should use frameworks such as Hadoop or Spark. For more details on setting up data ingestion jobs, see

hashtag
Job Spec YAML

To generate a segment, we need to first create a job spec YAML file. This file contains all the information regarding data format, input data location, and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location. For full configurations, see .

hashtag
Create and push segment

To create and push the segment in one go, use

Sample Console Output

Alternately, you can separately create and then push, by changing the jobType to SegmentCreation or SegmenTarPush.

hashtag
Templating Ingestion Job Spec

The Ingestion job spec supports templating with Groovy Syntax.

This is convenient if you want to generate one ingestion job template file and schedule it on a daily basis with extra parameters updated daily.

e.g. you could set inputDirURI with parameters to indicate the date, so that the ingestion job only processes the data for a particular date. Below is an example that templates the date for input and output directories.

You can pass in arguments containing values for ${year}, ${month}, ${day} when kicking off the ingestion job: -values $param=value1 $param2=value2...

This ingestion job only generates segments for date 2014-01-03

hashtag
Load Data in Streaming

Prerequisites

Below is an example of how to publish sample data to your stream. As soon as data is available to the realtime stream, it starts getting consumed by the realtime servers

hashtag
Kafka

Run below command to stream JSON data into Kafka topic: flights-realtime

Run below command to stream JSON data into Kafka topic: flights-realtime

Indexing
ingestion overview
Setup a cluster
Create broker and server tenants
Create an offline table
Import Data.arrow-up-right
Ingestion Job Spec
Setup a cluster
Create broker and server tenants
Create a realtime table and setup a realtime stream
job-spec.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'

jobType: SegmentCreationAndTarPush
inputDirURI: 'examples/batch/baseballStats/rawdata'
includeFileNamePattern: 'glob:**/*.csv'
excludeFileNamePattern: 'glob:**/*.tmp'
outputDirURI: 'examples/batch/baseballStats/segments'
overwriteOutput: true

pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS

recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
  configs:

tableSpec:
  tableName: 'baseballStats'
  schemaURI: 'http://localhost:9000/tables/baseballStats/schema'
  tableConfigURI: 'http://localhost:9000/tables/baseballStats'
  
segmentNameGeneratorSpec:

pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

pushJobSpec:
  pushParallelism: 2
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
docker run \
    --network=pinot-demo \
    --name pinot-data-ingestion-job \
    ${PINOT_IMAGE} LaunchDataIngestionJob \
    -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
SegmentGenerationJobSpec:
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
excludeFileNamePattern: null
executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
  segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
  segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
includeFileNamePattern: glob:**/*.avro
inputDirURI: examples/batch/airlineStats/rawdata
jobType: SegmentCreationAndTarPush
outputDirURI: examples/batch/airlineStats/segments
overwriteOutput: true
pinotClusterSpecs:
- {controllerURI: 'http://pinot-controller:9000'}
pinotFSSpecs:
- {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
pushJobSpec: {pushAttempts: 2, pushParallelism: 1, pushRetryIntervalMillis: 1000,
  segmentUriPrefix: null, segmentUriSuffix: null}
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
  configClassName: null, configs: null, dataFormat: avro}
segmentNameGeneratorSpec: null
tableSpec: {schemaURI: 'http://pinot-controller:9000/tables/airlineStats/schema',
  tableConfigURI: 'http://pinot-controller:9000/tables/airlineStats', tableName: airlineStats}

Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
Finished building StatsCollector!
Collected stats for 403 documents
Created dictionary for INT column: FlightNum with cardinality: 386, range: 14 to 7389
Using fixed bytes value dictionary for column: Origin, size: 294
Created dictionary for STRING column: Origin with cardinality: 98, max length in bytes: 3, range: ABQ to VPS
Created dictionary for INT column: Quarter with cardinality: 1, range: 1 to 1
Created dictionary for INT column: LateAircraftDelay with cardinality: 50, range: -2147483648 to 303
......
......
Pushing segment: airlineStats_OFFLINE_16085_16085_29 to location: http://pinot-controller:9000 for table airlineStats
Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
Response for pushing table airlineStats segment airlineStats_OFFLINE_16085_16085_29 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16085_16085_29 of table: airlineStats"}
Pushing segment: airlineStats_OFFLINE_16084_16084_30 to location: http://pinot-controller:9000 for table airlineStats
Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
Response for pushing table airlineStats segment airlineStats_OFFLINE_16084_16084_30 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16084_16084_30 of table: airlineStats"}
bin/pinot-admin.sh LaunchDataIngestionJob \
    -jobSpecFile examples/batch/airlineStats/ingestionJobSpec.yaml
inputDirURI: 'examples/batch/airlineStats/rawdata/${year}/${month}/${day}'
outputDirURI: 'examples/batch/airlineStats/segments/${year}/${month}/${day}'
docker run \
    --network=pinot-demo \
    --name pinot-data-ingestion-job \
    ${PINOT_IMAGE} LaunchDataIngestionJob \
    -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
    -values year=2014 month=01 day=03
docker run \
  --network pinot-demo \
  --name=loading-airlineStats-data-to-kafka \
  ${PINOT_IMAGE} StreamAvroIntoKafka \
  -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
  -kafkaTopic flights-realtime -kafkaBrokerList kafka:9092 -zkAddress pinot-zookeeper:2181/kafka
bin/pinot-admin.sh StreamAvroIntoKafka \
  -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
  -kafkaTopic flights-realtime -kafkaBrokerList localhost:19092 -zkAddress localhost:2191/kafka

Schema

Each table in Pinot is associated with a Schema. A schema defines what fields are present in the table along with the data types.

The schema is stored in the Zookeeper, along with the table configuration.

hashtag
Categories

A schema also defines what category a column belongs to. Columns in a Pinot table can be categorized into three categories:

hashtag
Data Types

Data types determine the operations that can be performed on a column. Pinot supports the following data types:

circle-exclamation

BOOLEAN, TIMESTAMP, JSON are added after release 0.7.1. In release 0.7.1 and older releases, BOOLEAN is equivalent to STRING.

Pinot also supports columns that contain lists or arrays of items, but there isn't an explicit data type to represent these lists or arrays. Instead, you can indicate that a dimension column accepts multiple values. For more information, see in the Schema configuration reference.

hashtag
Date Time Fields

Since Pinot doesn't have a dedicated DATETIME datatype support, you need to input time in either STRING, LONG, or INT format. However, Pinot needs to convert the date into an understandable format such as epoch timestamp to do operations.

To achieve this conversion, you will need to provide the format of the date along with the data type in the schema. The format is described using the following syntax: timeSize:timeUnit:timeFormat:pattern .

  • time size - the size of the time unit. This size is multiplied to the value present in the time column to get an actual timestamp. e.g. if timesize is 5 and value in time column is 4996308 minutes. The value that will be converted to epoch timestamp will be 4996308 * 5 * 60 * 1000 = 1498892400000 milliseconds. If your date is not in EPOCH format, this value is not used and can be set to 1 or any other integer.

  • time unit - one of enum values. e.g. HOURS , MINUTES etc. If your date is not in EPOCH

Here are some sample date-time formats you can use in the schema:

  • 1:MILLISECONDS:EPOCH - used when timestamp is in the epoch milliseconds and stored in LONG format

  • 1:HOURS:EPOCH - used when timestamp is in the epoch hours and stored in LONG or INT format

hashtag
Built-in Virtual Columns

There are several built-in virtual columns inside the schema the can be used for debugging purposes:

These virtual columns can be used in queries in a similar way to regular columns.

hashtag
Creating a Schema

First, Make sure your and running.

Let's create a schema and put it in a JSON file. For this example, we have created a schema for flight data.

circle-info

For more details on constructing a schema file, see the .

Then, we can upload the sample schema provided above using either a Bash command or REST API call.

Check out the schema in the to make sure it was successfully uploaded

0.0

BOOLEAN

0 (false)

N/A

TIMESTAMP

0 (1970-01-01 00:00:00 UTC)

N/A

STRING

"null"

N/A

JSON

"null"

N/A

BYTES

byte array of length 0

byte array of length 0

format, this value is not used and can be set to
MILLISECONDS
or any other unit.
  • timeFormat - can be either EPOCH or SIMPLE_DATE_FORMAT. If it is SIMPLE_DATE_FORMAT, the pattern string is also specified.

  • pattern - This is optional and is only specified when the date is in SIMPLE_DATE_FORMAT . The pattern should be specified using the java SimpleDateFormatarrow-up-right representation. e.g. 2020-08-21 can be represented as yyyy-MM-dd.

  • 1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd - when the date is in STRING format and has the pattern year-month-date. e.g. 2020-08-21

  • 1:HOURS:SIMPLE_DATE_FORMAT:EEE MMM dd HH:mm:ss ZZZ yyyy - when date is in STRING format. e.g. Mon Aug 24 12:36:50 America/Los_Angeles 2019

  • INT

    Document id of the record within the segment

    Category

    Description

    Dimension

    Dimension columns are typically used in slice and dice operations for answering business queries. Some operations for which dimension columns are used:

    • GROUP BY - group by one or more dimension columns along with aggregations on one or more metric columns

    • Filter clauses such as WHERE

    Metric

    These columns represent the quantitative data of the table. Such columns are used for aggregation. In data warehouse terminology, these can also be referred to as fact or measure columns.

    Some operation for which metric columns are used:

    • Aggregation - SUM, MIN, MAX, COUNT, AVG etc

    • Filter clause such as WHERE

    DateTime

    This column represents time columns in the data. There can be multiple time columns in a table, but only one of them can be treated as primary. The primary time column is the one that is present in the segment config. The primary time column is used by Pinot to maintain the time boundary between offline and real-time data in a hybrid table and for retention management. A primary time column is mandatory if the table's push type is APPEND and optional if the push type is REFRESH .

    Common operations that can be done on time column:

    • GROUP BY

    • Filter clauses such as WHERE

    Data Type

    Default Dimension Value

    Default Metric Value

    INT

    Integer.MIN_VALUEarrow-up-right

    0

    LONG

    Long.MIN_VALUEarrow-up-right

    0

    FLOAT

    Float.NEGATIVE_INFINITYarrow-up-right

    0.0

    DOUBLE

    Column Name

    Column Type

    Data Type

    Description

    $hostName

    Dimension

    STRING

    Name of the server hosting the data

    $segmentName

    Dimension

    STRING

    Name of the segment containing the record

    $docId

    DimensionFieldSpecarrow-up-right
    TimeUnitarrow-up-right
    cluster is up
    Schema configuration referencearrow-up-right
    Rest API arrow-up-right

    Dimension

    Minion

    A Minion is a standby component that leverages the Helix Task Frameworkarrow-up-right to offload computationally intensive tasks from other components.

    It can be attached to an existing Pinot cluster and then execute tasks as provided by the controller. Custom tasks can be plugged via annotations into the cluster. Some typical minion tasks are:

    • Segment creation

    • Segment purge

    • Segment merge

    hashtag
    Starting a Minion

    Make sure you've . If you're using docker, make sure to . To start a minion

    hashtag
    Interfaces

    hashtag
    PinotTaskGenerator

    PinotTaskGenerator interface defines the APIs for the controller to generate tasks for minions to execute.

    hashtag
    PinotTaskExecutorFactory

    Factory for PinotTaskExecutor which defines the APIs for Minion to execute the tasks.

    hashtag
    MinionEventObserverFactory

    Factory for MinionEventObserver which defines the APIs for task event callbacks on minion.

    hashtag
    Built-in Tasks

    hashtag
    SegmentGenerationAndPushTask

    To be added

    hashtag
    RealtimeToOfflineSegmentsTask

    See for details.

    hashtag
    MergeRollupTask

    See for details.

    hashtag
    ConvertToRawIndexTask

    To be added

    hashtag
    Enable Tasks

    Tasks are enabled on a per-table basis. To enable a certain task type (e.g. myTask) on a table, update the table config to include the task type:

    Under each enable task type, custom properties can be configured for the task type.

    hashtag
    Schedule Tasks

    hashtag
    Auto-Schedule

    Tasks can be scheduled periodically for all task types on all enabled tables. Enable auto task scheduling by configuring the schedule frequency in the controller config with the key controller.task.frequencyInSeconds.

    Tasks can also be scheduled based on cron expressions. The cron expression is set in the schedule config for each task type separately. Thie optioncontroller.task.scheduler.enabled should be set to true to enable cron scheduling.

    As shown below, the RealtimeToOfflineSegmentsTask will be scheduled at the first second of every minute (following the syntax ).

    hashtag
    Manual Schedule

    Tasks can be manually scheduled using the following controller rest APIs:

    Rest API
    Description

    hashtag
    Plug-in Custom Tasks

    To plug in a custom task, implement PinotTaskGenerator, PinotTaskExecutorFactory and MinionEventObserverFactory (optional) for the task type (all of them should return the same string for getTaskType()), and annotate them with the following annotations:

    Implementation
    Annotation

    After annotating the classes, put them under the package of name org.apache.pinot.*.plugin.minion.tasks.*, then they will be auto-registered by the controller and minion.

    hashtag
    Example

    See where the TestTask is plugged-in.

    hashtag
    Task-related metrics

    There is a controller job that runs every 5 minutes by default and emits metrics about Minion tasks scheduled in Pinot. The following metrics are emitted for each task type:

    • NumMinionTasksInProgress: Number of running tasks

    • NumMinionSubtasksRunning: Number of running sub-tasks

    • NumMinionSubtasksWaiting: Number of waiting sub-tasks (unassigned to a minion as yet)

    For each task, the Minion will emit these metrics:

    • TASK_QUEUEING: Task queueing time (task_dequeue_time - task_inqueue_time), assuming the time drift between helix controller and pinot minion is minor, otherwise the value may be negative

    • TASK_EXECUTION: Task execution time, which is the time spent on executing the task

    • NUMBER_OF_TASKS: number of tasks in progress on that minion. Whenever a Minion starts a task, increase the Gauge by 1, whenever a Minion completes (either succeeded or failed) a task, decrease it by 1

    flights-schema.json
    {
      "schemaName": "flights",
      "dimensionFieldSpecs": [
        {
          "name": "flightNumber",
          "dataType": "LONG"
        },
        {
          "name": "tags",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": "null"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "price",
          "dataType": "DOUBLE",
          "defaultNullValue": 0
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "millisSinceEpoch",
          "dataType": "LONG",
          "format": "1:MILLISECONDS:EPOCH",
          "granularity": "15:MINUTES"
        },
        {
          "name": "hoursSinceEpoch",
          "dataType": "INT",
          "format": "1:HOURS:EPOCH",
          "granularity": "1:HOURS"
        },
        {
          "name": "dateString",
          "dataType": "STRING",
          "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd",
          "granularity": "1:DAYS"
        }
      ]
    }
    bin/pinot-admin.sh AddSchema -schemaFile flights-schema.json -exec
    
    OR
    
    bin/pinot-admin.sh AddTable -schemaFile flights-schema.json -tableFile flights-table.json -exec
    curl -F [email protected]  localhost:9000/schemas
    Double.NEGATIVE_INFINITYarrow-up-right

    NumMinionSubtasksError: Number of error sub-tasks (completed with an error/exception)

  • PercentMinionSubtasksInQueue: Percent of sub-tasks in waiting or running states

  • PercentMinionSubtasksInError: Percent of sub-tasks in error

  • POST /tasks/schedule

    Schedule tasks for all task types on all enabled tables

    POST /tasks/schedule?taskType=myTask

    Schedule tasks for the given task type on all enabled tables

    POST /tasks/schedule?tableName=myTable_OFFLINE

    Schedule tasks for all task types on the given table

    POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE

    Schedule tasks for the given task type on the given table

    PinotTaskGenerator

    @TaskGenerator

    PinotTaskExecutorFactory

    @TaskExecutorFactory

    MinionEventObserverFactory

    @EventObserverFactory

    docker run \
        --network=pinot-demo \
        --name pinot-minion \
        -d ${PINOT_IMAGE} StartMinion \
        -zkAddress pinot-zookeeper:2181
    bin/pinot-admin.sh StartMinion \
        -zkAddress localhost:2181
    setup Zookeeper
    pull the pinot docker image
    Pinot managed Offline flows
    Minion merge rollup task
    defined herearrow-up-right
    SimpleMinionClusterIntegrationTestarrow-up-right

    Table

    A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The columns, data types, and other metadata related to the table are defined using a schema.

    Pinot breaks a table into multiple segments and stores these segments in a deep-store such as HDFS as well as Pinot servers.

    In the Pinot cluster, a table is modeled as a Helix resourcearrow-up-right and each segment of a table is modeled as a Helix Partitionarrow-up-right.

    Pinot supports the following types of table:

    Type
    Description
    circle-info

    The user querying the database does not need to know the type of the table. They only need to specify the table name in the query.

    e.g. regardless of whether we have an offline table myTable_OFFLINE, a real-time table myTable_REALTIME, or a hybrid table containing both of these, the query will be:

    is used to define the table properties, such as name, type, indexing, routing, retention etc. It is written in JSON format and is stored in Zookeeper, along with the table schema.

    You can use the following properties to make your tables faster or leaner:

    • Segment

    • Indexing

    • Tenants

    hashtag
    Segments

    A table is comprised of small chunks of data. These chunks are known as Segments. To learn more about how Pinot creates and manages segments see

    For offline tables, Segments are built outside of pinot and uploaded using a distributed executor such as Spark or Hadoop. For more details, see .

    For real-time tables, segments are built in a specific interval inside Pinot. You can tune the following for the real-time segments:

    hashtag
    Flush

    The Pinot real-time consumer ingests the data, creates the segment, and then flushes the in-memory segment to disk. Pinot allows you to configure when to flush the segment in the following ways:

    • Number of consumed rows - After consuming X no. of rows from the stream, Pinot will persist the segment to disk

    • Number of desired rows per segment - Pinot learns and then estimates the number of rows that need to be consumed so that the persisted segment is approximately the size. The learning phase starts by setting the number of rows to 100,000 (this value can be changed) and adjusts it to reach the desired segment size. The segment size may go significantly over the desired size during the learning phase. Pinot corrects the estimation as it goes along, so it is not guaranteed that the resulting completed segments are of the exact size as configured. You should set this value to optimize the performance of queries.

    Replicas A segment can have multiple replicas to provide higher availability. You can configure the number of replicas for a table segment using

    Completion Mode By default, if the in-memory segment in the is equivalent to the committed segment, then the non-winner server builds and replaces the segment. If the available segment is not equivalent to the committed segment, the server simply downloads the committed segment from the controller.

    However, in certain scenarios, the segment build can get very memory intensive. It might be desirable to enforce the non-committer servers to just download the segment from the controller, instead of building it again. You can do this by setting completionMode: "DOWNLOAD" in the table configuration

    For more details on why this is needed, see

    Download Scheme

    A Pinot server may fail to download segments from the deep store such as HDFS after its completion. However, you can configure servers to download these segments from peer servers instead of the deep store. Currently, only HTTP and HTTPS download schemes are supported. More methods such as gRPC/Thrift can be added in the future.

    For more details about peer segment download during real-time ingestion, please refer to this design doc on

    hashtag
    Indexing

    You can create multiple indices on a table to increase the performance of the queries. The following types of indices are supported:

      • Dictionary-encoded forward index with bit compression

      • Raw value forward index

    For more details on each indexing mechanism and corresponding configurations, see .

    You can also set up on columns to make queries faster. Further, you can also keep segments in off-heap instead of on-heap memory for faster queries.

    hashtag
    Pre-aggregation

    You can aggregate the real-time stream data as it is consumed to reduce segment sizes. We sum the metric column values of all rows that have the same dimensions and create a single row in the segment. This feature is only available on REALTIME tables.

    The only supported aggregation is SUM. The columns on which pre-aggregation is to be done need to satisfy the following requirements:

    • All metrics should be listed in noDictionaryColumns .

    • There should not be any multi-value dimensions.

    • All dimension columns are treated to have a dictionary, even if they appear as noDictionaryColumns in the config.

    The following table config snippet shows an example of enabling pre-aggregation during real-time ingestion.

    hashtag
    Tenants

    Each table is associated with a tenant. A segment resides on the server, which has the same tenant as itself. For more details on how tenants work, see .

    You can also override if a table should move to a server with different tenant based on segment status.

    A tagOverrideConfig can be added under the tenants section for realtime tables, to override tags for consuming and completed segments. For example:

    In the above example, the consuming segments will still be assigned to serverTenantName_REALTIME hosts, but once they are completed, the segments will be moved to serverTeantnName_OFFLINE. It is possible to specify the full name of any tag in this section (so, for example, you could decide that completed segments for this table should be in pinot servers tagged as allTables_COMPLETED). To learn more about this config, see the section.

    hashtag
    Hybrid Table

    A hybrid table is a table composed of 2 tables, one offline and one real-time that share the same name. In such a table, offline segments may be pushed periodically. The retention on the offline table can be set to a high value since segments are coming in on a periodic basis, whereas the retention on the real-time part can be small.

    Once an offline segment is pushed to cover a recent time period, the brokers automatically switch to using the offline table for segments for that time period and use the real-time table only for data not available in the offline table.

    To understand how time boundary works in the case of a hybrid table, see .

    A typical scenario is pushing a deduped cleaned up data into an offline table every day while consuming real-time data as and when it arrives. The data can be kept in offline tables for even a few years while the real-time data would be cleaned every few days.

    hashtag
    Examples

    Create a table config for your data, or see for all possible batch/streaming tables.

    Prerequisites

    hashtag
    Offline Table Creation

    Sample Console Output

    Check out the table config in the to make sure it was successfully uploaded.

    hashtag
    Streaming Table Creation

    Start Kafka

    Create a Kafka Topic

    Create a Streaming table

    Sample output

    Start Kafka-Zookeeper

    Start Kafka

    Check out the table config in the to make sure it was successfully uploaded.

    hashtag
    Hybrid Table creation

    circle-exclamation

    Note that creating a hybrid table has to be done in 2 separate steps of creating an offline and real-time table individually.

    Usage: StartMinion
        -help                                                   : Print this message. (required=false)
        -minionHost               <String>                      : Host name for minion. (required=false)
        -minionPort               <int>                         : Port number to start the minion at. (required=false)
        -zkAddress                <http>                        : HTTP address of Zookeeper. (required=false)
        -clusterName              <String>                      : Pinot cluster name. (required=false)
        -configFileName           <Config File Name>            : Minion Starter Config file. (required=false)
    public interface PinotTaskGenerator {
    
      /**
       * Initializes the task generator.
       */
      void init(ClusterInfoAccessor clusterInfoAccessor);
    
      /**
       * Returns the task type of the generator.
       */
      String getTaskType();
    
      /**
       * Generates a list of tasks to schedule based on the given table configs.
       */
      List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
    
      /**
       * Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
       */
      default long getTaskTimeoutMs() {
        return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
      }
    
      /**
       * Returns the maximum number of concurrent tasks allowed per instance, 1 by default.
       */
      default int getNumConcurrentTasksPerInstance() {
        return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
      }
    
      /**
       * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
       */
      default void nonLeaderCleanUp() {
      }
    }
    public interface PinotTaskExecutorFactory {
    
      /**
       * Initializes the task executor factory.
       */
      void init(MinionTaskZkMetadataManager zkMetadataManager);
    
      /**
       * Returns the task type of the executor.
       */
      String getTaskType();
    
      /**
       * Creates a new task executor.
       */
      PinotTaskExecutor create();
    }
    public interface PinotTaskExecutor {
    
      /**
       * Executes the task based on the given task config and returns the execution result.
       */
      Object executeTask(PinotTaskConfig pinotTaskConfig)
          throws Exception;
    
      /**
       * Tries to cancel the task.
       */
      void cancel();
    }
    public interface MinionEventObserverFactory {
    
      /**
       * Initializes the task executor factory.
       */
      void init(MinionTaskZkMetadataManager zkMetadataManager);
    
      /**
       * Returns the task type of the event observer.
       */
      String getTaskType();
    
      /**
       * Creates a new task event observer.
       */
      MinionEventObserver create();
    }
    public interface MinionEventObserver {
    
      /**
       * Invoked when a minion task starts.
       *
       * @param pinotTaskConfig Pinot task config
       */
      void notifyTaskStart(PinotTaskConfig pinotTaskConfig);
    
      /**
       * Invoked when a minion task succeeds.
       *
       * @param pinotTaskConfig Pinot task config
       * @param executionResult Execution result
       */
      void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult);
    
      /**
       * Invoked when a minion task gets cancelled.
       *
       * @param pinotTaskConfig Pinot task config
       */
      void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig);
    
      /**
       * Invoked when a minion task encounters exception.
       *
       * @param pinotTaskConfig Pinot task config
       * @param exception Exception encountered during execution
       */
      void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception);
    }
    {
      ...
      "task": {
        "taskTypeConfigsMap": {
          "myTask": {
            "myProperty1": "value1",
            "myProperty2": "value2"
          }
        }
      }
    }
      "task": {
        "taskTypeConfigsMap": {
          "RealtimeToOfflineSegmentsTask": {
            "bucketTimePeriod": "1h",
            "bufferTimePeriod": "1h",
            "schedule": "0 * * * * ?"
          }
        }
      },
    Max time duration to wait - Pinot consumers wait for the configured time duration after which segments are persisted to the disk.

    Sorted forward index with run-length encoding

  • Inverted Index

    • Bitmap inverted index

    • Sorted inverted index

  • Star-tree Index

  • Range Index

  • Text Index

  • Geospatial

  • Create stream table

    Offline

    Offline tables ingest pre-built pinot-segments from external data stores. This is generally used for batch ingestion.

    Realtime

    Realtime tables ingest data from streams (such as Kafka) and build segments from the consumed data.

    Hybrid

    A hybrid Pinot table has both realtime as well as offline tables under the hood. By default, all tables in Pinot are Hybrid in nature.

    Table Configuration
    the official documentation
    Batch Ingestion
    non-winner server
    Completion Config
    bypass deep store for segment completion.arrow-up-right
    Forward Index
    Indexing
    Bloomfilters
    Tenant
    Moving Completed Segments
    Brokerarrow-up-right
    examplesarrow-up-right
    Setup the cluster
    Create broker and server tenants
    Rest APIarrow-up-right
    Rest APIarrow-up-right
    select count(*)
    from myTable
    pinot-table-realtime.json
        "tableIndexConfig": { 
          "noDictionaryColumns": ["metric1", "metric2"],
          "aggregateMetrics": true,
          ...
        }
      "broker": "brokerTenantName",
      "server": "serverTenantName",
      "tagOverrideConfig" : {
        "realtimeConsuming" : "serverTenantName_REALTIME"
        "realtimeCompleted" : "serverTenantName_OFFLINE"
      }
    }
    docker run \
        --network=pinot-demo \
        --name pinot-batch-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -schemaFile examples/batch/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: a413b0013806, version: Unknown
    {"status":"Table airlineStats_OFFLINE succesfully added"}
    bin/pinot-admin.sh AddTable \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -exec
    # add schema
    curl -F schemaName=@airlineStats_schema.json  localhost:9000/schemas
    
    # add table
    curl -i -X POST -H 'Content-Type: application/json' \
        -d @airlineStats_offline_table_config.json localhost:9000/tables
    docker run \
        --network pinot-demo --name=kafka \
        -e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
        -e KAFKA_BROKER_ID=0 \
        -e KAFKA_ADVERTISED_HOST_NAME=kafka \
        -d wurstmeister/kafka:latest
    docker exec \
      -t kafka \
      /opt/kafka/bin/kafka-topics.sh \
      --zookeeper pinot-zookeeper:2181/kafka \
      --partitions=1 --replication-factor=1 \
      --create --topic flights-realtime
    docker run \
        --network=pinot-demo \
        --name pinot-streaming-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json -schemaFile examples/stream/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: 8fbe601012f3, version: Unknown
    {"status":"Table airlineStats_REALTIME succesfully added"}
    bin/pinot-admin.sh StartZookeeper -zkPort 2191
    bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2191/kafka -port 19092
    "OFFLINE": {
        "tableName": "pinotTable", 
        "tableType": "OFFLINE", 
        "segmentsConfig": {
          ... 
        }, 
        "tableIndexConfig": { 
          ... 
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
          ...
        }
      },
      "REALTIME": { 
        "tableName": "pinotTable", 
        "tableType": "REALTIME", 
        "segmentsConfig": {
          ...
        }, 
        "tableIndexConfig": { 
          ... 
          "streamConfigs": {
            ...
          },  
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
        ...
        }
      }
    }
    bin/pinot-admin.sh AddTable \
        -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/stream/airlineStats/airlineStats_realtime_table_config.json \
        -exec