LogoLogo
release-1.2.0
release-1.2.0
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
      • Stream ingestion with Upsert
      • Segment compaction on upserts
      • Stream ingestion with Dedup
      • Stream ingestion with CLP
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
        • Complex Type (Array, Map) Handling
        • Ingest records with dynamic schemas
      • Reload a table segment
      • Upload a table segment
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
    • Release notes
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Aggregation Functions
        • Cardinality Estimation
        • Explain Plan (Single-Stage)
        • Explain Plan (Multi-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
        • Querying JSON data
        • Transformation Functions
        • Window aggregate
        • Funnel Analysis
      • Query Options
      • Multi stage query
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Understanding Stages
        • Explain
        • Stats
      • User-Defined Functions (UDFs)
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • ago
      • EXPR_MIN / EXPR_MAX
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • caseWhen
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • COVAR_POP
      • COVAR_SAMP
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTAVG
      • DISTINCTAVGMV
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DISTINCTSUM
      • DISTINCTSUMMV
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FIRSTWITHTIME
      • FLOOR
      • FrequentLongsSketch
      • FrequentStringsSketch
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • FUNNELCOUNT
      • FunnelCompleteCount
      • FunnelMaxStep
      • FunnelMatchStep
      • Histogram
      • hour
      • isSubnetOf
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • LAG
      • LASTWITHTIME
      • LEAD
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • percentilekll
      • percentilerawkll
      • percentilekllmv
      • percentilerawkllmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • ROW_NUMBER
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • yearOfWeek
      • Extract
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
Export as PDF

Was this helpful?

Cardinality estimation is a classic problem. Pinot solves it with multiple ways each of which has a trade-off between accuracy and latency.

Exact Results

Functions:

  • DistinctCount(x) -> LONG

Returns accurate count for all unique values in a column.

The underlying implementation is using a IntOpenHashSet in library: it.unimi.dsi:fastutil:8.2.3 to hold all the unique values.

Approximate Results

It usually takes a lot of resources and time to compute exact results for unique counting on large datasets. In some circumstances, we can tolerate a certain error rate, in which case we can use approximation functions to tackle this problem.

HyperLogLog

is an approximation algorithm for unique counting. It uses fixed number of bits to estimate the cardinality of given data set.

Pinot leverages in library com.clearspring.analytics:stream:2.7.0as the data structure to hold intermediate results.

Functions:

  • DistinctCountHLL(x)_ -> LONG_

For column type INT/LONG/FLOAT/DOUBLE/STRING, Pinot treats each value as an individual entry to add into HyperLogLog Object, and then computes the approximation by calling method _cardinality().

For column type BYTES, Pinot treats each value as a serialized HyperLogLog Object with pre-aggregated values inside. The bytes value is generated by org.apache.pinot.core.common.ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hyperLogLog).

All deserialized HyperLogLog object will be merged into one then calling method _cardinality() to get the approximated unique count._

HyperLogLogPlusPlus

  • 64-bit hash function is used instead of the 32 bits used in the original paper. This reduces the hash collisions for large cardinalities allowing to remove the large range correction.

  • Some bias is found for small cardinalities when switching from linear counting to the HLL counting. An empirical bias correction is proposed to mitigate the problem.

  • A sparse representation of the registers is implemented to reduce memory requirements for small cardinalities, which can be later transformed to a dense representation if the cardinality grows.

Functions:

  • DistinctCountHLLPlus(<HllPlusColumn>)_ -> LONG_

  • DistinctCountHLLPlus(<HllPlusColumn>, <p>)_ -> LONG_

  • DistinctCountHLLPlus(<HllPlusColumn>, <p>, <sp>)_ -> LONG_

For column type INT/LONG/FLOAT/DOUBLE/STRING , Pinot treats each value as an individual entry to add into HyperLogLogPlus Object, then compute the approximation by calling method _cardinality().

For column type BYTES, Pinot treats each value as a serialized HyperLogLogPlus Object with pre-aggregated values inside. The bytes value is generated by org.apache.pinot.core.common.ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus).

All deserialized HyperLogLogPlus object will be merged into one then calling method _cardinality() to get the approximated unique count._

Theta Sketches

Functions:

  • DistinctCountThetaSketch(<thetaSketchColumn>, <thetaSketchParams>, predicate1, predicate2..., postAggregationExpressionToEvaluate**) **-> LONG

    • thetaSketchColumn (required): Name of the column to aggregate on.

    • thetaSketchParams (required): Parameters for constructing the intermediate theta-sketches. Currently, the only supported parameter is nominalEntries.

    • predicates (optional)_: _ These are individual predicates of form lhs <op> rhs which are applied on rows selected by the where clause. During intermediate sketch aggregation, sketches from the thetaSketchColumn that satisfies these predicates are unionized individually. For example, all filtered rows that match country=USA are unionized into a single sketch. Complex predicates that are created by combining (AND/OR) of individual predicates is supported.

    • postAggregationExpressionToEvaluate (required): The set operation to perform on the individual intermediate sketches for each of the predicates. Currently supported operations are SET_DIFF, SET_UNION, SET_INTERSECT , where DIFF requires two arguments and the UNION/INTERSECT allow more than two arguments.

In the example query below, the where clause is responsible for identifying the matching rows. Note, the where clause can be completely independent of the postAggregationExpression. Once matching rows are identified, each server unionizes all the sketches that match the individual predicates, i.e. country='USA' , device='mobile' in this case. Once the broker receives the intermediate sketches for each of these individual predicates from all servers, it performs the final aggregation by evaluating the postAggregationExpression and returns the final cardinality of the resulting sketch.

  • DistinctCountRawThetaSketch(<thetaSketchColumn>, <thetaSketchParams>, predicate1, predicate2..., postAggregationExpressionToEvaluate**)** -> HexEncoded Serialized Sketch Bytes

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

Tuple Sketches

Functions:

  • avgValueIntegerSumTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> Long

    • tupleSketchColumn (required): Name of the column to aggregate on.

    • tupleSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This function can be used to combine the summary values from the random sample stored within the Tuple sketch and formulate an estimate for an average that applies to the entire dataset. The average should be interpreted as applying to each key tracked by the sketch and is rounded to the nearest whole number.

  • distinctCountTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> LONG

    • tupleSketchColumn (required): Name of the column to aggregate on.

    • tupleSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This returns the cardinality estimate for a column where the values are already encoded as Tuple sketches, stored as BYTES.

  • distinctCountRawIntegerSumTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> HexEncoded Serialized Sketch Bytes

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

  • sumValuesIntegerSumTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> Long

    • tupleSketchColumn (required): Name of the column to aggregate on.

    • tupleSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This function can be used to combine the summary values (using sum) from the random sample stored within the Tuple sketch and formulate an estimate that applies to the entire dataset. See avgValueIntegerSumTupleSketch for extracting an average for integer summaries. If other merging options are required, it is best to extract the raw sketches directly or to implement a new Pinot aggregation function to support these.

Compressed Probability Counting (CPC) Sketches

Functions:

  • distinctCountCpcSketch(<cpcSketchColumn>, <cpcSketchLgK>**) -> Long

    • cpcSketchColumn (required): Name of the column to aggregate on.

    • cpcSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This returns the cardinality estimate for a column.

  • distinctCountRawCpcSketch(<cpcSketchColumn>, <cpcSketchLgK>**) -> HexEncoded Serialized Sketch Bytes

    • cpcSketchColumn (required): Name of the column to aggregate on.

    • cpcSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

UltraLogLog (ULL) Sketches

Functions:

  • distinctCountULL(<ullSketchColumn>, <ullSketchPrecision>**) -> Long

    • ullSketchColumn (required): Name of the column to aggregate on.

    • ullSketchPrecision (optional): p which is the precision parameter, which controls both the size and accuracy of the sketch.

This returns the cardinality estimate for a column.

  • distinctCountRawULL(<cpcSketchColumn>, <ullSketchPrecision>**) -> HexEncoded Serialized Sketch Bytes

    • ullSketchColumn (required): Name of the column to aggregate on.

    • ullSketchPrecision (optional): p which is the precision parameter, which controls both the size and accuracy of the sketch.

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

The algorithm proposes several improvements in the HyperLogLog algorithm to reduce memory requirements and increase accuracy in some ranges of cardinalities.

Pinot leverages in library com.clearspring.analytics:stream:2.7.0as the data structure to hold intermediate results.

The framework enables set operations over a stream of data, and can also be used for cardinality estimation. Pinot leverages the and its extensions from the library org.apache.datasketches:datasketches-java:4.2.0 to perform distinct counting as well as evaluating set operations.

The is an extension of the . Tuple sketches store an additional summary value with each retained entry which makes the sketch ideal for summarizing attributes such as impressions or clicks. Tuple sketches are interoperable with the Theta Sketch and enable set operations over a stream of data, and can also be used for cardinality estimation.

The enables extremely space-efficient cardinality estimation. The stored CPC sketch can consume about 40% less space than an HLL sketch of comparable accuracy. Pinot can aggregate multiple existing CPC sketches together to get a total distinct count or estimated directly from raw values.

The from Dynatrace is a variant of HyperLogLog and is used for approximate distinct counts. The UltraLogLog sketch shares many of the same properties of a typical HyperLogLog sketch but requires less space and also provides a simpler and faster estimator.

Pinot uses an production-ready Java implementation available in available under the Apache license.

select distinctCountThetaSketch(
  sketchCol, 
  'nominalEntries=1024', 
  'country'=''USA'' AND 'state'=''CA'', 'device'=''mobile'', 'SET_INTERSECT($1, $2)'
) 
from table 
where country = 'USA' or device = 'mobile...' 
  1. For Users
  2. Query
  3. Query Syntax

Cardinality Estimation

PreviousAggregation FunctionsNextExplain Plan (Single-Stage)
  • Exact Results
  • Approximate Results
  • HyperLogLog
  • HyperLogLogPlusPlus
  • Theta Sketches
  • Tuple Sketches
  • Compressed Probability Counting (CPC) Sketches
  • UltraLogLog (ULL) Sketches
HyperLogLog
HyperLogLog Class
HyperLogLog++
HyperLogLogPlus Class
Theta Sketch
Sketch Class
Tuple Sketch
Theta Sketch
Compressed Probability Counting(CPC) Sketch
UltraLogLog Sketch
Hash4j