Consistent Push and Rollback
Motivation
Data Consistency
Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent result due to interleaving of old and new data.
Data Rollback
Furthermore, Pinot currently does not support data rollback features. In case of a bad data push, the table owner needs to re-run the flow with the previous data and re-ingest data to Pinot. This end-to-end process can take hours and the Pinot table can potentially be in a bad state during this long period.
The consistent push and rollback protocol allows a user to atomically switch between data snapshots and rollback to the previous data in the case of a bad data push. For complete motivation and reasoning, please refer to the design doc above. Currently, we only support OFFLINE table REFRESH use cases.
How this works
Segment lineage data structure has been introduced in Zookeeper (under the path <cluster_name>/PROPERTYSTORE/SEGMENT_LINEAGE/<table_name>) for keeping track of which segments have been replaced by which new set of segments, as well as corresponding state and timestamp.
{
  "id": "<table_name>",
  "simpleFields": {},
  "mapFields": {},
  "listFields": {
    "<segment_lineage_entry_id>": [
      "<segmentsFrom_list>",
      "<segmentsTo_list>",
      "<state>",
      "<timestamp>"
    ]
  }
}When broker answers queries from the users, it will go through the lineage entries and only route to the segments in segmentsFrom for those in "IN_PROGRESS" or "REVERTED" state and the segments in segmentsTo for those in "COMPLETED" state, therefore preserving data snapshot atomicity.
Below are the APIs available on the controller to invoke the segment replacement protocol.
- startReplaceSegments: Signifies to the controller that a replacement protocol is about to atomically replace- segmentsFrom, a source list of segments, by- segmentsTo, a target list of segments, which then persists a segment lineage entry with "IN PROGRESS" state to Zookeeper and returns its ID.
- endReplaceSegments: Ends the replacement protocol associated with the segment lineage entry ID passed in as a parameter by changing the state to "COMPLETED".
- revertReplaceSegments: Reverts the replacement protocol associated with the segment lineage entry ID passed in as a parameter by changing the state to "REVERTED".
However, we don't typically expect users to invoke these APIs directly.
Instead, consistent push is built into batch ingestion jobs (currently only supported for the standalone execution framework).
How to set up Ingestion Job with Consistent Push
Step 0: Adjust the table storage quota to 2x that of the original amount. See #Implications of enabling Consistent Push for more details.
Step 1: Set up config for your OFFLINE, REFRESH table. Enable consistentDataPush under IngestionConfig -> BatchIngestionConfig.
"tableName": "myTable_OFFLINE",
"tableType": "OFFLINE",
...
...
"ingestionConfig": {
  "batchIngestionConfig": {
    "segmentIngestionType": "REFRESH",
     "segmentIngestionFrequency": "DAILY", // or HOURLY
     "consistentDataPush": true
  }
}Step 2: Execute the job by following instructions forBatch Data Ingestion In Practice #Executing the job.
How to trigger Data Rollback
Step 0: Identify the segment lineage entry ID corresponding to the segment swap that would like to be rolled back by using the /lineage REST API to list segment lineage.
Step 1: Use the revertReplaceSegments REST API to rollback data.
Step 2: As a sanity check, use the /lineage REST API again to ensure that the corresponding lineage entry is in "REVERTED" state.
Cleanup
Retention manager manages the cleanup of segments as well as segment lineage data.
On a high level, the cleanup logic is as follows:
- Cleanup unused segments: For entries in "COMPLETED" state, we remove segments in - segmentsFrom. For entries in "REVERTED" or "IN_PROGRESS" state whose timestamp is more than 24 hours old, we remove segments in- segmentsTo.
- Once all segments in step 1 are cleaned up, we remove the lineage entry. 
The cleanup is usually handled in 2 cycles.
Cleanup regarding startReplaceSegment API:
- We proactively remove the first snapshot if the client side is pushing the 3rd snapshot, so we are not exceeding the 2x disk space. 
- If the previous push fails in the middle (IN_PROGRESS/REVERTED state), we also clean up the - segmentsTo.
Implications of enabling Consistent Push
- Enabling consistent push can lead to up to 2x storage usage (assuming data size between snapshots are roughly equivalent) since at any time, we are potentially keeping both replacing and replaced segments. 
- Typically, for the REFRESH use case, users would directly replace segments by uploading segments of the same name. With consistent push, however, a timestamp is injected as the segment name postfix in order to differentiate between replacing and to be replaced segments. 
- Currently, there is no way to disable consistent push for a table with consistent push enabled, due to the unique segment postfix issue mentioned above. Users will need to create a new table until support for disabling consistent push in-place is implemented. 
- If the push job fails for any reason, the job will rollback all the uploaded segments ( - revertReplaceSegments) to maintain data equivalence prior to the push.
Last updated
Was this helpful?

