githubEdit

CDC / Upsert Pipeline

End-to-end guide for keeping Pinot in sync with a transactional database using CDC and upserts.

This playbook covers the pattern of capturing row-level changes from a transactional database (PostgreSQL, MySQL, MongoDB, etc.) via Change Data Capture (CDC), streaming them through Kafka, and ingesting them into Pinot with upsert enabled so that Pinot always reflects the latest state of each row.

When to use this pattern

Use this playbook when:

  • You need Pinot to mirror the current state of rows in an OLTP database (orders, user profiles, inventory, tickets).

  • Rows are frequently updated or soft-deleted after initial creation, and queries must see only the latest version.

  • You want real-time analytics on top of transactional data without adding read load to your primary database.

  • Your CDC tool (Debezium, Maxwell, AWS DMS) already produces events to Kafka.

If your data is append-only and never updated, the simpler Real-Time Product Analytics pattern is a better fit.

Architecture sketch

OLTP DB ──▶ Debezium ──▶ Kafka topic ──▶ Pinot REALTIME table (upsert)
  (WAL)      (CDC)        (per-table)          │
                                      ┌────────┴────────┐
                                      │ Servers with     │
                                      │ primary key map  │
                                      └─────────────────┘

Key components:

  • Debezium (or equivalent) reads the database WAL and produces a Kafka event for every INSERT, UPDATE, and DELETE.

  • Kafka topic is partitioned by the primary key so all mutations for the same row land on the same partition.

  • Pinot real-time table with upsert maintains an in-memory map from primary key to the latest segment/doc-id so queries skip stale versions.

circle-exclamation

Schema

Use the source table's primary key as Pinot's primary key. Include a comparison column (typically the event timestamp or database transaction sequence number) so Pinot can determine which version is newer.

circle-info

primaryKeyColumns is required for upsert. Pinot uses this to look up existing rows in the primary key map. See Schema and Table Shape.

Table configuration

Configuration highlights

Setting
Why

upsertConfig.mode: FULL

Every update replaces the entire row. Use PARTIAL if CDC events contain only changed columns. See Upsert Modes

comparisonColumns: ["updatedAt"]

Pinot uses this column to resolve out-of-order events — the row with the higher updatedAt wins

enableSnapshot: true

Persists the primary key map to disk so server restarts do not require replaying the full Kafka topic

enablePreload: true

Loads the snapshot into memory at startup for faster recovery

hashFunction: MURMUR3

More memory-efficient than the default hash for the primary key map

replication: 1

Upsert tables currently require replication factor 1. Use strictReplicaGroup routing to ensure consistency

routing.instanceSelectorType

Routes all queries for a partition to the same server, required for correct upsert semantics

Handling deletes

If your CDC stream includes tombstone events for deleted rows, configure a deleteRecordColumn:

Add isDeleted as a boolean dimension in your schema. Set it to true in your Debezium SMT (Single Message Transform) for delete events. Pinot will mark the row as deleted and stop returning it in queries.

Partial upsert

When your CDC events contain only the changed columns (e.g., only status changed on an order), use partial upsert to merge the update into the existing row:

With IGNORE as the default, any column not present in the incoming event retains its previous value. Columns listed with OVERWRITE are replaced when present.

Debezium configuration tips

A minimal Debezium connector config for PostgreSQL:

The ExtractNewRecordState transform flattens the Debezium envelope so Pinot receives a simple JSON object with the row's columns. The delete.handling.mode: rewrite adds a __deleted field that you can map to your isDeleted column via an ingestion transformation.

Query patterns

Current state aggregation

Because upsert is enabled, this query automatically sees only the latest version of each order.

Point lookup

Time-range analysis over mutable data

Segment compaction

Over time, upsert tables accumulate stale row versions inside completed segments. These invalid records waste storage and slow full-table scans. Schedule the Upsert Compaction Task via Minion to rewrite segments and physically remove stale rows:

See Segment Compaction on Upserts for tuning guidance.

Operational checklist

Before go-live

Monitoring

  • Primary key map size: Monitor pinot.server.upsertPrimaryKeysCount. If this exceeds available memory, consider TTL-based eviction or larger servers.

  • Invalid record ratio: Track the percentage of invalidated (stale) records per segment. Schedule compaction when this exceeds 30%.

  • Kafka consumer lag: Same as real-time tables — growing lag means Pinot queries show stale data.

  • Debezium connector status: Monitor via Kafka Connect REST API. A stopped connector means no updates flow to Pinot.

Common pitfalls

Pitfall
Fix

Out-of-order events cause old values to overwrite new ones

Use comparisonColumns with a monotonically increasing column (database sequence, event timestamp)

Memory pressure from large primary key map

Enable metadataTTL to evict keys for rows not updated within a time window, or use hashFunction: MURMUR3 to reduce per-key overhead

Queries return deleted rows

Ensure deleteRecordColumn is configured and the CDC transform sets it correctly for delete events

Server restart takes minutes

Enable enableSnapshot and enablePreload — without them, Pinot replays the Kafka topic from the last committed offset

Further reading

Last updated

Was this helpful?