arrow-left

All pages
gitbookPowered by GitBook
1 of 10

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Cluster

Cluster is a set a nodes comprising of servers, brokers, controllers and minions.

Pinot cluster components

Pinot leverages Apache Helixarrow-up-right for cluster management. Helix is a cluster management framework to manage replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.

hashtag
Cluster components

Briefly, Helix divides nodes into three logical components based on their responsibilities

hashtag
Participant

The nodes that host distributed, partitioned resources

hashtag
Spectator

The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).

hashtag
Controller

The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.

Pinot Servers are modeled as Participants, more details about server nodes can be found in . Pinot Brokers are modeled as Spectators, more details about broker nodes can be found in . Pinot Controllers are modeled as Controllers, more details about controller nodes can be found in .

hashtag
Logical view

Another way to visualize the cluster is a logical view, wherein a cluster contains , tenants contain , and tables contain .

hashtag
Setup a Pinot Cluster

Typically, there is only cluster per environment/data center. There is no needed to create multiple Pinot clusters since Pinot supports the concept of . At LinkedIn, the largest Pinot cluster consists of 1000+ nodes.

To setup a Pinot cluster, we need to first start Zookeeper.

hashtag
0. Create a Network

Create an isolated bridge network in docker

hashtag
1. Start Zookeeper

Once we've started Zookeeper, we can start other components to join this cluster. If you're using docker, pull the latest apachepinot/pinot image.

hashtag
Pull pinot docker image

You can try out pre-built Pinot all-in-one docker image.

(Optional) You can also follow the instructions to build your own images.

To start other components to join the cluster

Explore your cluster via

Start Zookeeper in daemon.

hashtag
2. Start Zookeeper UI

Start ZKUIarrow-up-right to browse Zookeeper data at http://localhost:9090arrow-up-right.

Download Pinot Distribution using instructions in Downloadarrow-up-right

hashtag
1. Start Zookeeper

hashtag
2. Start Zooinspector

Install to view the data in Zookeeper, and connect to localhost:2181

Server
Broker
Controller
tenants
tables
segments
tenants
here
Start Controller
Start Broker
Start Server
Pinot Data Explorer
bin/pinot-admin.sh StartZookeeper -zkPort 2181
docker network create -d bridge pinot-demo
export PINOT_VERSION=0.3.0-SNAPSHOT
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
docker run \
    --network=pinot-demo \
    --name pinot-zookeeper \
    --restart always \
    -p 2181:2181 \
    -d zookeeper:3.5.6
docker run \
    --network pinot-demo --name=zkui \
    -p 9090:9090 \
    -e ZK_SERVER=pinot-zookeeper:2181 \
    -d qnib/plain-zkui:latest
zooinspectorarrow-up-right

Minion

Pinot Minion is a new component which leverages the Helix Task Frameworkarrow-up-right . It can be attached to an existing Pinot cluster and then execute tasks as provided by the controller. It's a generic and single place for running background jobs. They help offload computationally intensive tasks—such as adding indexes to segments and merging segments—from other components.

hashtag
Starting Minion

// coming soon

Components

Learn about the different components and logical abstractions

This section is a reference for the definition of major components and logical abstractions used in Pinot. Please visit the Basic Concepts section to get a general overview that ties together all of the reference material in this section.

hashtag
Operator reference

Clusterchevron-rightControllerchevron-rightBrokerchevron-rightServerchevron-rightMinionchevron-rightTenantchevron-right

hashtag
Developer reference

Controller

The Pinot Controller is responsible for a number of things

  • Controllers maintain the global metadata (e.g. configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.

  • Controllers host Helix Controller and is responsible for managing other pinot components (brokers, servers, minions)

Broker

Brokers are the components that handle Pinot queries. They accept queries from clients and forward them to the right servers. They collect results back from the servers and consolidate them into a single response, to sent it back to the client.

Pinot Brokers are modeled as Spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried. The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers may optimize to prune some of the segments as long as accuracy is not sacrificed. Helix provides the framework by which spectators can learn the location in which each partition of a resource (i.e. participant) resides. The brokers use this mechanism to learn the servers that host specific segments of a table.

In case of hybrid tables, the brokers ensure that the overlap between realtime and offline segment data is queried exactly once, by performing offline and realtime federation

They maintain the mapping of which servers are responsible for which segments. This mapping is used by the servers, to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.

  • Controller has admin endpoints for viewing, creating, updating and deleting configs which help us manage and operate the cluster.

  • Controllers also have endpoints for segment uploads which are used in offline data pushes. They are responsible for initializing realtime consumption and coordination of persisting the realtime segments into the segment store periodically.

  • They undertake other management activities such as managing retention of segments, validations.

  • There can be multiple instances of Pinot controller for redundancy. If there are multiple controllers, Pinot expects that all of them are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or ADLSarrow-up-right.

    hashtag
    Starting a Controller

    Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a controller

    docker run \
        --network=pinot-demo \
        --name pinot-controller \
        -p 9000:9000 \
        -d ${PINOT_IMAGE} StartController \
        -zkAddress pinot-zookeeper:2181
    bin/pinot-admin.sh StartController \
      -zkAddress localhost:2181 \
      -clusterName PinotCluster \
      -controllerPort 9000
    . Let's take this example, we have realtime data for 5 days - March 23 to March 27, and offline data has been pushed until Mar 25, which is 2 days behind realtime. The brokers maintain this time boundary.

    Suppose, we get a query to this table : select sum(metric) from table. The broker will split the query into 2 queries based on this time boundary - one for offline and one for realtime. This query becomes - select sum(metric) from table_REALTIME where date >= Mar 25 and select sum(metric) from table_OFFLINE where date < Mar 25 The broker then merges results from both these queries before returning back to the client.

    hashtag
    Starting a Broker

    Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a broker

    Broker interaction with other components
    Tablechevron-right
    Schemachevron-right
    Segmentchevron-right
    docker run \
        --network=pinot-demo \
        --name pinot-broker \
        -d ${PINOT_IMAGE} StartBroker \
        -zkAddress pinot-zookeeper:2181
    bin/pinot-admin.sh StartBroker \
      -zkAddress localhost:2181 \
      -clusterName PinotCluster \
      -brokerPort 7000

    Server

    Servers host the data segments and serve queries off the data they host. There's two types of servers

    Offline Offline servers are responsible for downloading segments from the segment store, to host and serve queries off. When a new segment is uploaded to the controller, the controller decides the servers (as many as replication) that will host the new segment and notifies them to download the segment from the segment store. On receiving this notification, the servers download the segment file and load the segment onto the server, to server queries off them.

    Realtime Real time servers directly ingest from a real time stream (such as Kafka, EventHubs). Periodically, they make segments of the in-memory ingested data, based on certain thresholds. This segment is then persisted onto the segment store.

    Pinot Servers are modeled as Helix Participants, hosting Pinot tables (referred to as resources in helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).

    hashtag
    Starting a Server

    Make sure you've . If you're using docker, make sure to . To start a server

    >

    USAGE

    Tenant

    A tenant is a logical component, defined as a group of server/broker nodes with the same Helix tag.

    In order to support multi-tenancy, Pinot has first class support for tenants. Every table is associated with a server tenant and a broker tenant. This controls the nodes that will be used by this table as servers and brokers. This allows all tables belonging to a particular use case to be grouped under a single tenant name.

    The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A and Table B in the same Pinot cluster.

    We can configure Table A

    with server tenant
    Tenant A
    and
    Table B
    with server tenant
    Tenant B
    . We can tag some of the server nodes for
    Tenant A
    and some for
    Tenant B
    . This will ensure that segments of
    Table A
    only reside on servers tagged with
    Tenant A
    , and segment of
    Table B
    only reside on servers tagged with
    Tenant B
    . The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.
    Table isolation using tenants

    No need to create separate clusters for every table or use case!

    hashtag
    Tenant Config

    This tenant is defined in the tenants section of the table config.

    This section contains 2 main fields broker and server which decide the tenants used for the broker and server components of this table.

    In the above example,

    • The table will be served by brokers that have been tagged as brokerTenantName_BROKER in Helix.

    • If this were an offline table, the offline segments for the table will be hosted in pinot servers tagged in helix as serverTenantName_OFFLINE

    • If this were a realtime table, the realtime segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in helix as serverTenantName_REALTIME.

    hashtag
    Creating a tenant

    hashtag
    Broker tenant

    Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant by tagging 3 untagged broker nodes as sampleBrokerTenant_BROKER.

    To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances.

    Follow instructions in Getting Pinot to get Pinot locally, and then

    Check out the table config in the Rest APIarrow-up-right to make sure it was successfully uploaded.

    hashtag
    Server tenant

    Here's a sample server tenant config. This will create a server tenant sampleServerTenant by tagging 1 untagged server node as sampleServerTenant_OFFLINE and 1 untagged server node as sampleServerTenant_REALTIME.

    To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances + realtimeInstances.

    Follow instructions in Getting Pinot to get Pinot locally, and then

    Check out the table config in the Rest APIarrow-up-right to make sure it was successfully uploaded.

    Defining tenants for tables
    docker run \
        --network=pinot-demo \
        --name pinot-server \
        -d ${PINOT_IMAGE} StartServer \
        -zkAddress pinot-zookeeper:2181
    setup Zookeeper
    pull the pinot docker image
    bin/pinot-admin.sh AddTenant \
        -name sampleBrokerTenant 
        -role BROKER 
        -instanceCount 3 -exec
    curl -i -X POST -H 'Content-Type: application/json' -d @sample-broker-tenant.json localhost:9000/tenants
    bin/pinot-admin.sh AddTenant \
        -name sampleServerTenant \
        -role SERVER \
        -offlineInstanceCount 1 \
        -realtimeInstanceCount 1 -exec
    curl -i -X POST -H 'Content-Type: application/json' -d @sample-server-tenant.json localhost:9000/tenants
    "tenants": {
      "broker": "brokerTenantName",
      "server": "serverTenantName"
    }
    sample-broker-tenant.json
    {
         "tenantRole" : "BROKER",
         "tenantName" : "sampleBrokerTenant",
         "numberOfInstances" : 3
    }
    sample-server-tenant.json
    {
         "tenantRole" : "SERVER",
         "tenantName" : "sampleServerTenant",
         "offlineInstances" : 1,
         "realtimeInstances" : 1
    }
    Usage: StartServer
        -serverHost               <String>                      : Host name for controller. (required=false)
        -serverPort               <int>                         : Port number to start the server at. (required=false)
        -serverAdminPort          <int>                         : Port number to serve the server admin API at. (required=false)
        -dataDir                  <string>                      : Path to directory containing data. (required=false)
        -segmentDir               <string>                      : Path to directory containing segments. (required=false)
        -zkAddress                <http>                        : Http address of Zookeeper. (required=false)
        -clusterName              <String>                      : Pinot cluster name. (required=false)
        -configFileName           <Config File Name>            : Broker Starter Config file. (required=false)
        -help                                                   : Print this message. (required=false)
    bin/pinot-admin.sh StartServer \
        -zkAddress localhost:2181
    Usage: StartServer
        -serverHost               <String>                      : Host name for controller. (required=false)
        -serverPort               <int>                         : Port number to start the server at. (required=false)
        -serverAdminPort          <int>                         : Port number to serve the server admin API at. (required=false)
        -dataDir                  <string>                      : Path to directory containing data. (required=false)
        -segmentDir               <string>                      : Path to directory containing segments. (required=false)
        -zkAddress                <http>                        : Http address of Zookeeper. (required=false)
        -clusterName              <String>                      : Pinot cluster name. (required=false)
        -configFileName           <Config File Name>            : Server Starter Config file. (required=false)
        -help                                                   : Print this message. (required=false)

    Segment

    Pinot has the concept of table, which is a logical abstraction to refer to a collection of related data. Pinot has a distributed architecture and scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, the entire data needs to be distributed across multiple nodes. Pinot achieve this by breaking the data into smaller chunks known as segment (this is similar to shards/partitions in relational databases). Segments can also be seen as time based partitions.

    Thus, a segment is a horizontal shard representing a chunk of table data with some number of rows. The segment stores data for all columns of the table. Each segment packs the data in a columnar fashion, along with the dictionaries and indices for the columns. The segment is laid out in a columnar format so that it can be directly mapped into memory for serving queries.

    Columns may be single or multi-valued. Column types may be STRING, INT, LONG, FLOAT, DOUBLE or BYTES. Columns may be declared to be metric or dimension (or specifically as a time dimension) in the schema. Columns can have default null value. For example, the default null value of a integer column can be 0. Note: The default value of byte column has to be hex-encoded before adding to the schema.

    Pinot uses dictionary encoding to store values as a dictionary ID. Columns may be configured to be “no-dictionary” column in which case raw values are stored. Dictionary IDs are encoded using minimum number of bits for efficient storage (e.g. a column with cardinality of 3 will use only 2 bits for each dictionary ID).

    There is a forward index built for each column and compressed appropriately for efficient memory use. In addition, optional inverted indices can be configured for any set of columns. Inverted indices, while take up more storage, offer better query performance. Specialized indexes like Star-Tree index is also supported. Check out for more details.

    hashtag
    Creating a segment

    Once the table is configured, we can load some data. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster. Data can be loaded in batch mode or streaming mode. See page for details.

    hashtag
    Load Data in Batch

    hashtag
    Prerequisites

    Below are instructions to generate and push segments to Pinot via standalone scripts. For a production setup, you should use frameworks such as Hadoop or Spark. See this for more details on setting up Data Ingestion Jobs.

    hashtag
    Job Spec YAML

    To generate a segment, we need to first create a job spec yaml file. JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location.

    where,

    hashtag
    executionFrameworkSpec

    hashtag
    pinotFSSpecs

    hashtag
    recordReaderSpec

    hashtag
    tableSpec

    hashtag
    segmentNameGeneratorSpec

    hashtag
    pinotClusterSpecs

    hashtag
    pushJobSpec

    hashtag
    Create and push segment

    To create and push the segment in one go, use

    Sample Console Output

    Alternately, you can separately create and then push, by changing the jobType to SegmentCreation or SegmenTarPush.

    hashtag
    Templating Ingestion Job Spec

    Ingestion job spec supports templating with Groovy Syntax.

    This would be convenient for users to generate one ingestion job template file and schedule it in a daily basis with extra parameters updated daily.

    E.g. users can set inputDirURI with parameters to indicate date, so that ingestion job only process the data for a particular date.

    Below is an example to specify the date templating for input and output path.

    Then specify the value of ${year}, ${month}, ${day} when kicking off the ingestion job with arguments: -values $param=value1 $param2=value2...

    This ingestion job only generates segment for date 2014-01-03

    hashtag
    Load Data in Streaming

    Prerequisites

    Below is an example of how to publish sample data to your stream. As soon as data is available to the realtime stream, it starts getting consumed by the realtime servers

    hashtag
    Kafka

    Run below command to stream JSON data into Kafka topic: flights-realtime

    Run below command to stream JSON data into Kafka topic: flights-realtime

    Root directory of output segments, expected to have scheme configured in PinotFS.

    overwriteOutput

    Overwrite output segments if existed.

    pinotFSSpecs

    Defines all related Pinot file systems. For more details, scroll down to

    recordReaderSpec

    Defines all record reader config. For more details, scroll down to

    tableSpec

    Defines table name and where to fetch corresponding table config and table schema. For more details, scroll down to

    segmentNameGeneratorSpec

    Defines how the names of the segments will be. For more details, scroll down to

    pinotClusterSpecs

    Defines the Pinot Cluster Access Point. For more details, scroll down to

    pushJobSpec

    Defines segment push job related configuration. For more details, scroll down to

    Top level field

    Description

    executionFrameworkSpec

    Defines ingestion jobs to be running. For more details, scroll down to executionFrameworkSpec

    jobType

    Pinot ingestion job type. Supported job types are:

    SegmentCreation - only create segment

    SegmentTarPush - only upload segments

    SegmentUriPush -

    SegmentCreationAndTarPush - create and upload segment

    SegmentCreationAndUriPush -

    inputDirURI

    Root directory of input data, expected to have scheme configured in PinotFS.

    includeFileNamePattern

    Include file name pattern, supported glob pattern. E.g.

    'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories

    'glob:**/*.avro' will include all the avro files under inputDirURI recursively.

    excludeFileNamePattern

    Exclude file name pattern, supported glob pattern. Similar usage as includeFilePatternName

    field

    Description

    name

    execution framework name

    segmentGenerationJobRunnerClassName

    class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.

    segmentTarPushJobRunnerClassName

    class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.

    segmentUriPushJobRunnerClassName

    class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.

    extraConfigs

    Map of extra configs for execution framework

    field

    description

    schema

    used to identify a PinotFS. E.g. local, hdfs, dbfs, etc

    className

    Class name used to create the PinotFS instance. E.g.

    org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem

    org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS

    configs

    configs used to init PinotFS instance

    field

    description

    dataFormat

    Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.

    className

    Corresponding RecordReader class name. E.g.

    org.apache.pinot.plugin.inputformat.avro.AvroRecordReader

    org.apache.pinot.plugin.inputformat.csv.CSVRecordReader

    org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader

    org.apache.pinot.plugin.inputformat.json.JsonRecordReader

    org.apache.pinot.plugin.inputformat.orc.OrcRecordReader

    org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader

    configClassName

    Corresponding RecordReaderConfig class name, it's mandatory for CSV and Thrift file format. E.g.

    org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig

    org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig

    configs

    Used to init RecordReaderConfig class name, this config is required for CSV and Thrift data format.

    field

    description

    tableName

    table name

    schemaURI

    defines where to read the table schema, supports PinotFS or HTTP. E.g.

    hdfs://path/to/table_schema.json

    http://localhost:9000/tables/myTable/schema

    tableConfigURI

    defines where to read the table config. Supports using PinotFS or HTTP. E.g.

    hdfs://path/to/table_config.json

    http://localhost:9000/tables/myTable

    field

    description

    type

    supported type is simple and normalizedDate

    configs

    configs to init SegmentNameGenerator

    field

    description

    controllerURI

    used to fetch table/schema information and data push.

    E.g. http://localhost:9000

    field

    description

    pushAttempts

    number of attempts for push job, default is 1, which means no retry.

    pushRetryIntervalMillis

    retry wait Ms, default to 1 second.

    pushParallelism

    push job parallelism, default is 1

    Indexing
    ingestion overview
    Setup a cluster
    Create broker and server tenants
    Create an offline table
    page
    Setup a cluster
    Create broker and server tenants
    Create a realtime table and setup a realtime stream

    outputDirURI

    job-spec.yml
    executionFrameworkSpec:
      name: 'standalone'
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
    
    jobType: SegmentCreationAndTarPush
    inputDirURI: 'examples/batch/baseballStats/rawdata'
    includeFileNamePattern: 'glob:**/*.csv'
    excludeFileNamePattern: 'glob:**/*.tmp'
    outputDirURI: 'examples/batch/baseballStats/segments'
    overwriteOutput: true
    
    pinotFSSpecs:
      - scheme: file
        className: org.apache.pinot.spi.filesystem.LocalPinotFS
    
    recordReaderSpec:
      dataFormat: 'csv'
      className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
      configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
      configs:
    
    tableSpec:
      tableName: 'baseballStats'
      schemaURI: 'http://localhost:9000/tables/baseballStats/schema'
      tableConfigURI: 'http://localhost:9000/tables/baseballStats'
    
    segmentNameGeneratorSpec:
    
    pinotClusterSpecs:
      - controllerURI: 'http://localhost:9000'
    
    pushJobSpec:
      pushAttempts: 2
      pushRetryIntervalMillis: 1000
    docker run \
        --network=pinot-demo \
        --name pinot-data-ingestion-job \
        ${PINOT_IMAGE} LaunchDataIngestionJob \
        -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
    SegmentGenerationJobSpec:
    !!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
    excludeFileNamePattern: null
    executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
      segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
      segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
    includeFileNamePattern: glob:**/*.avro
    inputDirURI: examples/batch/airlineStats/rawdata
    jobType: SegmentCreationAndTarPush
    outputDirURI: examples/batch/airlineStats/segments
    overwriteOutput: true
    pinotClusterSpecs:
    - {controllerURI: 'http://pinot-controller:9000'}
    pinotFSSpecs:
    - {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
    pushJobSpec: {pushAttempts: 2, pushParallelism: 1, pushRetryIntervalMillis: 1000,
      segmentUriPrefix: null, segmentUriSuffix: null}
    recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
      configClassName: null, configs: null, dataFormat: avro}
    segmentNameGeneratorSpec: null
    tableSpec: {schemaURI: 'http://pinot-controller:9000/tables/airlineStats/schema',
      tableConfigURI: 'http://pinot-controller:9000/tables/airlineStats', tableName: airlineStats}
    
    Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
    Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
    Finished building StatsCollector!
    Collected stats for 403 documents
    Created dictionary for INT column: FlightNum with cardinality: 386, range: 14 to 7389
    Using fixed bytes value dictionary for column: Origin, size: 294
    Created dictionary for STRING column: Origin with cardinality: 98, max length in bytes: 3, range: ABQ to VPS
    Created dictionary for INT column: Quarter with cardinality: 1, range: 1 to 1
    Created dictionary for INT column: LateAircraftDelay with cardinality: 50, range: -2147483648 to 303
    ......
    ......
    Pushing segment: airlineStats_OFFLINE_16085_16085_29 to location: http://pinot-controller:9000 for table airlineStats
    Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
    Response for pushing table airlineStats segment airlineStats_OFFLINE_16085_16085_29 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16085_16085_29 of table: airlineStats"}
    Pushing segment: airlineStats_OFFLINE_16084_16084_30 to location: http://pinot-controller:9000 for table airlineStats
    Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
    Response for pushing table airlineStats segment airlineStats_OFFLINE_16084_16084_30 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16084_16084_30 of table: airlineStats"}
    bin/pinot-admin.sh LaunchDataIngestionJob \
        -jobSpecFile examples/batch/airlineStats/ingestionJobSpec.yaml
    inputDirURI: 'examples/batch/airlineStats/rawdata/${year}/${month}/${day}'
    outputDirURI: 'examples/batch/airlineStats/segments/${year}/${month}/${day}'
    docker run \
        --network=pinot-demo \
        --name pinot-data-ingestion-job \
        ${PINOT_IMAGE} LaunchDataIngestionJob \
        -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
        -values year=2014 month=01 day=03
    docker run \
      --network pinot-demo \
      --name=loading-airlineStats-data-to-kafka \
      ${PINOT_IMAGE} StreamAvroIntoKafka \
      -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
      -kafkaTopic flights-realtime -kafkaBrokerList kafka:9092 -zkAddress pinot-zookeeper:2181/kafka
    bin/pinot-admin.sh StreamAvroIntoKafka \
      -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
      -kafkaTopic flights-realtime -kafkaBrokerList localhost:19092 -zkAddress localhost:2191/kafka
    pinotFSSpec
    recordReaderSpec
    tableSpec
    segmentNameGeneratorSpec
    pinotClusterSpec
    pushJobSpec

    Schema

    Schema is used to define the names, data types and other information for the columns of a Pinot table.

    hashtag
    Types of columns

    Columns in a Pinot table can be broadly categorized into three categories

    Column Category

    hashtag
    Schema format

    A Pinot schema is written in JSON format. Here's an example which shows all the fields of a schema

    The Pinot schema is composed of

    Below is a detailed description of each type of field spec.

    hashtag
    dimensionFieldSpecs

    A dimensionFieldSpec is defined for each dimension column. Here's a list of the fields in the dimensionFieldSpec

    hashtag
    Internal default null values for dimension

    hashtag
    metricFieldSpecs

    A metricFieldSpec is defined for each metric column. Here's a list of fields in the metricFieldSpec

    hashtag
    Internal default null values for metric

    hashtag
    dateTimeFieldSpec

    A dateTimeFieldSpec is used to define time columns of the table. Here's a list of the fields in a dateTimeFieldSpec

    hashtag
    timeFieldSpec

    This has been deprecated. Older schemas containing timeFieldSpec will be supported. But for new schemas, use DateTimeFieldSpec instead.

    A timeFieldSpec is defined for the time column. A timeFieldSpec is composed of an incomingGranularitySpec and an outgoingGranularitySpec. IncomingGranularitySpec in combination with outgoingGranularitySpec can be used to transform the time column from incoming format to the outgoing format. If both of them are specified, the segment creation process will convert the time column from the incoming format to the outgoing format. If no time column transformation is required, you can specify just the incomingGranularitySpec.

    The incoming and outgoing granularitySpec are defined as:

    hashtag
    Advanced fields

    Apart from these, there's some advanced fields. These are common to all field specs.

    hashtag
    Ingestion Transform Functions

    Transform functions can be defined on columns in the schema. For example:

    Currently, we have support for 2 kinds of functions

    1. Groovy functions

    2. Inbuilt functions

    circle-exclamation

    Note

    Currently, the arguments must be from the source data. They cannot be columns from the Pinot schema which have been created through transformations.

    hashtag
    Groovy functions

    Groovy functions can be defined using the syntax:

    Here's some examples of commonly needed functions. Any valid Groovy expression can be used.

    hashtag
    String concatenation

    Concat firstName and lasName to get fullName

    hashtag
    Find element in an array

    Find max value in array bids

    hashtag
    Time transformation

    Convert timestamp from MILLISECONDS to HOURS

    hashtag
    Column name change

    Simply change name of the column from user_id to userId

    hashtag
    Ternary operation

    If eventType is IMPRESSION set impression to 1. Similar for CLICK.

    hashtag
    AVRO Map

    Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping. 1) The keys of the map as map_keys 2) The values of the map as map_values

    hashtag
    Inbuilt Pinot functions

    We have several inbuilt functions that can be used directly in as ingestion transform functions

    hashtag
    DateTime functions

    These are functions which enable commonly needed time transformations.

    toEpochXXX

    Converts from epoch milliseconds to a higher granularity.

    toEpochXXXRounded

    Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000 (2020-05-01 42:29:12) is 26474489 minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480 (2020-05-01 42:20:00)

    fromEpochXXX

    Converts from an epoch granularity to milliseconds.

    Simple date format

    Converts simple date format strings to milliseconds and vice-a-versa, as per the provided pattern string.

    hashtag
    Json functions

    hashtag
    Creating a Schema

    Create a schema for your data, or see for examples. Make sure you've

    Note: schema can also be created as part of table creation, refer to .

    Check out the schema in the to make sure it was successfully uploaded

    byte array of length 0

    byte array of length 0

    Description

    Dimension

    Dimension columns are typically used in slice and dice operations for answering business queries. Frequent operations done on dimension columns:

    • GROUP BY - group by one or more dimension columns along with aggregations on one or more metric columns

    • Filter processing

    Metric

    These columns represent quantitative data of the table. Such columns are frequently used in aggregation operations. In data warehouse terminology, these are also referred to as fact or measure columns.

    Frequent operations done on metric columns:

    • Aggregation - SUM, MIN, MAX, COUNT, AVG etc

    • Filter processing

    DateTime

    This column represents time columns in the data. There can be multiple time columns in a table, but only one of them is the primary time column. Primary time column is the one that is set in the segmentConfigarrow-up-right. This primary time column is used by Pinot, for maintaining the time boundary between offline and realtime data in a hybrid table and for retention management. A primary time column is mandatory if the table's push type is APPEND and optional if the push type is REFRESH .

    Common operations done on time column:

    • GROUP BY

    • Filter processing

    Time

    This has been deprecated. Use DateTime column type for time columns.

    This column represents a timestamp. There can be at most one time column in a table. Common operations done on time column:

    • GROUP BY

    • Filter processing

    The time column is also used internally by Pinot, for maintaining the time boundary between offline and realtime data in a hybrid table and for retention management. A time column is mandatory if the table's push type is APPEND and optional if the push type is REFRESH .

    schema fields

    description

    schemaName

    Defines the name of the schema. This is usually the same as the table name. The offline and the realtime table of a hybrid table should use the same schema.

    dimensionFieldSpecs

    A dimensionFieldSpec is defined for each dimension column. For more details, scroll down to dimensionFieldSpec

    metricFieldSpecs

    A metricFieldSpec is defined for each metric column. For more details, scroll down to metricFieldSpec

    dateTimeFieldSpec

    A dateTimeFieldSpec is defined for the time columns. There can be multiple time columns. For more details, scroll down to dateTimeFieldSpec.

    timeFieldSpec

    Deprecated. Use dateTimeFieldSpec instead. A timeFieldSpec is defined for the time column. There can only be one time column. For more details, scroll down to timeFieldSpec

    field

    description

    name

    Name of the dimension column

    dataType

    Data type of the dimension column. Can be STRING, BOOLEAN, INT, LONG, DOUBLE, FLOAT, BYTES

    <b></b>

    defaultNullValue

    Represents null values in the data, since Pinot doesn't support storing null column values natively (as part of its on-disk storage format). If not specified, an internal default null value is used as listed here

    singleValueField

    Boolean indicating if this is a single value or a multi value column. In the example above, the dimension tags is multi-valued. This means that it can have multiple values for a particular row, say tag1, tag2, tag3. For a multi-valued column, individual rows don’t necessarily need to have the same number of values. Typical use case for this would be a column such as skillSet for a person (one row in the table) that can have multiple values such as Real Estate, Mortgages.

    Data Type

    Internal Default Null Value

    INT

    ​Integer.MIN_VALUEarrow-up-right​

    LONG

    ​LONG.MIN_VALUEarrow-up-right​

    FLOAT

    ​Float.NEGATIVE_INFINITYarrow-up-right​

    DOUBLE

    ​DOUBLE.NEGATIVE_INFINITYarrow-up-right​

    STRING

    "null"

    field

    description

    name

    Name of the metric column

    dataType

    Data type of the column. Can be INT, LONG, DOUBLE, FLOAT, BYTES (for specialized representations such as HLL, TDigest, etc, where the column stores byte serialized version of the value)

    defaultNullValue

    Represents null values in the data. If not specified, an internal default null value is used, as listed here. The values are the same as those used for dimensionFieldSpec.

    Data Type

    Internal Default Null Value

    INT

    0

    LONG

    0

    FLOAT

    0.0

    DOUBLE

    0.0

    STRING

    "null"

    field

    description

    name

    Name of the date time column

    dataType

    Data type of the date time column. Can be STRING, INT, LONG

    format

    The format of the time column. The syntax of the format is timeSize:timeUnit:timeFormat

    timeFormat can be either EPOCH or SIMPLE_DATE_FORMAT. If it is SIMPLE_DATE_FORMAT, the pattern string is also specified. For example:

    1:MILLISECONDS:EPOCH - epoch millis

    1:HOURS:EPOCH - epoch hours

    1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd - date specified like 20191018

    1:HOURS:SIMPLE_DATE_FORMAT:EEE MMM dd HH:mm:ss ZZZ yyyy - date specified like Mon Aug 24 12:36:50 America/Los_Angeles 2019

    granularity

    The granularity in which the column is bucketed. The syntax of granularity is bucket size:bucket unit For example, the format can be milliseconds 1:MILLISECONDS:EPOCH, but bucketed to 15 minutes i.e. we only have one value for every 15 minute interval, in which case granularity can be specified as 15:MINUTES

    defaultNullValue

    Represents null values in the data. If not specified, an internal default null value is used, as listed here. The values are the same as those used for dimensionFieldSpec.

    timeFieldSpec fields

    Description

    incomingGranularitySpec

    Details of the time column in the incoming data

    outgoingGranularitySpec

    Details of the format to which the time column should be converted for using in Pinot

    field

    description

    name

    Name of the time column. If incomingGranularitySpec, this is the name of the time column in the incoming data. If outgoingGranularitySpec, this is the name of the column you wish to transform it to and see in Pinot

    dataType

    Data type of the time column. Can be INT, LONG or STRING

    timeType

    Indicates the time unit. Can be one of DAYS, SECONDS, HOURS, MILLISECONDS, MICROSECONDS and NANOSECONDS

    timeUnitSize

    Indicates the bucket length. By default 1. E.g. in the sample above outgoing time is in fiveMinutesSinceEpoch i.e. rounded to 5 minutes buckets

    timeFormat

    EPOCH (millisSinceEpoch, hoursSinceEpoch etc) or SIMPLE_DATE_FORMAT (yyyyMMdd, yyyyMMdd:hhssmm etc)

    field name

    description

    maxLength

    Max length of this column

    transformFunction

    Transform function to generate this column. See section below.

    virtualColumnProvider

    Column value provider

    Function name

    Description

    toEpochSeconds

    Converts epoch millis to epoch seconds.

    Usage: "transformFunction": "toEpochSeconds(millis)"

    toEpochMinutes

    Converts epoch millis to epoch minutes

    Usage: "transformFunction": "toEpochMinutes(millis)"

    toEpochHours

    Converts epoch millis to epoch hours

    Usage: "transformFunction": "toEpochHours(millis)"

    toEpochDays

    Converts epoch millis to epoch days

    Usage: "transformFunction": "toEpochDays(millis)"

    Function Name

    Description

    toEpochSecondsRounded

    Converts epoch millis to epoch seconds, rounding to nearest rounding bucket

    "transformFunction": "toEpochSecondsRounded(millis, 30)"

    toEpochMinutesRounded

    Converts epoch millis to epoch seconds, rounding to nearest rounding bucket

    "transformFunction": "toEpochMinutesRounded(millis, 10)"

    toEpochHoursRounded

    Converts epoch millis to epoch seconds, rounding to nearest rounding bucket

    "transformFunction": "toEpochHoursRounded(millis, 6)"

    toEpochDaysRounded

    Converts epoch millis to epoch seconds, rounding to nearest rounding bucket

    "transformFunction": "toEpochDaysRounded(millis, 7)"

    Function Name

    Description

    fromEpochSeconds

    Converts from epoch seconds to milliseconds

    "transformFunction": "fromEpochSeconds(secondsSinceEpoch)"

    fromEpochMinutes

    Converts from epoch minutes to milliseconds

    "transformFunction": "fromEpochMinutes(minutesSinceEpoch)"

    fromEpochHours

    Converts from epoch hours to milliseconds

    "transformFunction": "fromEpochHours(hoursSinceEpoch)"

    fromEpochDays

    Converts from epoch days to milliseconds

    "transformFunction": "fromEpochDays(daysSinceEpoch)"

    Function name

    Description

    toDateTime

    Converts from milliseconds to a formatted date time string, as per the provided pattern

    "transformFunction": "toDateTime(millis, 'yyyy-MM-dd')"

    fromDateTime

    Converts a formatted date time string to milliseconds, as per the provided pattern

    "transformFunction": "fromDateTime(dateTimeStr, 'EEE MMM dd HH:mm:ss ZZZ yyyy')"

    Function name

    Description

    toJsonMapStr

    Converts a JSON/Avro map to a string. This json map can then be queried using jsonExtractScalararrow-up-right function.

    "transformFunction": "toJsonMapStr(jsonMapField)"

    examplesarrow-up-right
    setup the cluster
    Creating a table
    Rest API arrow-up-right

    BYTES

    BYTES

    flights-schema.json
    {
      "schemaName": "flights",
      "dimensionFieldSpecs": [
        {
          "name": "flightNumber",
          "dataType": "LONG"
        },
        {
          "name": "tags",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": "null"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "price",
          "dataType": "DOUBLE",
          "defaultNullValue": 0
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "millisSinceEpoch",
          "dataType": "LONG",
          "format": "1:MILLSECONDS:EPOCH",
          "granularity": "15:MINUTES"
        },
        {
          "name": "hoursSinceEpoch",
          "dataType": "INT",
          "format": "1:HOURS:EPOCH",
          "granularity": "1:HOURS"
        },
        {
          "name": "date",
          "dataType": "STRING",
          "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd",
          "granularity": "1:DAYS"
        }
      ]
    }
    "metricFieldSpecs": [
        {
          "name": "maxPrice",
          "dataType": "DOUBLE",
          "transformFunction": "Groovy({prices.max()}, prices)" // groovy function
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "hoursSinceEpoch",
          "dataType": "INT",
          "format": "1:HOURS:EPOCH",
          "granularity": "1:HOURS",
          "transformFunction": "toEpochHours(timestamp)" // inbuilt function
        }
    Groovy({groovy script}, argument1, argument2...argumentN)
    {
          "name": "fullName",
          "dataType": "STRING",
          "transformFunction": "Groovy({firstName+' '+lastName}, firstName, lastName)"
    }
    {
          "name": "maxBid",
          "dataType": "INT",
          "transformFunction": "Groovy({bids.max{ it.toBigDecimal() }}, bids)"
    }
    "dateTimeFieldSpecs": [{
        "name": "hoursSinceEpoch",
        "dataType": "LONG",
        "format" : "1:HOURS:EPOCH",
        "granularity": "1:HOURS"
        "transformFunction": "Groovy({timestamp/(1000*60*60)}, timestamp)"
      }]
    {
          "name": "userId",
          "dataType": "LONG",
          "transformFunction": "Groovy({user_id}, user_id)"
    }
    {
        "name": "impressions",
        "dataType": "LONG",
        "transformFunction": "Groovy({eventType == 'IMPRESSION' ? 1: 0}, eventType)"
    },
    {
        "name": "clicks",
        "dataType": "LONG",
        "transformFunction": "Groovy({eventType == 'CLICK' ? 1: 0}, eventType)"
    }
    {
          "name": "map2_keys",
          "dataType": "STRING",
          "singleValueField": false,
          "transformFunction": "Groovy({map2.sort()*.key}, map2)"
    },
    {
          "name": "map2_values",
          "dataType": "INT",
          "singleValueField": false,
          "transformFunction": "Groovy({map2.sort()*.value}, map2)"
    }
    bin/pinot-admin.sh AddSchema -schemaFile transcript-schema.json -exec
    curl -F [email protected]  localhost:9000/schemas

    Table

    A table is a logical abstraction to refer to a collection of related data. It consists of columns and rows (documents).

    Data in Pinot tables is sharded into segments. A Pinot table is modeled as a Helix resource. Each segment of a table is modeled as a Helix Partition.

    A table is typically associated with a schema, which is used to define the names, data types and other information of the columns of the table.

    There are 3 types of a Pinot table

    Table type

    Description

    Note that the query does not know the existence of offline or realtime tables. It only specifies the table name in the query. For example, regardless of whether we have an offline table myTable_OFFLINE , or a realtime table myTable_REALTIME or a hybrid table containing both of these, the query will simply use mytable as select count(*) from myTable .

    A table config file is used to define the table properties, such as name, type, indexing, routing, retention etc. It is written in JSON format, and stored in the property store in Zookeeper, along with the table schema.

    hashtag
    Offline Table Config

    Here's an example table config for an offline table

    We will now discuss each section of the table config in detail.

    hashtag
    Top level fields

    hashtag
    Second level fields

    hashtag
    quota

    hashtag
    routing

    hashtag
    segmentsConfig

    hashtag
    tableIndexConfig

    hashtag
    Realtime Table Config

    Here's an example table config for a realtime table. All the fields from the offline table config are valid for the realtime table. Additionally, realtime tables use some extra fields.

    We will now discuss the sections have some behavior differences for realtime tables.

    hashtag
    segmentsConfig

    replicasPerPartition The number of replicas per partition for the realtime stream

    completionConfig Holds information related to realtime segment completion. There is just one field in this config as of now, which is the completionMode. The value of the completioMode decides how non-committers servers should replace the in-memory segment during realtime segment completion. By default, if the in memory segment in the non-winner server is equivalent to the committed segment, then the non-committer server builds and replaces the segment, else it download the segment from the controller.

    Currently, the supported value for completionMode is

    • DOWNLOAD: In certain scenarios, segment build can get very memory intensive. It might become desirable to enforce the non-committer servers to just download the segment from the controller, instead of building it again. Setting this completionMode ensures that the non-committer servers always download the segment.

      For more details on why this is needed, check out

    hashtag
    tableIndexConfig

    sortedColumn Indicates the column which should be sorted when creating the realtime segment

    aggregateMetrics Aggregate the realtime stream data as it is consumed, where applicable, in order to reduce segment sizes. We sum the metric column values of all rows that have the same value for dimension columns and create one row in a realtime segment for all such rows. This feature is only available on REALTIME tables. Only supported aggregation right now is sum. Also note that for this to work, all metrics should be listed in noDictionaryColumns and there should not be any multi value dimensions.

    streamConfigs This section is where the bulk of settings specific to the realtime stream and consumption are found. This section is specific to tables of type REALTIME and is ignored if the table type is any other. See section on for an overview of how realtime ingestion works.

    Here is a minimal example of what the streamConfigs section may look like:

    The streamType field is mandatory. In this case, it is set to kafka. StreamType of kafka is supported natively in Pinot. You can use default decoder classes and consumer factory classes. Pinot allows you to use other stream types with their own consumer factory and decoder classes (or, even other decoder and consumer factory for kafka if your installation formats kafka messages differently). See .

    There are some configurations that are generic to all stream types, and others that are specific to stream types.

    hashtag
    Configuration generic to all stream types

    • realtime.segment.flush.threshold.size: Maximum number of rows to consume before persisting the consuming segment.

      Note that in the example above, it is set to 0. In this case, Pinot automatically computes the row limit using the value of realtime.segment.flush.desired.size described below. If the consumer type is HighLevel, then this value will be the maximum per consuming segment. If the consumer type is LowLevel then this value will be divided across all consumers being hosted on any one pinot-server.

      Default is 5000000.

    hashtag
    Configuration specific to stream types

    All of these configuration items have the prefix stream.<streamType>. In the example above, the prefix is stream.kafka.

    Important ones to note here are:

    • stream.kafka.consumer.type: This should have a value of LowLevel (recommended) or HighLevel.

    • stream.kafka.topic.name: Name of the topic from which to consume.

    All the configurations that are prefixed with the streamType are expected to be used by the underlying stream. So, you can set any of the configurations described in the can be set using the prefix stream.kafka and Kafka should pay attention to it.

    More options are explained in the section.

    hashtag
    tenants

    Similar to the offline table, this section defines the server and broker tenant used for this table. More details about tenant can be found in .

    tagOverrideConfig

    A tagOverrideConfig can be added under the tenants section for realtime tables, to override tags for consuming and completed segments. For example:

    In the above example, the consuming segments will still be assigned to serverTenantName_REALTIME hosts, but once they are completed, the segments will be moved to serverTeantnName_OFFLINE. It is possible to specify the full name of any tag in this section (so, for example, you could decide that completed segments for this table should be in pinot servers tagged as allTables_COMPLETED). Refer to section for learning more about this config.

    hashtag
    Hybrid Table Config

    A hybrid table is simply a table composed of 2 tables, 1 of type offline and 1 of type realtime, which share the same name. In such a table, offline segments may be pushed periodically (say, once a day). The retention on the offline table can be set to a high value (say, a few years) since segments are coming in on a periodic basis, whereas the retention on the realtime part can be small (say, a few days). Once an offline segment is pushed to cover a recent time period, the brokers automatically switch to using the offline table for segments in that time period, and use realtime table only to cover later segments for which offline data may not be available yet.

    Here's a sample table config for a hybrid table.

    Note that creating a hybrid table has to be done in 2 separate steps of creating an offline and realtime table individually.

    hashtag
    Creating a table

    Create a table config for your data, or see for all possible batch/streaming tables.

    Prerequisites

    1. S

    hashtag
    Offline Table Creation

    Sample Console Output

    Check out the table config in the to make sure it was successfully uploaded.

    hashtag
    Streaming Table Creation

    Start Kafka

    Create a Kafka Topic

    Create a Streaming table

    Sample output

    Start Kafka-Zookeeper

    Start Kafka

    Check out the table config in the to make sure it was successfully uploaded.

    This section helps configure indexing and dictionary encoding related information for the Pinot table. For more details head over to

    tenants

    Define the server and broker tenant used for this table. More details about tenant can be found in .

    metadata

    This section is for keeping custom configs, which are expressed as key value pairs.

    A numeric value for the retention. This in combination with retentionTimeUnit decides the duration for which to retain the segments

    segmentPushType

    This can be either APPEND - new data segments pushed periodically, to append to the existing data eg. daily or hourly REFRESH - the entire data is replaced every time during a data push. Refresh tables have no retention.

    segmentPushFrequency

    The cadence at which segments are pushed eg. HOURLY, DAILY

    The set of columns that should not be dictionary encoded. The name of columns should match the schema. NoDictionary dimension columns are compressed, while the metrics are not compressed.

    onHeapDictionaryColumns

    The list of columns for which the dictionary should be created on heap

    varLengthDictionaryColumns

    The list of columns for which the variable length dictionary needs to be enabled in offline segments. This is only valid for string and bytes columns and has no impact for columns of other data types.

    loadMode

    Indicates how the segments will be loaded onto the server heap - load data directly into direct memory mmap - load data segments to off-heap memory

    columnMinMaxValueGeneratorMode

    Generate min max values for columns. Supported values are NONE - do not generate for any columns ALL - generate for all columns TIME - generate for only time column NON_METRIC - generate for time and dimension columns

    realtime.segment.flush.threshold.time: Maximum elapsed time after which a consuming segment should be persisted.

    The value can be set as a human readable string, such as "1d", "4h30m", etc. This value should be set such that it is not below the retention of messages in the underlying stream, but is not so long that it may cause the server to run out of memory.

    Default is "6h"

  • realtime.segment.flush.desired.size: Desired size of the completed segments.

    This setting is supported only if consumer type is set to LowLevel. This value can be set as a human readable string such as "150M", or "1.1G", etc. This value is used when realtime.segment.flush.threshold.size is set to 0. Pinot learns and then estimates the number of rows that need to be consumed so that the persisted segment is approximately of this size. The learning phase starts by setting the number of rows to 100,000 (can be changed with the setting realtime.segment.flush.autotune.initialRows). and increasing to reach the desired segment size. Segment size may go over the desired size significantly during the learning phase. Pinot corrects the estimation as it goes along, so it is not guaranteed that the resulting completed segments are of the exact size as configured. You should set this value to optimize the performance of queries (i.e. neither too small nor too large)

    Default is "200M"

  • realtime.segment.flush.autotune.initialRows: Initial number of rows for learning.

    This value is used only if realtime.segment.flush.threshold.size is set o 0 and the consumer type is LowLevel. See realtime.segment.flush.desired.size above.

    Default is "100K"

  • stream.kafka.consumer.prop.auto.offset.reset: Indicates where to start consumption from in the stream.

    If the consumer type is LowLevel, This configuration is used only when the table is first provisioned. In HighLevel consumer type, it will also be used when new servers are rolled in, or when existing servers are replaced with new ones. You can specify values such as smallest or largest, or even 3d if your stream supports it. If you specify largest, the consumption starts from the most recent events in the data stream. This is the recommended way to create a new table. If you specify smallest then the consumption starts from the earliest event available in the data stream.

    Create stream table

    Offline

    Offline tables ingest pre-built pinot-segments from external data stores.

    Realtime

    Realtime tables ingest data from streams (such as Kafka) and build segments.

    Hybrid

    A hybrid Pinot table has both realtime as well as offline tables under the hood.

    Top level field

    Description

    tableName

    Specifies the name of the table. Should only contain alpha-numeric characters, hyphens (‘-‘), or underscores (‘’). (Using a double-underscore (‘_’) is not allowed and reserved for other features within Pinot)

    tableType

    Defines the table type - OFFLINE for offline table, REALTIME for realtime table. A hybrid table is essentially 2 table configs one of each type, with the same table name.

    quota

    This section defines properties related to quotas, such as storage quota and query quota. For more details scroll down to quota

    routing

    This section defines the properties related to configuring how the broker selects the servers to route, and how segments can be pruned by the broker based on segment metadata. For more details, scroll down to routing

    segmentsConfig

    This section defines the properties related to the segments of the table, such as segment push frequency, type, retention, schema, time column etc. For more details scroll down to segmentsConfigarrow-up-right

    quota fields

    Description

    storage

    The maximum storage space the table is allowed to use, before replication. For example, in the above table, the storage is 140G and replication is 3. Therefore, the maximum storage the table is allowed to use is 140*3=420G. The space used by the table is calculated by adding up the sizes of all segments from every server hosting this table. Once this limit is reached, offline segment push throws a 403 exception with message, Quota check failed for segment: segment_0 of table: pinotTable.

    maxQueriesPerSecond

    The maximum queries per second allowed to execute on this table. If query volume exceeds this, a 429 exception with message,Request 123 exceeds query quota for table:pinotTable, query:select count(*) from pinotTable

    will be sent, and a BrokerMetric QUERY_QUOTA_EXCEEDED will be recorded. The application should build an exponential backoff and retry mechanism to react to this exception.

    routing fields

    description

    segmentPrunerType

    The segment pruner prunes the selected segments based on the query. Supported values currently are partition - prunes segments based on the partition metadata stored in zookeeper. By default, there is no pruner. For mode details on how to configure this check out Querying All Segments

    instanceSelectorType

    The instance selector selects server instances to serve the query based on selected segments. Supported values are balanced - balances the number of segments served by each selected instance. Default. replicaGroup - instance selector for replica group routing strategy. For more details on how to configure this check out Querying All Servers

    segmentsConfig field

    Description

    schemaName

    Name of the schema associated with the table

    timeColumnName

    The name of the time column for this table. This must match with the time column name in the schema. This is mandatory for tables with push type APPEND, optional for REFRESH

    timeColumnType

    The time column type of the time column. eg. MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS

    replication

    Number of replicas

    retentionTimeUnit

    Unit for the retention. e.g. HOURS, DAYS. This in combination with retentionTimeValue decides the duration for which to retain the segments e.g. 365 DAYS in the example means that segments containing data older than 365 days will be deleted periodically. This is done by the RetentionManager Controller periodic task. By default, no retention is set.

    tableIndexConfig fields

    Description

    invertedIndexColumns

    The set of columns that inverted index should be created on. The name of columns should match the schema. e.g. in the table above, inverted index has been created on 3 columns foo, bar, moo

    createInvertedIndexDuringSegmentGeneration

    Boolean to indicate whether to create inverted indexes during the segment creation. By default, false i.e. inverted indexes are created when the segments are loaded on the server

    sortedColumn

    The column which is sorted in the data and hence will have a sorted index. This does not need to be specified for the offline table, as the segment generation job will automatically detect the sorted column in the data and create a sorted index for it.

    bloomFilterColumns

    The list of columns to apply bloom filter on. The names of the columns should match the schema. For more details about using bloom filters refer to Bloom Filter for Dictionaryarrow-up-right.

    starTreeIndexConfigs

    The config for creating a star tree index. For more details on how to configure this, go to Star-treearrow-up-right

    Completion Configarrow-up-right
    Ingesting Realtime Dataarrow-up-right
    Pluggable Streamsarrow-up-right
    Kafka configuraton pagearrow-up-right
    Pluggable Streamsarrow-up-right
    Tenant
    Moving Completed Segmentsarrow-up-right
    examplesarrow-up-right
    etup the cluster
    Create broker and server tenants
    Rest APIarrow-up-right
    Rest APIarrow-up-right

    tableIndexConfig

    retentionTimeValue

    noDictionaryColumns

    pinot-table-offline.json
    "OFFLINE": {
        "tableName": "pinotTable", 
        "tableType": "OFFLINE",
        "quota": {
          "maxQueriesPerSecond": 300, 
          "storage": "140G"
        }, 
        "routing": {
          "segmentPrunerType": "partition", 
          "instanceSelectorType": "replicaGroup"
        }, 
        "segmentsConfig": {
          "schemaName": "pinotTable", 
          "timeColumnName": "daysSinceEpoch", 
          "timeType": "DAYS",
          "replication": "3", 
          "retentionTimeUnit": "DAYS", 
          "retentionTimeValue": "365", 
          "segmentPushFrequency": "DAILY", 
          "segmentPushType": "APPEND" 
        }, 
        "tableIndexConfig": { 
          "invertedIndexColumns": ["foo", "bar", "moo"], 
          "createInvertedIndexDuringSegmentGeneration": false, 
          "sortedColumn": [],
          "bloomFilterColumns": [],
          "starTreeIndexConfigs": [],
          "noDictionaryColumns": [],
          "onHeapDictionaryColumns": [], 
          "varLengthDictionaryColumns": [],
          "loadMode": "MMAP", 
          "columnMinMaxValueGeneratorMode": null
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
          "customConfigs": {
            "key": "value", 
            "key": "value"
          }
        }
      }
    }
    pinot-table-realtime.json
    "REALTIME": { 
        "tableName": "pinotTable", 
        "tableType": "REALTIME", 
        "segmentsConfig": {
          "schemaName": "pinotTable", 
          "timeColumnName": "daysSinceEpoch", 
          "timeType": "DAYS",
          "replicasPerPartition": "3", 
          "retentionTimeUnit": "DAYS", 
          "retentionTimeValue": "5", 
          "completionConfig": {
            "completionMode": "DOWNLOAD"
          } 
        }, 
        "tableIndexConfig": { 
          "invertedIndexColumns": ["foo", "bar", "moo"], 
          "sortedColumn": ["column1"],
          "noDictionaryColumns": ["metric1", "metric2"],
          "loadMode": "MMAP", 
          "aggregateMetrics": true, 
          "streamConfigs": {
            "realtime.segment.flush.threshold.size": "0", 
            "realtime.segment.flush.threshold.time": "24h", 
            "stream.kafka.broker.list": "XXXX", 
            "stream.kafka.consumer.factory.class.name": "XXXX", 
            "stream.kafka.consumer.prop.auto.offset.reset": "largest", 
            "stream.kafka.consumer.type": "XXXX", 
            "stream.kafka.decoder.class.name": "XXXX", 
            "stream.kafka.decoder.prop.schema.registry.rest.url": "XXXX", 
            "stream.kafka.decoder.prop.schema.registry.schema.name": "XXXX", 
            "stream.kafka.hlc.zk.connect.string": "XXXX", 
            "stream.kafka.topic.name": "XXXX", 
            "stream.kafka.zk.broker.url": "XXXX", 
            "streamType": "kafka"
          }  
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant", 
          "tagOverrideConfig": {}
        },
        "metadata": {
        }
    }
    "streamConfigs" : {
      "realtime.segment.flush.threshold.size": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.desired.size": "150M",
      "streamType": "kafka",
      "stream.kafka.consumer.type": "LowLevel",
      "stream.kafka.topic.name": "ClickStream",
      "stream.kafka.consumer.prop.auto.offset.reset" : "largest"
    }
      "broker": "brokerTenantName",
      "server": "serverTenantName",
      "tagOverrideConfig" : {
        "realtimeConsuming" : "serverTenantName_REALTIME"
        "realtimeCompleted" : "serverTenantName_OFFLINE"
      }
    }
    pinot-table-hybrid.json
    "OFFLINE": {
        "tableName": "pinotTable", 
        "tableType": "OFFLINE", 
        "segmentsConfig": {
          ... 
        }, 
        "tableIndexConfig": { 
          ... 
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
          ...
        }
      },
      "REALTIME": { 
        "tableName": "pinotTable", 
        "tableType": "REALTIME", 
        "segmentsConfig": {
          ...
        }, 
        "tableIndexConfig": { 
          ... 
          "streamConfigs": {
            ...
          },  
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
        ...
        }
      }
    }
    docker run \
        --network=pinot-demo \
        --name pinot-batch-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -schemaFile examples/batch/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: a413b0013806, version: Unknown
    {"status":"Table airlineStats_OFFLINE succesfully added"}
    bin/pinot-admin.sh AddTable \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -exec
    # add schema
    curl -F schemaName=@airlineStats_schema.json  localhost:9000/schemas
    
    # add table
    curl -i -X POST -H 'Content-Type: application/json' \
        -d @airlineStats_offline_table_config.json localhost:9000/tables
    docker run \
        --network pinot-demo --name=kafka \
        -e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
        -e KAFKA_BROKER_ID=0 \
        -e KAFKA_ADVERTISED_HOST_NAME=kafka \
        -d wurstmeister/kafka:latest
    docker exec \
      -t kafka \
      /opt/kafka/bin/kafka-topics.sh \
      --zookeeper pinot-zookeeper:2181/kafka \
      --partitions=1 --replication-factor=1 \
      --create --topic flights-realtime
    docker run \
        --network=pinot-demo \
        --name pinot-streaming-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json -schemaFile examples/stream/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: 8fbe601012f3, version: Unknown
    {"status":"Table airlineStats_REALTIME succesfully added"}
    bin/pinot-admin.sh StartZookeeper -zkPort 2191
    bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2191/kafka -port 19092
    bin/pinot-admin.sh AddTable \
        -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/stream/airlineStats/airlineStats_realtime_table_config.json \
        -exec
    tableIndexConfigarrow-up-right
    Tenant
    Snappyarrow-up-right