Cardinality Estimation

Cardinality estimation is a classic problem. Pinot solves it with multiple ways each of which has a trade-off between accuracy and latency.

Exact Results

Functions:

  • DistinctCount(x) -> LONG

Returns accurate count for all unique values in a column.

The underlying implementation is using a IntOpenHashSet in library: it.unimi.dsi:fastutil:8.2.3 to hold all the unique values.

Approximate Results

It usually takes a lot of resources and time to compute exact results for unique counting on large datasets. In some circumstances, we can tolerate a certain error rate, in which case we can use approximation functions to tackle this problem.

HyperLogLog

HyperLogLog is an approximation algorithm for unique counting. It uses fixed number of bits to estimate the cardinality of given data set.

Pinot leverages HyperLogLog Class in library com.clearspring.analytics:stream:2.7.0as the data structure to hold intermediate results.

Functions:

  • DistinctCountHLL(x)_ -> LONG_

For column type INT/LONG/FLOAT/DOUBLE/STRING , Pinot treats each value as an individual entry to add into HyperLogLog Object, then compute the approximation by calling method cardinality().

For column type BYTES, Pinot treats each value as a serialized HyperLogLog Object with pre-aggregated values inside. The bytes value is generated by org.apache.pinot.core.common.ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hyperLogLog).

All deserialized HyperLogLog object will be merged into one then calling method **cardinality() **to get the approximated unique count.

Theta Sketches

The Theta Sketch framework enables set operations over a stream of data, and can also be used for cardinality estimation. Pinot leverages the Sketch Class and its extensions from the library org.apache.datasketches:datasketches-java:4.2.0 to perform distinct counting as well as evaluating set operations.

Functions:

  • DistinctCountThetaSketch(<thetaSketchColumn>, <thetaSketchParams>, predicate1, predicate2..., postAggregationExpressionToEvaluate**) **-> LONG

    • thetaSketchColumn (required): Name of the column to aggregate on.

    • thetaSketchParams (required): Parameters for constructing the intermediate theta-sketches. Currently, the only supported parameter is nominalEntries.

    • predicates (optional)_: _ These are individual predicates of form lhs <op> rhs which are applied on rows selected by the where clause. During intermediate sketch aggregation, sketches from the thetaSketchColumn that satisfies these predicates are unionized individually. For example, all filtered rows that match country=USA are unionized into a single sketch. Complex predicates that are created by combining (AND/OR) of individual predicates is supported.

    • postAggregationExpressionToEvaluate (required): The set operation to perform on the individual intermediate sketches for each of the predicates. Currently supported operations are SET_DIFF, SET_UNION, SET_INTERSECT , where DIFF requires two arguments and the UNION/INTERSECT allow more than two arguments.

In the example query below, the where clause is responsible for identifying the matching rows. Note, the where clause can be completely independent of the postAggregationExpression. Once matching rows are identified, each server unionizes all the sketches that match the individual predicates, i.e. country='USA' , device='mobile' in this case. Once the broker receives the intermediate sketches for each of these individual predicates from all servers, it performs the final aggregation by evaluating the postAggregationExpression and returns the final cardinality of the resulting sketch.

select distinctCountThetaSketch(
  sketchCol, 
  'nominalEntries=1024', 
  'country'=''USA'' AND 'state'=''CA'', 'device'=''mobile'', 'SET_INTERSECT($1, $2)'
) 
from table 
where country = 'USA' or device = 'mobile...' 
  • DistinctCountRawThetaSketch(<thetaSketchColumn>, <thetaSketchParams>, predicate1, predicate2..., postAggregationExpressionToEvaluate**)** -> HexEncoded Serialized Sketch Bytes

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

Tuple Sketches

The Tuple Sketch is an extension of the Theta Sketch. Tuple sketches store an additional summary value with each retained entry which makes the sketch ideal for summarizing attributes such as impressions or clicks. Tuple sketches are interoperable with the Theta Sketch and enable set operations over a stream of data, and can also be used for cardinality estimation.

Functions:

  • avgValueIntegerSumTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> Long

    • tupleSketchColumn (required): Name of the column to aggregate on.

    • tupleSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This function can be used to combine the summary values from the random sample stored within the Tuple sketch and formulate an estimate for an average that applies to the entire dataset. The average should be interpreted as applying to each key tracked by the sketch and is rounded to the nearest whole number.

  • distinctCountTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> LONG

    • tupleSketchColumn (required): Name of the column to aggregate on.

    • tupleSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This returns the cardinality estimate for a column where the values are already encoded as Tuple sketches, stored as BYTES.

  • distinctCountRawIntegerSumTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> HexEncoded Serialized Sketch Bytes

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

  • sumValueIntegerSumTupleSketch(<tupleSketchColumn>, <tupleSketchLgK>**) -> Long

    • tupleSketchColumn (required): Name of the column to aggregate on.

    • tupleSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This function can be used to combine the summary values (using sum) from the random sample stored within the Tuple sketch and formulate an estimate that applies to the entire dataset. See avgValueIntegerSumTupleSketch for extracting an average for integer summaries. If other merging options are required, it is best to extract the raw sketches directly or to implement a new Pinot aggregation function to support these.

Compressed Probability Counting (CPC) Sketches

The Compressed Probability Counting(CPC) Sketch enables extremely space-efficient cardinality estimation. The stored CPC sketch can consume about 40% less space than an HLL sketch of comparable accuracy. Pinot can aggregate multiple existing CPC sketches together to get a total distinct count or estimated directly from raw values.

Functions:

  • distinctCountCpcSketch(<cpcSketchColumn>, <cpcSketchLgK>**) -> Long

    • cpcSketchColumn (required): Name of the column to aggregate on.

    • cpcSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This returns the cardinality estimate for a column.

  • distinctCountRawCpcSketch(<cpcSketchColumn>, <cpcSketchLgK>**) -> HexEncoded Serialized Sketch Bytes

    • cpcSketchColumn (required): Name of the column to aggregate on.

    • cpcSketchLgK (optional): lgK which is the the log2 of K, which controls both the size and accuracy of the sketch.

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).

UltraLogLog (ULL) Sketches

The UltraLogLog Sketch from Dynatrace is a variant of HyperLogLog and is used for approximate distinct counts. The UltraLogLog sketch shares many of the same properties of a typical HyperLogLog sketch but requires less space and also provides a simpler and faster estimator.

Pinot uses an production-ready Java implementation available in Hash4j available under the Apache license.

Functions:

  • distinctCountULL(<ullSketchColumn>, <ullSketchPrecision>**) -> Long

    • ullSketchColumn (required): Name of the column to aggregate on.

    • ullSketchPrecision (optional): p which is the precision parameter, which controls both the size and accuracy of the sketch.

This returns the cardinality estimate for a column.

  • distinctCountRawULL(<cpcSketchColumn>, <ullSketchPrecision>**) -> HexEncoded Serialized Sketch Bytes

    • ullSketchColumn (required): Name of the column to aggregate on.

    • ullSketchPrecision (optional): p which is the precision parameter, which controls both the size and accuracy of the sketch.

This is the same as the previous function, except it returns the byte serialized sketch instead of the cardinality sketch. Since Pinot returns responses as JSON strings, bytes are returned as hex encoded strings. The hex encoded string can be deserialized into sketch by using the library org.apache.commons.codec.binaryas Hex.decodeHex(stringValue.toCharArray()).