LogoLogo
release-1.3.0
release-1.3.0
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
      • Stream ingestion with Upsert
      • Segment compaction on upserts
      • Stream ingestion with Dedup
      • Stream ingestion with CLP
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
        • Complex Type (Array, Map) Handling
        • Complex Type Examples
        • Ingest records with dynamic schemas
      • Reload a table segment
      • Upload a table segment
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Aggregation Functions
        • Array Functions
        • Cardinality Estimation
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • Funnel Analysis
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • Hash Functions
        • JOINs
        • Lookup UDF Join
        • Querying JSON data
        • Transformation Functions
        • URL Functions
        • Window Functions
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • User-Defined Functions (UDFs)
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • ago
      • EXPR_MIN / EXPR_MAX
      • ARRAY_AGG
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • caseWhen
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • COVAR_POP
      • COVAR_SAMP
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTAVG
      • DISTINCTAVGMV
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTSMARTHLL
      • DISTINCTCOUNTHLLPLUS
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DISTINCTCOUNTULL
      • DISTINCTSUM
      • DISTINCTSUMMV
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FIRSTWITHTIME
      • FLOOR
      • FrequentLongsSketch
      • FrequentStringsSketch
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • FUNNELCOUNT
      • FunnelCompleteCount
      • FunnelMaxStep
      • FunnelMatchStep
      • Histogram
      • hour
      • isSubnetOf
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • LAG
      • LASTWITHTIME
      • LEAD
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • percentilekll
      • percentilerawkll
      • percentilekllmv
      • percentilerawkllmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • roundDecimal
      • ROW_NUMBER
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • Extract
      • yearOfWeek
      • FIRST_VALUE
      • LAST_VALUE
      • ST_GeomFromGeoJSON
      • ST_GeogFromGeoJSON
      • ST_AsGeoJSON
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Aggregation Config
  • Requirements
  • Example Scenario
  • Example Input Data
  • Schema
  • Table Config
  • Example Final Table
  • Allowed Aggregation Functions
  • Frequently Asked Questions
  • Why not use a Startree?
  • When to not use ingestion aggregation?
  • I already use the aggregateMetrics setting?
  • Does this config work for offline data?
  • Why do all metrics need to be aggregated?
  • Why no data show up when I enabled AggregationConfigs?

Was this helpful?

Export as PDF
  1. For Developers
  2. Advanced

Ingestion Aggregations

PreviousData Ingestion OverviewNextIngestion Transformations

Was this helpful?

Many data analytics use-cases only need aggregated data. For example, data used in charts can be aggregated down to one row per time bucket per dimension combination.

Doing this results in much less storage and better query performance. Configuring this for a table is done via the Aggregation Config in the .

Aggregation Config

The aggregation config controls the aggregations that happen during real-time data ingestion. Offline aggregations must be handled separately.

Below is a description of the config, which is defined in the ingestion config of the table config.

{
  "tableConfig": {
    "tableName": "...",
    "ingestionConfig": {
      "aggregationConfigs": [{
        "columnName": "aggregatedFieldName",
        "aggregationFunction": "<aggregationFunction>(<originalFieldName>)"
      }]
    }
  }
}

Requirements

The following are required for ingestion aggregation to work:

  • All metrics must have aggregation configs.

  • All metrics must be noDictionaryColumns.

  • aggregatedFieldName must be in the Pinot schema and originalFieldName must not exist in Pinot schema

Example Scenario

Here is an example of sales data, where only the daily sales aggregates per product are needed.

You can also find it when running RealtimeQuickStart, there is a table called dailySales

Example Input Data

{"customerID":205,"product_name": "car","price":1500.00,"timestamp":1571900400000}
{"customerID":206,"product_name": "truck","price":2200.00,"timestamp":1571900400000}
{"customerID":207,"product_name": "car","price":1300.00,"timestamp":1571900400000}
{"customerID":208,"product_name": "truck","price":700.00,"timestamp":1572418800000}
{"customerID":209,"product_name": "car","price":1100.00,"timestamp":1572505200000}
{"customerID":210,"product_name": "car","price":2100.00,"timestamp":1572505200000}
{"customerID":211,"product_name": "truck","price":800.00,"timestamp":1572678000000}
{"customerID":212,"product_name": "car","price":800.00,"timestamp":1572678000000}
{"customerID":213,"product_name": "car","price":1900.00,"timestamp":1572678000000}
{"customerID":214,"product_name": "car","price":1000.00,"timestamp":1572678000000}

Schema

Note that the schema only reflects the final table structure.

{
  "schemaName": "dailySales",
  "dimensionFieldSpecs": [
    {
      "name": "product_name",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "sales_count",
      "dataType": "LONG"
    },
    {
      "name": "total_sales",
      "dataType": "DOUBLE"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "daysSinceEpoch",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Table Config

From the below aggregation config example, note that price exists in the input data while total_sales exists in the Pinot Schema.

{
  "tableName": "daily_sales",
  "ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "daysSinceEpoch",
        "transformFunction": "toEpochDays(\"timestamp\")"
      }
    ],
    "aggregationConfigs": [
      {
        "columnName": "total_sales",
        "aggregationFunction": "SUM(price)"
      },
      {
        "columnName": "sales_count", 
        "aggregationFunction": "COUNT(*)"
      }
    ]
  }
  "tableIndexConfig": {
    "noDictionaryColumns": [
      "sales_count",
      "total_sales"
    ]
  }
}

Example Final Table

product_name
sales_count
total_sales
daysSinceEpoch

car

2

2800.00

18193

truck

1

2200.00

18193

truck

1

700.00

18199

car

2

3200.00

18200

truck

1

800.00

18202

car

3

3700.00

18202

Allowed Aggregation Functions

function name
notes

MAX

MIN

SUM

COUNT

Specify as COUNT(*)

DISTINCTCOUNTHLL

DISTINCTCOUNTHLLPLUS

SUMPRECISION

Specify as SUMPRECISION(field, precision), precision must be defined. Used to compute the maximum possible size of the field. Cannot be changed later, a new field must be used. The schema for the output field should be BIG_DECIMAL type.

Frequently Asked Questions

Why not use a Startree?

Startrees can only be added to real-time segments after the segments has sealed, and creating startrees is CPU-intensive. Ingestion Aggregation works for consuming segments and uses no additional CPU.

Startrees take additional memory to store, while ingestion aggregation stores less data than the original dataset.

When to not use ingestion aggregation?

If the original rows in non-aggregated form are needed, then ingestion-aggregation cannot be used.

I already use the aggregateMetrics setting?

The aggregateMetrics works the same as Ingestion Aggregation, but only allows for the SUM function.

The current changes are backward compatible, so no need to change your table config unless you need a different aggregation function.

Does this config work for offline data?

Ingestion Aggregation only works for real-time ingestion. For offline data, the offline process needs to generate the aggregates separately.

Why do all metrics need to be aggregated?

If a metric isn't aggregated then it will result in more than one row per unique set of dimensions.

Why no data show up when I enabled AggregationConfigs?

  1. Check if ingestion is normal without AggregationConfigs, this is to isolate the problem

  2. Check Pinot Server log for any warning or error log, especially related to class MutableSegmentImpland method aggregateMetrics.

  3. For JSON data, please ensure you don't double quote numbers, as they are parsed as string internally and won't be able to do the value based aggregation, e.g. sum. Using the above example, data ingestion not working with row: {"customerID":205,"product_name": "car","price":"1500.00","timestamp":1571900400000} , the major issue here is that price number is double quoted so it won't show up. Below is a sample stacktrace:

2024/11/04 00:24:27.760 ERROR [RealtimeSegmentDataManager_dailySales__0__0__20241104T0824Z] [dailySales__0__0__20241104T0824Z] Caught exception while indexing the record at offset: 9 , row: {
  "fieldToValueMap" : {
    "price" : "1000.00",
    "daysSinceEpoch" : 18202,
    "sales_count" : 0,
    "total_sales" : 0.0,
    "product_name" : "car",
    "timestamp" : 1572678000000
  },
  "nullValueFields" : [ "sales_count", "total_sales" ]
}
java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Number (java.lang.String and java.lang.Number are in module java.base of loader 'bootstrap')
	at org.apache.pinot.segment.local.aggregator.SumValueAggregator.applyRawValue(SumValueAggregator.java:25) ~[classes/:?]
	at org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.aggregateMetrics(MutableSegmentImpl.java:855) ~[classes/:?]
	at org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.index(MutableSegmentImpl.java:577) ~[classes/:?]
	at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.processStreamEvents(RealtimeSegmentDataManager.java:641) ~[classes/:?]
	at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.consumeLoop(RealtimeSegmentDataManager.java:477) ~[classes/:?]
	at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager$PartitionConsumer.run(RealtimeSegmentDataManager.java:734) ~[classes/:?]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

Ingestion aggregation config is effective only for real-time tables. (There is no ingestion time aggregation support for offline tables. We need use or pre-process aggregations in the offline data flow using batch processing engines like Spark/MapReduce).

type must be lowLevel.

Specify as DISTINCTCOUNTHLL(field, log2m), default is 12. See for how to define log2m. Cannot be changed later, a new field must be used. The schema for the output field should be BYTES type.

Specify as DISTINCTCOUNTHLLPLUS(field, s, p). See for how to define s and p, they cannot be changed later. The schema for the output field should be BYTES type.

table config
Merge/Rollup Task
Stream ingestion
function reference
function reference