LogoLogo
release-1.3.0
release-1.3.0
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
      • Stream ingestion with Upsert
      • Segment compaction on upserts
      • Stream ingestion with Dedup
      • Stream ingestion with CLP
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
        • Complex Type (Array, Map) Handling
        • Complex Type Examples
        • Ingest records with dynamic schemas
      • Reload a table segment
      • Upload a table segment
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Aggregation Functions
        • Array Functions
        • Cardinality Estimation
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • Funnel Analysis
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • Hash Functions
        • JOINs
        • Lookup UDF Join
        • Querying JSON data
        • Transformation Functions
        • URL Functions
        • Window Functions
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • User-Defined Functions (UDFs)
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • ago
      • EXPR_MIN / EXPR_MAX
      • ARRAY_AGG
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • caseWhen
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • COVAR_POP
      • COVAR_SAMP
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTAVG
      • DISTINCTAVGMV
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTSMARTHLL
      • DISTINCTCOUNTHLLPLUS
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DISTINCTCOUNTULL
      • DISTINCTSUM
      • DISTINCTSUMMV
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FIRSTWITHTIME
      • FLOOR
      • FrequentLongsSketch
      • FrequentStringsSketch
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • FUNNELCOUNT
      • FunnelCompleteCount
      • FunnelMaxStep
      • FunnelMatchStep
      • Histogram
      • hour
      • isSubnetOf
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • LAG
      • LASTWITHTIME
      • LEAD
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • percentilekll
      • percentilerawkll
      • percentilekllmv
      • percentilerawkllmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • roundDecimal
      • ROW_NUMBER
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • Extract
      • yearOfWeek
      • FIRST_VALUE
      • LAST_VALUE
      • ST_GeomFromGeoJSON
      • ST_GeogFromGeoJSON
      • ST_AsGeoJSON
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • The order of input relations matter
  • Predicate push-down
  • Optimizing semi-join to use indexes
  • Reduce data shuffle

Was this helpful?

Export as PDF
  1. For Users
  2. Query
  3. Multi-stage query

Optimizing joins

Tips and tricks that can be used to optimize joins

PreviousStatsNextJoin strategies

Was this helpful?

Read the page for a detailed explanation of how joins are implemented.

The order of input relations matter

Apache Pinot does not rely on table statistics to optimize the join order. Instead, it prioritizes the input relations from right to left (based on the order of the tables in the SQL query). This relation is fully consumed to create an in-memory hash table and may be broadcast to all workers. It is less expensive to do a join between a large table and a small table than the other way around, therefore it's important to specify the smaller relation as the right input

Here left means the first relation in the explain plan and right the second one. In SQL, when two tables are joined, the left relation is the first one to specify and the right the second one. But this gets more complicated when three or more tables are joined. It is strongly recommended to use the explain plan to be sure about which input is left and right.

For example, this query:

select largeTable.col1, smallTable.col2
from largeTable 
cross join smallTable

is more efficient than:

select largeTable.col1, smallTable.col2
from smallTable 
cross join largeTable

Predicate push-down

Usually it is faster to filter data before joining it. Pinot automatically pushes down predicates to the individual tables before joining them when it can prove the change doesn't break semantics.

For example, consider the following query:

SELECT customer.c_address, orders.o_shippriority
FROM customer
JOIN orders
    ON customer.c_custkey = orders.o_custkey
WHERE customer.c_nationkey = 1

Is automatically transformed by Pinot into:

SELECT customer.c_address, orders.o_shippriority
FROM (customer WHERE c_nationkey = 1) as customer
JOIN orders
    ON customer.c_custkey = orders.o_custkey

This optimization not only reduces the amount of data that needs to be shuffled and joined but also opens the possibility of using indexes to speed up the query.

Remember that sometimes the predicate push-down is not possible. One example is when one of the inputs is a subquery with a limit like:

SELECT customer.c_address, orders.o_shippriority
FROM (select * from customer LIMIT 10) as customer
JOIN orders
    ON customer.c_custkey = orders.o
WHERE customer.c_nationkey = 1

In this case, although Pinot will push down the predicate into the subquery, it won't be able to push it down into the table scan of the subquery because it would break the semantics of the original limit.

Therefore the final query will be

SELECT customer.c_address, orders.o_shippriority
FROM (select * from 
        (select * from customer LIMIT 10) as temp where WHERE temp.c_nationkey = 1
     ) as customer
JOIN orders
    ON customer.c_custkey = orders.o

This new query is equivalent to the original one and reduce the amount of data that needs to be shuffled and joined but cannot use indexes to speed up the query. In case you want to apply the filter before the limit, you can rewrite the query as:

SELECT customer.c_address, orders.o_shippriority
FROM (select * from customer WHERE temp.c_nationkey = 1 LIMIT 10) as customer
JOIN orders
    ON customer.c_custkey = orders.o

This optimization can be easily seen in the explain plan, where the filter operator will be pushed as one of the sides of the join.

Optimizing semi-join to use indexes

Semi-joins are a special case of joins where the result of the join is not the result of the join itself but the rows of the first table that have a match in the second table.

Queries using semi-joins are usually not written as such but as a query with a subquery in the WHERE clause like:

SELECT customer.c_address, customer.c_nationkey
FROM customer
WHERE EXISTS (SELECT 1 FROM orders WHERE customer.c_custkey = orders.o_custkey)

Or

SELECT customer.c_address, customer.c_nationkey
FROM customer
WHERE c_custkey IN (SELECT o_custkey FROM orders)

In order to use indexes Pinot needs to know the actual values on the subquery at optimization time. Therefore what Pinot does internally is to execute the subquery first and then replace the subquery with the actual values in the main query.

For example, if the subquery in the previous example returns the values 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, the query is transformed into:

SELECT customer.c_address, customer.c_nationkey
FROM customer
WHERE customer.c_custkey IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Which can then be optimized using indexes.

Currently, this optimization cannot be seen in the Pinot explain plan.

Rewriting joins as semi-joins

Sometimes, inner or left joins can be converted into semi-joins. Specifically, the requirements are:

  1. Only columns from the left side must be projected.

  2. The join condition must be equality.

  3. The right side must be unique.

For example, the following two queries are equivalent:

SELECT customer.c_address, customer.c_nationkey
FROM customer
JOIN (SELECT DISTINCT o_custkey FROM orders) AS distinct_orders
on customer.c_custkey = distinct_orders.o_custkey
SELECT customer.c_address, customer.c_nationkey
FROM customer
WHERE c_custkey IN (SELECT o_custkey FROM orders)

But they won't be equivalent if instead of distinct_orders we were using orders . This is because a join repeats rows from the left side if there are repetitions on the right side, while a semi-join never repeats rows.

Pinot applies this optimization automatically if the three conditions explained above are fulfilled. Given that columns in Pinot cannot be marked as unique, the only way to indicate Pinot that the right-hand side is unique is to apply a SQL expression that guarantees that, for example DISTINCT or GROUP BY the columns used in the join condition. Sometimes, it is just easier to rewrite the original SQL to substitute the JOIN with a WHERE EXISTS .

Reduce data shuffle

Pinot supports different types of . It is important to understand them and try to use when possible. This data shuffle is expensive and can be a bottleneck for the query performance. Remember to use stageStats (specially and ) and different explain plan modes to understand how your data is being shuffled.

join operator
join strategies
mailbox send
mailbox receive