Skip to main content

delta_run() — incremental sync

A context manager that wraps the delta-state service so an entrypoint that does incremental sync (poll an external system every 5 minutes for what changed) gets clean begin → work → complete (or fail) lifecycle handling without writing it every time.

The underlying service stores one row per (connector_name, stream_name) with a high-water-mark cursor. Every begin overwrites the run-state fields in place — there is no run history. The API ships zero concurrency policy: the consumer reads state and decides whether to skip, start fresh, or override.

Connector name vs stream name

  • connector_name is the integration instance identity. It comes from context.get_config('connector_name') in your settings, set once when the connector is registered. Two SAP systems → two connector_names (e.g. sap_eu, sap_us).
  • stream_name is what kind of data this entrypoint tracks. Hardcode it as a constant at the top of the file. A single connector_name can have multiple streams — pulling suppliers from SAP uses 'suppliers', pulling invoices from the same SAP uses 'invoices'.

One delta-state row per (integration instance, kind of data) pair:

connector_namestream_nameWhat this row tracks
sap_eusuppliersSupplier sync watermark for the EU SAP instance
sap_euinvoicesInvoice sync watermark for the EU SAP instance
sap_ussuppliersSupplier sync watermark for the US SAP instance
acme_lifecyclelifecycleOutbound lifecycle-poll watermark

Minimal usage

from lib.delta import delta_run

STREAM_NAME = 'suppliers' # constant — one stream per entrypoint file

def run(context):
connector_name = context.get_config('connector_name')

with delta_run(context, connector_name, STREAM_NAME) as r:
# r.lower_bound_cursor is what the previous successful run committed
# (or None on the very first run).
records, watermark = fetch_from_external_since(r.lower_bound_cursor)
process(records)

r.set_new_cursor(watermark) # REQUIRED before clean exit
r.set_records_processed(len(records)) # optional, observability
r.set_metadata({'lower_used': r.lower_bound_cursor}) # optional jsonb
# Auto-completes here on clean exit.
# Auto-fails (cursor untouched) if the with-block raises.
return {'records': len(records)}

If you exit the with block without calling set_new_cursor(), the helper raises RuntimeError. This is deliberate — silently not-advancing the cursor is exactly the bug class the helper exists to prevent.

The since precedence pattern

Every polling entrypoint should resolve since in this order. The example connectors (02_pull_suppliers.py, 04_poll_lifecycle_messages.py) all share this helper shape:

def resolve_since(context, lower_bound_from_delta):
# 1. job_params override — for backfill / re-running a window.
override = context.get_param('since_override')
if override:
return override

# 2. delta-state cursor — the watermark from the last successful run.
if lower_bound_from_delta:
return lower_bound_from_delta

# 3. Bootstrap window — first-ever run for this stream.
lookback_days = int(context.get_param('lookback_days', default=7))
return (datetime.now(timezone.utc) - timedelta(days=lookback_days)).strftime(
'%Y-%m-%dT%H:%M:%SZ',
)

The since_override slot is what makes scheduled-task job_params useful: an operator can schedule a one-off backfill task with since_override set, without touching the normal recurring schedule and without committing the backfill window over the regular cursor — see "Error handling" below.

Cursor types

Pass cursor_type= on the call (default 'timestamp_utc'):

TypeUse whenServer enforces
timestamp_utcsource has a sortable updated_at you can query againstnew value >= previous (422 CURSOR_REGRESSION otherwise)
numeric_idsource returns monotonic numeric idsnew value >= previous
opaque_tokensource uses cursor / continuation tokens you can't compareany non-equal value is treated as advance

The cursor type is per-stream, not per-connector. The same connector_name can have one stream using timestamp_utc and another using numeric_id.

Concurrency: read-then-decide

The API does not refuse a begin while another run is running. That's intentional — the consumer (your code, or the scheduler that invokes you) decides what to do.

from datetime import datetime, timezone

state = context.api.delta_get_state(connector_name, STREAM_NAME)
if state['status'] == 'running':
started = datetime.fromisoformat(state['started_at_utc'].replace('Z', '+00:00'))
age_min = (datetime.now(timezone.utc) - started).total_seconds() / 60
if age_min < 60:
return {'skipped': True} # earlier run still legitimately running
# else: previous run is stuck — fall through and override

with delta_run(context, connector_name, STREAM_NAME) as r:
...

Common policies:

  • "Skip if running" — short-circuit return at the top.
  • "Override if running > X minutes" — assume stuck.
  • "Always override" — only correct if you're absolutely sure runs can't legitimately overlap (rare — the scheduler usually doesn't guarantee that).

Error handling

Any exception inside the with block:

  1. Triggers delta_fail(error_summary=str(exc)) — the row's status becomes failed, but the cursor is not advanced.
  2. Re-raises so your top-level handler sees it.

The next begin after a failed run hands out the same lower bound (the last successful commit). So failed runs naturally re-fetch the same window on retry — no special code required.

What's audited

  • complete_run writes an audit entry on every cursor advance. Cursor advances are security-relevant — moving the watermark forward is the only operation that affects what data the next run sees.
  • fail_run is not audited. Failures are routine and would dwarf the interesting events.
  • reset_cursor (admin-only) writes an audit entry with the caller's reason. Entrypoint code can't normally call this — it requires an admin-scoped credential.

Direct API

If you need finer control than the context manager:

context.api.delta_get_state(connector_name, stream_name)
context.api.delta_begin(connector_name, stream_name, cursor_type='timestamp_utc')
context.api.delta_complete(connector_name, stream_name, new_cursor_value,
records_processed=None, metadata=None)
context.api.delta_fail(connector_name, stream_name, error_summary, metadata=None)
# delta_reset requires admin-scoped credentials — entrypoints can't call this.

Use delta_run() for almost all cases. Drop to the direct API only when you genuinely need to interleave delta operations with your own transaction boundaries.

What's next