REST API pull
The most common connector shape: poll an external REST API for what changed, upsert the results into Nuntiq. This page walks through a production-ready version — pagination, auth headers, retries with backoff, and incremental sync via the delta-state service.
The scenario
You're integrating Acme's supplier API into Nuntiq. Their API:
- Lives at
https://api.acme-erp.com/v2/. - Auth via
Authorization: Bearer <token>. - Paginates with
?page=N&per_page=200. The response shape is{ "data": [...], "next_page": N+1 | null }. - Supports
?updated_after=<ISO-8601>for incremental queries. - Throws transient 502/503 occasionally and 429 when you go too fast.
You want to pull suppliers updated since the last successful run, every 15 minutes.
Settings
{
"customer_number": "10000001",
"secrets": {
"acme_token": "test-bearer-token-here"
},
"config": {
"connector_name": "acme_eu",
"api_url": "https://api.acme-erp.com/v2",
"page_size": 200
},
"job_params": {
"lookback_days": 7
}
}
connector_name is what gets stored on the delta-state row, so distinct
Acme instances (EU vs US) just change this string in their settings.
The connector
"""Pull suppliers from Acme ERP into Nuntiq (delta sync)."""
from datetime import datetime, timedelta, timezone
import requests
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from lib.delta import delta_run
from lib.objects.supplier import SupplierLoad
STREAM_NAME = 'suppliers'
# Bump on schema changes so you can spot which version emitted a row.
SCHEMA_VERSION = 1
# ── Retry policy ──────────────────────────────────────────────────────────
# Retry transient HTTP errors with exponential backoff. Authentication
# errors (401/403) and client errors (4xx) NOT in the transient set must
# not retry — they won't resolve themselves.
class TransientHttpError(Exception):
"""Raised for status codes we want tenacity to retry."""
def _is_transient(resp):
return resp.status_code in (429, 500, 502, 503, 504)
@retry(
retry=retry_if_exception_type((TransientHttpError, requests.ConnectionError, requests.Timeout)),
wait=wait_exponential(multiplier=1, min=2, max=30),
stop=stop_after_attempt(5),
reraise=True,
)
def http_get(url, headers, params, logger):
"""GET with retries. Raises for non-transient errors immediately."""
resp = requests.get(url, headers=headers, params=params, timeout=30)
if _is_transient(resp):
logger.warn(
f"Transient {resp.status_code} from Acme; will retry",
step='http_get',
detail={'url': resp.url, 'attempt': http_get.retry.statistics.get('attempt_number')},
)
raise TransientHttpError(f"{resp.status_code} {resp.text[:200]}")
resp.raise_for_status()
return resp.json()
# ── Pagination walker ─────────────────────────────────────────────────────
def fetch_suppliers_since(api_url, api_token, since_iso, page_size, logger):
"""Walk every page from Acme. Yields supplier dicts one at a time."""
page = 1
headers = {'Authorization': f'Bearer {api_token}', 'Accept': 'application/json'}
max_updated_at = since_iso # high-water-mark we'll commit at the end
while True:
params = {
'updated_after': since_iso,
'page': page,
'per_page': page_size,
}
body = http_get(f"{api_url}/suppliers", headers=headers, params=params, logger=logger)
suppliers = body.get('data', [])
logger.info(
f"Fetched page {page} ({len(suppliers)} rows)",
step='fetch',
detail={'page': page, 'count': len(suppliers)},
)
for s in suppliers:
# Track the watermark as we go so a partial failure still advances.
if s.get('updated_at', '') > max_updated_at:
max_updated_at = s['updated_at']
yield s
next_page = body.get('next_page')
if not next_page:
break
page = next_page
# Return the watermark via a sentinel — callers iterate the generator
# and then access this attribute. (See run() below for the pattern.)
fetch_suppliers_since.watermark = max_updated_at
# ── `since` resolution ────────────────────────────────────────────────────
def resolve_since(context, lower_bound_from_delta):
"""Override > delta cursor > bootstrap window."""
override = context.get_param('since_override')
if override:
context.logger.info(f"since_override set → {override}", step='since')
return override
if lower_bound_from_delta:
return lower_bound_from_delta
lookback_days = int(context.get_param('lookback_days', default=7))
bootstrap = (datetime.now(timezone.utc) - timedelta(days=lookback_days)).strftime(
'%Y-%m-%dT%H:%M:%SZ',
)
context.logger.info(
f"First-run bootstrap → {bootstrap} (lookback_days={lookback_days})",
step='since',
)
return bootstrap
# ── Mapping: external → Nuntiq supplier ───────────────────────────────────
def map_supplier(ext, load):
s = load.new()
s.supplier_number = ext['number']
s.supplier_name = ext['name']
s.is_active = ext.get('active', True)
# NOTE: VAT numbers live on Organization (as OrganizationIdentifier),
# not on Supplier. Ignore the source's vat_number field here unless
# you're also syncing organizations.
# One MAIN location with the headquarters address
if ext.get('address'):
loc = s.new_location()
loc.location_code = 'MAIN'
loc.location_name = 'Headquarters'
loc.is_active = True
addr = loc.new_address()
addr.street = ext['address'].get('street', '')
addr.city = ext['address'].get('city', '')
addr.country_code = ext['address'].get('country', 'XX')
return s
# ── Entry point ───────────────────────────────────────────────────────────
def run(context):
connector_name = context.get_config('connector_name')
api_url = context.get_config('api_url')
api_token = context.get_secret('acme_token')
page_size = int(context.get_config('page_size', default=200))
if not all([connector_name, api_url, api_token]):
raise RuntimeError("config.connector_name, config.api_url, secrets.acme_token are required")
load = SupplierLoad(context)
with delta_run(context, connector_name, STREAM_NAME, cursor_type='timestamp_utc') as r:
since_iso = resolve_since(context, r.lower_bound_cursor)
context.logger.info(f"Pulling suppliers updated_after={since_iso}", step='start')
count = 0
for ext in fetch_suppliers_since(api_url, api_token, since_iso, page_size, context.logger):
map_supplier(ext, load)
count += 1
# Flush every 500 to avoid building a huge in-memory queue.
if count % 500 == 0:
load.save_all()
context.logger.info(f"Flushed {count} so far", step='save')
# Final flush
load.save_all()
watermark = fetch_suppliers_since.watermark
context.logger.info(
f"Done: {count} suppliers, new cursor = {watermark}",
step='done',
detail={'count': count, 'cursor': watermark},
)
r.set_new_cursor(watermark)
r.set_records_processed(count)
r.set_metadata({
'schema_version': SCHEMA_VERSION,
'page_size': page_size,
'lower_bound_used': since_iso,
})
return {
'connector_name': connector_name,
'fetched': count,
'cursor_committed': watermark,
}
Walking through what's important
Why tenacity and not a while loop
Transient 502s and 429s happen. The native pattern is:
for attempt in range(5):
try:
return requests.get(...)
except SomeError:
time.sleep(2 ** attempt)
raise
…but that's six lines of cruft every time you want to retry. tenacity is
already in the connector base image (Dockerfile line 27), so use it. The
decorator captures the policy in one place; the function stays linear.
Why a separate transient-error class
raise_for_status() raises HTTPError for everything 4xx/5xx. You only
want to retry the transient ones. The TransientHttpError wrapper makes the
retry condition explicit and keeps client errors (400, 401, 404) from
silently looping until you blow your timeout.
Why flush every 500
save_all() chunks at 1000 per request automatically. Flushing every 500
gives you:
- A bounded in-memory queue (matters when the source has 100k+ rows).
- Faster failure recovery — if the connector crashes after 1500 rows, the first 1000 are durable in Nuntiq and the delta cursor stays at the old watermark, so the retry re-fetches everything but only re-saves new edits.
- Progress visibility — the
Flushed 500 so farlog lines tell whoever reads the log that work is happening.
Why track the watermark inside the generator
We update max_updated_at as we yield each row. If the external API returns
sorted-newest-first you could simplify to "first row's updated_at", but most
don't, so accumulate.
The function attaches the watermark to itself (fetch_suppliers_since.watermark)
which is a small hack — alternatives are returning a tuple from a non-generator
or using a list/box object. Pick what you like; just commit the watermark
after the last save_all succeeds.
Why delta_run wraps the whole thing
If fetch_suppliers_since raises on page 3, delta_run's __exit__ catches
the exception, calls delta_fail, and re-raises. The cursor is not advanced.
The next scheduled run starts from the same lower bound and re-fetches pages
1–2 (idempotent — save_all upserts) plus whatever the source has now.
If everything succeeds, delta_run calls delta_complete with
watermark, advancing the cursor.
See Delta state for the lifecycle in detail.
Testing it locally
The toolkit's 02_pull_suppliers.py is the same pattern in stub form — copy
this connector to connectors/acme_pull_suppliers.py in your toolkit, then:
- Add a
fixtures/_external/acme_suppliers.jsonand adapt the HTTP call to read from it whenapi_url.startswith('file://'), OR - Use
responses(arequestsmocking library —pip install responses) to mock the HTTP layer in a test runner alongside the toolkit.
Either way, the Nuntiq-side calls (SupplierLoad, delta_run) work
unchanged against the fake API.
Deployment notes
- Schedule. Register the entrypoint key (
acme_pull_suppliers) against the connector instance, then schedule it in the admin portal at your desired cadence (every 15 minutes, hourly, whatever). - Multiple instances. Two Acme tenants → two settings rows with
different
connector_namevalues. Same.pyfile, different delta-state rows, fully independent watermarks. - Backfill. Run the same entrypoint one-off with
job_params.since_override = "2024-01-01T00:00:00Z". Theresolve_since()helper picks it up and the run pulls the full historical window without poisoning the live cursor (a failure during the backfill doesn't move the cursor; a success doesn't either, becauser.set_new_cursor()still gets the latestupdated_atfrom what was fetched — which is fine).
What's next
- SFTP pull — same shape, but files instead of JSON.
- OAuth flow — when the external API isn't a static bearer.
- Logging — when to log what.