Table

Top-level fields

Second level fields

The following properties can be nested inside the top-level configs.

Quota

Routing

Query

Segments Config

Table Index Config

Field Config List

Specify the columns and the type of indices to be created on those columns. Currently, only Text search columns can be specified using this property. We will be migrating the rest of the indices to this field in future releases.

Realtime Table Config

We will now discuss the sections that are only applicable to realtime tables.

segmentsConfig

Indexing config

Below is the list of fields in streamConfigs section.

IndexingConfig -> streamConfig has been deprecated starting 0.7.0 or commit 9eaea9. Use IngestionConfig -> StreamIngestionConfig -> streamConfigMaps instead.

When specifying realtime.segment.flush.threshold.rows, the actual number of rows per segment is computed using the following formula: `` realtime.segment.flush.threshold.rows / partitionsConsumedByServer

This means that if we set realtime.segment.flush.threshold.rows=1000 and each server consumes 10 partitions, the rows per segment will be:1000/10 = 100

Any additional properties set here will be directly available to the stream consumers. For example, in case of Kafka stream, you could put any of the configs described in Kafka configuraton page, and it will be automatically passed to the KafkaConsumer.

Some of the properties you might want to set:

Example

Here is a minimal example of what the streamConfigs section may look like:

0.6.0 onwards:

"streamConfigs" : {
  "realtime.segment.flush.threshold.rows": "0",
  "realtime.segment.flush.threshold.time": "24h",
  "realtime.segment.flush.threshold.segment.size": "150M",
  "streamType": "kafka",
  "stream.kafka.consumer.type": "LowLevel",
  "stream.kafka.topic.name": "ClickStream",
  "stream.kafka.consumer.prop.auto.offset.reset" : "largest"
}

0.5.0 and prior:

"streamConfigs" : {
  "realtime.segment.flush.threshold.size": "0",
  "realtime.segment.flush.threshold.time": "24h",
  "realtime.segment.flush.desired.size": "150M",
  "streamType": "kafka",
  "stream.kafka.consumer.type": "LowLevel",
  "stream.kafka.topic.name": "ClickStream",
  "stream.kafka.consumer.prop.auto.offset.reset" : "largest"
}

Tenants

Example

  "broker": "brokerTenantName",
  "server": "serverTenantName",
  "tagOverrideConfig" : {
    "realtimeConsuming" : "serverTenantName_REALTIME"
    "realtimeCompleted" : "serverTenantName_OFFLINE"
  }
}

Environment Variables Override

Pinot allows users to define environment variables in the format of ${ENV_NAME} or ${ENV_NAME:DEFAULT_VALUE}as field values in table config.

Pinot instance will override it during runtime.

Brackets are required when defining the environment variable."$ENV_NAME"is not supported.

Environment variables used without default value in table config have to be available to all Pinot components - Controller, Broker, Server, and Minion. Otherwise, querying/consumption will be affected depending on the service to which these variables are not available.

Below is an example of setting AWS credential as part of table config using environment variable.

Example:

{
...
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY",
      "batchConfigMaps": [
        {
          "inputDirURI": "s3://<my-bucket>/baseballStats/rawdata",
          "includeFileNamePattern": "glob:**/*.csv",
          "excludeFileNamePattern": "glob:**/*.tmp",
          "inputFormat": "csv",
          "outputDirURI": "s3://<my-bucket>/baseballStats/segments",
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "us-west-2",
          "input.fs.prop.accessKey": "${AWS_ACCESS_KEY}",
          "input.fs.prop.secretKey": "${AWS_SECRET_KEY}",
          "push.mode": "tar"
        }
      ],
      "segmentNameSpec": {},
      "pushSpec": {}
    }
  },
...
}

Sample Configurations

Offline Table

pinot-table-offline.json
"OFFLINE": {
    "tableName": "pinotTable",
    "tableType": "OFFLINE",
    "quota": {
      "maxQueriesPerSecond": 300,
      "storage": "140G"
    },
    "routing": {
      "segmentPrunerTypes": ["partition"],
      "instanceSelectorType": "replicaGroup"
    },
    "segmentsConfig": {
      "schemaName": "pinotTable",
      "timeColumnName": "daysSinceEpoch",
      "timeType": "DAYS",
      "allowNullTimeValue": false,
      "replication": "3",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "365",
      "segmentPushFrequency": "DAILY",
      "segmentPushType": "APPEND"
    },
    "tableIndexConfig": {
      "invertedIndexColumns": ["foo", "bar", "moo"],
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": ["pk"],
      "bloomFilterColumns": [],
      "starTreeIndexConfigs": [],
      "noDictionaryColumns": [],
      "rangeIndexColumns": [],
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "segmentPartitionConfig": {
        "pk": {
          "functionName": "Murmur",
          "numPartitions": 32
        }
      }
      "loadMode": "MMAP",
      "columnMinMaxValueGeneratorMode": null,
      "nullHandlingEnabled": false
    },
    "tenants": {
      "broker": "myBrokerTenant",
      "server": "myServerTenant"
    },
    "ingestionConfig": {
      "filterConfig": {
        "filterFunction": "Groovy({foo == \"VALUE1\"}, foo)"
      },
      "transformConfigs": [{
        "columnName": "bar",
        "transformFunction": "lower(moo)"
      },
      {
        "columnName": "hoursSinceEpoch",
        "transformFunction": "toEpochHours(millis)"
      }]
    }
    "metadata": {
      "customConfigs": {
        "key": "value",
        "key": "value"
      }
    }
  }
}

Realtime Table

Here's an example table config for a realtime table. All the fields from the offline table config are valid for the realtime table. Additionally, realtime tables use some extra fields.

pinot-table-realtime.json
"REALTIME": {
    "tableName": "pinotTable",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "pinotTable",
      "timeColumnName": "daysSinceEpoch",
      "timeType": "DAYS",
      "allowNullTimeValue": true,
      "replicasPerPartition": "3",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "5",
      "segmentPushType": "APPEND",
      "completionConfig": {
        "completionMode": "DOWNLOAD"
      }
    },
    "tableIndexConfig": {
      "invertedIndexColumns": ["foo", "bar", "moo"],
      "sortedColumn": ["column1"],
      "noDictionaryColumns": ["metric1", "metric2"],
      "loadMode": "MMAP",
      "nullHandlingEnabled": false,
      "streamConfigs": {
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.threshold.segment.size": "150M",
        "stream.kafka.broker.list": "XXXX",
        "stream.kafka.consumer.factory.class.name": "XXXX",
        "stream.kafka.consumer.prop.auto.offset.reset": "largest",
        "stream.kafka.consumer.type": "XXXX",
        "stream.kafka.decoder.class.name": "XXXX",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "XXXX",
        "stream.kafka.decoder.prop.schema.registry.schema.name": "XXXX",
        "stream.kafka.hlc.zk.connect.string": "XXXX",
        "stream.kafka.topic.name": "XXXX",
        "stream.kafka.zk.broker.url": "XXXX",
        "streamType": "kafka"
      }
    },
    "tenants": {
      "broker": "myBrokerTenant",
      "server": "myServerTenant",
      "tagOverrideConfig": {}
    },
    "metadata": {
    }
}

Last updated