githubEdit

1.3.0

Release Notes for 1.3.0

This release brings significant improvements, including enhancements to the multistage query engine and the introduction of an experimental time series query engine for efficient analysis. Key features include database query quotas, cursor-based pagination for large result sets, multi-stream ingestion, and new function support for URL and GeoJson. Security vulnerabilities and several bug fixes and performance enhancements have been addressed, ensuring a more robust and versatile platform.

Multistage Engine Improvements

Reuse common expressions in a query (spool) #14507arrow-up-right, Design Docarrow-up-right

Refines query plan reuse in Apache Pinot by allowing reuse across stages instead of subtrees. Stages are natural boundaries in the query plan, divided into pull-based operators. To execute queries, Pinot introduces stages connected by MailboxSendOperator and MailboxReceiveOperator. The proposal modifies MailboxSendOperator to send data to multiple stages, transforming stage connections into a Directed Acyclic Graph (DAG) for greater efficiency and flexibility.

Segment Plan for MultiStage Queries #13733arrow-up-right, #14212arrow-up-right

It focuses on providing comprehensive execution plans, including physical operator details. The new explain mode aligns with Calcite terminology and uses a broker-server communication flow to analyze and transform query plans into explained physical plans without executing them. A new ExplainedPlanNode is introduced to enrich query execution plans with physical details, ensuring better transparency and debugging capabilities for users.

DataBlock Serde Performance Improvements #13303arrow-up-right, #13304arrow-up-right

Improve the performance of DataBlock building, serialization, and deserialization by reducing memory allocation and copies without altering the binary format. Benchmarks show 1x to 3x throughput gains, with significant reductions in memory allocation, minimizing GC-related latency issues in production. The improvement is achieved by changes to the buffers and the addition of a couple of stream classes.

Notable Improvements and Bug Fixes

Timeseries Engine Support in Pinot Design Docarrow-up-right

Introduction of a Generic Time Series Query Engine in Apache Pinot, enabling native support for various time-series query languages (e.g., PromQL, M3QL) through a pluggable framework. This enhancement addresses limitations in Pinot’s current SQL-based query engines for time-series analysis, providing optimized performance and usability for observability use cases, especially those requiring high-cardinality metrics.

NOTE: Timeseries Engine support in Pinot is currently in an Experimental state.

Key Features

Pluggable Time-Series Query Language:

  • Pinot will support multiple time-series query languages, such as PromQL and Uber’s M3QL, via plugins like pinot-m3ql.

  • Example queries:

    • Plot hourly order counts for specific merchants.

    • Perform week-over-week analysis of order counts.

  • These plugins will leverage a new SPI module to enable seamless integration of custom query languages.

Pluggable Time-Series Operators:

  • Custom operators specific to each query language (e.g., nonNegativeDerivative or holt_winters) can be implemented within language-specific plugins without modifying Pinot’s core code.

  • Extensible operator abstractions will allow stakeholders to define unique time-series analysis functions.

Advantages of the New Engine:

  • Optimized for Time-Series Data: Processes data in series rather than rows, improving performance and simplifying the addition of complex analysis functions.

  • Reduced Complexity in Pinot Core: The engine reuses existing components like the Multi-Stage Engine (MSE) Query Scheduler, Query Dispatcher, and Mailbox. At the same time, language parsers and planners remain modular in plugins.

  • Improved Usability: Users can run concise and powerful time-series queries in their preferred language, avoiding the verbosity and limitations of SQL.

Impact on Observability Use Cases:

This new engine significantly enhances Pinot’s ability to handle complex time-series analyses efficiently, making it an ideal database for high-cardinality metrics and observability workloads.

The improvement is a step forward in transforming Pinot into a robust and versatile platform for time-series analytics, enabling seamless integration of diverse query languages and custom operators.

Here are some of the key PRs that have been merged as part of this feature:

Database Query Quota #13544arrow-up-right

Introduces the ability to impose query rate limits at the database level, covering all queries made to tables within a database. A database-level rate limiter is implemented, and a new method, acquireDatabase(databaseName), is added to the QueryQuotaManager interface to check database query quotas.

Database Query Quota Configuration

  • Query and storage quotas are now provisioned similarly to table quotas but managed separately in a DatabaseConfig znode.

  • Details about the DatabaseConfig znode:

    • It does not represent a logical database entity.

    • Its absence does not prevent table creation under a database.

    • Deletion does not remove tables within the database.

Default and Override Quotas

  • A default query quota (databaseMaxQueriesPerSecond: 1000) is provided in ClusterConfig.

  • Overrides for specific databases can be configured via znodes (e.g., PROPERTYSTORE/CONFIGS/DATABASE/).

APIs for Configuration

Method
Path
Description

POST

/databases/{databaseName}/quotas?maxQueriesPerSecond=

Sets the database query quota

GET

/databases/{databaseName}/quotas

Get the database query quota

Dynamic Quota Updates

  • Quotas are determined by a combination of default cluster-level quotas and database-specific overrides.

  • Per-broker quotas are adjusted dynamically based on the number of live brokers.

  • Updates are handled via:

    • A custom DatabaseConfigRefreshMessage is sent to brokers upon database config changes.

    • A ClusterConfigChangeListener in ClusterChangeMediator to process updates in cluster configs.

    • Adjustments to per-broker quotas upon broker resource changes.

    • Creation of database rate limiters during the OFFLINE -> ONLINE state transition of tables in BrokerResourceOnlineOfflineStateModel.

This feature provides fine-grained control over query rate limits, ensuring scalability and efficient resource management for databases within Pinot.

Binary Workload Scheduler for Constrained Execution #13847arrow-up-right

Introduction of the BinaryWorkloadScheduler, which categorizes queries into two distinct workloads to ensure cluster stability and prioritize critical operations:

Workload Categories:

1. Primary Workload:

  • Default category for all production traffic.

  • Queries are executed using an unbounded FCFS (First-Come, First-Served) scheduler.

  • Designed for high-priority, critical queries to maintain consistent availability and performance.

2. Secondary Workload:

  • Reserved for ad-hoc queries, debugging tools, dashboards/notebooks, development environments, and one-off tests.

  • Imposes several constraints to minimize impact on the primary workload:

    • Limited concurrent queries: Caps the number of in-progress queries, with excess queries queued.

    • Thread restrictions: Limits the number of worker threads per query and across all queries in the secondary workload.

    • Queue pruning: Queries stuck in the queue too long are pruned based on time or queue length.

Key Benefits:

  • Prioritization: Guarantees the primary workload remains unaffected by resource-intensive or long-running secondary queries.

  • Stability: Protects cluster availability by preventing incidents caused by poorly optimized or excessive ad-hoc queries.

  • Scalability: Efficiently manages traffic in multi-tenant clusters, maintaining service reliability across workloads.

Cursor support will allow Pinot clients to consume query results in smaller chunks. This feature allows clients to work with lesser resources esp. memory. Application logic is more straightforward with cursors. For example an app UI paginates through results in a table or a graph. Cursor support has been implemented using APIs.

API

Method
Path
Description

POST

/query/sql

New broker API parameter has been added to trigger pagination.

GET

/resultStore/{requestId}/results

Broker API that can be used to iterate over the result set of a query submitted using the above API.

GET

/resultStore/{requestId}/

Returns the BrokerResponse metadata of the query.

GET

/resultStore

Lists all the requestIds of all the query results available in the response store.

DELETE

/resultStore/{requestId}/

Delete the results of a query.

SPI

The feature provides two SPIs to extend the feature to support other implementations:

  • ResponseSerde: Serialize/Deserialize the response.

  • ResponseStore: Store responses in a storage system. Both SPIs use Java SPI and the default ServiceLoader to find implementation of the SPIs. All implementation should be annotated with AutoService to help generate files for discovering the implementations.

URL Functions Support #14646arrow-up-right

Implemented various URL functions to handle multiple aspects of URL processing, including extraction, encoding/decoding, and manipulation, making them useful for tasks involving URL parsing and modification

URL Extraction Methods

  • urlProtocol(String url): Extracts the protocol (scheme) from the URL.

  • urlDomain(String url): Extracts the domain from the URL.

  • urlDomainWithoutWWW(String url): Extracts the domain without the leading "www." if present.

  • urlTopLevelDomain(String url): Extracts the top-level domain (TLD) from the URL.

  • urlFirstSignificantSubdomain(String url): Extracts the first significant subdomain from the URL.

  • cutToFirstSignificantSubdomain(String url): Extracts the first significant subdomain and the top-level domain from the URL.

  • cutToFirstSignificantSubdomainWithWWW(String url): Returns the part of the domain that includes top-level subdomains up to the "first significant subdomain", without stripping "www.".

  • urlPort(String url): Extracts the port from the URL.

  • urlPath(String url): Extracts the path from the URL without the query string.

  • urlPathWithQuery(String url): Extracts the path from the URL with the query string.

  • urlQuery(String url): Extracts the query string without the initial question mark (?) and excludes the fragment (#) and everything after it.

  • urlFragment(String url): Extracts the fragment identifier (without the hash symbol) from the URL.

  • urlQueryStringAndFragment(String url): Extracts the query string and fragment identifier from the URL.

  • extractURLParameter(String url, String name): Extracts the value of a specific query parameter from the URL.

  • extractURLParameters(String url): Extracts all query parameters from the URL as an array of name=value pairs.

  • extractURLParameterNames(String url): Extracts all parameter names from the URL query string.

  • urlHierarchy(String url): Generates a hierarchy of URLs truncated at path and query separators.

  • urlPathHierarchy(String url): Generates a hierarchy of path elements from the URL, excluding the protocol and host.

URL Manipulation Methods

  • urlEncode(String url): Encodes a string into a URL-safe format.

  • urlDecode(String url) Decodes a URL-encoded string.

  • urlEncodeFormComponent(String url): Encodes the URL string following RFC-1866 standards, with spaces encoded as +.

  • urlDecodeFormComponent(String url): Decodes the URL string following RFC-1866 standards, with + decoded as a space.

  • urlNetloc(String url): Extracts the network locality (username:password@host:port) from the URL.

  • cutWWW(String url): Removes the leading "www." from a URL’s domain.

  • cutQueryString(String url): Removes the query string, including the question mark.

  • cutFragment(String url): Removes the fragment identifier, including the number sign.

  • cutQueryStringAndFragment(String url): Removes both the query string and fragment identifier.

  • cutURLParameter(String url, String name): Removes a specific query parameter from a URL.

  • cutURLParameters(String url, String[] names): Removes multiple specific query parameters from a URL.

Multi Stream Ingestion Support #13790arrow-up-right, Design Docarrow-up-right

  • Add support to ingest from multiple source by a single table

  • Use existing interface (TableConfig) to define multiple streams

  • Separate the partition id definition between Stream and Pinot segment

  • Compatible with existing stream partition auto expansion logics The feature does not change any existing interfaces. Users could define the table config in the same way and combine with any other transform functions or instance assignment strategies.

New Scalar Functions Support. #14671arrow-up-right

  • intDiv and intDivOrZero: Perform integer division, with intDivOrZero returning zero for division by zero or when dividing a minimal negative number by minus one.

  • isFinite, isInfinite, and isNaN: Check if a double value is finite, infinite, or NaN, respectively.

  • ifNotFinite: Returns a default value if the given value is not finite.

  • moduloOrZero and positiveModulo: Variants of the modulo operation, with moduloOrZero returning zero for division by zero or when dividing a minimal negative number by minus one.

  • negate: Returns the negation of a double value.

  • gcd and lcm: Calculate the greatest common divisor and least common multiple of two long values, respectively.

  • hypot: Computes the hypotenuse of a right-angled triangle given the lengths of the other two sides.

  • byteswapInt and byteswapLong: Perform byte swapping on integer and long values.

GeoJSON Support #14405arrow-up-right

Add support for GeoJSON Scalar functions:

Supported data types:

  • Point

  • LineString

  • Polygon

  • MultiPoint

  • MultiLineString

  • MultiPolygon

  • GeometryCollection

  • Feature

  • FeatureCollection

Improved Implementation of Distinct Operators. #14701arrow-up-right

Main optimizations:

  • Add per data type DistinctTable and utilize primitive type if possible

  • Specialize single-column case to reduce overhead

  • Allow processing null values with dictionary based operators

  • Specialize unlimited LIMIT case

  • Do not create priority queue before collecting LIMIT values

  • Add support for null ordering

Upsert Improvements

Features and Improvements

Track New Segments for Upsert Tables #13992arrow-up-right

  • Improvement for addressing a race condition where newly uploaded segments may be processed by the server before brokers add them to the routing table, potentially causing queries to miss valid documents.

  • Introduce a configurable newSegmentTrackingTimeMs (default 10s) to track new segments on the server side, allowing them to be accessed as optional segments until brokers update their routing tables.

Ensure Upsert Deletion Consistency with Compaction Flow Enabled #13347arrow-up-right

Enhancement addresses inconsistencies in upsert-compaction by introducing a mechanism to track the distinct segment count for primary keys. By ensuring a record exists in only one segment before compacting deleted records, it prevents older non-deleted records from being incorrectly revived during server restarts, ensuring consistent table state.

Consistent Segments Tracking for Consistent Upsert View #13677arrow-up-right

This improves consistent upsert view handling by addressing segment tracking and query inconsistencies. Key changes include:

  • Complete and Consistent Segment Tracking: Introduced a new Set to track segments before registration to the table manager, ensuring synchronized segment membership and validDocIds access.

  • Improved Segment Replacement: Added DuoSegmentDataManager to register both mutable and immutable segments during replacement, allowing queries to access a complete data view without blocking ingestion.

  • Query Handling Enhancements: Queries now acquire the latest consuming segments to avoid missing newly ingested data if the broker's routing table isn't updated.

  • Misc Fixes: Addressed edge cases, such as updating _numDocsIndexed before metadata updates, returning empty bitmaps instead of null, and preventing bitmap re-acquisition outside locking logic. These changes, gated by the new feature flag upsertConfig.consistencyMode, are tested with unit and stress tests in a staging environment to ensure reliability.

Other Notable Improvements and Bug Fixes

Lucene and Text Search Improvements

  • Store index metadata file for Lucene text indexes. #13948arrow-up-right

  • Runtime configurability for Lucene analyzers and query parsers, enabling dynamic text tokenization and advanced log search capabilities like case-sensitive/insensitive searches. #13003arrow-up-right

Security Improvements and Vulnerability Fixes

Miscellaneous Improvements

  • Allow setting ForwardIndexConfig default settings via cluster config. #14773arrow-up-right

  • Extend Merge Rollup Capabilities for Datasketches. #14625arrow-up-right

  • Skip task validation during table creation with schema. #14683arrow-up-right

  • Add capability to configure sketch precision / accuracy for different rollup buckets. Helpful in a space-saving for use cases where historical data does not require high accuracy. #14373arrow-up-right

  • Add support for application-level query quota. #14226arrow-up-right

  • Improvement to allow setting ForwardIndexConfig default settings via cluster config. #14773arrow-up-right

  • Enhanced mutable Index class to be as pluggable. #14609arrow-up-right

  • Improvement to allow configurable initial capacity for IndexedTable. #14620arrow-up-right

  • Add a new segment reload API for flexible control, allowing specific segments to be reloaded on designated servers and enabling workload management through batch processing and replica group targeting. #14544arrow-up-right

  • Add a server API to list segments that need to be refreshed for a table. #14544arrow-up-right

  • Introduced the ability to erase dimension values before rollup in merged segments, reducing cardinality and optimizing space for less critical historical data. #14355arrow-up-right

  • Add support for immutable CLPForwardIndex creator and related classes. #14288arrow-up-right

  • Add support for Minion Task to support automatic Segment Refresh. #14300arrow-up-right

  • Add support for S3A Connector. #14474arrow-up-right

  • Add support for hex decimal to long scalar functions. #14435arrow-up-right

  • Remove emitting null value fields during data transformation for SchemaConformingTransformer. #14351arrow-up-right

  • Improved CSV record reader to skip unparseable lines. #14396arrow-up-right

  • Add the ability to specify a target instance for segment reloading and improve API response messages when segments are not found on the target instances. #14393arrow-up-right

  • Add support for JSON Path Exists function. #14376arrow-up-right

  • Improvement for MSQ explain and stageStats when dealing with empty tables. #14374arrow-up-right

  • Improvement for dynamically adjusting GroupByResultHolder's initial capacity based on filter predicates to optimize resource allocation and improve performance for filtered group-by queries. #14001arrow-up-right

  • Add support for the isEqualSet Function. #14313arrow-up-right

  • Improvement to ensure consistent index configuration by constructing IndexLoadingConfig and SegmentGeneratorConfig from table config and schema, fixing inconsistencies and honouring FieldConfig.EncodingType. #14258arrow-up-right

  • Add usage of CLPMutableForwardIndexV2 by default to improve ingestion performance and efficiency. #14241arrow-up-right

  • Add support for application-level query quota. #14226arrow-up-right

  • Add null handling support for aggregations grouped by MV columns. #14071arrow-up-right

  • Add support to enable the capability to specify zstd and lz4 segment compression via config. #14008arrow-up-right

  • Add support for map data type on UI. #14245arrow-up-right

  • Add support for ComplexType in SchemaInfo to render Complex Column count in UI. #14254arrow-up-right

  • Introduced raw fwd index version V5 containing implicit num doc length, improving space efficiency. #14105arrow-up-right

  • Improvement for colocated Joins without hints. #13943arrow-up-right

  • Enhanced optimizeDictionary is used to optimize var-width type columns optionally. #13994arrow-up-right

  • Add support for BETWEEN in NumericalFilterOptimizer. #14163arrow-up-right

  • Add support for NULLIF scalar function. #14203arrow-up-right

  • Improvement for allowing usage of star-tree index with null handling enabled when no null values in segment columns. #14177arrow-up-right

  • Improvement Improvement for avoiding using setter in IndexLoadingConfig for consuming segment. #14190arrow-up-right

  • Implement consistent data push for Spark3 segment generation and metadata push jobs. #14139arrow-up-right

  • Improvement in addressing ingestion delays in real-time tables with many partitions by mitigating simultaneous segment commits across consumers. #14170arrow-up-right

  • Improve query options validation and error handling. #14158arrow-up-right

  • Add support an arbitrary number of WHEN THEN clauses in the scalar CASE function. #14125arrow-up-right

  • Add support for configuring Theta and Tuple aggregation functions. #14167arrow-up-right

  • Add support for Map type in complex schema. #13906)arrow-up-right

  • Add TTL watermark storage/loading for the dedup feature to prevent stale metadata from being added to the store when loading segments. #14137arrow-up-right

  • Polymorphic scalar function implementation for BETWEEN. #14113arrow-up-right

  • Polymorphic binary arithmetic scalar functions. #14089arrow-up-right

  • Improvement for Adaptive Server Selection to penalize servers returning server-side exceptions. #14029arrow-up-right

  • Add a server-level configuration for the segment server upload to the deep store. #14093arrow-up-right

  • Add support to upload segments in batch mode with METADATA upload type. #13646arrow-up-right

  • Remove recreateDeletedConsumingSegment flag from RealtimeSegmentValidationManager. #14024arrow-up-right

  • Kafka3 support for realtime ingestion. #13891arrow-up-right

  • Allow the building of an index on the preserved field in SchemaConformingTransformer. #13993arrow-up-right

  • Add support to differentiate null and emptyLists for multi-value columns in avro decoder. #13572arrow-up-right

  • Broker config to set default query null handling behavior. #13977arrow-up-right

  • Moves the untarring method to BaseTaskExecutor to enable downloading and untarring from a peer server if deepstore untarring fails and allows DownloadFromServer to be enabled. #13964arrow-up-right

  • Optimize Adaptive Server Selection. #13952arrow-up-right

  • New SPI to support custom executor services, providing default implementations for cached and fixed thread pools. #13921arrow-up-right

  • Introduction of shared IdealStateUpdaterLock for PinotLLCRealtimeSegmentManager to prevent race conditions and timeouts during large segment updates. #13947arrow-up-right

  • Support for configuring aggregation function parameters in the star-tree index. #13835arrow-up-right

  • Write support for creating Pinot segments in the Pinot Spark connector. #13748arrow-up-right

  • Array flattening support in SchemaConformingTransformer. #13890arrow-up-right

  • Allow table names in TableConfigs with or without database name when database context is passed. #13934arrow-up-right

  • Improvement in null handling performance for nullable single input aggregation functions. #13791arrow-up-right

  • Improvement in column-based null handling by refining method naming, adding documentation and updating validation and constructor logic to support column-specific null strategies. #13839arrow-up-right

  • UI load time improvements. #13296arrow-up-right

  • Enhanced the noRawDataForTextIndex config to skip writing raw data when re-using the mutable index is enabled, fixing a global disable issue and improving ingestion performance. #13776arrow-up-right

  • Improvements to polymorphic scalar comparison functions for better backward compatibility. #13870arrow-up-right

  • Add TablePauseStatus to track the pause details. #13803arrow-up-right

  • Check stale dedup metadata when adding new records/segments. #13848arrow-up-right

  • Improve error messages with star-tree indexes creation. #13818arrow-up-right

  • Adds support for ZStandard and LZ4 compression in tar archives, enhancing efficiency and reducing CPU bottlenecks for large-scale data operations. #13782arrow-up-right

  • Support for IPv6 in Net Utils. #13805arrow-up-right

  • Optimize NullableSingleInputAggregationFunction when the entire block is null based on the null bitmap’s cardinality. #13758arrow-up-right

  • Supporting extra headers in the request to support the database in routing the requests. #13417arrow-up-right

  • Adds routing policy details to query error messages for unavailable segments, providing context to ease confusion and expedite issue triage. #13706arrow-up-right

  • Refactoring and cleanup for permissions and access. #13696arrow-up-right, #13633arrow-up-right

  • Prevent 500 error for non-existent tasktype in /tasks/{taskType}/tasks API. #13537arrow-up-right

  • Changed STREAM_DATA_LOSS from a Meter to a Gauge to accurately reflect data loss detection and ensure proper cleanup. #13712arrow-up-right

Bug Fixes

Last updated

Was this helpful?