LogoLogo
release-0.11.0
release-0.11.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
      • Controller
      • Broker
      • Server
      • Minion
      • Tenant
      • Schema
      • Table
      • Segment
      • Deep Store
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Hadoop
        • Backfill Data
        • Dimension Table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • Stream Ingestion with Dedup
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
      • Complex Type (Array, Map) Handling
    • Indexing
      • Forward Index
      • Inverted Index
      • Star-Tree Index
      • Bloom Filter
      • Range Index
      • Text search support
      • JSON Index
      • Geospatial
      • Timestamp Index
    • Releases
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Aggregation Functions
      • Transformation Functions
      • User-Defined Functions (UDFs)
      • Grouping Algorithm
      • Query Options
      • Cardinality Estimation
      • Lookup UDF Join
      • Querying JSON data
      • Filtering with IdSet
      • Explain Plan
      • GapFill Function For Time-Series Dataset
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update Documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null Value Support
      • V2 Multi-Stage Query Engine
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Setup cluster
      • Server Startup Status Checkers
      • Setup table
      • Setup ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Tiered Storage
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Access Control
      • Monitoring
      • Tuning
        • Realtime
        • Routing
      • Upgrading Pinot with confidence
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication, Authorization, and ACLs
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FLOOR
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • Histogram
      • hour
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • yearOfWeek
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
Powered by GitBook
On this page
  • Aggregation Config
  • Example Scenario
  • Example Input Data
  • Schema
  • Table Config
  • Example Final Table
  • Requirements
  • Allowed Aggregation Functions
  • Frequently Asked Questions
  • Why not use a Startree?
  • When to not use ingestion aggregation?
  • I already use the aggregateMetrics setting?
  • Does this config work for offline data?
  • Why do all metrics need to be aggregated?

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Developers
  2. Advanced

Ingestion Aggregations

PreviousData Ingestion OverviewNextIngestion Transformations

Last updated 2 years ago

Was this helpful?

Many data analytics use-cases only need aggregated data. For example, data used in charts can be aggregated down to one row per time bucket per dimension combination.

Doing this results in much less storage and better query performance. Configuring this for a table is done via the Aggregation Config in the .

Aggregation Config

The aggregation config controls the aggregations that happen during realtime data ingestion. Offline aggregations must be handled separately.

Below is a description of the config, which is defined in the ingestion config of the table config.

{
  "tableConfig": {
    "tableName": "...",
    "ingestionConfig": {
      "aggregationConfigs": [{
        "columeName": "aggregatedFieldName",
        "aggregationFunction": "<aggregationFunction>(<originalFieldName>)"
      }]
    }
  }
}

Example Scenario

Here is an example of sales data, where only the daily sales aggregates per product are needed.

Example Input Data

{"customerID":205,"product_name": "car","price":"1500.00","timestamp":1571900400000}
{"customerID":206,"product_name": "truck","price":"2200.00","timestamp":1571900400000}
{"customerID":207,"product_name": "car","price":"1300.00","timestamp":1571900400000}
{"customerID":208,"product_name": "truck","price":"700.00","timestamp":1572418800000}
{"customerID":209,"product_name": "car","price":"1100.00","timestamp":1572505200000}
{"customerID":210,"product_name": "car","price":"2100.00","timestamp":1572505200000}
{"customerID":211,"product_name": "truck","price":"800.00","timestamp":1572678000000}
{"customerID":212,"product_name": "car","price":"800.00","timestamp":1572678000000}
{"customerID":213,"product_name": "car","price":"1900.00","timestamp":1572678000000}
{"customerID":214,"product_name": "car","price":"1000.00","timestamp":1572678000000}

Schema

Note that the schema only reflects the final table structure.

{
  "schemaName": "daily_sales_schema",
  "dimensionFieldSpecs": [
    {
      "name": "product_name",
      "dataType": "STRING"
    }
  ],
  "metricSpecs": [
    {
      "name": "sales_count",
      "dataType": "LONG"
    },
    {
      "name": "total_sales",
      "dataType": "DOUBLE"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "daysSinceEpoch",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Table Config

{
  "tableName": "daily_sales",
  "ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "daysSinceEpoch",
        "transformFunction": "toEpochDays(timestamp)"
      }
    ],
    "aggregationConfigs": [
      {
        "columnName": "total_sales",
        "aggregationFunction": "SUM(price)"
      },
      {
        "columnName": "sales_count", 
        "aggregationFunction": "COUNT(*)"
      }
    ]
  }
}

Example Final Table

product_name
sales_count
total_sales
daysSinceEpoch

car

2

2800.00

18193

truck

1

2200.00

18193

truck

1

700.00

18199

car

2

3300.00

18200

truck

1

800.00

18202

car

3

3700.00

18202

Requirements

The following are required for ingestion aggregation to work:

  • All metrics must have aggregation configs.

  • All metrics must be noDictionaryColumns.

Allowed Aggregation Functions

function name
notes

MAX

MIN

SUM

COUNT

Specify as COUNT(*)

DISTINCTCOUNTHLL

Not available yet, but coming soon

Frequently Asked Questions

Why not use a Startree?

Startrees can only be added to realtime segments after the segments has sealed, and creating startrees is CPU-intensive. Ingestion Aggregation works for consuming segments and uses no additional CPU.

Startrees take additional memory to store, while ingestion aggregation stores less data than the original dataset.

When to not use ingestion aggregation?

If the original rows in non-aggregated form are needed, then ingestion-aggregation cannot be used.

I already use the aggregateMetrics setting?

The aggregateMetrics works the same as Ingestion Aggregation, but only allows for the SUM function.

The current changes are backward compatible, so no need to change your table config unless you need a different aggregation function.

Does this config work for offline data?

Ingestion Aggregation only works for realtime ingestion. For offline data, the offline process needs to generate the aggregates separately.

Why do all metrics need to be aggregated?

If a metric isn't aggregated then it will result in more than one row per unique set of dimensions.

type must be lowLevel.

table config
Stream ingestion