1.4.0

Release Notes for 1.4.0

This release delivers significant improvements to the Multistage Engine, Pauseless Consumption, Time Series Engine, Logical Table support, Upsert and Deduplication Enhancement, Minion Jobs (including smallSegmentMerger), and Rebalancing capabilities. It also includes numerous smaller features and general bug fixes.

Multistage Engine Lite Mode (Beta) | Runbookarrow-up-right

There's an all new query mode added for running Multistage Engine queries against Pinot, heavily inspired from Uber's Presto over Pinotarrow-up-right query architecture.

The MSE Lite Mode runs queries following a Scatter-Gather paradigm, same as Pinot's V1 Query Engine. Moreover, it adds a configurable limit on the number of records returned by each instance of the leaf stage. This limit is set to 100k records by default.

With MSE Lite Mode, you can enable MSE access for all users without worrying about them breaking their production workloads. Moreover, MSE Lite Mode can scale to 1000s of QPS with minimal hardware, meaning users can now run complicated multi-stage queries leveraging features such as sub-queries, window functions, etc. at high-qps and low-latencies, with minimal reliability risks.

You can enable this by setting all of the following query options:

SET useMultistageEngine=true; 
SET usePhysicalOptimizer=true; 
SET useLiteMode=true;

Multistage Engine Physical Optimizer (Beta) | Runbookarrow-up-right

We have added a new query optimizer for the Multistage Engine that can automatically eliminate or simplify redundant Exchanges. We aim to make this query optimizer the default in future versions.

Uber adopted this optimizer for one of their workloads that needs Colocated Join support, and it proved to be 5-7x faster with 50% less CPU consumed. issue #15871arrow-up-right

To enable this, set the query options:

SET useMultistageEngine=true;
SET usePhysicalOptimizer=true;

Features

  • Capable of simplifying Exchanges for arbitrary complicated queries. No query-hints required.

  • Supports group-by, joins, union-all, etc.

  • Can solve constant queries within the Broker itself.

  • Can simplify Exchange even if the number of partitions of the two join inputs are different. e.g. if the table on the left is partitioned into 8 partitions over 4 servers, and the table on the right is partitioned into 16 partitions over 4 servers, the Physical Optimizer will automatically switch to "Identity Exchanges".

  • Can simplify Exchange even if the servers selected for the two sides of a join are different.

Unsupported but Coming Soon

  • Support for customizing query parallelism via SET stageParallelism=x.

  • Support for dynamic filters for Semi-Join queries.

  • Support for SubPlan based execution for eligible queries.

  • Support for "Lookup Join" optimization.

  • Support for Spools.

Here are some of the key PRs that have been merged as part of this feature

Multistage Engine Enhancements

Multiple Window Functions in MSE #16109arrow-up-right

The multi-stage engine now supports multiple WINDOW functions in a single query plan, enabling more expressive and efficient analytical queries with improved stage fusion and execution planning.

ASOF JOIN Support #15630arrow-up-right

Introduced support for ASOF JOIN, allowing time-aligned joins commonly used in time-series analytics. This unlocks use cases where approximate matches based on time proximity are required.

Colocated Join with Different Partitions #15764arrow-up-right

The MSE engine now supports colocated joins between tables with different partitioning schemes, improving join flexibility and compatibility with real-world data layouts.

Local Replicated Join & Local Exchange Parallelism #14893arrow-up-right

Optimized join strategies by enabling local replicated joins and local exchanges. This reduces cross-node shuffles and improves performance for high-selectivity joins and co-partitioned data.

Distribution Type Hint for Broadcast Join #14797arrow-up-right

Introduced a planner hint for specifying distribution type (e.g., BROADCAST) to force broadcast joins when appropriate. This gives users more control over join strategy and execution plans.

Dynamic Rule Toggling in Optimizer #15999arrow-up-right

Users can now dynamically enable or disable optimization rules in the query planner (optProgram), allowing fine-grained control and easier tuning for query behavior and debugging.

Parser Enhancements for Type Aliases #15615arrow-up-right

Added support for SQL type aliases like LONG being interpreted as BIGINT, improving compatibility and developer ergonomics.

Task Throttling Based on Heap Usage #16271arrow-up-right

Throttling logic has been introduced for Segment Split Executor (SSE) and Multi-Stage Execution (MSE) tasks. Tasks will be throttled when server heap usage exceeds a configurable threshold to safeguard system stability under load.

Query Cancellation for MSQE with Client-Provided ID #14823arrow-up-right

Extended support for query cancellation in the Multi-Stage Query Engine (MSQE), including cancellation via client-specified query identifiers. This enables better integration with external systems and more robust control over long-running queries.

Pauseless Consumption (Designarrow-up-right)

Pauseless consumption is introduced in Pinot 1.4.0, it enhances real-time analytics by minimizing ingestion delays and improving data freshness in Apache Pinot.

In the current architecture of Apache Pinot, real-time data ingestion pauses during the build and upload phases of the previous segment. These phases can sometimes take a few minutes to complete, causing delays in data availability. As a result, users face a gap in accessing the most recent data, impacting real-time analytics capabilities.

Pauseless consumption resolves this issue by allowing Pinot to continue ingesting data while completing the build and upload phases of the previous segment. This enhancement ensures more up-to-date data availability, significantly reducing latency between ingestion and query.

Here are some of the key PRs that have been merged as part of this feature

Logical Table Support (Designarrow-up-right)

A logical table is a collection of physical tables (REALTIME and OFFLINE tables). A SQL query that uses a logical table will internally scan ALL the physical tables. Conceptually, a logical table is similar to a specific definition of a VIEW in relational databases.

Logical tables are designed to simplify and unify a wide range of use cases by abstracting the complexity of managing multiple physical tables. They enable ZK node scalability by allowing large tables to be split into smaller OFFLINE tables and a REALTIME table, while presenting a single logical table to users—making operations on IdealState, ExternalView, and segment transparently. Logical tables also support ALTER TABLE workflows, such as Kafka topic reconfiguration, schema changes, and table renames, by allowing replacement of the underlying physical table list. For data layout management, like re-streaming and time-based partitioning, logical tables help ensure that ingestion changes remain invisible to users.

Here are some of the key PRs that have been merged as part of this feature

Time Series Engine is Now in Beta

Pinot 1.3.0 introduced a Generic Time Series Query Engine in Apache Pinot, enabling native support for various time-series query languages (e.g., PromQL, M3QL) through a pluggable framework. Multiple enhancements and bugfixes have been added in 1.4.0.

Timeseries Query Execution UI in Pinot Controller #16305arrow-up-right

Added a new UI in the Pinot Controller for visualizing timeseries query execution plans. This feature helps developers and operators better understand query breakdowns, execution stages, and time-series–specific optimizations, making troubleshooting and tuning more intuitive.

Adding controller endpoint to access timeseries API #16286arrow-up-right

Introduces a Prometheus-compatible /query_range endpoint to support time series queries in Pinot. Refactors broker request handling to generalize support for both GET and POST methods, simplifies header extraction, and improves error handling and logging. Includes minor code cleanups and enhancements to maintainability.

Enhancements

Upsert and Dedup

Ensure consistent creation time across replicas to prevent upsert data inconsistency #16034arrow-up-right

Enhancement addresses inconsistent segment creation times across replicas that result in non-deterministic upsert behavior while uploading UploadedRealtimeSegment, leading to data inconsistency. The solution adds zkCreationTime in SegmentMetadataImpl and uses that for comparison tie breaking logic. During segment loading, ZK time is set during all loading flows, and the upsert logic introduces getAuthoritativeCreationTime() to prefer ZK time, ensuring consistent upsert decisions across all replicas while maintaining backward compatibility by falling back to local time if ZK time is unavailable.

Bug fixes

  • Introduce Enablement enum with value ENABLE, DISABLE and DEFAULT to control the enablement of a feature. For DEFAULT enablement, use the default config from upper level (e.g. instance level)

  • Introduce snapshot and preload field as Enablement into UpsertConfig and DedupConfig so that the value can be properly overridden. Currently there is no way to disable at table level when instance level is enabled

  • Always read properties from UpsertContext and DedupContext to avoid the inconsistency of server level override and config change

Cleanups

  • Simplify the constructor for upsert/dedup related configs

  • Re-order some fields/methods for readability

  • Unify the metadata manager creation logic for upsert/dedup

  • Move some constants to CommonConstants

Incompatibility

  • enableSnapshot and enablePreload are deprecated and replaced with snapshot and preload

Here are some of the key PRs

Minion Improvements

Small Segment Merger Task Enhancement #16086arrow-up-right

Enhancement addresses data inconsistency issues in UpsertCompactMerge tasks caused by segment replica creation time mismatches. Instead of using the creation time from the server, the system now uses the creation time from ZK metadata which aligns with upsert tie breaking logic. The task generator passes the maximum creation time of merging segments as task input, ensuring deterministic segment metadata across replicas without additional server calls.

Added config to skip dedup metadata updates for non-default tiers #15576arrow-up-right

For dedup-enabled tables, when segments are moved to the cold tier, usually they are out of the metadata TTL thus we can skip updating the dedup metadata for it to reduce the overhead of metadata construction.

New added config:

  • Table level under dedupConfig: ignoreNonDefaultTiers: ENABLE, DISABLE, or DEFAULT (default, use instance level config)

  • Instance level: pinot.server.instance.dedup,default.ignore.non.default.tiers

Notable Improvements and Bug Fixes

Ingestion and Indexing

Add Multi-column Text index #16103arrow-up-right

Introduced the ability to create a single text index across multiple columns. This reduces indexing overhead for multi-field text search and enables faster search queries where text relevance spans multiple fields.

Apart from saving space on shared intra-column tokens within Lucene, the new index uses a single document id mapping. Example configuration (within table config):

As shown in example above, index configuration allows for both:

  • setting shared index properties that apply to all columns with "properties". Allowed keys are : enableQueryCacheForTextIndex, luceneUseCompoundFile, luceneMaxBufferSizeMB, reuseMutableIndex and all allowed in perColumnProperties.

  • setting column-specific properties (overriding shared ones) with perColumnProperties. Allowed keys: useANDForMultiTermTextIndexQueries, enablePrefixSuffixMatchingInPhraseQueries, stopWordInclude, stopWordExclude, caseSensitive, luceneAnalyzerClass, luceneAnalyzerClassArgs, luceneAnalyzerClassArgTypes, luceneQueryParserClass.

Max JSON Index Heap Usage Configuration #15685arrow-up-right

Introduced a maxBytesSize configuration for mutable JSON indexes to cap memory usage during ingestion. This prevents excessive heap consumption when processing large JSON documents.

Logical Type Support in Avro Enabled by Default #15654arrow-up-right

The pinot-avro ingestion plugin now automatically enables support for Avro logical types such as timestamps and decimals. This improves schema accuracy and reduces the need for manual configuration.

Fix for Real-Time Segment Download #15316arrow-up-right

Resolved an issue that caused failures when downloading real-time table segments during ingestion. This fix improves data availability and reduces ingestion errors.

JSON Confluent Schema Registry Decoder #15273arrow-up-right

Added the KafkaConfluentSchemaRegistryJsonMessageDecoder, enabling seamless ingestion of JSON messages registered in Confluent Schema Registry. This broadens compatibility with Kafka-based pipelines.

Canonicalize BigDecimal Values During Ingestion #14958arrow-up-right

Standardized BigDecimal ingestion by converting values into a canonical form. This ensures consistent deduplication, accurate comparisons, and stable upsert behavior.

New Scalar Functions Support

JSON_MATCH Function Extension Points #15508arrow-up-right

Added extension points for the JSON_MATCH function, allowing developers to plug in custom matching logic during JSON query evaluation.

JsonKeyValueArrayToMap Function #15352arrow-up-right

Introduced a function that converts a JSON key-value array into a map, simplifying certain ETL and query transformations.

H3 Geospatial Functions: gridDisk and gridDistance #15349arrow-up-right, #15259arrow-up-right

Added new geospatial functions for H3 indexing:

  • gridDisk — returns all H3 cells within a given radius.

  • gridDistance — computes the distance between two H3 cells.

Plugin & API Enhancements

ArrowResponseEncoder Implementation #15410arrow-up-right

Added a new ArrowResponseEncoder to support Apache Arrow format responses, enabling faster and more efficient data transfer to compatible clients.

S3 Plugin Checksum Support #15304arrow-up-right

The S3 plugin now supports request and response checksum validation via configuration. This improves data integrity verification when reading from or writing to S3.

Security

Row-Level Security (RLS) Support #16043arrow-up-right

Implemented row-level security policies, allowing fine-grained data access control where different users or groups see only rows they are authorized to view. This is particularly useful for multi-tenant environments.

Groovy Script Static Analysis #14844arrow-up-right

Added static analysis checks for Groovy scripts to detect unsafe patterns before execution, improving the security posture of custom UDFs and transforms.

Notable Features and Updates

Support orderedPreferredReplicas query option for customizable routing strategy #15203arrow-up-right

Introduced the orderedPreferredPools query option, allowing users to provide a prioritized list of server pools as a routing hint. The broker attempts to route queries to these pools in order, falling back gracefully, which enables precise traffic control for canary deployments.

Enforce Schema for All Tables #15333arrow-up-right

Now enforces that all tables have an associated schema, ensuring data integrity and consistency across ingestion and query execution.

Default Load Mode Changed to MMAP #15089arrow-up-right

Updated the default segment load mode to MMAP for better memory efficiency, especially for large datasets.

Workload Configurations for Query Resource Isolation #15109arrow-up-right

Introduced workload-based query resource isolation. Administrators can now define workload profiles with specific resource allocations, improving multi-tenant fairness.

Server-Level Segment Batching for Rebalance #15617arrow-up-right

Added the ability to batch segment assignments at the server level during rebalance operations. This reduces the number of rebalance steps and minimizes disruption.

ClusterConfigChangeHandler and Segment Reindex Throttle #14894arrow-up-right

Introduced a ClusterConfigChangeHandler on servers and added throttling for segment reindexing operations. This prevents excessive load during cluster configuration changes.

Misc. Improvements

Bug Fixes

Was this helpful?