LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Usage
  • Enable Thread Statistics Collection
  • Enable Query Killing Mechanism
  • Configuration

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Operators
  2. Deployment and Monitoring

OOM Protection Using Automatic Query Killing

Pinot's built in heap usage monitoring and OOM protection

PreviousManaging LogsNextPause ingestion based on resource utilization

Last updated 7 months ago

Was this helpful?

Pinot has implemented a mechanism to monitor the total JVM heap size and per query memory allocation approximation for server.

  • Support for Single-Stage Queries:

  • Support for Multi-Stage Queries (available in 1.3.0) :

The feature is OFF by default. When enabled, this mechanism can help to protect the servers and brokers from OOM caused by expensive queries (e.g. distinctcount + group by on high cardinality columns). Upon an immediate risk of heap depletion, this mechanism will kick in and kill from the most expensive query(s).

The feature has two components on each broker and server:

  • Statistics framework that tracks resource usage for each query thread.

  • Query killing mechanism.

Usage

Enable Thread Statistics Collection

# Turn on resource usage tracking in statistics framework.
# Configuration has to be set in broker and server config files.
pinot.broker.instance.enableThreadAllocatedBytesMeasurement=true
pinot.server.instance.enableThreadAllocatedBytesMeasurement=true
pinot.query.scheduler.accounting.factory.name=org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory
pinot.query.scheduler.accounting.enable.thread.memory.sampling=true

Debug APIs

Once memory sampling has been enabled, the following DEBUG APIs can be used to check memory usage on a broker or server. Note that there are no APIs that aggregate usage across all servers and brokers for a query.

/debug/query/resourceUsage

Returns resource usage aggregated by queryId

[
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000095_O",
    "allocatedBytes": 3239944
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000094_O",
    "allocatedBytes": 3239944
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000093_O",
    "allocatedBytes": 3239944
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000082_O",
    "allocatedBytes": 4520488
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000092_O",
    "allocatedBytes": 2599672
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000087_O",
    "allocatedBytes": 3880216
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000088_O",
    "allocatedBytes": 2599672
  },
  {
    "cpuTimeNs": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000000096_O",
    "allocatedBytes": 1959400
  }
]

/debug/threads/resourceUsage

Returns resource usage of a thread and the queryId of the task.

[
  {
    "taskId": -1,
    "queryId": "174733410000001465",
    "cputimeMS": 0,
    "allocatedBytes": 0
  },
  {
    "taskId": -1,
    "queryId": "174733410000001466",
    "cputimeMS": 0,
    "allocatedBytes": 0
  },
  {
    "taskId": -1,
    "queryId": "174733410000001467",
    "cputimeMS": 0,
    "allocatedBytes": 0
  },
  {
    "taskId": -1,
    "queryId": "Broker_192.168.0.107_8000_174733410000001466_O",
    "cputimeMS": 0,
    "allocatedBytes": 0
  },
  {
    "taskId": 0,
    "queryId": "Broker_192.168.0.107_8000_174733410000001466_O",
    "cputimeMS": 0,
    "allocatedBytes": 3239680
  },
]

Enable Query Killing Mechanism

The statistics framework also starts a watcher task. The watcher task takes decisions on killing queries.

  • By default the watcher task does not take any actions.

  • queries_killed meter tracks the number of queries killed.

The killing mechanism is enabled with the following config:

# Set in broker and server
pinot.query.scheduler.accounting.oom.enable.killing.query=true
pinot.query.scheduler.accounting.query.killed.metric.enabled=true

The watcher task can be in 3 modes depending on the level of heap usage:

  • Normal

  • Critical

  • Panic

The thresholds for these levels is defined by the following configs:

pinot.query.scheduler.accounting.oom.critical.heap.usage.ratio=0.96f (default)
pinot.query.scheduler.accounting.oom.panic.heap.usage.ratio=0.99f (default)

The watcher task runs periodically. The frequency of the watcher task can be configured with:

pinot.query.scheduler.accounting.sleep.ms=30 (default)

However under stress, the task can run faster so that it can react to increase in heap usage faster. The watcher task has to be configured with

  • a threshold when to shift to higher frequency

  • the frequency expressed as a ratio of the default frequency.

pinot.query.scheduler.accounting.oom.alarming.heap.usage.ratio=0.75f (default)
pinot.query.scheduler.accounting.sleep.time.denominator=3 (Run every 30/3=10ms)

Configuration to control which queries are chosen as victims

In panic mode, all queries are killed.

In critical mode, queries below a certain threshold (expressed as a ratio of total heap memory) are not killed.

pinot.query.scheduler.accounting.min.memory.footprint.to.kill.ratio=0.025f

Once the watcher task kills a few queries, it will trigger a GC to reclaim memory. The configuration is:

pinot.query.scheduler.accounting.gc.backoff.count=5

Configuration

Here are the configurations that can be commonly applied to server/broker:

Config
Default
Description

pinot.broker.instance.enableThreadAllocatedBytesMeasurement

pinot.server.instance.enableThreadAllocatedBytesMeasurement

false

Use true if one intend to enable this feature to kill queries by bytes allocated

pinot.server.instance.enableThreadCpuTimeMeasurement pinot.server.instance.enableThreadCpuTimeMeasurement

false

Use true if one intend to enable this feature to kill queries by cpu time

pinot.query.scheduler.accounting.factory.name

DefaultThreadResourceUsageAccountant which only hardens timeout but no preemption

Use org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryIf one intend to enable this feature

pinot.query.scheduler.accounting.enable.thread.memory.sampling

false

Account for threads' memory usage of a query, works only for hotspot jvm. If enabled, the killing decision will be based on memory allocated.

pinot.query.scheduler.accounting.enable.thread.cpu.sampling

false

Account for threads' cpu time of a query. If memory sampling is disabled/unavailable, the killing decision will be based on CPU time. If both are disabled, the framework will not able to pick the most expensive query.

pinot.query.scheduler.accounting.oom.enable.killing.query

false

Whether the framework will actually commit to kill queries. If disabled, only error message will be logged.

pinot.query.scheduler.accounting.publishing.jvm.heap.usage

false

Whether the framework periodically publishes the heap usage to Pinot metrics.

pinot.query.scheduler.accounting.oom.panic.heap.usage.ratio

0.99

When the heap usage exceeds this ratio, the frame work will kill all the queries. This can be set to be >1 to prevent a full killing from happening.

pinot.query.scheduler.accounting.oom.critical.heap.usage.ratio

0.96

When the heap usage exceeds this ratio, the frame work will kill the most expensive query.

pinot.query.scheduler.accounting.oom.alarming.heap.usage.ratio

0.75

When the heap usage exceeds this ratio, the framework will run more frequently to gather stats and prepare to kill queries timely.

pinot.query.scheduler.accounting.sleep.ms

30ms

The periodical task for query killing wakes up every 30ms

pinot.query.scheduler.accounting.sleep.time.denominator

3 (corresponding to 10ms sleep time at alarming level heap usage)

When the heap usage exceeds this alarming level, the sleep time will be sleepTime/denominator

pinot.query.scheduler.accounting.min.memory.footprint.to.kill.ratio

0.025

If a query allocates memory below this ratio of total heap size (Xmx) it will not be killed. This is to prevent aggressive killing when the heap memory is not mainly allocated for queries

pinot.query.scheduler.accounting.gc.backoff.count

5

When the framework consecutively kills this many expensive queries it will explicitly trigger gc to reclaim the memory. Should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for some gc algorithms.

pinot.query.scheduler.accounting.oom.critical.heap.usage.ratio.delta.after.gc

0.15

if after gc the heap usage is still above this, kill the most expensive query use this to prevent heap size oscillation and repeatedly triggering gc

https://github.com/apache/pinot/pull/9727
https://github.com/apache/pinot/pull/13598