Rebalance Servers
Last updated
Was this helpful?
Last updated
Was this helpful?
The rebalance operation is used to recompute the assignment of brokers or servers in the cluster. This is not a single command, but rather a series of steps that need to be taken.
In the case of servers, rebalance operation is used to balance the distribution of the segments amongst the servers being used by a Pinot table. This is typically done after capacity changes or config changes such as replication or segment assignment strategies or table migration to a different tenant.
Below are changes that need to be followed by a rebalance.
Capacity changes
Increasing/decreasing replication for a table
Changing segment assignment for a table
Moving table from one tenant to a different tenant
These are typically done when downsizing/uplifting a cluster or replacing nodes of a cluster.
Every server added to the Pinot cluster has tags associated with it. A group of servers with the same tag forms a server tenant.
By default, a server in the cluster gets added to the DefaultTenant
i.e. gets tagged as DefaultTenant_OFFLINE
and DefaultTenant_REALTIME
.
Below is an example of how this looks in the znode, as seen in ZooInspector.
0.6.0 onwards
In order to change the server tags, use the following API.
PUT /instances/{instanceName}/updateTags?tags=<comma separated tags>
0.5.0 and prior
UpdateTags API is not available in 0.5.0 and prior. Instead, use this API to update the Instance.
PUT /instances/{instanceName}
For example,
NOTE
The output of GET and input of PUT don't match for this API. Make sure to use the right payload as shown in example above. Particularly, notice that the instance name "Server_host_port" gets split up into separate fields in this PUT API.
When upsizing/downsizing a cluster, you will need to make sure that the host names of servers are consistent. You can do this by setting the following config parameter:
In order to change the replication factor of a table, update the table config as follows:
OFFLINE table - update the replication
field
REALTIME table - update the replicasPerPartition
field
In a scenario where you need to move table across tenants, for e.g table was assigned earlier to a different Pinot tenant and now you want to move it to a separate one, then you need to call the rebalance API with reassignInstances set to true.
To move a table to other tenants, modify the following configs in both realtime and offline tables:
Currently, two rebalance algorithms are supported; one is the default algorithm and the other one is minimal data movement algorithm.
This algorithm is used for most of the cases. When reassignInstances
parameter is set to true, the final lists of instance assignment will be re-computed, and the list of instances is sorted per partition per replica group. Whenever the table rebalance is run, segment assignment will respect the sequence in the sorted list and pick up the relevant instances.
This algorithm focuses more on minimizing the data movement during table rebalance. When reassignInstances
parameter is set to true and this algorithm gets enabled, the position of instances which are still alive remains the same, and vacant seats are filled with newly added instances or last instances in the existing alive instance candidate. So only the instances which change the position will involve in data movement.
In order to switch to this table rebalance algorithm, just simply set the following config to the table config before triggering table rebalance:
When instanceAssignmentConfigMap
is not explicitly configured, minimizeDataMovement
flag can also be set into the segmentsConfig
:
After any of the above described changes are done, a rebalance is needed to make those changes take effect.
To run a rebalance, use the following API.
POST /tables/{tableName}/rebalance?type=<OFFLINE/REALTIME>
This API has a lot of parameters to control its behavior. Make sure to go over them and change the defaults as needed.
Note
Typically, the flags that need to be changed from the default values are
includeConsuming=true for REALTIME
downtime=true if you have only 1 replica, or prefer a faster rebalance at the cost of a momentary downtime
dryRun
false
If set to true, rebalance is run as a dry-run so that you can see the expected changes to the ideal state and instance partition assignment.
preChecks
false
If set to true, some pre-checks are performed and their status is returned. This can only be used with dryRun=true. See the section below for more details.
includeConsuming
true
Applicable for REALTIME tables.
CONSUMING segments are rebalanced only if this is set to true. Moving a CONSUMING segment involves dropping the data consumed so far on old server, and re-consuming on the new server. If an application is sensitive to increased memory utilization due to re-consumption or to a momentary data staleness, they may choose to not include consuming in the rebalance. Whenever the CONSUMING segment completes, the completed segment will be assigned to the right instances, and the new CONSUMING segment will also be started on the correct instances. If you choose to includeConsuming=false and let the segments move later on, any downsized nodes need to remain untagged in the cluster, until the segment completion happens.
downtime
false
This controls whether Pinot allows downtime while rebalancing. If downtime = true, all replicas of a segment can be moved around in one go, which could result in a momentary downtime for that segment (time gap between ideal state updated to new servers and new servers downloading the segments). If downtime = false, Pinot will make sure to keep certain number of replicas (config in next row) always up. The rebalance will be done in multiple iterations under the hood, in order to fulfill this constraint.
Note: If you have only 1 replica for your table, rebalance with downtime=false is not possible.
minAvailableReplicas
-1
Applicable for rebalance with downtime=false.
This is the minimum number of replicas that are expected to stay alive through the rebalance.
lowDiskMode
false
Applicable for rebalance with downtime=false. When enabled, segments will first be offloaded from servers, then added to servers after offload is done. It may increase the total time of the rebalance, but can be useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table to more servers.
bestEfforts
false
Applicable for rebalance with downtime=false.
If a no-downtime rebalance cannot be performed successfully, this flag controls whether to fail the rebalance or do a best-effort rebalance. Warning: setting this flag to true can cause downtime under two scenarios: 1) any segments get into ERROR state and 2) EV-IS convergence times out
reassignInstances
true
Applicable to tables where the instance assignment has been persisted to zookeeper. Setting this to true will make the rebalance first update the instance assignment, and then rebalance the segments. This option should be set to true if the instance assignment will be changed (e.g. increasing replication or instances per replica for replicaGroup based assignment)
minimizeDataMovement
ENABLE
Whether to ENABLE minimizeDataMovement, DISABLE it, or DEFAULT to the value in the TableConfig. If enabled, it reduces the segments that will be moved by trying to minimize the changes to the instance assignment. For tables using implicit instance assignment (no INSTANCE_PARTITIONS) this is a no-op.
bootstrap
false
Rebalances all segments again, as if adding segments to an empty table. If this is false, then the rebalance will try to minimize segment movements. Warning: Only use this option if a reshuffle of all segments is desirable.
externalViewCheckIntervalInMs
1000
How often to check if external view converges with ideal states
externalViewStabilizationTimeoutInMs
3600000
How long to wait till external view converges with ideal states. For large tables it is recommended to increase this timeout.
heartbeatIntervalInMs
300000
How often to make a status update (i.e. heartbeat)
heartbeatTimeoutInMs
3600000
How long to wait for next status update (i.e. heartbeat) before the job is considered failed
maxAttempts
3
Max number of attempts to rebalance
retryInitialDelayInMs
300000
Initial delay to exponentially backoff retry
updateTargetTier
false
Whether to update segment target tier as part of the rebalance. Only relevant for tiered storage enabled tables.
The following API is used to check the progress of a rebalance Job. The API takes the jobId of the rebalance job. The API to see the jobIds of rebalance Jobs for a table is shown next.
Below is the API to get the jobIds of rebalance jobs for a given table. The API takes the table name and jobType which is TABLE_REBALANCE.
In the new stats above, rebalanceProgressStatsOverall
is meant for tracking the overall progress of the rebalance job and is the main stats to monitor. The rebalanceProgressStatsCurrentStep
are used to calculate the overall stats, but do not need to be monitored for obtaining the overall rebalance status since the overall stats will be updated regularly. The rebalanceProgressStatsCurrentStep
can be used for debugging if needed.
With options dryRun=true, preChecks=true
, some pre-checks relevant to rebalance will be performed:
isMinimizeDataMovement
Check if the rebalance will run with minimizeDataMovement=true
. This is an important flag for instance assignment strategies such as replicaGroups which controls how much data movement may occur.
PASS if enabled, or when this flag is irrelevant. WARN if it's not enabled.
diskUtilizationDuringRebalance
Check if the disk utilization could become a problem "during" rebalance based on a default threshold defined by the config (defaulted to 0.9): controller.rebalance.disk.utilization.threshold
.
Note that this pre-check could have false negatives. The pre-check passes but a server could still suffer from disk utilization problem, as there are other sources that increase the disk usage, especially under a long time running rebalance job.
PASS if the disk utilization of all servers in the rebalance will be within controller.rebalance.disk.utilization.threshold
if all assigned segments added.
ERROR otherwise, and show the problematic servers.
diskUtilizationAfterRebalance
Similar to diskUtilizationDuringRebalance
but checks the size "after" the rebalance.
This test could pass while diskUtilizationDuringRebalance
fails. For example, a server gets segments but also will delete some, and the net size change falls in the threshold.
PASS if the disk utilization of all servers in the rebalance will be within controller.rebalance.disk.utilization.threshold
if all assigned segments added and unassigned segments removed.
ERROR otherwise, and show the problematic servers.
needsReloadStatus
Check if any of the servers needs to be reloaded (do the segments on these servers need to be updated based on the latest TableConfig and Schema).
PASS if all servers assigned to the table don't need a reload. WARN if any of the server need a reload. ERROR if any of the server fails to answer the need reload status.
rebalanceConfigOptions
Mark any parameters in the rebalance that need a double check, as they might cause performance impact
PASS if no rebalance parameter needs a double-check. WARN if any rebalance parameter that is flagged is set, followed by the description.
The ERROR
status for needsReloadStatus
above maybe due to errors returned by a subset of servers hosting the segments. In such cases, it is recommended to try again or run it manually via needReload
API.
Rebalance (without or without dryRun=true
) will return a summary of the changes that will occur during the rebalance along with the usual instance and segment assignments. Right now, the summary has three different sections:
Server level - captures information about changes occurring at the server level and also dumps per server information about changes taking place.
Segment level - captures information about changes happening at the segment level
Tag level - aggregate information about segment changes, grouped by server tags
Fields such as the status
and description
can be used to identify whether the rebalance will result in any change or not (status=NO-OP
indicates that the table is already balanced), and can be a quick check prior to checking the summary.
See Examples and Scenarios for how the rebalance summary looks under different scenarios.
serverInfo.numServersGettingNewSegments
The number of servers that will get new segment replicas in this rebalance.
serverInfo.numServers
The number of servers assigned to this table, including values before and after the rebalance.
serverInfo.serversAdded
A list of servers newly added to the assignment of this table in this rebalance.
serverInfo.serversRemoved
A list of servers removed from the assignment of this table in this rebalance.
serverInfo.serversUnchanged
A list of servers remaining in the assignment of this table in this rebalance.
serverInfo.serversGettingNewSegments
A list of servers that will get new segment replicas in this rebalance.
serverInfo.serverSegmentChangeInfo
segmentInfo.totalSegmentsToBeMoved
The number of segment replicas that will be added to a server. This essentially equivalent to how many segments servers need to download.
segmentInfo.totalSegmentsToBeDeleted
The number of segment replicas that will be removed from a server.
segmentInfo.maxSegmentsAddedToASingleServer
The maximum number of segment replicas added to a server across all servers
segmentInfo.estimatedAverageSegmentSizeInBytes
The average size of a segment in one replica of this table.
segmentInfo.totalEstimatedDataToBeMovedInBytes
Calculated by
segmentInfo.totalSegmentsToBeMoved * segmentInfo.estimatedAverageSegmentSizeInBytes
segmentInfo.replicationFactor
The number of replications, including values before and after the rebalance.
segmentInfo.numSegmentsInSingleReplica
The number of segments in a single replica, including values before and after the rebalance.
segmentInfo.numSegmentsAcrossAllReplicas
The total number of segment replicas, including values before and after the rebalance. Equivalent to segmentInfo.replicationFactor * segmentInfo.numSegmentsInSingleReplica
segmentInfo.consumingSegmentToBeMovedSummary.numConsumingSegmentsToBeMoved
For REALTIME tables, the number of CONSUMING segment replicas that will be added to a server. A segment replica is a CONSUMING segment replica if none of the replica of the segment is ONLINE and any of the replica of the segment is CONSUMING. OFFLINE tables do not have this field.
segmentInfo.consumingSegmentToBeMovedSummary.numServersGettingConsumingSegmentsAdded
For REALTIME tables, the number of servers that will get a new CONSUMING segment replicas. OFFLINE tables do not have this field.
segmentInfo.consumingSegmentToBeMovedSummary.consumingSegmentsToBeMovedWithMostOffsetsToCatchUp
For REALTIME tables, the top 10 of the most offset difference between the start offset and the latest offset across all consuming segments being added to a server. This determines how many offsets a server needs to re-consume from the stream after adding this CONSUMING segment replica. The actual overhead of re-consuming depends on the stream as the overhead varies across different streams. Note that only ingestion from Kafka has this information available now. If the information failed to be fetched, this field will not be included in the summary. OFFLINE tables do not have this field.
segmentInfo.consumingSegmentToBeMovedSummary.consumingSegmentsToBeMovedWithOldestAgeInMinutes
For REALTIME tables, the top 10 oldest age of the consuming segment across all consuming segments being added to a server. This approximates the oldest data that will need to be re-consumed. Note that this is segment's age instead of data age. The oldest event covered by the consuming segment might be older than the segment's age. If the information failed to be fetched, this field will not be included in the summary.
OFFLINE tables do not have this field.
segmentInfo.consumingSegmentToBeMovedSummary.serverConsumingSegmentSummary
For REALTIME tables, a map from server name to its detailed information of consuming segments that are added to the server. Each has two fields numConsumingSegmentsToBeAdded
and totalOffsetsToCatchUpAcrossAllConsumingSegments
. If the information of the offset failed to be fetched, the latter will be set to -1
.
OFFLINE tables do not have this field.
tagsInfo
Example output:
A Pinot table config has a tenants section, to define the tenant to be used by the table. The Pinot table will use all the servers which belong to the tenant as described in this config. For more details about this, see the section.
The most common segment assignment change is moving from the default segment assignment to replica group segment assignment. Discussing the details of the segment assignment is beyond the scope of this page. More details can be found in and in this .
Note that rebalanceStatus API is available from this
Note that rebalanceStatus API's result has changed from this to add two sections to the existing stats. The goal is to eventually remove the existing stats in favor of these new ones.
From onwards, the stats will include the following newly added sections to the original stats posted above:
For each check the return includes a preCheckStatus
which is one of: PASS
|WARN
|ERROR
and a message to explain what the status means from this OSS PR onwards. Prior to this, these just returned true
| false
|error
with no further explanation.
As part of , a fix was made to fetch the status from the servers currently assigned in the IdealState rather than relying on the tagged instances as this may change as part of rebalance. Prior to this PR, even for scenarios such as tenant move, ERROR
status would be thrown.
A detail breakdown of the segment amount change information per server. See
A list of aggregated segment and server related statistics grouped by tags. See All the tags present in the table config will be present here. It is possible that a server has multiple tags present in the tag list here. In this case, the statistics will be accounted for both tags. If an assigned server does not contain any tag present in the table config (could happen when a table has instance partition and rebalanced with `reassignInstance=false`), it will be categorized under a special tag `OUTDATED_SERVERS`