Skip to main content

REST API pull

The most common connector shape: poll an external REST API for what changed, upsert the results into Nuntiq. This page walks through a production-ready version — pagination, auth headers, retries with backoff, and incremental sync via the delta-state service.

The scenario

You're integrating Acme's supplier API into Nuntiq. Their API:

  • Lives at https://api.acme-erp.com/v2/.
  • Auth via Authorization: Bearer <token>.
  • Paginates with ?page=N&per_page=200. The response shape is { "data": [...], "next_page": N+1 | null }.
  • Supports ?updated_after=<ISO-8601> for incremental queries.
  • Throws transient 502/503 occasionally and 429 when you go too fast.

You want to pull suppliers updated since the last successful run, every 15 minutes.

Settings

settings.json
{
"customer_number": "10000001",
"secrets": {
"acme_token": "test-bearer-token-here"
},
"config": {
"connector_name": "acme_eu",
"api_url": "https://api.acme-erp.com/v2",
"page_size": 200
},
"job_params": {
"lookback_days": 7
}
}

connector_name is what gets stored on the delta-state row, so distinct Acme instances (EU vs US) just change this string in their settings.

The connector

connectors/acme_pull_suppliers.py
"""Pull suppliers from Acme ERP into Nuntiq (delta sync)."""
from datetime import datetime, timedelta, timezone

import requests
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from lib.delta import delta_run
from lib.objects.supplier import SupplierLoad

STREAM_NAME = 'suppliers'

# Bump on schema changes so you can spot which version emitted a row.
SCHEMA_VERSION = 1


# ── Retry policy ──────────────────────────────────────────────────────────
# Retry transient HTTP errors with exponential backoff. Authentication
# errors (401/403) and client errors (4xx) NOT in the transient set must
# not retry — they won't resolve themselves.

class TransientHttpError(Exception):
"""Raised for status codes we want tenacity to retry."""


def _is_transient(resp):
return resp.status_code in (429, 500, 502, 503, 504)


@retry(
retry=retry_if_exception_type((TransientHttpError, requests.ConnectionError, requests.Timeout)),
wait=wait_exponential(multiplier=1, min=2, max=30),
stop=stop_after_attempt(5),
reraise=True,
)
def http_get(url, headers, params, logger):
"""GET with retries. Raises for non-transient errors immediately."""
resp = requests.get(url, headers=headers, params=params, timeout=30)
if _is_transient(resp):
logger.warn(
f"Transient {resp.status_code} from Acme; will retry",
step='http_get',
detail={'url': resp.url, 'attempt': http_get.retry.statistics.get('attempt_number')},
)
raise TransientHttpError(f"{resp.status_code} {resp.text[:200]}")
resp.raise_for_status()
return resp.json()


# ── Pagination walker ─────────────────────────────────────────────────────

def fetch_suppliers_since(api_url, api_token, since_iso, page_size, logger):
"""Walk every page from Acme. Yields supplier dicts one at a time."""
page = 1
headers = {'Authorization': f'Bearer {api_token}', 'Accept': 'application/json'}
max_updated_at = since_iso # high-water-mark we'll commit at the end

while True:
params = {
'updated_after': since_iso,
'page': page,
'per_page': page_size,
}
body = http_get(f"{api_url}/suppliers", headers=headers, params=params, logger=logger)

suppliers = body.get('data', [])
logger.info(
f"Fetched page {page} ({len(suppliers)} rows)",
step='fetch',
detail={'page': page, 'count': len(suppliers)},
)

for s in suppliers:
# Track the watermark as we go so a partial failure still advances.
if s.get('updated_at', '') > max_updated_at:
max_updated_at = s['updated_at']
yield s

next_page = body.get('next_page')
if not next_page:
break
page = next_page

# Return the watermark via a sentinel — callers iterate the generator
# and then access this attribute. (See run() below for the pattern.)
fetch_suppliers_since.watermark = max_updated_at


# ── `since` resolution ────────────────────────────────────────────────────

def resolve_since(context, lower_bound_from_delta):
"""Override > delta cursor > bootstrap window."""
override = context.get_param('since_override')
if override:
context.logger.info(f"since_override set → {override}", step='since')
return override
if lower_bound_from_delta:
return lower_bound_from_delta

lookback_days = int(context.get_param('lookback_days', default=7))
bootstrap = (datetime.now(timezone.utc) - timedelta(days=lookback_days)).strftime(
'%Y-%m-%dT%H:%M:%SZ',
)
context.logger.info(
f"First-run bootstrap → {bootstrap} (lookback_days={lookback_days})",
step='since',
)
return bootstrap


# ── Mapping: external → Nuntiq supplier ───────────────────────────────────

def map_supplier(ext, load):
s = load.new()
s.supplier_number = ext['number']
s.supplier_name = ext['name']
s.is_active = ext.get('active', True)

# NOTE: VAT numbers live on Organization (as OrganizationIdentifier),
# not on Supplier. Ignore the source's vat_number field here unless
# you're also syncing organizations.

# One MAIN location with the headquarters address
if ext.get('address'):
loc = s.new_location()
loc.location_code = 'MAIN'
loc.location_name = 'Headquarters'
loc.is_active = True

addr = loc.new_address()
addr.street = ext['address'].get('street', '')
addr.city = ext['address'].get('city', '')
addr.country_code = ext['address'].get('country', 'XX')

return s


# ── Entry point ───────────────────────────────────────────────────────────

def run(context):
connector_name = context.get_config('connector_name')
api_url = context.get_config('api_url')
api_token = context.get_secret('acme_token')
page_size = int(context.get_config('page_size', default=200))

if not all([connector_name, api_url, api_token]):
raise RuntimeError("config.connector_name, config.api_url, secrets.acme_token are required")

load = SupplierLoad(context)

with delta_run(context, connector_name, STREAM_NAME, cursor_type='timestamp_utc') as r:
since_iso = resolve_since(context, r.lower_bound_cursor)
context.logger.info(f"Pulling suppliers updated_after={since_iso}", step='start')

count = 0
for ext in fetch_suppliers_since(api_url, api_token, since_iso, page_size, context.logger):
map_supplier(ext, load)
count += 1

# Flush every 500 to avoid building a huge in-memory queue.
if count % 500 == 0:
load.save_all()
context.logger.info(f"Flushed {count} so far", step='save')

# Final flush
load.save_all()

watermark = fetch_suppliers_since.watermark
context.logger.info(
f"Done: {count} suppliers, new cursor = {watermark}",
step='done',
detail={'count': count, 'cursor': watermark},
)

r.set_new_cursor(watermark)
r.set_records_processed(count)
r.set_metadata({
'schema_version': SCHEMA_VERSION,
'page_size': page_size,
'lower_bound_used': since_iso,
})

return {
'connector_name': connector_name,
'fetched': count,
'cursor_committed': watermark,
}

Walking through what's important

Why tenacity and not a while loop

Transient 502s and 429s happen. The native pattern is:

for attempt in range(5):
try:
return requests.get(...)
except SomeError:
time.sleep(2 ** attempt)
raise

…but that's six lines of cruft every time you want to retry. tenacity is already in the connector base image (Dockerfile line 27), so use it. The decorator captures the policy in one place; the function stays linear.

Why a separate transient-error class

raise_for_status() raises HTTPError for everything 4xx/5xx. You only want to retry the transient ones. The TransientHttpError wrapper makes the retry condition explicit and keeps client errors (400, 401, 404) from silently looping until you blow your timeout.

Why flush every 500

save_all() chunks at 1000 per request automatically. Flushing every 500 gives you:

  • A bounded in-memory queue (matters when the source has 100k+ rows).
  • Faster failure recovery — if the connector crashes after 1500 rows, the first 1000 are durable in Nuntiq and the delta cursor stays at the old watermark, so the retry re-fetches everything but only re-saves new edits.
  • Progress visibility — the Flushed 500 so far log lines tell whoever reads the log that work is happening.

Why track the watermark inside the generator

We update max_updated_at as we yield each row. If the external API returns sorted-newest-first you could simplify to "first row's updated_at", but most don't, so accumulate.

The function attaches the watermark to itself (fetch_suppliers_since.watermark) which is a small hack — alternatives are returning a tuple from a non-generator or using a list/box object. Pick what you like; just commit the watermark after the last save_all succeeds.

Why delta_run wraps the whole thing

If fetch_suppliers_since raises on page 3, delta_run's __exit__ catches the exception, calls delta_fail, and re-raises. The cursor is not advanced. The next scheduled run starts from the same lower bound and re-fetches pages 1–2 (idempotent — save_all upserts) plus whatever the source has now.

If everything succeeds, delta_run calls delta_complete with watermark, advancing the cursor.

See Delta state for the lifecycle in detail.

Testing it locally

The toolkit's 02_pull_suppliers.py is the same pattern in stub form — copy this connector to connectors/acme_pull_suppliers.py in your toolkit, then:

  1. Add a fixtures/_external/acme_suppliers.json and adapt the HTTP call to read from it when api_url.startswith('file://'), OR
  2. Use responses (a requests mocking library — pip install responses) to mock the HTTP layer in a test runner alongside the toolkit.

Either way, the Nuntiq-side calls (SupplierLoad, delta_run) work unchanged against the fake API.

Deployment notes

  • Schedule. Register the entrypoint key (acme_pull_suppliers) against the connector instance, then schedule it in the admin portal at your desired cadence (every 15 minutes, hourly, whatever).
  • Multiple instances. Two Acme tenants → two settings rows with different connector_name values. Same .py file, different delta-state rows, fully independent watermarks.
  • Backfill. Run the same entrypoint one-off with job_params.since_override = "2024-01-01T00:00:00Z". The resolve_since() helper picks it up and the run pulls the full historical window without poisoning the live cursor (a failure during the backfill doesn't move the cursor; a success doesn't either, because r.set_new_cursor() still gets the latest updated_at from what was fetched — which is fine).

What's next

  • SFTP pull — same shape, but files instead of JSON.
  • OAuth flow — when the external API isn't a static bearer.
  • Logging — when to log what.