delta_run() — incremental sync
A context manager that wraps the delta-state service so an entrypoint
that does incremental sync (poll an external system every 5 minutes
for what changed) gets clean begin → work → complete (or fail)
lifecycle handling without writing it every time.
The underlying service stores one row per (connector_name, stream_name)
with a high-water-mark cursor. Every begin overwrites the run-state fields
in place — there is no run history. The API ships zero concurrency policy:
the consumer reads state and decides whether to skip, start fresh, or override.
Connector name vs stream name
connector_nameis the integration instance identity. It comes fromcontext.get_config('connector_name')in your settings, set once when the connector is registered. Two SAP systems → twoconnector_names (e.g.sap_eu,sap_us).stream_nameis what kind of data this entrypoint tracks. Hardcode it as a constant at the top of the file. A singleconnector_namecan have multiple streams — pulling suppliers from SAP uses'suppliers', pulling invoices from the same SAP uses'invoices'.
One delta-state row per (integration instance, kind of data) pair:
connector_name | stream_name | What this row tracks |
|---|---|---|
sap_eu | suppliers | Supplier sync watermark for the EU SAP instance |
sap_eu | invoices | Invoice sync watermark for the EU SAP instance |
sap_us | suppliers | Supplier sync watermark for the US SAP instance |
acme_lifecycle | lifecycle | Outbound lifecycle-poll watermark |
Minimal usage
from lib.delta import delta_run
STREAM_NAME = 'suppliers' # constant — one stream per entrypoint file
def run(context):
connector_name = context.get_config('connector_name')
with delta_run(context, connector_name, STREAM_NAME) as r:
# r.lower_bound_cursor is what the previous successful run committed
# (or None on the very first run).
records, watermark = fetch_from_external_since(r.lower_bound_cursor)
process(records)
r.set_new_cursor(watermark) # REQUIRED before clean exit
r.set_records_processed(len(records)) # optional, observability
r.set_metadata({'lower_used': r.lower_bound_cursor}) # optional jsonb
# Auto-completes here on clean exit.
# Auto-fails (cursor untouched) if the with-block raises.
return {'records': len(records)}
If you exit the with block without calling set_new_cursor(), the helper
raises RuntimeError. This is deliberate — silently not-advancing the
cursor is exactly the bug class the helper exists to prevent.
The since precedence pattern
Every polling entrypoint should resolve since in this order. The example
connectors (02_pull_suppliers.py, 04_poll_lifecycle_messages.py) all share
this helper shape:
def resolve_since(context, lower_bound_from_delta):
# 1. job_params override — for backfill / re-running a window.
override = context.get_param('since_override')
if override:
return override
# 2. delta-state cursor — the watermark from the last successful run.
if lower_bound_from_delta:
return lower_bound_from_delta
# 3. Bootstrap window — first-ever run for this stream.
lookback_days = int(context.get_param('lookback_days', default=7))
return (datetime.now(timezone.utc) - timedelta(days=lookback_days)).strftime(
'%Y-%m-%dT%H:%M:%SZ',
)
The since_override slot is what makes scheduled-task job_params useful: an
operator can schedule a one-off backfill task with since_override set,
without touching the normal recurring schedule and without committing the
backfill window over the regular cursor — see "Error handling" below.
Cursor types
Pass cursor_type= on the call (default 'timestamp_utc'):
| Type | Use when | Server enforces |
|---|---|---|
timestamp_utc | source has a sortable updated_at you can query against | new value >= previous (422 CURSOR_REGRESSION otherwise) |
numeric_id | source returns monotonic numeric ids | new value >= previous |
opaque_token | source uses cursor / continuation tokens you can't compare | any non-equal value is treated as advance |
The cursor type is per-stream, not per-connector. The same connector_name
can have one stream using timestamp_utc and another using numeric_id.
Concurrency: read-then-decide
The API does not refuse a begin while another run is running.
That's intentional — the consumer (your code, or the scheduler that
invokes you) decides what to do.
from datetime import datetime, timezone
state = context.api.delta_get_state(connector_name, STREAM_NAME)
if state['status'] == 'running':
started = datetime.fromisoformat(state['started_at_utc'].replace('Z', '+00:00'))
age_min = (datetime.now(timezone.utc) - started).total_seconds() / 60
if age_min < 60:
return {'skipped': True} # earlier run still legitimately running
# else: previous run is stuck — fall through and override
with delta_run(context, connector_name, STREAM_NAME) as r:
...
Common policies:
- "Skip if running" — short-circuit return at the top.
- "Override if running > X minutes" — assume stuck.
- "Always override" — only correct if you're absolutely sure runs can't legitimately overlap (rare — the scheduler usually doesn't guarantee that).
Error handling
Any exception inside the with block:
- Triggers
delta_fail(error_summary=str(exc))— the row's status becomesfailed, but the cursor is not advanced. - Re-raises so your top-level handler sees it.
The next begin after a failed run hands out the same lower bound (the last
successful commit). So failed runs naturally re-fetch the same window on
retry — no special code required.
What's audited
complete_runwrites an audit entry on every cursor advance. Cursor advances are security-relevant — moving the watermark forward is the only operation that affects what data the next run sees.fail_runis not audited. Failures are routine and would dwarf the interesting events.reset_cursor(admin-only) writes an audit entry with the caller's reason. Entrypoint code can't normally call this — it requires an admin-scoped credential.
Direct API
If you need finer control than the context manager:
context.api.delta_get_state(connector_name, stream_name)
context.api.delta_begin(connector_name, stream_name, cursor_type='timestamp_utc')
context.api.delta_complete(connector_name, stream_name, new_cursor_value,
records_processed=None, metadata=None)
context.api.delta_fail(connector_name, stream_name, error_summary, metadata=None)
# delta_reset requires admin-scoped credentials — entrypoints can't call this.
Use delta_run() for almost all cases. Drop to the direct API only when you
genuinely need to interleave delta operations with your own transaction
boundaries.
What's next
- Logging — surfacing what the delta run actually did.
- REST API pull pattern — full example of a delta-driven pull connector.