LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Implementation details
  • Blocking nature
  • Maximum number of rows
  • Hints
  • join_overflow_mode
  • max_rows_in_join
  • join_strategy
  • Stats
  • executionTimeMs
  • emittedRows
  • maxRowsInJoinReached
  • timeBuildingHashTableMs
  • Explain attributes
  • condition
  • joinType
  • Tips and tricks

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Users
  2. Query
  3. Multi-stage query
  4. Operator Types

Join

Describes the hash join relation operator in the multi-stage query engine.

PreviousFilterNextIntersect

Last updated 11 months ago

Was this helpful?

The hash join operator is used to join two relations using a hash join algorithm. It is a binary operator that takes two inputs, the left and right relations, and produces a single output relation.

This is the only join operator in the multi-stage query engine and it is always created as a result of a query that contains a join clause, but can be created by other SQL queries like ones using semi-join.

There are different types of joins that can be performed using the hash join operator. Apache Pinot supports:

  • Inner join, where only the rows that have a match in both relations are returned.

  • Left join, where all the rows from the left relation are returned. The ones that have a match with the right relation are returned with the columns from the right relation, and the ones that do not have a match are returned with null values for the columns from the right relation.

  • Right join, like the left join but returning all the rows from the right relation, with the columns from the left relation filled with null values for the rows that do not have a match.

  • Full outer join, where all the rows from both relations are returned. If a row from any relation does not have a match in the other relation, the columns from the other relation are filled with null values.

  • Semi-join, where only the rows from the left relation that have a match in the right relation are returned. This is useful to filter the rows from the left relation based on the existence of a match in the right relation.

  • Anti-join, where only the rows from the left relation that do not have a match in the right relation are returned.

Implementation details

The hash join operator is one of the new operators introduced in the multi-stage query engine. The current implementation assumes that the right input relation is the smaller one, so it consumes this input first building a hash table that is then probed with the left input relation.

Future optimizations may include advanced heuristics to decide which input relation to consume first, but in the current implementation, it is important to specify the smaller relation as the right input.

Although the whole multi-stage query engine is designed to be able to process the data in memory, the multi-stage query engine uses the ability to execute each stage in different workers (explained in ) to be able to process the data that may not fit in the memory of a single node. Specifically, each worker processes a subset of the data. Inputs are by default partitioned by the join keys and each worker process one partition of the data.

This means that data usually needs to be shuffled between workers, which is done by the engine using a mailbox system. The engine tries to minimize the amount of data that needs to be shuffled by partitioning the data, but some techniques can be used to reduce the amount of data that needs to be shuffled, like using co-located joins.

Blocking nature

The hash join operator is a blocking operator. It needs to consume all the input data (from both inputs) before emitting the result.

Maximum number of rows

Even using partitioning, the amount of data that needs to be stored in memory can be high, so the engine tries to protect itself from running out of memory by limiting the number of rows that can be emitted from a join operation.

Hints

join_overflow_mode

Type: String

Default: THROW

Defines the behavior of the engine when the number of rows in a join operation exceeds the limit defined by the max_rows_in_join hint. The possible values are:

  • THROW: The query will fail if the number of rows exceeds the limit.

  • BREAK: The engine will stop processing the join and return the results that have been computed so far. In this case the stat maxRowsInJoinReached will be true.

SELECT /*+ joinOptions(join_overflow_mode='BREAK') */ 
  table1.col2
FROM table1 JOIN table2
on table1.col1 = table2.col1

max_rows_in_join

Type: Integer

Default: 1.048.576

The maximum number of rows that can be processed in a join operation. Note that this limit is applied to both the hash table build phase for the join's right input as well as the number of joined rows emitted after matching with the join's left input. What happens when this limit is reached is defined by the join_overflow_mode hint.

Take care when increasing this limit. If the number of rows is too high, the amount of memory used by the engine can be very high, which can lead to very large GC pauses and even out of memory errors.

Example:

SELECT /*+ joinOptions(max_rows_in_join='1000') */ 
  table1.col2
FROM table1 JOIN table2
on table1.col1 = table2.col1

join_strategy

Type: String

Default: empty

Used to change the default join strategy. The alternative value is dynamic_broadcast, which means to send the whole right relation to the servers where the left relation is stored. This is useful when co-located joins cannot be used and the right table is significantly smaller than the left table.

Example:

SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
  col1, aggr(col2)
FROM largeTable
WHERE 
  col3 IN (
    SELECT col1 FROM smallTable
  )
GROUP BY
  promotion_type

Stats

executionTimeMs

Type: Long

The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.

emittedRows

Type: Long

The number of rows emitted by the operator. Joins can emit more rows than the input relations, so this value can be higher than the number of rows in the input. Remember that the number of rows is limited by the max_rows_in_join hint and a large number of rows can lead to high memory usage and long GC pauses, which can affect the performance of the whole system.

maxRowsInJoinReached

Type: Boolean

This stat is set to true when the number of rows exceeds the limit defined by the max_rows_in_join hint.

Notice that by default the engine will throw an exception when this happens in which case no stat will be emitted. Therefore this stat is only emitted when the join_overflow_mode hint is set to BREAK.

timeBuildingHashTableMs

Type: Long

The time spent building the hash table used to probe the join keys, in milliseconds.

A large number here can indicate that the right relation is too large or the right relation is taking too long to be processed.

Explain attributes

The hash join operator is represented in the explain plan as a LogicalJoin explain node.

condition

Type: Expression

The condition that is being applied to the rows to join the relations. The expression may use indexed columns ($0, $1, etc), functions and literals. The indexed columns are always 0-based.

For example, the following explain plan:

LogicalJoin(condition=[=($0, $1)], joinType=[inner])
  PinotLogicalExchange(distribution=[hash[0]])
    LogicalProject(userUUID=[$6])
      LogicalTableScan(table=[[default, userAttributes]])
  PinotLogicalExchange(distribution=[hash[0]])
    LogicalProject(userUUID=[$4])
      LogicalTableScan(table=[[default, userGroups]])

Is saying that the join condition is that the column with index 0 in the left relation is equal to the column with index 1 in the right relation. Given the rest of the explain plan, we can see that the column with index 0 userUUID column in the userAttributes table and the column with index 1 is the userUUID column in the userGroups table.

joinType

Type: String

Tips and tricks

Given there are different optimization techniques related to joins, there is a specify page explaining how tips and tricks that can be applied to joins

The hint can be used to control the behavior of the engine when the number of rows exceeds the limit. This limit can be defined using the hint. By default, this limit is slightly above 1 million rows and the default join overflow mode is THROW, which means that the query will fail if the number of rows exceeds the limit.

The type of join that is being performed. The possible values are: inner, left, right, full, semi and anti, as explained in .

understanding stages
join_overflow_mode
max_rows_in_join
Optimizing joins
Implementation details