Table

Top-level fields

Property
Description

tableName

Specifies the name of the table. Should only contain alpha-numeric characters, hyphens (‘-‘), or underscores (‘_’). (Using a double-underscore (‘__’) is not allowed and reserved for other features within Pinot)

tableType

Defines the table type - OFFLINE for offline table, REALTIME for realtime table. A hybrid table is essentially 2 table configs one of each type, with the same table name.

isDimTable

quota

task

routing

query

segmentsConfig

tableIndexConfig

fieldConfigList

tenants

ingestionConfig

upsertConfig

dedupConfig

tierConfigs

metadata

This section is for keeping custom configs, which are expressed as key-value pairs.

Second level fields

The following properties can be nested inside the top-level configs.

Quota

Property
Description

storage

The maximum storage space the table is allowed to use, before replication. For example, in the above table, the storage is 140G and replication is 3. Therefore, the maximum storage the table is allowed to use is 140*3=420G. The space used by the table is calculated by adding up the sizes of all segments from every server hosting this table. Once this limit is reached, offline segment push throws a 403 exception with message, Quota check failed for segment: segment_0 of table: pinotTable.

maxQueriesPerSecond

The maximum queries per second allowed to execute on this table. If query volume exceeds this, a 429 exception with message Request 123 exceeds query quota for table:pinotTable, query:select count(*) from pinotTable will be sent, and a BrokerMetric QUERY_QUOTA_EXCEEDED will be recorded. The application should build an exponential backoff and retry mechanism to react to this exception.

Routing

Property
Description

segmentPrunerTypes

The list of segment pruners to be enabled.

instanceSelectorType

Query

Property
Description

timeoutMs

Query timeout in milliseconds

Segments Config

Property
Description

schemaName

Name of the schema associated with the table

timeColumnName

The name of the time column for this table. This must match with the time column name in the schema. This is mandatory for tables with push type APPEND, optional for REFRESH. timeColumnName along with timeColumnType is used to manage segment retention and time boundary for offline vs realtime.

replication

Number of replicas for the tables. A replication value of 1 means segments won't be replicated across servers.

retentionTimeUnit

Unit for the retention. e.g. HOURS, DAYS. This in combination with retentionTimeValue decides the duration for which to retain the segments e.g. 365 DAYS in the example means that segments containing data older than 365 days will be deleted periodically. This is done by the RetentionManager Controller periodic task. By default, no retention is set.

retentionTimeValue

A numeric value for the retention. This in combination with retentionTimeUnit decides the duration for which to retain the segments

segmentPushType

(Deprecated starting 0.7.0 or commit 9eaea9. Use IngestionConfig -> BatchIngestionConfig -> segmentPushType )

This can be either APPEND - new data segments pushed periodically, to append to the existing data eg. daily or hourly REFRESH - the entire data is replaced every time during a data push. Refresh tables have no retention.

segmentPushFrequency

(Deprecated starting 0.7.0 or commit 9eaea9. Use IngestionConfig -> BatchIngestionConfig -> segmentPushFrequency )

The cadence at which segments are pushed eg. HOURLY, DAILY

Table Index Config

Property
Description

invertedIndexColumns

The list of columns that inverted index should be created on. The name of columns should match the schema. e.g. in the table above, inverted index has been created on 3 columns foo, bar, moo

createInvertedIndexDuringSegmentGeneration

Boolean to indicate whether to create inverted indexes during the segment creation. By default, false i.e. inverted indexes are created when the segments are loaded on the server

sortedColumn

The column which is sorted in the data and hence will have a sorted index. This does not need to be specified for the offline table, as the segment generation job will automatically detect the sorted column in the data and create a sorted index for it.

bloomFilterColumns

bloomFilterConfigs

rangeIndexColumns

The list of columns that range index should be created on. Typically used for numeric columns and mostly on metrics. e.g. select count(*) from T where latency > 3000 will be faster if you enable range index for latency

rangeIndexVersion

Version of the range index, 2 (latest) by default.

starTreeIndexConfigs

enableDefaultStarTree

enableDynamicStarTreeCreation

Boolean to indicate whether to allow creating star-tree when server loads the segment. Star-tree creation could potentially consume a lot of system resources, so this config should be enabled when the servers have the free system resources to create the star-tree.

noDictionaryColumns

onHeapDictionaryColumns

The list of columns for which the dictionary should be created on heap

varLengthDictionaryColumns

The list of columns for which the variable length dictionary needs to be enabled in offline segments. This is only valid for string and bytes columns and has no impact for columns of other data types.

jsonIndexColumns

jsonIndexConfigs

segmentPartitionConfig

The map from column to partition function, which indicates how the segment is partitioned.

Currently 4 types of partition functions are supported:

Murmur - murmur2 hash function

Modulo - modulo on integer values

HashCode - java hashCode() function

ByteArray - java hashCode() on deserialized byte array

Example:

{ "foo": { "functionName": "Murmur", "numPartitions": 32 } }

loadMode

Indicates how the segments will be loaded onto the server heap - load data directly into direct memory mmap - load data segments to off-heap memory

columnMinMaxValueGeneratorMode

Generate min max values for columns. Supported values are NONE - do not generate for any columns ALL - generate for all columns TIME - generate for only time column NON_METRIC - generate for time and dimension columns

nullHandlingEnabled

Boolean to indicate whether to keep track of null values as part of the segment generation. This is required when using IS NULL or IS NOT NULL predicates in the query. Enabling this will lead to additional memory and storage usage per segment. By default, this is set to false.

aggregateMetrics

optimizeDictionaryForMetrics

Set to true if you want to disable dictionaries for single valued metric columns. Only applicable to single-valued metric columns. If a column is specified Default false

noDictionarySizeRatioThreshold

If optimizeDictionaryForMetrics enabled, dictionary is not created for the metric columns for which noDictionaryIndexSize/ indexWithDictionarySize is less than the noDictionarySizeRatioThreshold Default: 0.85

segmentNameGeneratorType

Type of segmentNameGenerator, default is SimpleSegmentNameGenerator.

Field Config List

Specify the columns and the type of indices to be created on those columns. Currently, only Text search columns can be specified using this property. We will be migrating the rest of the indices to this field in future releases.

Property

name

name of the column

encodingType

Should be one of RAW or DICTIONARY

indexType

index to create on this column. currently only TEXT is supported.

properties

JSON of key-value pairs containing additional properties associated with the index. The following properties are supported currently -

  • enableQueryCacheForTextIndex - set to true to enable caching for text index in Lucene

  • rawIndexWriterVersion

  • deriveNumDocsPerChunkForRawIndex

  • forwardIndexDisabled - set to true to disable the forward index, defaults to false

Warning:

If removing the forwardIndexDisabled property above to regenerate the forward index for multi-value (MV) columns note that the following invariants cannot be maintained after regenerating the forward index for a forward index disabled column:

  • Ordering guarantees of the MV values within a row

  • If entries within an MV row are duplicated, the duplicates will be lost. Please regenerate the segments via your offline jobs and re-push / refresh the data to get back the original MV data with duplicates.

We will work on removing the second invariant in the future.

Realtime Table Config

We will now discuss the sections that are only applicable to realtime tables.

segmentsConfig

Property
Description

replicasPerPartition

The number of replicas per partition for the stream

completionMode

determines if segment should be downloaded from other server or built in memory. can be DOWNLOAD or empty

peerSegmentDownloadScheme

protocol to use to download segments from server. can be on of http or https

Indexing config

Below is the list of fields in streamConfigs section.

IndexingConfig -> streamConfig has been deprecated starting 0.7.0 or commit 9eaea9. Use IngestionConfig -> StreamIngestionConfig -> streamConfigMaps instead.

Property
Description

streamType

only kafka is supported at the moment

stream.[streamType].consumer.type

stream.[streamType].topic.name

topic or equivalent datasource from which to consume data

stream[streamType].consumer.prop.auto.offset.reset

offset to start consuming data from. Should be one of smallest , largest, timestamp in format 'yyyy-MM-dd'T'HH:mm:ss.SSSZ' or Valid Datetime interval Eg., '2d', '1m' etc,.

(0.6.0 onwards) realtime.segment.flush.threshold.rows

(0.5.0 and prior) (deprecated) realtime.segment.flush.threshold.size

The maximum number of rows to consume before persisting the consuming segment. Default is 5,000,000

realtime.segment.flush.threshold.time

Maximum elapsed time after which a consuming segment should be persisted. The value can be set as a human readable string, such as 1d, 4h30m Default is 6 hours.

(0.6.0 onwards) realtime.segment.flush.threshold.segment.size

(0.5.0 and prior) (deprecated)

realtime.segment.flush.desired.size

Desired size of the completed segments. This value can be set as a human readable string such as 150M, or 1.1G, etc. This value is used when realtime.segment.flush.threshold.rows is set to 0. Default is 200M i.e. 200 MegaBytes

realtime.segment.flush.autotune.initialRows

Initial number of rows for learning.

This value is used only if realtime.segment.flush.threshold.rows is set o 0 and the consumer type is LowLevel.

Default is 100000 (ie 100K).

When specifying realtime.segment.flush.threshold.rows, the actual number of rows per segment is computed using the following formula: `` realtime.segment.flush.threshold.rows / partitionsConsumedByServer

This means that if we set realtime.segment.flush.threshold.rows=1000 and each server consumes 10 partitions, the rows per segment will be:1000/10 = 100

The desired segment size refers to the size of the segments that are loaded in Pinot Servers. Normally compressed version of the segments with tar.gz format are kept in the deep store which has smaller size than the specified parameter.

Any additional properties set here will be directly available to the stream consumers. For example, in case of Kafka stream, you could put any of the configs described in Kafka configuration page, and it will be automatically passed to the KafkaConsumer.

Some of the properties you might want to set:

Config
Description
Values

auto.offset.reset

If Kafka Consumer encounters an offset which is not in range (resulting in Kafka OffsetOutOfRange), the strategy to use to reset the offset Default value is latest, as a result of which, if the consumer seeks for an offset which has already expired, the consumer will reset to latest, resulting in data loss.

earliest - reset to earliest available offset latest - reset to latest available offset.

Example

Here is a minimal example of what the streamConfigs section may look like:

0.6.0 onwards:

"streamConfigs" : {
  "realtime.segment.flush.threshold.rows": "0",
  "realtime.segment.flush.threshold.time": "24h",
  "realtime.segment.flush.threshold.segment.size": "150M",
  "streamType": "kafka",
  "stream.kafka.consumer.type": "LowLevel",
  "stream.kafka.topic.name": "ClickStream",
  "stream.kafka.consumer.prop.auto.offset.reset" : "largest"
}

0.5.0 and prior:

"streamConfigs" : {
  "realtime.segment.flush.threshold.size": "0",
  "realtime.segment.flush.threshold.time": "24h",
  "realtime.segment.flush.desired.size": "150M",
  "streamType": "kafka",
  "stream.kafka.consumer.type": "LowLevel",
  "stream.kafka.topic.name": "ClickStream",
  "stream.kafka.consumer.prop.auto.offset.reset" : "largest"
}

Tenants

Property
Description

broker

Broker tenant in which the segment should reside

server

Server tenant in which the segment should reside

tagOverrideConfig

Override the tenant for segment if it fulfills certain conditions. Currently, only support override on realtimeConsuming or realtimeCompleted

Example

  "broker": "brokerTenantName",
  "server": "serverTenantName",
  "tagOverrideConfig" : {
    "realtimeConsuming" : "serverTenantName_REALTIME"
    "realtimeCompleted" : "serverTenantName_OFFLINE"
  }
}

Environment Variables Override

Pinot allows users to define environment variables in the format of ${ENV_NAME} or ${ENV_NAME:DEFAULT_VALUE}as field values in table config.

Pinot instance will override it during runtime.

Brackets are required when defining the environment variable."$ENV_NAME"is not supported.

Environment variables used without default value in table config have to be available to all Pinot components - Controller, Broker, Server, and Minion. Otherwise, querying/consumption will be affected depending on the service to which these variables are not available.

Below is an example of setting AWS credential as part of table config using environment variable.

Example:

{
...
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY",
      "batchConfigMaps": [
        {
          "inputDirURI": "s3://<my-bucket>/baseballStats/rawdata",
          "includeFileNamePattern": "glob:**/*.csv",
          "excludeFileNamePattern": "glob:**/*.tmp",
          "inputFormat": "csv",
          "outputDirURI": "s3://<my-bucket>/baseballStats/segments",
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "us-west-2",
          "input.fs.prop.accessKey": "${AWS_ACCESS_KEY}",
          "input.fs.prop.secretKey": "${AWS_SECRET_KEY}",
          "push.mode": "tar"
        }
      ],
      "segmentNameSpec": {},
      "pushSpec": {}
    }
  },
...
}

Sample Configurations

Offline Table

pinot-table-offline.json
"OFFLINE": {
    "tableName": "pinotTable",
    "tableType": "OFFLINE",
    "quota": {
      "maxQueriesPerSecond": 300,
      "storage": "140G"
    },
    "routing": {
      "segmentPrunerTypes": ["partition"],
      "instanceSelectorType": "replicaGroup"
    },
    "segmentsConfig": {
      "schemaName": "pinotTable",
      "timeColumnName": "daysSinceEpoch",
      "timeType": "DAYS",
      "allowNullTimeValue": false,
      "replication": "3",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "365",
      "segmentPushFrequency": "DAILY",
      "segmentPushType": "APPEND"
    },
    "tableIndexConfig": {
      "invertedIndexColumns": ["foo", "bar", "moo"],
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": ["pk"],
      "bloomFilterColumns": [],
      "starTreeIndexConfigs": [],
      "noDictionaryColumns": [],
      "rangeIndexColumns": [],
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "segmentPartitionConfig": {
        "pk": {
          "functionName": "Murmur",
          "numPartitions": 32
        }
      }
      "loadMode": "MMAP",
      "columnMinMaxValueGeneratorMode": null,
      "nullHandlingEnabled": false
    },
    "tenants": {
      "broker": "myBrokerTenant",
      "server": "myServerTenant"
    },
    "ingestionConfig": {
      "filterConfig": {
        "filterFunction": "Groovy({foo == \"VALUE1\"}, foo)"
      },
      "transformConfigs": [{
        "columnName": "bar",
        "transformFunction": "lower(moo)"
      },
      {
        "columnName": "hoursSinceEpoch",
        "transformFunction": "toEpochHours(millis)"
      }]
    }
    "metadata": {
      "customConfigs": {
        "key": "value",
        "key": "value"
      }
    }
  }
}

Realtime Table

Here's an example table config for a realtime table. All the fields from the offline table config are valid for the realtime table. Additionally, realtime tables use some extra fields.

pinot-table-realtime.json
"REALTIME": {
    "tableName": "pinotTable",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "pinotTable",
      "timeColumnName": "daysSinceEpoch",
      "timeType": "DAYS",
      "allowNullTimeValue": true,
      "replicasPerPartition": "3",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "5",
      "segmentPushType": "APPEND",
      "completionConfig": {
        "completionMode": "DOWNLOAD"
      }
    },
    "tableIndexConfig": {
      "invertedIndexColumns": ["foo", "bar", "moo"],
      "sortedColumn": ["column1"],
      "noDictionaryColumns": ["metric1", "metric2"],
      "loadMode": "MMAP",
      "nullHandlingEnabled": false,
      "streamConfigs": {
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.threshold.segment.size": "150M",
        "stream.kafka.broker.list": "XXXX",
        "stream.kafka.consumer.factory.class.name": "XXXX",
        "stream.kafka.consumer.prop.auto.offset.reset": "largest",
        "stream.kafka.consumer.type": "XXXX",
        "stream.kafka.decoder.class.name": "XXXX",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "XXXX",
        "stream.kafka.decoder.prop.schema.registry.schema.name": "XXXX",
        "stream.kafka.hlc.zk.connect.string": "XXXX",
        "stream.kafka.topic.name": "XXXX",
        "stream.kafka.zk.broker.url": "XXXX",
        "streamType": "kafka"
      }
    },
    "tenants": {
      "broker": "myBrokerTenant",
      "server": "myServerTenant",
      "tagOverrideConfig": {}
    },
    "metadata": {
    }
}

Last updated