Table

The tables below shows the properties available to set at the table level.

Top-level fields

PropertyDescription

tableName

Specifies the name of the table. Should only contain alpha-numeric characters, hyphens (‘-‘), or underscores (‘_’). (Two notes: While the hyphen is allowed in table names, it is also a reserved character in SQL, so if you use it you must remember to double quote the table name in your queries. Using a double-underscore (‘__’) is not allowed as it is reserved for other features within Pinot.)

tableType

Defines the table type: OFFLINE for offline tables or REALTIME for real-time tables. A hybrid table is essentially two table configurations: one of each type, with the same table name.

isDimTable

Boolean field to indicate whether the table is a dimension table

quota

Defines properties related to quotas, such as storage quota and query quota. For details, see the Quota table below.

task

Defines the enabled minion tasks for the table. See Minion for more details.

routing

Defines the properties that determine how the broker selects the servers to route, and how segments can be pruned by the broker based on segment metadata. For details, see the Routing table below.

query

Defines the properties related to query execution. For details, see the Query table below.

segmentsConfig

Defines the properties related to the segments of the table, such as segment push frequency, type, retention, schema, time column etc. For details, see the segmentsConfig table below.

tableIndexConfig

Defines the indexing related information for the Pinot table. For details, see Table indexing config below.

fieldConfigList

Specifies the columns and the type of indices to be created on those columns. See Field config list for sub-properties.

tenants

Defines the server and broker tenant used for this table. For details, see Tenant below.

ingestionConfig

Defines the configurations needed for ingestion level transformations. For details, see Ingestion Level Transformations and Ingestion Level Aggregations.

upsertConfig

Set upset configurations. For details, see Stream ingestion with upsert.

dedupConfig

Set deduplication configurations. For details, see Stream ingestion with Dedup.

tierConfigs

Defines configurations for tiered storage. For details, see Tiered Storage.

metadata

Contains other metadata of the table. There is a string to string map field "customConfigs" under it which is expressed as key-value pairs to hold the custom configurations.

Second-level fields

The following properties can be nested inside the top-level configurations.

Quota

PropertyDescription

storage

The maximum storage space the table is allowed to use before replication.

For example, in the above table, the storage is 140G and replication is 3, so the maximum storage the table is allowed to use is 140G x 3 = 420G. The space the table uses is calculated by adding up the sizes of all segments from every server hosting this table. Once this limit is reached, offline segment push throws a 403 exception with message, Quota check failed for segment: segment_0 of table: pinotTable.

maxQueriesPerSecond

The maximum queries per second allowed to execute on this table. If query volume exceeds this, a 429 exception with message Request 123 exceeds query quota for table:pinotTable, query:select count(*) from pinotTable will be sent, and a BrokerMetric QUERY_QUOTA_EXCEEDED will be recorded. The application should build an exponential backoff and retry mechanism to react to this exception.

Routing

Find details on configuring routing here.

PropertyDescription

segmentPrunerTypes

The list of segment pruners to be enabled.

The segment pruner prunes the selected segments based on the query.

Supported values:

  • partition: Prunes segments based on the partition metadata stored in zookeeper. By default, there is no pruner.

  • time: Prune segments for queries filtering on timeColumnName that do not contain data in the query's time range.

instanceSelectorType

The server instances to serve the query based on selected segments. Supported values:

  • balanced: Balances the number of segments served by each selected instance. Default.

  • replicaGroup: Instance selector for replica group routing strategy.

Query

PropertyDescription

timeoutMs

Query timeout in milliseconds

disableGroovy

Whether to disable groovy in query. This overrides the broker instance level config (pinot.broker.disable.query.groovy) if configured.

useApproximateFunction

Whether to automatically use approximate function for expensive aggregates, such as DISTINCT_COUNT and PERCENTILE. This overrides the broker instance level config (pinot.broker.use.approximate.function) if configured.

expressionOverrideMap

A map that configures the expressions to override in the query. This can be useful when users cannot control the queries sent to Pinot (e.g. queries auto-generated by some other tools), but want to override the expressions within the query (e.g. override a transform function to a derived column). Example: {"myFunc(a)": "b"}.

Segments config

PropertyDescription

schemaName

Name of the schema associated with the table

timeColumnName

The name of the time column for this table. This must match with the time column name in the schema. This is mandatory for tables with push type APPEND, optional for REFRESH. timeColumnName along with timeColumnType is used to manage segment retention and time boundary for offline vs real-time.

replication

Number of replicas for the tables. A replication value of 1 means segments won't be replicated across servers.

retentionTimeUnit

Unit for the retention, such as HOURS or DAYS. This, in combination with retentionTimeValue decides the duration for which to retain the segments.

For example, 365 DAYS means that segments containing data older than 365 days will be deleted periodically by the RetentionManager Controller periodic task. By default, there is no set retention.

retentionTimeValue

A numeric value for the retention. This, in combination with retentionTimeUnit, determines the duration for which to retain the segments

segmentPushType

(Deprecated starting 0.7.0 or commit 9eaea9. Use IngestionConfig -> BatchIngestionConfig -> segmentPushType )

Can be either:

  • APPEND: New data segments pushed periodically, to append to the existing data eg. daily or hourly

  • REFRESH: Entire data is replaced every time during a data push. Refresh tables have no retention.

segmentPushFrequency

(Deprecated starting 0.7.0 or commit 9eaea9. Use IngestionConfig -> BatchIngestionConfig -> segmentPushFrequency )

The cadence at which segments are pushed, such as HOURLY or DAILY

Table index config

PropertyDescription

invertedIndexColumns

The list of columns that inverted index should be created on. The name of columns should match the schema. e.g. in the table above, inverted index has been created on three columns foo, bar, moo

createInvertedIndexDuringSegmentGeneration

Boolean to indicate whether to create inverted indexes during the segment creation. By default, false i.e. inverted indexes are created when the segments are loaded on the server

sortedColumn

The column which is sorted in the data and hence will have a sorted index. This does not need to be specified for the offline table, as the segment generation job will automatically detect the sorted column in the data and create a sorted index for it.

bloomFilterColumns

The list of columns to apply bloom filter on. The names of the columns should match the schema. For more details about using bloom filters refer to Bloom Filter.

bloomFilterConfigs

The map from the column to the bloom filter config. The names of the columns should match the schema. For more details about using bloom filters refer to Bloom Filter.

rangeIndexColumns

The list of columns that range index should be created on. Typically used for numeric columns and mostly on metrics. e.g. select count(*) from T where latency > 3000 will be faster if you enable range index for latency

rangeIndexVersion

Version of the range index, 2 (latest) by default.

starTreeIndexConfigs

The list of StarTree indexing configs for creating StarTree indexes. For details on how to configure this, see StarTree Index.

enableDefaultStarTree

Boolean to indicate whether to create a default StarTree index for the segment. For details, see StarTree Index.

enableDynamicStarTreeCreation

Boolean to indicate whether to allow creating StarTree when server loads the segment. StarTree creation could potentially consume a lot of system resources, so this config should be enabled when the servers have the free system resources to create the StarTree.

noDictionaryColumns

The set of columns that should not be dictionary-encoded. The name of columns should match the schema. NoDictionary dimension columns are LZ4 compressed, while the metrics are not compressed.

onHeapDictionaryColumns

The list of columns for which the dictionary should be created on heap

varLengthDictionaryColumns

The list of columns for which the variable length dictionary needs to be enabled in offline segments. This is only valid for string and bytes columns and has no impact for columns of other data types.

jsonIndexColumns

The list of columns to create the JSON index. See JSON Index for more details.

jsonIndexConfigs

The map from column to JSON index config. See JSON Index for more details.

segmentPartitionConfig

Use segmentPartitionConfig.columnPartitionMap along with routing.segementPrunerTypes to enable partitioning. For each column, configure the following options:

  • functionName: Specify one of the supported functions:

    • Murmur:MurmurHash 2

    • Modulo: Modulo on integer values

    • HashCode: Java hashCode()

    • ByteArray: Java hashCode() on deserialized byte array

  • numPartitions: Number of partitions you want per segment. Controls how data is divided within each segment.

Example: {

"columnPartitionMap": { "column_memberID": { "functionName": "Murmur", "numPartitions": 32 } }

loadMode

Indicates how the segments will be loaded onto the server: heap - load data directly into direct memory mmap - load data segments to off-heap memory

columnMinMaxValueGeneratorMode

Generate min max values for columns. Supported values: NONE - do not generate for any columns ALL - generate for all columns TIME - generate for only time column NON_METRIC - generate for time and dimension columns

nullHandlingEnabled

Boolean to indicate whether to keep track of null values as part of the segment generation. This is required when using IS NULL or IS NOT NULL predicates in the query. Enabling this will lead to additional memory and storage usage per segment. By default, this is set to false.

aggregateMetrics

(deprecated, use Ingestion Aggregation) (only applicable for stream) set to true to pre-aggregate the metrics

optimizeDictionaryForMetrics

Set to true if you want to disable dictionaries for single valued metric columns. Only applicable to single-valued metric columns. If a column is specified Default false

noDictionarySizeRatioThreshold

If optimizeDictionaryForMetrics enabled, dictionary is not created for the metric columns for which noDictionaryIndexSize/ indexWithDictionarySize is less than the noDictionarySizeRatioThreshold Default: 0.85

segmentNameGeneratorType

Type of segmentNameGenerator, default is SimpleSegmentNameGenerator.

Field Config List

Specify the columns and the type of indices to be created on those columns. Currently, not all index types can use this property. The following indexes are supported:

Property

name

Name of the column

encodingType

Should be one of RAW or DICTIONARY

indexTypes

List of indexes to create on this column. Valid values are the ids of the index types (text, fst, h3, etc)

properties

JSON of key-value pairs containing additional properties associated with the index. The following properties are supported currently -

  • enableQueryCacheForTextIndex - set to true to enable caching for text index in Lucene

  • luceneMaxBufferSizeMB - Lucene IndexWriter buffer max size, defaults to 500

  • luceneUseCompoundFile - Lucene IndexWriter file format, defaults to true to use compound files

  • rawIndexWriterVersion

  • deriveNumDocsPerChunkForRawIndex

  • forwardIndexDisabled - set to true to disable the forward index, defaults to false

The property indexType (in singular, accepting a single index id as string) is also supported for compatibility reasons, but we recommend using the plural in order to be able to define several indexes for the same column.

Warning:

If removing the forwardIndexDisabled property above to regenerate the forward index for multi-value (MV) columns note that the following invariants cannot be maintained after regenerating the forward index for a forward index disabled column:

  • Ordering guarantees of the MV values within a row

  • If entries within an MV row are duplicated, the duplicates will be lost. Regenerate the segments via your offline jobs and re-push / refresh the data to get back the original MV data with duplicates.

We will work on removing the second invariant in the future.

Real-time table config

The sections below apply to real-time tables only.

segmentsConfig

PropertyDescription

replicasPerPartition

The number of replicas per partition for the stream

completionMode

determines if segment should be downloaded from other server or built in memory. can be DOWNLOAD or empty

peerSegmentDownloadScheme

protocol to use to download segments from server. can be on of http or https

Indexing config

The streamConfigs section has been deprecated as of release 0.7.0. See streamConfigMaps instead.

Tenants

PropertyDescription

broker

Broker tenant in which the segment should reside

server

Server tenant in which the segment should reside

tagOverrideConfig

Override the tenant for segment if it fulfills certain conditions. Currently, only support override on realtimeConsuming or realtimeCompleted

Example

  "broker": "brokerTenantName",
  "server": "serverTenantName",
  "tagOverrideConfig" : {
    "realtimeConsuming" : "serverTenantName_REALTIME"
    "realtimeCompleted" : "serverTenantName_OFFLINE"
  }
}

Environment variables override

Pinot allows users to define environment variables in the format of ${ENV_NAME} or ${ENV_NAME:DEFAULT_VALUE}as field values in table config.

Pinot instance will override it during runtime.

Brackets are required when defining the environment variable."$ENV_NAME"is not supported.

Environment variables used without default value in table config have to be available to all Pinot components - Controller, Broker, Server, and Minion. Otherwise, querying/consumption will be affected depending on the service to which these variables are not available.

Below is an example of setting AWS credential as part of table config using environment variable.

Example:

{
...
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY",
      "batchConfigMaps": [
        {
          "inputDirURI": "s3://<my-bucket>/baseballStats/rawdata",
          "includeFileNamePattern": "glob:**/*.csv",
          "excludeFileNamePattern": "glob:**/*.tmp",
          "inputFormat": "csv",
          "outputDirURI": "s3://<my-bucket>/baseballStats/segments",
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "us-west-2",
          "input.fs.prop.accessKey": "${AWS_ACCESS_KEY}",
          "input.fs.prop.secretKey": "${AWS_SECRET_KEY}",
          "push.mode": "tar"
        }
      ],
      "segmentNameSpec": {},
      "pushSpec": {}
    }
  },
...
}

Sample configurations

Offline table

pinot-table-offline.json
"OFFLINE": {
    "tableName": "pinotTable",
    "tableType": "OFFLINE",
    "quota": {
      "maxQueriesPerSecond": 300,
      "storage": "140G"
    },
    "routing": {
      "segmentPrunerTypes": ["partition"],
      "instanceSelectorType": "replicaGroup"
    },
    "segmentsConfig": {
      "schemaName": "pinotTable",
      "timeColumnName": "daysSinceEpoch",
      "timeType": "DAYS",
      "replication": "3",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "365",
      "segmentPushFrequency": "DAILY",
      "segmentPushType": "APPEND"
    },
    "tableIndexConfig": {
      "invertedIndexColumns": ["foo", "bar", "moo"],
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": ["pk"],
      "bloomFilterColumns": [],
      "starTreeIndexConfigs": [],
      "noDictionaryColumns": [],
      "rangeIndexColumns": [],
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "segmentPartitionConfig": {
        "columnPartitionMap": {
          "column_foo": {
          "functionName": "Murmur",
          "numPartitions": 32
        }
      }
      "loadMode": "MMAP",
      "columnMinMaxValueGeneratorMode": null,
      "nullHandlingEnabled": false
    },
    "tenants": {
      "broker": "myBrokerTenant",
      "server": "myServerTenant"
    },
    "ingestionConfig": {
      "filterConfig": {
        "filterFunction": "Groovy({foo == \"VALUE1\"}, foo)"
      },
      "transformConfigs": [{
        "columnName": "bar",
        "transformFunction": "lower(moo)"
      },
      {
        "columnName": "hoursSinceEpoch",
        "transformFunction": "toEpochHours(millis)"
      }]
    }
    "metadata": {
      "customConfigs": {
        "key": "value",
        "key": "value"
      }
    }
  }
}

Real-time table

Here's an example table config for a real-time table. All the fields from the offline table config are valid for the real-time table. Additionally, real-time tables use some extra fields.

pinot-table-realtime.json
"REALTIME": {
    "tableName": "pinotTable",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "pinotTable",
      "timeColumnName": "daysSinceEpoch",
      "timeType": "DAYS",
      "replicasPerPartition": "3",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "5",
      "segmentPushType": "APPEND",
      "completionConfig": {
        "completionMode": "DOWNLOAD"
      }
    },
    "tableIndexConfig": {
      "invertedIndexColumns": ["foo", "bar", "moo"],
      "sortedColumn": ["column1"],
      "noDictionaryColumns": ["metric1", "metric2"],
      "loadMode": "MMAP",
      "nullHandlingEnabled": false,
    },
    "ingestionConfig:" {
      "streamIngestionConfig": {
       "streamConfigMaps":[
        { "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.threshold.segment.size": "150M",
        "stream.kafka.broker.list": "XXXX",
        "stream.kafka.consumer.factory.class.name": "XXXX",
        "stream.kafka.consumer.prop.auto.offset.reset": "largest",
        "stream.kafka.consumer.type": "XXXX",
        "stream.kafka.decoder.class.name": "XXXX",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "XXXX",
        "stream.kafka.decoder.prop.schema.registry.schema.name": "XXXX",
        "stream.kafka.hlc.zk.connect.string": "XXXX",
        "stream.kafka.topic.name": "XXXX",
        "stream.kafka.zk.broker.url": "XXXX",
        "streamType": "kafka"
      }
    ]
    },
    "tenants":{
      "broker": "myBrokerTenant",
      "server": "myServerTenant",
      "tagOverrideConfig": {}
    },
    "metadata": {}
}

Last updated