Setup Pinot by starting each component individually
You can try out pre-built Pinot all-in-one docker image.
(Optional) You can also follow the instructions here to build your own images.
Create an isolated bridge network in docker
Start Zookeeper in daemon.
Start ZKUI to browse Zookeeper data at http://localhost:9090.
Start Pinot Controller in daemon and connect to Zookeeper.
Start Pinot Broker in daemon and connect to Zookeeper.
Start Pinot Server in daemon and connect to Zookeeper.
Now all Pinot related components are started as an empty cluster.
You can run below command to check container status.
Sample Console Output
Download Pinot Distribution from http://pinot.apache.org/download/
See controller page for more details .
Often times we need to customized the setup of Pinot Components. Hence user can compile a config file and use it to start Pinot Components.
Below are the examples config files and sample command to start Pinot.
Below is a sample pinot-controller.conf
used in HelmChart setup.
In order to run Pinot Controller, the command is:
Below are some outstanding configurations you can set in Pinot Controller:
Config Name
Description
Default Value
controller.helix.cluster.name
Pinot Cluster name
PinotCluster
controller.host
Pinot Controller Host
Required if config pinot.set.instance.id.to.hostname is false.
pinot.set.instance.id.to.hostname
When enabled, use server hostname to infer controller.host
false
controller.port
Pinot Controller Port
9000
controller.vip.host
The VIP hostname used to set the download URL for segments
${controller.host}
controller.vip.port
The VIP port used to set the download URL for segments
${controller.port}
controller.data.dir
Directory to host segment data
${java.io.tmpdir}/PinotController
controller.zk.str
Zookeeper URL
localhost:2181
cluster.tenant.isolation.enable
Enable Tenant Isolation, default is single tenant cluster
true
Below is a sample pinot-broker.conf
used in HelmChart setup.
In order to run Pinot Broker, the command is:
Below are some outstanding configurations you can set in Pinot Broker:
Config Name
Description
Default Value
instanceId
Unique id to register Pinot Broker in the cluster.
BROKER_${BROKER_HOST}_${pinot.broker.client.queryPort}
pinot.set.instance.id.to.hostname
When enabled, use server hostname to set ${BROKER_HOST} in above config, else use IP address.
false
pinot.broker.client.queryPort
Port to query Pinot Broker
8099
pinot.broker.timeoutMs
Timeout for Broker Query in Milliseconds
10000
pinot.broker.enable.query.limit.override
Configuration to enable Query LIMIT Override to protect Pinot Broker and Server from fetch too many records back.
false
pinot.broker.query.response.limit
When config pinot.broker.enable.query.limit.override is enabled, reset limit for selection query if it exceeds this value.
2147483647
pinot.broker.startup.minResourcePercent
Configuration to consider the broker ServiceStatus as being STARTED if the percent of resources (tables) that are ONLINE for this this broker has crossed the threshold percentage of the total number of tables that it is expected to serve
100.0
Below is a sample pinot-server.conf
used in HelmChart setup.
In order to run Pinot Server, the command is:
Below are some outstanding configurations you can set in Pinot Server:
Config Name
Description
Default Value
instanceId
Unique id to register Pinot Server in the cluster.
Server_${SERVER_HOST}_${pinot.server.netty.port}
pinot.set.instance.id.to.hostname
When enabled, use server hostname to set ${SERVER_HOST} in above config, else use IP address.
false
pinot.server.netty.port
Port to query Pinot Server
8098
pinot.server.adminapi.port
Port for Pinot Server Admin UI
8097
pinot.server.instance.dataDir
Directory to hold all the data
${java.io.tmpDir}/PinotServer/index
pinot.server.instance.segmentTarDir
Directory to hold temporary segments downloaded from Controller or Deep Store
${java.io.tmpDir}/PinotServer/segmentTar
pinot.server.query.executor.timeout
Timeout for Server to process Query in Milliseconds
15000
A TABLE in regular database world is represented as <TABLE>_OFFLINE and/or <TABLE>_REALTIME in Pinot depending on the ingestion mode (batch, real-time, hybrid)
See examples
for all possible batch/streaming tables.
Please see Batch Tables for table configuration details and how to customize it.
Sample Console Output
Please see Streaming Tables for table configuration details and how to customize it.
Start Kafka
Create a Kafka Topic
Create a Streaming table
Sample output
Start Kafka-Zookeeper
Start Kafka
Create stream table
Now that the table is configured, let's load some data. Data can be loaded in batch mode or streaming mode. See ingestion overview page for details. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster.
User can always generate and push segments to Pinot via standalone scripts or using frameworks such as Hadoop or Spark. See this page for more details on setting up Data Ingestion Jobs.
Below example goes with the standalone mode.
Sample Console Output
JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location. See Pinot Ingestion Job for more details.
Run below command to stream JSON data into Kafka topic: flights-realtime
Run below command to stream JSON data into Kafka topic: flights-realtime
Segments for offline tables are constructed outside of Pinot, typically in Hadoop via map-reduce jobs and ingested into Pinot via REST API provided by the Controller. Pinot provides libraries to create Pinot segments out of input files in AVRO, JSON or CSV formats in a hadoop job, and push the constructed segments to the controllers via REST APIs.
When an Offline segment is ingested, the controller looks up the table’s configuration and assigns the segment to the servers that host the table. It may assign multiple servers for each segment depending on the number of replicas configured for that table.
Pinot supports different segment assignment strategies that are optimized for various use cases.
Once segments are assigned, Pinot servers get notified via Helix to “host” the segment. The servers download the segments (as a cached local copy to serve queries) and load them into local memory. All segment data is maintained in memory as long as the server hosts that segment.
Once the server has loaded the segment, Helix notifies brokers of the availability of these segments. The brokers start include the new segments for queries. Brokers support different routing strategies depending on the type of table, the segment assignment strategy and the use case.
Data in offline segments are immutable (Rows cannot be added, deleted, or modified). However, segments may be replaced with modified data.
Segments for realtime tables are constructed by Pinot servers with rows ingested from data streams such as Kafka. Rows ingested from streams are made available for query processing as soon as they are ingested, thus enabling applications such as those that need real-time charts on analytics.
In large scale installations, data in streams is typically split across multiple stream partitions. The underlying stream may provide consumer implementations that allow applications to consume data from any subset of partitions, including all partitions (or, just from one partition).
A pinot table can be configured to consume from streams in one of two modes:
LowLevel
: This is the preferred mode of consumption. Pinot creates independent partition-level consumers for each partition. Depending on the the configured number of replicas, multiple consumers may be created for each partition, taking care that no two replicas exist on the same server host. Therefore you need to provision at least as many hosts as the number of replcias configured.
HighLevel
: Pinot creates one stream-level consumer that consumes from all partitions. Each message consumed could be from any of the partitions of the stream. Depending on the configured number of replicas, multiple stream-level consumers are created, taking care that no two replicas exist on the same server host. Therefore you need to provision exactly as many hosts as the number of replicas configured.
Of course, the underlying stream should support either mode of consumption in order for a Pinot table to use that mode. Kafka has support for both of these modes. See Pluggable Streams for more information on support of other data streams in Pinot.
In either mode, Pinot servers store the ingested rows in volatile memory until either one of the following conditions are met:
A certain number of rows are consumed
The consumption has gone on for a certain length of time
(See StreamConfigs Section on how to set these values, or have pinot compute them for you)
Upon reaching either one of these limits, the servers do the following:
Pause consumption
Persist the rows consumed so far into non-volatile storage
Continue consuming new rows into volatile memory again.
The persisted rows form what we call a completed segment (as opposed to a consuming segment that resides in volatile memory).
In LowLevel
mode, the completed segments are persisted the into local non-volatile store of pinot server as well as the segment store of the pinot cluster (See Pinot Architecture Overview). This allows for easy and automated mechanisms for replacing pinot servers, or expanding capacity, etc. Pinot has special mechanisms that ensure that the completed segment is equivalent across all replicas.
During segment completion, one winner is chosen by the controller from all the replicas as the committer server
. The committer server
builds the segment and uploads it to the controller. All the other non-committer servers
follow one of these two paths:
If the in-memory segment is equivalent to the committed segment, the non-committer
server also builds the segment locally and replaces the in-memory segment
If the in-memory segment is non equivalent to the committed segment, the non-committer
server downloads the segment from the controller.
For more details on this protocol, please refer to this doc.
In HighLevel
mode, the servers persist the consumed rows into local store (and not the segment store). Since consumption of rows can be from any partition, it is not possible to guarantee equivalence of segments across replicas.
See Consuming and Indexing rows in Realtime for details.