LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Binary Functions
    • DateTime Functions
    • Funnel Analysis Functions
    • GeoSpatial Functions
    • Hash Functions
    • JSON Functions
    • Math Functions
    • String Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Changes that require a rebalance
  • Capacity changes
  • Replication changes
  • Segment Assignment changes
  • Table Migration to a different tenant
  • Rebalance Algorithms
  • The Default Algorithm
  • Minimal Data Movement Algorithm
  • Running a Rebalance
  • Rebalance Parameters
  • Checking status
  • Canceling Rebalance Jobs
  • Rebalance Pre-Checks
  • Example
  • Rebalance Summary
  • Server Level (serverInfo)
  • Segment Level (segmentInfo)
  • Tag Level (tagsInfo)
  • Example Output

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Operators
  2. Deployment and Monitoring
  3. Rebalance

Rebalance Servers

PreviousRebalanceNextExamples and Scenarios

Last updated 10 days ago

Was this helpful?

The rebalance operation is used to recompute the assignment of brokers or servers in the cluster. This is not a single command, but rather a series of steps that need to be taken.

In the case of servers, rebalance operation is used to balance the distribution of the segments amongst the servers being used by a Pinot table. This is typically done after capacity changes or config changes such as replication or segment assignment strategies or table migration to a different tenant.

Changes that require a rebalance

Below are changes that need to be followed by a rebalance.

  1. Capacity changes

  2. Increasing/decreasing replication for a table

  3. Changing segment assignment for a table

  4. Moving table from one tenant to a different tenant

Capacity changes

These are typically done when downsizing/uplifting a cluster or replacing nodes of a cluster.

Tenants and tags

Every server added to the Pinot cluster has tags associated with it. A group of servers with the same tag forms a server tenant.

By default, a server in the cluster gets added to the DefaultTenant i.e. gets tagged as DefaultTenant_OFFLINE and DefaultTenant_REALTIME.

Below is an example of how this looks in the znode, as seen in ZooInspector.

 {   
    "tableName": "myTable_OFFLINE",
    "tenants" : {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    }
  }

Updating tags

0.6.0 onwards

In order to change the server tags, use the following API.

PUT /instances/{instanceName}/updateTags?tags=<comma separated tags>

0.5.0 and prior

UpdateTags API is not available in 0.5.0 and prior. Instead, use this API to update the Instance.

PUT /instances/{instanceName}

For example,

curl -X PUT "http://localhost:9000/instances/Server_10.1.10.51_7000" 
    -H "accept: application/json" 
    -H "Content-Type: application/json" 
    -d "{ \"host\": \"10.1.10.51\", \"port\": \"7000\", \"type\": \"SERVER\", \"tags\": [ \"newName_OFFLINE\", \"DefaultTenant_REALTIME\" ]}"

NOTE

The output of GET and input of PUT don't match for this API. Make sure to use the right payload as shown in example above. Particularly, notice that the instance name "Server_host_port" gets split up into separate fields in this PUT API.

When upsizing/downsizing a cluster, you will need to make sure that the host names of servers are consistent. You can do this by setting the following config parameter:

pinot.set.instance.id.to.hostname=true

Replication changes

In order to change the replication factor of a table, update the table config as follows:

OFFLINE table - update the replication field

REALTIME table - update the replicasPerPartition field

Segment Assignment changes

Table Migration to a different tenant

In a scenario where you need to move table across tenants, for e.g table was assigned earlier to a different Pinot tenant and now you want to move it to a separate one, then you need to call the rebalance API with reassignInstances set to true.

To move a table to other tenants, modify the following configs in both realtime and offline tables:

"REALTIME": {
  ...
  "tenants": {
    ...
    "server": "<tenant_name>",
    ...
  },
  ...
  "instanceAssignmentConfigMap": {
    ...
    "CONSUMING": {
      ...
      "tagPoolConfig": {
        ...
        "tag": "<tenant_name>_REALTIME",
        ...
      },
      ...
    },
    ...
    "COMPLETED": {
      ...
      "tagPoolConfig": {
        ...
        "tag": "<tenant_name>_REALTIME",
        ...
      },
      ...
    },
    ...
  },
  ...
}
"OFFLINE": {
  ...
  "tenants": {
    ...
    "server": "<tenant_name>",
    ...
  },
  ...
  "instanceAssignmentConfigMap": {
    ...
    "OFFLINE": {
      ...
      "tagPoolConfig": {
        ...
        "tag": "<tenant_name>_OFFLINE",
        ...
      },
      ...
    },
    ...
  },
  ...
}

Rebalance Algorithms

Currently, two rebalance algorithms are supported; one is the default algorithm and the other one is minimal data movement algorithm.

The Default Algorithm

This algorithm is used for most of the cases. When reassignInstances parameter is set to true, the final lists of instance assignment will be re-computed, and the list of instances is sorted per partition per replica group. Whenever the table rebalance is run, segment assignment will respect the sequence in the sorted list and pick up the relevant instances.

Minimal Data Movement Algorithm

This algorithm focuses more on minimizing the data movement during table rebalance. When reassignInstances parameter is set to true and this algorithm gets enabled, the position of instances which are still alive remains the same, and vacant seats are filled with newly added instances or last instances in the existing alive instance candidate. So only the instances which change the position will involve in data movement.

In order to switch to this table rebalance algorithm, just simply set the following config to the table config before triggering table rebalance:

"instanceAssignmentConfigMap": {
  ...
  "OFFLINE": {
    ...
    "replicaGroupPartitionConfig": {
      ...
      "minimizeDataMovement": true,
      ...
    },
    ...
  },
  ...
}

When instanceAssignmentConfigMap is not explicitly configured, minimizeDataMovement flag can also be set into the segmentsConfig:

"segmentsConfig": {
    ...
    "minimizeDataMovement": true,
    ...
}

Running a Rebalance

After any of the above described changes are done, a rebalance is needed to make those changes take effect.

To run a rebalance, use the following API.

POST /tables/{tableName}/rebalance?type=<OFFLINE/REALTIME>

This API has a lot of parameters to control its behavior. Make sure to go over them and change the defaults as needed.

Note

Typically, the flags that need to be changed from the default values are

includeConsuming=true for REALTIME

downtime=true if you have only 1 replica, or prefer a faster rebalance at the cost of a momentary downtime

Rebalance Parameters

Query param
Default value
Description

dryRun

false

If set to true, rebalance is run as a dry-run so that you can see the expected changes to the ideal state and instance partition assignment.

preChecks

false

If set to true, some pre-checks are performed and their status is returned. This can only be used with dryRun=true. See the section below for more details.

includeConsuming

true

Applicable for REALTIME tables.

CONSUMING segments are rebalanced only if this is set to true. Moving a CONSUMING segment involves dropping the data consumed so far on old server, and re-consuming on the new server. If an application is sensitive to increased memory utilization due to re-consumption or to a momentary data staleness, they may choose to not include consuming in the rebalance. Whenever the CONSUMING segment completes, the completed segment will be assigned to the right instances, and the new CONSUMING segment will also be started on the correct instances. If you choose to includeConsuming=false and let the segments move later on, any downsized nodes need to remain untagged in the cluster, until the segment completion happens.

downtime

false

This controls whether Pinot allows downtime while rebalancing. If downtime = true, all replicas of a segment can be moved around in one go, which could result in a momentary downtime for that segment (time gap between ideal state updated to new servers and new servers downloading the segments). If downtime = false, Pinot will make sure to keep certain number of replicas (config in next row) always up. The rebalance will be done in multiple iterations under the hood, in order to fulfill this constraint.

Note: If you have only 1 replica for your table, rebalance with downtime=false is not possible.

minAvailableReplicas

-1

Applicable for rebalance with downtime=false.

This is the minimum number of replicas that are expected to stay alive through the rebalance.

lowDiskMode

false

Applicable for rebalance with downtime=false. When enabled, segments will first be offloaded from servers, then added to servers after offload is done. It may increase the total time of the rebalance, but can be useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table to more servers.

bestEfforts

false

Applicable for rebalance with downtime=false.

If a no-downtime rebalance cannot be performed successfully, this flag controls whether to fail the rebalance or do a best-effort rebalance. Warning: setting this flag to true can cause downtime under two scenarios: 1) any segments get into ERROR state and 2) EV-IS convergence times out

reassignInstances

true

Applicable to tables where the instance assignment has been persisted to zookeeper. Setting this to true will make the rebalance first update the instance assignment, and then rebalance the segments. This option should be set to true if the instance assignment will be changed (e.g. increasing replication or instances per replica for replicaGroup based assignment)

minimizeDataMovement

ENABLE

Whether to ENABLE minimizeDataMovement, DISABLE it, or DEFAULT to the value in the TableConfig. If enabled, it reduces the segments that will be moved by trying to minimize the changes to the instance assignment. For tables using implicit instance assignment (no INSTANCE_PARTITIONS) this is a no-op.

batchSizePerServer

-1

bootstrap

false

Rebalances all segments again, as if adding segments to an empty table. If this is false, then the rebalance will try to minimize segment movements. Warning: Only use this option if a reshuffle of all segments is desirable.

externalViewCheckIntervalInMs

1000

How often to check if external view converges with ideal states

externalViewStabilizationTimeoutInMs

3600000

Maximum time (in milliseconds) to wait for external view to converge with ideal states. It automatically extends the time if progress has been made

heartbeatIntervalInMs

300000

How often to make a status update (i.e. heartbeat)

heartbeatTimeoutInMs

3600000

How long to wait for next status update (i.e. heartbeat) before the job is considered failed

maxAttempts

3

Max number of attempts to rebalance

retryInitialDelayInMs

300000

Initial delay to exponentially backoff retry

updateTargetTier

false

Whether to update segment target tier as part of the rebalance. Only relevant for tiered storage enabled tables.

Checking status

The following API is used to check the progress of a rebalance Job. The API takes the jobId of the rebalance job. The API to see the jobIds of rebalance Jobs for a table is shown next.

curl -X GET "https://localhost:9000/rebalanceStatus/ffb38717-81cf-40a3-8f29-9f35892b01f9" -H "accept: application/json"
{"tableRebalanceProgressStats": {
    "startTimeMs": 1679073157779,
    "status": "DONE", // IN_PROGRESS/DONE/FAILED    
    "timeToFinishInSeconds": 0, // Time it took for the rebalance job after it completes/fails 
    "completionStatusMsg": "Finished rebalancing table: airlineStats_OFFLINE with minAvailableReplicas: 1, enableStrictReplicaGroup: false, bestEfforts: false in 44 ms."
     
     // The total amount of work required for rebalance 
    "initialToTargetStateConvergence": {
      "_segmentsMissing": 0, // Number of segments missing in the current state but present in the target state
      "_segmentsToRebalance": 31, // Number of segments that needs to be assigned to hosts so that the current state can get to the target state.
      "_percentSegmentsToRebalance": 100, // Total number of replicas that needs to be assigned to hosts so that the current state can get to the target state.
      "_replicasToRebalance": 279 // Remaining work to be done in %
    },
    
    // The pending work for rebalance
    "externalViewToIdealStateConvergence": {
      "_segmentsMissing": 0,
      "_segmentsToRebalance": 0,
      "_percentSegmentsToRebalance": 0,
      "_replicasToRebalance": 0
    },
    
    // Additional work to catch up with the new ideal state, when the ideal 
    // state shifts since rebalance started. 
    "currentToTargetConvergence": {
      "_segmentsMissing": 0,
      "_segmentsToRebalance": 0,
      "_percentSegmentsToRebalance": 0,
      "_replicasToRebalance": 0
    },
  },
  "timeElapsedSinceStartInSeconds": 28 // If rebalance is IN_PROGRESS, this gives the time elapsed since it started
  }

Below is the API to get the jobIds of rebalance jobs for a given table. The API takes the table name and jobType which is TABLE_REBALANCE.

curl -X GET "https://localhost:9000/table/airlineStats_OFFLINE/jobstype=OFFLINE&jobTypes=TABLE_REBALANCE" -H "accept: application/json"
 "ffb38717-81cf-40a3-8f29-9f35892b01f9": {
    "jobId": "ffb38717-81cf-40a3-8f29-9f35892b01f9",
    "submissionTimeMs": "1679073157804",
    "jobType": "TABLE_REBALANCE",
    "REBALANCE_PROGRESS_STATS": "{\"initialToTargetStateConvergence\":{\"_segmentsMissing\":0,\"_segmentsToRebalance\":31,\"_percentSegmentsToRebalance\":100.0,\"_replicasToRebalance\":279},\"externalViewToIdealStateConvergence\":{\"_segmentsMissing\":0,\"_segmentsToRebalance\":0,\"_percentSegmentsToRebalance\":0.0,\"_replicasToRebalance\":0},\"currentToTargetConvergence\":{\"_segmentsMissing\":0,\"_segmentsToRebalance\":0,\"_percentSegmentsToRebalance\":0.0,\"_replicasToRebalance\":0},\"startTimeMs\":1679073157779,\"status\":\"DONE\",\"timeToFinishInSeconds\":0,\"completionStatusMsg\":\"Finished rebalancing table: airlineStats_OFFLINE with minAvailableReplicas: 1, enableStrictReplicaGroup: false, bestEfforts: false in 44 ms.\"}",
    "tableName": "airlineStats_OFFLINE"
  "rebalanceProgressStatsOverall": { // Meant to be used to track overall progress of the rebalance job
    "totalSegmentsToBeAdded": 60, // Segments to be added overall as part of this rebalance job
    "totalSegmentsToBeDeleted": 60, // Segments to be deleted overall as part of this rebalance job
    "totalRemainingSegmentsToBeAdded": 21, // Segments that are yet to be added
    "totalRemainingSegmentsToBeDeleted": 15, // Segments that are yet to be deleted
    "totalRemainingSegmentsToConverge": 0, // Segments that belong to the correct instance but who's EV state doesn't match the expected IS state
    "totalCarryOverSegmentsToBeAdded": 0, // Segments adds carried over from the previous rebalance step
    "totalCarryOverSegmentsToBeDeleted": 0, // Segment deletes carried over from the previous rebalance step
    "totalUniqueNewUntrackedSegmentsDuringRebalance": 0, // Newly added segments detected but which are not yet monitored by the rebalance job, some of these may be monitored later
    "percentageRemainingSegmentsToBeAdded": 35, // Percentage segments yet to be added (including carry-over segments)
    "percentageRemainingSegmentsToBeDeleted": 25, // Percentage segments yet to be deleted (including carry-over segments)
    "estimatedTimeToCompleteAddsInSeconds": 10476.487, // Estimated time to complete segment adds in seconds based on historical time taken so far
    "estimatedTimeToCompleteDeletesInSeconds": 6485.444333333333, // Estimated time to complete segment deletes in seconds based on historical time taken so far
    "averageSegmentSizeInBytes": 5448028669, // Average segment size in bytes
    "totalEstimatedDataToBeMovedInBytes": 326881720140, // Total estimated data to be moved (total segments to be added * average segment size)
    "startTimeMs": 1744393492152 // Start time of the rebalance job
  },
  "rebalanceProgressStatsCurrentStep": { // Captures the stats of the current rebalance step being performed
    "totalSegmentsToBeAdded": 45, // Segments to be added as part of this rebalance step
    "totalSegmentsToBeDeleted": 45, // Segments to be deleted as part of this rebalance step
    "totalRemainingSegmentsToBeAdded": 6, // Segments that are yet to be added in this rebalance step
    "totalRemainingSegmentsToBeDeleted": 0, // Segments that are yet to be deleted in this rebalance step
    "totalRemainingSegmentsToConverge": 0, // Segments that belong to the correct instance but who's EV state doesn't match the expected IS state
    "totalCarryOverSegmentsToBeAdded": 0, // Segments adds carried over from the previous rebalance step
    "totalCarryOverSegmentsToBeDeleted": 0, // Segments deletes carried over from the previous rebalance step
    "totalUniqueNewUntrackedSegmentsDuringRebalance": 0, // Newly added segments detected but which are not yet monitored by the rebalance job, some of these may be monitored later
    "percentageRemainingSegmentsToBeAdded": 13.333333333333334, // Percentage segments yet to be added (including carry-over segments due to which it may show > 100%)
    "percentageRemainingSegmentsToBeDeleted": 0, // Percentage segments yet to be deleted (including carry-over segments due to which it may show > 100%)
    "estimatedTimeToCompleteAddsInSeconds": 2993.278923076923, // Estimated time to complete segment adds in seconds for the current step
    "estimatedTimeToCompleteDeletesInSeconds": 0, // Estimated time to complete segment deletes in seconds for the current step
    "averageSegmentSizeInBytes": 5448028669, // Average segment size in bytes
    "totalEstimatedDataToBeMovedInBytes": 245161290105, // Total estimated data to be moved (total segments to be added in this step * average segment size)
    "startTimeMs": 1744393492172 // Start time of the current rebalance step
  },

In the new stats above, rebalanceProgressStatsOverall is meant for tracking the overall progress of the rebalance job and is the main stats to monitor. The rebalanceProgressStatsCurrentStep are used to calculate the overall stats, but do not need to be monitored for obtaining the overall rebalance status since the overall stats will be updated regularly. The rebalanceProgressStatsCurrentStep can be used for debugging if needed.

Canceling Rebalance Jobs

The need may arise to cancel a rebalance job. To do this an API exists which cancels all IN_PROGRESS jobs for the given table. Few caveats about cancellation which one should keep in mind are:

  • Any updates to the IdealState that was already made as part of the rebalance job will continue to be processed. Cancellation prevents the TableRebalancer from making future updates to the IdealState as part of the rebalance job.

  • Cancellation does not rollback the state of the cluster to what it was prior to triggering the rebalance, thus leaving the cluster in an inconsistent state.

  • To fix the inconsistent state caused by a cancel, fix up the cluster components due to which the rebalance was canceled (if needed) and re-trigger a new rebalance job. The new job should help get the cluster into the final state.

Rebalance jobs for a given table can be canceled via the following API:

curl -X 'DELETE' 'http://localhost:9000/tables/airlineStats/rebalance?type=OFFLINE' -H 'accept: application/json'

The above API will return a list of jobIds that were canceled.

[
  "ffb38717-81cf-40a3-8f29-9f35892b01f9"
]

Rebalance Pre-Checks

With options dryRun=true, preChecks=true, some pre-checks relevant to rebalance will be performed:

Pre-check item name
Description
Result

isMinimizeDataMovement

Check if the rebalance will run with minimizeDataMovement=true . This is an important flag for instance assignment strategies such as replicaGroups which controls how much data movement may occur.

PASS if enabled, or when this flag is irrelevant. WARN if it's not enabled.

diskUtilizationDuringRebalance

Check if the disk utilization could become a problem "during" rebalance based on a default threshold defined by the config (defaulted to 0.9): controller.rebalance.disk.utilization.threshold . Note that this pre-check could have false negatives. The pre-check passes but a server could still suffer from disk utilization problem, as there are other sources that increase the disk usage, especially under a long time running rebalance job.

PASS if the disk utilization of all servers in the rebalance will be within controller.rebalance.disk.utilization.threshold if all assigned segments added. ERROR otherwise, and show the problematic servers.

diskUtilizationAfterRebalance

Similar to diskUtilizationDuringRebalance but checks the size "after" the rebalance. This test could pass while diskUtilizationDuringRebalance fails. For example, a server gets segments but also will delete some, and the net size change falls in the threshold.

PASS if the disk utilization of all servers in the rebalance will be within controller.rebalance.disk.utilization.threshold if all assigned segments added and unassigned segments removed. ERROR otherwise, and show the problematic servers.

needsReloadStatus

Check if any of the servers needs to be reloaded (do the segments on these servers need to be updated based on the latest TableConfig and Schema).

PASS if all servers assigned to the table don't need a reload. WARN if any of the server need a reload. ERROR if any of the server fails to answer the need reload status.

rebalanceConfigOptions

Mark any parameters in the rebalance that need a double check, as they might cause performance impact

PASS if no rebalance parameter needs a double-check. WARN if any rebalance parameter that is flagged is set, followed by the description.

replicaGroupsInfo

If replicaGroups are enabled, provides information about the number of replica groups and the number of instances per replica group, otherwise provides information about the replication factor. Provides information for OFFLINE, CONSUMING, COMPLETED and tier depending on what is enabled.

PASS if replicaGroups are disabled or if enabled but the rebalance config reassignInstances=true WARN if replicaGroups is enabled but the rebalance config reassignInstances=false

Example

  "preChecksResult": {
    "isMinimizeDataMovement": {
      "preCheckStatus": "PASS",
      "message": "minimizeDataMovement is enabled"
    },
    "diskUtilizationDuringRebalance" : {
      "preCheckStatus" : "PASS",
      "message" : "Within threshold (<90%)"
    },
    "diskUtilizationAfterRebalance" : {
      "preCheckStatus" : "PASS",
      "message" : "Within threshold (<90%)"
    },
    "replicaGroupsInfo": {
      "preCheckStatus": "PASS",
      "message": "COMPLETED segments - Replica Groups are not enabled, replication: 1\nCONSUMING segments - Replica Groups are not enabled, replication: 1"
    },
    "needsReloadStatus": {
      "preCheckStatus": "ERROR",
      "message": "Could not determine needReload status, run needReload API manually"
    },
    "rebalanceConfigOptions": {
      "preCheckStatus": "PASS",
      "message": "All rebalance parameters look good"
    }
  },

The ERROR status for needsReloadStatus above maybe due to errors returned by a subset of servers hosting the segments. In such cases, it is recommended to try again or run it manually via needReload API.

Rebalance Summary

Rebalance (without or without dryRun=true) will return a summary of the changes that will occur during the rebalance along with the usual instance and segment assignments. Right now, the summary has three different sections:

  • Server level - captures information about changes occurring at the server level and also dumps per server information about changes taking place.

  • Segment level - captures information about changes happening at the segment level

  • Tag level - aggregate information about segment changes, grouped by server tags

Fields such as the status and description can be used to identify whether the rebalance will result in any change or not (status=NO-OP indicates that the table is already balanced), and can be a quick check prior to checking the summary.

See Examples and Scenarios for how the rebalance summary looks under different scenarios.

Server Level (serverInfo)

Field
Description

numServersGettingNewSegments

The number of servers that will get new segment replicas added in this rebalance.

numServers

The number of servers assigned to this table, including values before and after the rebalance.

serversAdded

A list of servers to be newly added to the assignment of this table in this rebalance.

serversRemoved

A list of servers to be removed from the assignment of this table in this rebalance.

serversUnchanged

A list of servers remaining unchanged in the assignment of this table in this rebalance.

serversGettingNewSegments

A list of servers that will get new segment replicas added in this rebalance.

serverSegmentChangeInfo

Segment Level (segmentInfo)

Field
Description

totalSegmentsToBeMoved

The number of segment replicas that will be added to a server. This essentially equivalent to how many segments servers need to download.

totalSegmentsToBeDeleted

The number of segment replicas that will be removed from a server.

maxSegmentsAddedToASingleServer

The maximum number of segment replicas added to a server across all servers

estimatedAverageSegmentSizeInBytes

The average size of a segment in one replica of this table.

totalEstimatedDataToBeMovedInBytes

Calculated by

segmentInfo.totalSegmentsToBeMoved * segmentInfo.estimatedAverageSegmentSizeInBytes

replicationFactor

The number of replications, including values before and after the rebalance.

numSegmentsInSingleReplica

The number of segments in a single replica, including values before and after the rebalance.

numSegmentsAcrossAllReplicas

The total number of segment replicas, including values before and after the rebalance. Equivalent to segmentInfo.replicationFactor * segmentInfo.numSegmentsInSingleReplica

consumingSegmentToBeMovedSummary.numConsumingSegmentsToBeMoved

For REALTIME tables, the number of CONSUMING segment replicas that will be added to a server. A segment replica is a CONSUMING segment replica if none of the replica of the segment is ONLINE and any of the replica of the segment is CONSUMING. OFFLINE tables do not have this field.

consumingSegmentToBeMovedSummary.numServersGettingConsumingSegmentsAdded

For REALTIME tables, the number of servers that will get a new CONSUMING segment replicas. OFFLINE tables do not have this field.

consumingSegmentToBeMovedSummary.consumingSegmentsToBeMovedWithMostOffsetsToCatchUp

For REALTIME tables, the top 10 CONSUMING segments with the largest offset difference between the start offset and the latest offset across all consuming segments being added servers. This determines how many offsets a server needs to re-consume from the stream after adding this CONSUMING segment replica. The actual overhead (memory / CPU utilization) of re-consuming depends on the data in the stream as the overhead varies across different streams.

This is meant to be an indicator for the amount of work (and thus additional resource utilization) that needs to be redone when the CONSUMING segment moves and can be used to decided whether a forceCommit is desirable prior to rebalance. Note that only ingestion from Kafka has this information available as of now. If the information cannot be fetched, this field will not be included in the summary. OFFLINE tables do not have this field.

consumingSegmentToBeMovedSummary.consumingSegmentsToBeMovedWithOldestAgeInMinutes

For REALTIME tables, the top 10 oldest CONSUMING segments based on their segment creation time in the SegmentZkMetadata across all consuming segments being added to servers. This approximates the oldest data that will need to be re-consumed when the segment is moved and can give a high-level estimate of staleness. Note that this is captured as the segment's creation time instead of the actual data age. The oldest event covered by the consuming segment might be older or newer than the segment's creation time and is dependent on the stream's data. If the information cannot be fetched, this field will not be included in the summary.

This is meant to be an indicator for the data staleness expected while running queries when the CONSUMING segment moves and can be used to decided whether a forceCommit is desirable prior to rebalance.

OFFLINE tables do not have this field.

consumingSegmentToBeMovedSummary.serverConsumingSegmentSummary

For REALTIME tables, a map from server name to its detailed information of consuming segments that are added to the server. Each has two fields numConsumingSegmentsToBeAdded and totalOffsetsToCatchUpAcrossAllConsumingSegments . If the information of the offset failed to be fetched, the latter will be set to -1 . OFFLINE tables do not have this field.

Tag Level (tagsInfo)

A list of aggregated segment and server related statistics grouped by tags. See Examples and Scenarios All the tags present in the table config will be present here. It is possible that a server has multiple tags present in the tag list here. In this case, the statistics will be accounted for all relevant tags. If an assigned server does not contain any tag present in the table config (could happen when a table has instance partition and rebalanced with reassignInstance=false), it will be categorized under a special tag OUTDATED_SERVERS.

The fields of each entry will consist of:

Field
Description

tagName

Name of the tag

numSegmentsToDownload

Number of segments that will be downloaded on servers tagged with this tagName

numSegmentsUnchanged

Number of segments that will remain on the servers tagged with this tagName

numServerParticipants

Total number of servers tagged wtih tagName

Example Output

"rebalanceSummaryResult": {
    "serverInfo": {
      "numServersGettingNewSegments": 1,
      "numServers": {
        "valueBeforeRebalance": 2,
        "expectedValueAfterRebalance": 1
      },
      "serversAdded": [],
      "serversRemoved": [
        "Server_7051"
      ],
      "serversUnchanged": [
        "Server_7050"
      ],
      "serversGettingNewSegments": [
        "Server_7050"
      ],
      "serverSegmentChangeInfo": {
        "Server_7051": {
          "serverStatus": "REMOVED",
          "totalSegmentsAfterRebalance": 0,
          "totalSegmentsBeforeRebalance": 5,
          "segmentsAdded": 0,
          "segmentsDeleted": 5,
          "segmentsUnchanged": 0,
          "tagList": [
            "DefaultTenant_OFFLINE"
          ]
        },
        "Server_7050": {
          "serverStatus": "UNCHANGED",
          "totalSegmentsAfterRebalance": 10,
          "totalSegmentsBeforeRebalance": 5,
          "segmentsAdded": 5,
          "segmentsDeleted": 0,
          "segmentsUnchanged": 5,
          "tagList": [
            "DefaultTenant_OFFLINE",
            "DefaultTenant_REALTIME"
          ]
        }
      }
    },
    "segmentInfo": {
      "totalSegmentsToBeMoved": 5,
      "totalSegmentsToBeDeleted": 5,
      "maxSegmentsAddedToASingleServer": 5,
      "estimatedAverageSegmentSizeInBytes": 0,
      "totalEstimatedDataToBeMovedInBytes": 0,
      "replicationFactor": {
        "valueBeforeRebalance": 1,
        "expectedValueAfterRebalance": 1
      },
      "numSegmentsInSingleReplica": {
        "valueBeforeRebalance": 10,
        "expectedValueAfterRebalance": 10
      },
      "numSegmentsAcrossAllReplicas": {
        "valueBeforeRebalance": 10,
        "expectedValueAfterRebalance": 10
      },
      "consumingSegmentToBeMovedSummary": {
        "numConsumingSegmentsToBeMoved": 5,
        "numServersGettingConsumingSegmentsAdded": 1,
        "consumingSegmentsToBeMovedWithMostOffsetsToCatchUp": {
          "airlineStats__6__0__20250414T2046Z": 289,
          "airlineStats__4__0__20250414T2046Z": 287,
          "airlineStats__8__0__20250414T2046Z": 283,
          "airlineStats__2__0__20250414T2046Z": 270,
          "airlineStats__0__0__20250414T2046Z": 265
        },
        "consumingSegmentsToBeMovedWithOldestAgeInMinutes": {
          "airlineStats__8__0__20250414T2046Z": 45,
          "airlineStats__4__0__20250414T2046Z": 45,
          "airlineStats__2__0__20250414T2046Z": 45,
          "airlineStats__6__0__20250414T2046Z": 45,
          "airlineStats__0__0__20250414T2046Z": 45
        },
        "serverConsumingSegmentSummary": {
          "Server_7050": {
            "numConsumingSegmentsToBeAdded": 5,
            "totalOffsetsToCatchUpAcrossAllConsumingSegments": 1394
          }
        }
      }
    },
    "tagsInfo": [
      {
        "tagName": "DefaultTenant_REALTIME",
        "numSegmentsToDownload": 5,
        "numSegmentsUnchanged": 5,
        "numServerParticipants": 1
      }
    ]
  },

A Pinot table config has a tenants section, to define the tenant to be used by the table. The Pinot table will use all the servers which belong to the tenant as described in this config. For more details about this, see the section.

The most common segment assignment change is moving from the default segment assignment to replica group segment assignment. Discussing the details of the segment assignment is beyond the scope of this page. More details can be found in and in this .

How many maximum segment adds per server to update in the IdealState in each step. For non-strict replica group based assignment, this number will be capped at the batchSizePerServer value per rebalance step (some servers may get fewer segments). For strict replica group based assignment, this is a per-server best effort value since each partition of a replica group must be moved as a whole and at least one partition in a replica group should be moved. A value of -1 is used to disable batching (select as many segments as possible per incremental step in rebalance such that minAvailableReplicas is honored). Support for batching is available from commit onwards.

Note that rebalanceStatus API is available from this

Note that rebalanceStatus API's result has changed from this to add two sections to the existing stats. The goal is to eventually remove the existing stats in favor of these new ones.

From onwards, the stats will include the following newly added sections to the original stats posted above:

For each check the return includes a preCheckStatuswhich is one of: PASS|WARN|ERROR and a message to explain what the status means from this OSS PR onwards. Prior to this, these just returned true| false|error with no further explanation.

As part of , a fix was made to fetch the status from the servers currently assigned in the IdealState rather than relying on the tagged instances as this may change as part of rebalance. Prior to this PR, even for scenarios such as tenant move, ERROR status would be thrown.

A detail breakdown of the segment amount change information per server. See

Tenants
commit
commit
commit
https://github.com/apache/pinot/pull/15233
PR #15360
Examples and Scenarios
https://github.com/apache/pinot/pull/15617
Routing
FAQ question