LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Overview of segment compaction
  • Compact segments on upserts in a real-time table
  • Example

Was this helpful?

Edit on GitHub
Export as PDF
  1. Manage Data
  2. Import Data
  3. Upsert and Dedup

Segment compaction on upserts

Use segment compaction on upsert-enabled real-time tables.

PreviousStream ingestion with UpsertNextStream ingestion with Dedup

Last updated 1 year ago

Was this helpful?

Overview of segment compaction

Compacting a segment replaces the completed segment with a compacted segment that only contains the latest version of records. For more information about how to use upserts on a real-time table in Pinot, see .

The Pinot upsert feature stores all versions of the record ingested into immutable segments on disk. Even though the previous versions are not queried, they continue to add to the storage overhead. To remove older records (no longer used in query results) and reclaim storage space, we need to compact Pinot segments periodically. Segment compaction is done via a new minion task. To schedule Pinot tasks periodically, see the .

Compact segments on upserts in a real-time table

To compact segments on upserts, complete the following steps:

  1. Ensure task scheduling is enabled and a minion is available.

  2. Add the following to your table configuration. These configurations (except schedule)determine which segments to compact.

"task": {
  "taskTypeConfigsMap": {
    "UpsertCompactionTask": {
      "schedule": "0 */5 * ? * *",
      "bufferTimePeriod": "7d",
      "invalidRecordsThresholdPercent": "30",
      "invalidRecordsThresholdCount": "100000",
      "tableMaxNumTasks": "100",
      "validDocIdsType": "SNAPSHOT"
    }
  }
}
  • bufferTimePeriod: To compact segments once they are complete, set to “0d”. To delay compaction (as the configuration above shows by 7 days ("7d")), specify the number of days to delay compaction after a segment completes.

  • invalidRecordsThresholdPercent (Optional) Limits the older records allowed in the completed segment as a percentage of the total number of records in the segment. In the example above, the completed segment may be selected for compaction when 30% of the records in the segment are old.

  • invalidRecordsThresholdCount (Optional) Limits the older records allowed in the completed segment by record count. In the example above, if the segment contains more than 100K records, it may be selected for compaction.

  • tableMaxNumTasks (Optional) Limits the number of tasks allowed to be scheduled.

  • validDocIdsType (Optional) Specifies the source of validDocIds to fetch when running the data compaction. The valid types are SNAPSHOT, IN_MEMORY, IN_MEMORY_WITH_DELETE

    • SNAPSHOT: Default validDocIds type. This indicates that the validDocIds bitmap is loaded from the snapshot from the Pinot segment. UpsertConfig's enableSnapshot must be enabled for this type.

    • IN_MEMORY: This indicates that the validDocIds bitmap is loaded from the real-time server's in-memory.

    • IN_MEMORY_WITH_DELETE: This indicates that the validDocIds bitmap is read from the real-time server's in-memory. The valid document ids here does take account into the deleted records. UpsertConfig's deleteRecordColumn must be provided for this type.

WARNING Using in-memory based validDocids type (IN_MEMORY, IN_MEMORY_WITH_DELETE) is dangerous as it will not guarantee us the consistency in some edge cases (e.g. fetching validDocIds bitmap while the server is restarting & updating validDocIds).

Because segment compaction is an expensive operation, we do not recommend setting invalidRecordsThresholdPercent and invalidRecordsThresholdCount too low (close to 1). By default, all configurations above are 0, so no thresholds are applied.

Example

The following example includes a dataset with 24M records and 240K unique keys that have each been duplicated 100 times. After ingesting the data, there are 6 segments (5 completed segments and 1 consuming segment) with a total estimated size of 22.8MB.

Submitting the query “set skipUpsert=true; select count(*) from transcript_upsert” before compaction produces 24,000,000 results:

Segment compactions generates a task for each segment to compact. Five tasks were generated in this case because 90% of the records (3.6–4.5M records) are considered ready for compaction in the completed segments, exceeding the configured thresholds.

If a completed segment only contains old records, Pinot immediately deletes the segment (rather than creating a task to compact it).

Submitting the query again shows the count matches the set of 240K unique keys.

Once segment compaction has completed, the total number of segments remain the same and the total estimated size drops to 2.77MB.

To further improve query latency, merge small segments into larger one.

After the compaction tasks are complete, the reports the following.

Stream Ingestion with Upsert
Minion documentation
Minion Task Manager UI
Example dataset
Results before segment compaction
Minion compaction task completed
Results after segment compaction