arrow-left

All pages
gitbookPowered by GitBook
1 of 7

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Segment retention

In this Apache Pinot concepts guide, we'll learn how segment retention works.

Segments in Pinot tables have a retention time, after which the segments are deleted. Typically, offline tables retain segments for a longer period of time than real-time tables.

The removal of segments is done by the retention manager. By default, the retention manager runs once every 6 hours.

The retention manager purges two types of segments:

  • Expired segments: Segments whose end time has exceeded the retention period.

  • Replaced segments: Segments that have been replaced as part of the

There are a couple of scenarios where segments in offline tables won't be purged:

  • If the segment doesn't have an end time. This would happen if the segment doesn't contain a time column.

  • If the segment's table has a segmentIngestionType of REFRESH.

If the retention period isn't specified, segments aren't purged from tables.

The retention manager initially moves these segments into a Deleted Segments area, from where they will eventually be permanently removed.

merge rollup task. arrow-up-right

Segment threshold

Learn how segment thresholds work in Pinot.

The segment threshold determines when a segment is committed in real-time tables.

When data is first ingested from a streaming provider like Kafka, Pinot stores the data in a consuming segment.

This segment is on the disk of the server(s) processing a particular partition from the streaming provider.

However, it's not until a segment is committed that the segment is written to the deep storearrow-up-right. The segment threshold decides when that should happen.

hashtag
Why is the segment threshold important?

The segment threshold is important because it ensures segments are a reasonable size.

When queries are processed, smaller segments may increase query latency due to more overhead (number of threads spawned, meta data processing, and so on).

Larger segments may cause servers to run out of memory. When a server is restarted, the consuming segment must start consuming from the first row again, causing a lag between Pinot and the streaming provider.

Mark Needham explains the segment threshold

Deep Store

Leverage Apache Pinot's deep store component for efficient large-scale data storage and management, enabling impactful data processing and analysis.

The deep store (or deep storage) is the permanent store for segment files.

It is used for backup and restore operations. New server nodes in a cluster will pull down a copy of segment files from the deep store. If the local segment files on a server gets damaged in some way (or accidentally deleted), a new copy will be pulled down from the deep store on server restart.

The deep store stores a compressed version of the segment files and it typically won't include any indexes. These compressed files can be stored on a local file system or on a variety of other file systems. For more details on supported file systems, see File Systems.

Note: Deep store by itself is not sufficient for restore operations. Pinot stores metadata such as table config, schema, segment metadata in Zookeeper. For restore operations, both Deep Store as well as Zookeeper metadata are required.

hashtag
How do segments get into the deep store?

There are several different ways that segments are persisted in the deep store.

For offline tables, the batch ingestion job writes the segment directly into the deep store, as shown in the diagram below:

The ingestion job then sends a notification about the new segment to the controller, which in turn notifies the appropriate server to pull down that segment.

For real-time tables, by default, a segment is first built-in memory by the server. It is then uploaded to the lead controller (as part of the Segment Completion Protocol sequence), which writes the segment into the deep store, as shown in the diagram below:

Having all segments go through the controller can become a system bottleneck under heavy load, in which case you can use the peer download policy, as described in .

When using this configuration, the server will directly write a completed segment to the deep store, as shown in the diagram below:

hashtag
Configuring the deep store

For hands-on examples of how to configure the deep store, see the following tutorials:

Decoupling Controller from the Data Path
Use OSS as Deep Storage for Pinot
Use S3 as Deep Storage for Pinot
Batch job writing a segment into the deep store
Server sends segment to Controller, which writes segments into the deep store
Server writing a segment into the deep store

Segment

Discover the segment component in Apache Pinot for efficient data storage and querying within Pinot clusters, enabling optimized data processing and analysis.

Pinot tables are stored in one or more independent shards called segments. A small table may be contained by a single segment, but Pinot lets tables grow to an unlimited number of segments. There are different processes for creating segments (see ingestionarrow-up-right). Segments have time-based partitions of table data, and are stored on Pinot serversarrow-up-right that scale horizontally as needed for both storage and computation.

Pinot achieves this by breaking the data into smaller chunks known as segments (similar to shards/partitions in relational databases). Segments can be seen as time-based partitions.

A segment is a horizontal shard representing a chunk of table data with some number of rows. The segment stores data for all columns of the table. Each segment packs the data in a columnar fashion, along with the dictionaries and indices for the columns. The segment is laid out in a columnar format so that it can be directly mapped into memory for serving queries.

Columns can be single or multi-valued and the following types are supported: STRING, BOOLEAN, INT, LONG, FLOAT, DOUBLE, TIMESTAMP or BYTES. Only single-valued BIG_DECIMAL data type is supported.

Columns may be declared to be metric or dimension (or specifically as a time dimension) in the schema. Columns can have default null values. For example, the default null value of a integer column can be 0. The default value for bytes columns must be hex-encoded before it's added to the schema.

Pinot uses dictionary encoding to store values as a dictionary ID. Columns may be configured to be “no-dictionary” column in which case raw values are stored. Dictionary IDs are encoded using minimum number of bits for efficient storage (e.g. a column with a cardinality of 3 will use only 2 bits for each dictionary ID).

A forward index is built for each column and compressed for efficient memory use. In addition, you can optionally configure inverted indices for any set of columns. Inverted indices take up more storage, but improve query performance. Specialized indexes like Star-Tree index are also supported. For more details, see .

hashtag
Creating a segment

Once the table is configured, we can load some data. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster. Data can be loaded in batch mode or streaming mode. For more details, see the page.

hashtag
Load data in batch

hashtag
Prerequisites

Below are instructions to generate and push segments to Pinot via standalone scripts. For a production setup, you should use frameworks such as Hadoop or Spark. For more details on setting up data ingestion jobs, see

hashtag
Job Spec YAML

To generate a segment, we need to first create a job spec YAML file. This file contains all the information regarding data format, input data location, and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location. For full configurations, see .

hashtag
Create and push segment

To create and push the segment in one go, use the following:

Sample Console Output

Alternately, you can separately create and then push, by changing the jobType to SegmentCreation or SegmenTarPush.

hashtag
Templating Ingestion Job Spec

The Ingestion job spec supports templating with Groovy Syntax.

This is convenient if you want to generate one ingestion job template file and schedule it on a daily basis with extra parameters updated daily.

e.g. you could set inputDirURI with parameters to indicate the date, so that the ingestion job only processes the data for a particular date. Below is an example that templates the date for input and output directories.

You can pass in arguments containing values for ${year}, ${month}, ${day} when kicking off the ingestion job: -values $param=value1 $param2=value2...

This ingestion job only generates segments for date 2014-01-03

hashtag
Load data in streaming

Prerequisites

Below is an example of how to publish sample data to your stream. As soon as data is available to the real-time stream, it starts getting consumed by the real-time servers.

hashtag
Kafka

Run below command to stream JSON data into Kafka topic: flights-realtime

Run below command to stream JSON data into Kafka topic: flights-realtime

Indexing
ingestion overview
Set up a cluster
Create broker and server tenants
Create an offline table
Import Data.arrow-up-right
Ingestion Job Spec
Set up a cluster
Create broker and server tenants
Create a real-time table and set up a real-time stream
job-spec.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'

jobType: SegmentCreationAndTarPush
inputDirURI: 'examples/batch/baseballStats/rawdata'
includeFileNamePattern: 'glob:**/*.csv'
excludeFileNamePattern: 'glob:**/*.tmp'
outputDirURI: 'examples/batch/baseballStats/segments'
overwriteOutput: true

pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS

recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
  configs:

tableSpec:
  tableName: 'baseballStats'
  schemaURI: 'http://localhost:9000/tables/baseballStats/schema'
  tableConfigURI: 'http://localhost:9000/tables/baseballStats'
  
segmentNameGeneratorSpec:

pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

pushJobSpec:
  pushParallelism: 2
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
docker run \
    --network=pinot-demo \
    --name pinot-data-ingestion-job \
    ${PINOT_IMAGE} LaunchDataIngestionJob \
    -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
SegmentGenerationJobSpec:
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
excludeFileNamePattern: null
executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
  segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
  segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
includeFileNamePattern: glob:**/*.avro
inputDirURI: examples/batch/airlineStats/rawdata
jobType: SegmentCreationAndTarPush
outputDirURI: examples/batch/airlineStats/segments
overwriteOutput: true
pinotClusterSpecs:
- {controllerURI: 'http://pinot-controller:9000'}
pinotFSSpecs:
- {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
pushJobSpec: {pushAttempts: 2, pushParallelism: 1, pushRetryIntervalMillis: 1000,
  segmentUriPrefix: null, segmentUriSuffix: null}
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
  configClassName: null, configs: null, dataFormat: avro}
segmentNameGeneratorSpec: null
tableSpec: {schemaURI: 'http://pinot-controller:9000/tables/airlineStats/schema',
  tableConfigURI: 'http://pinot-controller:9000/tables/airlineStats', tableName: airlineStats}

Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
Finished building StatsCollector!
Collected stats for 403 documents
Created dictionary for INT column: FlightNum with cardinality: 386, range: 14 to 7389
Using fixed bytes value dictionary for column: Origin, size: 294
Created dictionary for STRING column: Origin with cardinality: 98, max length in bytes: 3, range: ABQ to VPS
Created dictionary for INT column: Quarter with cardinality: 1, range: 1 to 1
Created dictionary for INT column: LateAircraftDelay with cardinality: 50, range: -2147483648 to 303
......
......
Pushing segment: airlineStats_OFFLINE_16085_16085_29 to location: http://pinot-controller:9000 for table airlineStats
Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
Response for pushing table airlineStats segment airlineStats_OFFLINE_16085_16085_29 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16085_16085_29 of table: airlineStats"}
Pushing segment: airlineStats_OFFLINE_16084_16084_30 to location: http://pinot-controller:9000 for table airlineStats
Sending request: http://pinot-controller:9000/v2/segments?tableName=airlineStats to controller: a413b0013806, version: Unknown
Response for pushing table airlineStats segment airlineStats_OFFLINE_16084_16084_30 to location http://pinot-controller:9000 - 200: {"status":"Successfully uploaded segment: airlineStats_OFFLINE_16084_16084_30 of table: airlineStats"}
bin/pinot-admin.sh LaunchDataIngestionJob \
    -jobSpecFile examples/batch/airlineStats/ingestionJobSpec.yaml
inputDirURI: 'examples/batch/airlineStats/rawdata/${year}/${month}/${day}'
outputDirURI: 'examples/batch/airlineStats/segments/${year}/${month}/${day}'
docker run \
    --network=pinot-demo \
    --name pinot-data-ingestion-job \
    ${PINOT_IMAGE} LaunchDataIngestionJob \
    -jobSpecFile examples/docker/ingestion-job-specs/airlineStats.yaml
    -values year=2014 month=01 day=03
docker run \
  --network pinot-demo \
  --name=loading-airlineStats-data-to-kafka \
  ${PINOT_IMAGE} StreamAvroIntoKafka \
  -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
  -kafkaTopic flights-realtime -kafkaBrokerList kafka:9092 -zkAddress pinot-zookeeper:2181/kafka
bin/pinot-admin.sh StreamAvroIntoKafka \
  -avroFile examples/stream/airlineStats/sample_data/airlineStats_data.avro \
  -kafkaTopic flights-realtime -kafkaBrokerList localhost:19092 -zkAddress localhost:2191/kafka

Schema

Explore the Schema component in Apache Pinot, vital for defining the structure and data types of Pinot tables, enabling efficient data processing and analysis.

Each table in Pinot is associated with a schema. A schema defines:

  • Fields in the table with their data types.

  • Whether the table uses column-based or table-based null handling. For more information, see Null value support.

The schema is stored in Zookeeper along with the table configuration.

circle-info

Schema naming in Pinot follows typical database table naming conventions, such as starting names with a letter, not ending with an underscore, and using only alphanumeric characters

hashtag
Categories

A schema also defines what category a column belongs to. Columns in a Pinot table can be categorized into three categories:

Category
Description

Pinot does not enforce strict rules on which of these categories columns belong to, rather the categories can be thought of as hints to Pinot to do internal optimizations.

For example, metrics may be stored without a dictionary and can have a different default null value.

The categories are also relevant when doing segment merge and rollups. Pinot uses the dimension and time fields to identify records against which to apply merge/rollups.

Metrics aggregation is another example where Pinot uses dimensions and time are used as the key, and automatically aggregates values for the metric columns.

For configuration details, see .

hashtag
Date and time fields

Since Pinot doesn't have a dedicated DATETIME datatype support, you need to input time in either STRING, LONG, or INT format. However, Pinot needs to convert the date into an understandable format such as epoch timestamp to do operations. You can refer to for more details on supported formats.

hashtag
Creating a schema

First, Make sure your and running.

Let's create a schema and put it in a JSON file. For this example, we have created a schema for flight data.

circle-info

For more details on constructing a schema file, see the .

Then, we can upload the sample schema provided above using either a Bash command or REST API call.

Check out the schema in the to make sure it was successfully uploaded

Dimension

Dimension columns are typically used in slice and dice operations for answering business queries. Some operations for which dimension columns are used:

  • GROUP BY - group by one or more dimension columns along with aggregations on one or more metric columns

  • Filter clauses such as WHERE

Metric

These columns represent the quantitative data of the table. Such columns are used for aggregation. In data warehouse terminology, these can also be referred to as fact or measure columns.

Some operation for which metric columns are used:

  • Aggregation - SUM, MIN, MAX, COUNT, AVG etc

  • Filter clause such as WHERE

DateTime

This column represents time columns in the data. There can be multiple time columns in a table, but only one of them can be treated as primary. The primary time column is the one that is present in the segment config. The primary time column is used by Pinot to maintain the time boundary between offline and real-time data in a hybrid table and for retention management. A primary time column is mandatory if the table's push type is APPEND and optional if the push type is REFRESH .

Common operations that can be done on time column:

  • GROUP BY

  • Filter clauses such as WHERE

Schema configuration referencearrow-up-right
DateTime field spec configs
cluster is up
Schema configuration referencearrow-up-right
Rest APIarrow-up-right
flights-schema.json
{
  "schemaName": "flights",
  "enableColumnBasedNullHandling": true,
  "dimensionFieldSpecs": [
    {
      "name": "flightNumber",
      "dataType": "LONG",
      "notNull": true
    },
    {
      "name": "tags",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": "null"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "price",
      "dataType": "DOUBLE",
      "notNull": true,
      "defaultNullValue": 0
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "millisSinceEpoch",
      "dataType": "LONG",
      "format": "EPOCH",
      "granularity": "15:MINUTES"
    },
    {
      "name": "hoursSinceEpoch",
      "dataType": "INT",
      "notNull": true,
      "format": "EPOCH|HOURS",
      "granularity": "1:HOURS"
    },
    {
      "name": "dateString",
      "dataType": "STRING",
      "format": "SIMPLE_DATE_FORMAT|yyyy-MM-dd",
      "granularity": "1:DAYS"
    }
  ]
}
bin/pinot-admin.sh AddSchema -schemaFile flights-schema.json -exec

OR

bin/pinot-admin.sh AddTable -schemaFile flights-schema.json -tableFile flights-table.json -exec
curl -F [email protected]  localhost:9000/schemas

Time boundary

Learn about time boundaries in hybrid tables.

Learn about time boundaries in hybrid tables. Hybrid tables are when we have offline and real-time tables with the same name.

When querying these tables, the Pinot broker decides which records to read from the offline table and which to read from the real-time table. It does this using the time boundary.

hashtag
How is the time boundary determined?

The time boundary is determined by looking at the maximum end time of the offline segments and the segment ingestion frequency specified for the offline table.

If it's set to hourly, then:

Otherwise:

It is possible to force the hybrid table to use max(all offline segments' end time) by calling the API (V 0.12.0+)

Note that this will not automatically update the time boundary as more segments are added to the offline table, and must be called each time a segment with more recent end time is uploaded to the offline table. You can revert back to using the derived time boundary by calling API:

hashtag
Querying

When a Pinot broker receives a query for a hybrid table, the broker sends a time boundary annotated version of the query to the offline and real-time tables.

For example, if we executed the following query:

The broker would send the following query to the offline table:

And the following query to the real-time table:

The results of the two queries are merged by the broker before being returned to the client.

timeBoundary = Maximum end time of offline segments - 1 hour
timeBoundary = Maximum end time of offline segments - 1 day
curl -X POST \
  "http://localhost:9000/tables/{tableName}/timeBoundary" \
  -H "accept: application/json"
curl -X DELETE \
  "http://localhost:9000/tables/{tableName}/timeBoundary" \
  -H "accept: application/json"
SELECT count(*)
FROM events
SELECT count(*)
FROM events_OFFLINE
WHERE timeColumn <= $timeBoundary
SELECT count(*)
FROM events_REALTIME
WHERE timeColumn > $timeBoundary

Table

Explore the table component in Apache Pinot, a fundamental building block for organizing and managing data in Pinot clusters, enabling effective data processing and analysis.

Pinot stores data in tables. A Pinot table is conceptually identical to a relational database table with rows and columns. Columns have the same name and data type, known as the table's schema.

Pinot schemas are defined in a JSON file. Because that schema definition is in its own file, multiple tables can share a single schema. Each table can have a unique name, indexing strategy, partitioning, data sources, and other metadata.

Pinot table types include:

  • real-time: Ingests data from a streaming source like Apache Kafka®

  • offline: Loads data from a batch source

  • hybrid: Loads data from both a batch source and a streaming source

Pinot breaks a table into multiple and stores these segments in a deep-store such as Hadoop Distributed File System (HDFS) as well as Pinot servers.

In the Pinot cluster, a table is modeled as a and each segment of a table is modeled as a .

circle-info

Table naming in Pinot follows typical naming conventions, such as starting names with a letter, not ending with an underscore, and using only alphanumeric characters.

Pinot supports the following types of tables:

Type
Description
circle-info

The user querying the database does not need to know the type of the table. They only need to specify the table name in the query.

e.g. regardless of whether we have an offline table myTable_OFFLINE, a real-time table myTable_REALTIME, or a hybrid table containing both of these, the query will be:

is used to define the table properties, such as name, type, indexing, routing, and retention. It is written in JSON format and is stored in Zookeeper, along with the table schema.

Use the following properties to make your tables faster or leaner:

  • Segment

  • Indexing

  • Tenants

hashtag
Segments

A table is comprised of small chunks of data known as segments. Learn more about how Pinot creates and manages segments .

For offline tables, segments are built outside of Pinot and uploaded using a distributed executor such as Spark or Hadoop. For details, see .

For real-time tables, segments are built in a specific interval inside Pinot. You can tune the following for the real-time segments.

hashtag
Flush

The Pinot real-time consumer ingests the data, creates the segment, and then flushes the in-memory segment to disk. Pinot allows you to configure when to flush the segment in the following ways:

  • Number of consumed rows: After consuming the specified number of rows from the stream, Pinot will persist the segment to disk.

  • Number of rows per segment: Pinot learns and then estimates the number of rows that need to be consumed. The learning phase starts by setting the number of rows to 100,000 (this value can be changed) and adjusts it to reach the appropriate segment size. Because Pinot corrects the estimate as it goes along, the segment size might go significantly over the correct size during the learning phase. You should set this value to optimize the performance of queries.

Replicas A segment can have multiple replicas to provide higher availability. You can configure the number of replicas for a table segment .

Completion Mode By default, if the in-memory segment in the is equivalent to the committed segment, then the non-winner server builds and replaces the segment. If the available segment is not equivalent to the committed segment, the server just downloads the committed segment from the controller.

However, in certain scenarios, the segment build can get very memory-intensive. In these cases, you might want to enforce the non-committer servers to just download the segment from the controller instead of building it again. You can do this by setting completionMode: "DOWNLOAD" in the table configuration.

For details, see .

Download Scheme

A Pinot server might fail to download segments from the deep store, such as HDFS, after its completion. However, you can configure servers to download these segments from peer servers instead of the deep store. Currently, only HTTP and HTTPS download schemes are supported. More methods, such as gRPC/Thrift, are planned be added in the future.

For more details about peer segment download during real-time ingestion, refer to this design doc on

hashtag
Indexing

You can create multiple indices on a table to increase the performance of the queries. The following types of indices are supported:

    • Dictionary-encoded forward index with bit compression

    • Raw value forward index

For more details on each indexing mechanism and corresponding configurations, see .

Set up on columns to make queries faster. You can also keep segments in off-heap instead of on-heap memory for faster queries.

hashtag
Pre-aggregation

Aggregate the real-time stream data as it is consumed to reduce segment sizes. We add the metric column values of all rows that have the same values for all dimension and time columns and create a single row in the segment. This feature is only available on REALTIME tables.

The only supported aggregation is SUM. The columns to pre-aggregate need to satisfy the following requirements:

  • All metrics should be listed in noDictionaryColumns.

  • No multi-value dimensions

  • All dimension columns are treated to have a dictionary, even if they appear as noDictionaryColumns in the config.

The following table config snippet shows an example of enabling pre-aggregation during real-time ingestion:

hashtag
Tenants

Each table is associated with a tenant. A segment resides on the server, which has the same tenant as itself. For details, see .

Optionally, override if a table should move to a server with different tenant based on segment status. The example below adds a tagOverrideConfig under the tenants section for real-time tables to override tags for consuming and completed segments.

In the above example, the consuming segments will still be assigned to serverTenantName_REALTIME hosts, but once they are completed, the segments will be moved to serverTeantnName_OFFLINE.

You can specify the full name of any tag in this section. For example, you could decide that completed segments for this table should be in Pinot servers tagged as allTables_COMPLETED). To learn more about, see the section.

hashtag
Hybrid table

A hybrid table is a table composed of two tables, one offline and one real-time, that share the same name. In a hybrid table, offline segments can be pushed periodically. The retention on the offline table can be set to a high value because segments are coming in on a periodic basis, whereas the retention on the real-time part can be small.

Once an offline segment is pushed to cover a recent time period, the brokers automatically switch to using the offline table for segments for that time period and use the real-time table only for data not available in the offline table.

To learn how time boundaries work for hybrid tables, see .

A typical use case for hybrid tables is pushing deduplicated, cleaned-up data into an offline table every day while consuming real-time data as it arrives. Data can remain in offline tables for as long as a few years, while the real-time data would be cleaned every few days.

hashtag
Examples

Create a table config for your data, or see for all possible batch/streaming tables.

Prerequisites

hashtag
Offline table creation

Sample console output

Check out the table config in the to make sure it was successfully uploaded.

hashtag
Streaming table creation

Start Kafka

Create a Kafka topic

Create a streaming table

Sample output

Start Kafka-Zookeeper

Start Kafka

Check out the table config in the to make sure it was successfully uploaded.

hashtag
Hybrid table creation

To create a hybrid table, you have to create the offline and real-time tables individually. You don't need to create a separate hybrid table.

Max time duration to wait: Pinot consumers wait for the configured time duration after which segments are persisted to the disk.

Sorted forward index with run-length encoding

  • Inverted Index

    • Bitmap inverted index

    • Sorted inverted index

  • Star-tree Index

  • Range Index

  • Text Index

  • Geospatial

  • Create stream table

    Offline

    Offline tables ingest pre-built Pinot segments from external data stores and are generally used for batch ingestion.

    Real-time

    Real-time tables ingest data from streams (such as Kafka) and build segments from the consumed data.

    Hybrid

    Hybrid Pinot tables have both real-time as well as offline tables under the hood. By default, all tables in Pinot are hybrid.

    segments
    Helix resourcearrow-up-right
    Helix Partitionarrow-up-right
    Table configuration
    herearrow-up-right
    Batch Ingestion
    using the CLIarrow-up-right
    non-winner server
    Completion Config
    bypass deep store for segment completion.arrow-up-right
    Forward Index
    Indexing
    Bloomfiltersarrow-up-right
    Tenant
    Moving Completed Segments
    Brokerarrow-up-right
    examplesarrow-up-right
    Set up the cluster
    Create broker and server tenants
    Rest APIarrow-up-right
    Rest APIarrow-up-right
    select count(*)
    from myTable
    pinot-table-realtime.json
        "tableIndexConfig": { 
          "noDictionaryColumns": ["metric1", "metric2"],
          "aggregateMetrics": true,
          ...
        }
      "broker": "brokerTenantName",
      "server": "serverTenantName",
      "tagOverrideConfig" : {
        "realtimeConsuming" : "serverTenantName_REALTIME"
        "realtimeCompleted" : "serverTenantName_OFFLINE"
      }
    }
    docker run \
        --network=pinot-demo \
        --name pinot-batch-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json -schemaFile examples/batch/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: a413b0013806, version: Unknown
    {"status":"Table airlineStats_OFFLINE succesfully added"}
    bin/pinot-admin.sh AddTable \
        -schemaFile examples/batch/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/batch/airlineStats/airlineStats_offline_table_config.json \
        -exec
    # add schema
    curl -F schemaName=@airlineStats_schema.json  localhost:9000/schemas
    
    # add table
    curl -i -X POST -H 'Content-Type: application/json' \
        -d @airlineStats_offline_table_config.json localhost:9000/tables
    docker run \
        --network pinot-demo --name=kafka \
        -e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
        -e KAFKA_BROKER_ID=0 \
        -e KAFKA_ADVERTISED_HOST_NAME=kafka \
        -d wurstmeister/kafka:latest
    docker exec \
      -t kafka \
      /opt/kafka/bin/kafka-topics.sh \
      --zookeeper pinot-zookeeper:2181/kafka \
      --partitions=1 --replication-factor=1 \
      --create --topic flights-realtime
    docker run \
        --network=pinot-demo \
        --name pinot-streaming-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/docker/table-configs/airlineStats_realtime_table_config.json -schemaFile examples/stream/airlineStats/airlineStats_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: 8fbe601012f3, version: Unknown
    {"status":"Table airlineStats_REALTIME succesfully added"}
    bin/pinot-admin.sh StartZookeeper -zkPort 2191
    bin/pinot-admin.sh  StartKafka -zkAddress=localhost:2191/kafka -port 19092
    "OFFLINE": {
        "tableName": "pinotTable", 
        "tableType": "OFFLINE", 
        "segmentsConfig": {
          ... 
        }, 
        "tableIndexConfig": { 
          ... 
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
          ...
        }
      },
      "REALTIME": { 
        "tableName": "pinotTable", 
        "tableType": "REALTIME", 
        "segmentsConfig": {
          ...
        }, 
        "tableIndexConfig": { 
          ... 
          "streamConfigs": {
            ...
          },  
        },  
        "tenants": {
          "broker": "myBrokerTenant", 
          "server": "myServerTenant"
        },
        "metadata": {
        ...
        }
      }
    }
    bin/pinot-admin.sh AddTable \
        -schemaFile examples/stream/airlineStats/airlineStats_schema.json \
        -tableConfigFile examples/stream/airlineStats/airlineStats_realtime_table_config.json \
        -exec