Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent result due to interleaving of old and new data.
Furthermore, Pinot currently does not support data rollback features. In case of a bad data push, the table owner needs to re-run the flow with the previous data and re-ingest data to Pinot. This end-to-end process can take hours and the Pinot table can potentially be in a bad state during this long period.
The consistent push and rollback protocol allows a user to atomically switch between data snapshots and rollback to the previous data in the case of a bad data push. For complete motivation and reasoning, please refer to the design doc above. Currently, we only support OFFLINE table REFRESH use cases.
Segment lineage data structure has been introduced in Zookeeper (under the path <cluster_name>/PROPERTYSTORE/SEGMENT_LINEAGE/<table_name>
) for keeping track of which segments have been replaced by which new set of segments, as well as corresponding state and timestamp.
When broker answers queries from the users, it will go through the lineage entries and only route to the segments in segmentsFrom
for those in "IN_PROGRESS" or "REVERTED" state and the segments in segmentsTo
for those in "COMPLETED" state, therefore preserving data snapshot atomicity.
Below are the APIs available on the controller to invoke the segment replacement protocol.
startReplaceSegments
: Signifies to the controller that a replacement protocol is about to atomically replace segmentsFrom
, a source list of segments, by segmentsTo
, a target list of segments, which then persists a segment lineage entry with "IN PROGRESS" state to Zookeeper and returns its ID.
endReplaceSegments
: Ends the replacement protocol associated with the segment lineage entry ID passed in as a parameter by changing the state to "COMPLETED".
revertReplaceSegments
: Reverts the replacement protocol associated with the segment lineage entry ID passed in as a parameter by changing the state to "REVERTED".
However, we don't typically expect users to invoke these APIs directly.
Instead, consistent push is built into batch ingestion jobs (currently only supported for the standalone execution framework).
Step 0: Adjust the table storage quota to 2x that of the original amount. See #implications-of-enabling-consistent-push for more details.
Step 1: Set up config for your OFFLINE, REFRESH table. Enable consistentDataPush
under IngestionConfig -> BatchIngestionConfig.
Step 2: Execute the job by following instructions for#executing-the-job.
Step 0: Identify the segment lineage entry ID corresponding to the segment swap that would like to be rolled back by using the /lineage
REST API to list segment lineage.
Step 1: Use the revertReplaceSegments
REST API to rollback data.
Step 2: As a sanity check, use the /lineage
REST API again to ensure that the corresponding lineage entry is in "REVERTED" state.
Retention manager manages the cleanup of segments as well as segment lineage data.
On a high level, the cleanup logic is as follows:
Cleanup unused segments: For entries in "COMPLETED" state, we remove segments in segmentsFrom
. For entries in "REVERTED" or "IN_PROGRESS" state whose timestamp is more than 24 hours old, we remove segments in segmentsTo
.
Once all segments in step 1 are cleaned up, we remove the lineage entry.
The cleanup is usually handled in 2 cycles.
Cleanup regarding startReplaceSegment
API:
We proactively remove the first snapshot if the client side is pushing the 3rd snapshot, so we are not exceeding the 2x disk space.
If the previous push fails in the middle (IN_PROGRESS/REVERTED state), we also clean up the segmentsTo
.
Enabling consistent push can lead to up to 2x storage usage (assuming data size between snapshots are roughly equivalent) since at any time, we are potentially keeping both replacing and replaced segments.
Typically, for the REFRESH use case, users would directly replace segments by uploading segments of the same name. With consistent push, however, a timestamp is injected as the segment name postfix in order to differentiate between replacing and to be replaced segments.
Currently, there is no way to disable consistent push for a table with consistent push enabled, due to the unique segment postfix issue mentioned above. Users will need to create a new table until support for disabling consistent push in-place is implemented.
If the push job fails for any reason, the job will rollback all the uploaded segments (revertReplaceSegments
) to maintain data equivalence prior to the push.