OAuth 2.0 flow
Many modern APIs (Xero, NetSuite, SAP Concur, QuickBooks Online…) use OAuth 2.0 instead of a static bearer token. The connector needs to:
- Exchange a long-lived refresh token (or client credentials) for a short-lived access token.
- Use the access token until it expires (usually ~1 hour).
- Refresh when it does.
authlib is in the connector base image (Dockerfile line 26).
How the framework treats OAuth state
The framework has no built-in secret-rotation endpoint today. Treat the access token as ephemeral, in-memory only, scoped to the current run. The connector fetches a fresh access token at the start of each run (or before each external call) and discards it when the run ends.
What's stored long-term in context.secrets:
| Flow | What you store | What rotates |
|---|---|---|
| Client credentials | client_id, client_secret | Nothing — you mint a fresh access token every run from the same credentials |
| Authorization code | client_id, client_secret, a non-rotating refresh token | Nothing on the connector side |
If the provider you're integrating rotates the refresh token on every refresh (Xero, QuickBooks Online), there's no supported way to write the new value back to Nuntiq's secret store today — the connector framework doesn't expose a self-service secret-write endpoint. Either:
- Use a provider configuration that doesn't rotate (Microsoft Graph and most client-credentials APIs).
- Have an operator re-do the auth-code dance from the admin portal when the refresh token dies.
- Pick a different auth mechanism if the provider supports one (PAT, API key).
Client credentials
The clean case. No persisted state on the connector side — every run mints a fresh access token from the stored client_id/client_secret.
"""Pull data from a client-credentials OAuth API."""
import time
import requests
from authlib.integrations.requests_client import OAuth2Session
from lib.delta import delta_run
from lib.objects.organization import OrganizationLoad
STREAM_NAME = 'orgs'
def get_access_token(token_url, client_id, client_secret, scope, logger):
"""Mint a fresh access token. Called once per run."""
logger.info("Minting OAuth access token", step='oauth')
sess = OAuth2Session(client_id, client_secret, scope=scope)
token = sess.fetch_token(
token_url,
grant_type='client_credentials',
timeout=15,
)
# token = {'access_token': '...', 'expires_in': 3600, 'token_type': 'Bearer'}
return token['access_token'], int(token.get('expires_in', 3600))
def run(context):
connector_name = context.get_config('connector_name')
client_id = context.get_secret('oauth_client_id')
client_secret = context.get_secret('oauth_client_secret')
token_url = context.get_config('oauth_token_url')
scope = context.get_config('oauth_scope', default='read:orgs')
api_url = context.get_config('api_url')
access_token, _expires_in = get_access_token(
token_url, client_id, client_secret, scope, context.logger,
)
headers = {'Authorization': f'Bearer {access_token}',
'Accept': 'application/json'}
org_load = OrganizationLoad(context)
with delta_run(context, connector_name, STREAM_NAME) as r:
resp = requests.get(f"{api_url}/organizations", headers=headers, timeout=30)
resp.raise_for_status()
for o in resp.json()['organizations']:
org = org_load.new()
org.code = o['code']
org.name = o['name']
org.organization_type = 2
org_load.save_all()
r.set_new_cursor(...)
return {...}
authlib's OAuth2Session handles the encoding (form-urlencoded vs JSON
bodies — they vary across providers), the Basic auth header for the
client credentials, and the response parsing.
Refresh-on-demand within a single run
A run that lasts longer than the access token's lifetime needs to refresh mid-run. Don't track expiry yourself — let a 401 response trigger the refresh, then retry the call once.
class TokenHolder:
"""Caches an access token for the duration of the run and refreshes on 401.
Lives only in memory. Nothing about this state outlives the run.
"""
def __init__(self, fetch_fn):
self._fetch = fetch_fn
self._token = None
self._minted_at = 0
self._lifetime = 0
def get(self):
# Refresh proactively a minute before expiry to absorb clock skew.
if self._token is None or (time.monotonic() - self._minted_at) > (self._lifetime - 60):
self._token, self._lifetime = self._fetch()
self._minted_at = time.monotonic()
return self._token
def invalidate(self):
self._token = None
def call_with_retry(method, url, holder, **kwargs):
"""One retry on 401 — the access token just expired."""
headers = dict(kwargs.pop('headers', {}))
headers['Authorization'] = f'Bearer {holder.get()}'
resp = requests.request(method, url, headers=headers, **kwargs)
if resp.status_code == 401:
holder.invalidate()
headers['Authorization'] = f'Bearer {holder.get()}'
resp = requests.request(method, url, headers=headers, **kwargs)
return resp
That's enough for almost every real-world connector — refresh once before the first call, refresh again if anything 401s mid-run.
Authorization-code refresh (non-rotating)
If the provider issues a refresh token once and keeps it stable across
exchanges (Microsoft Graph in default mode, some self-hosted setups), you can
store the refresh token in secrets and exchange it for an access token at the
start of each run:
from authlib.integrations.requests_client import OAuth2Session
def refresh_access_token(token_url, client_id, client_secret, refresh_token, logger):
"""Exchange a non-rotating refresh token for an access token."""
logger.info("Refreshing OAuth access token", step='oauth_refresh')
sess = OAuth2Session(client_id, client_secret)
token = sess.refresh_token(token_url, refresh_token=refresh_token, timeout=15)
return token['access_token'], int(token.get('expires_in', 3600))
def run(context):
client_id = context.get_secret('client_id')
client_secret = context.get_secret('client_secret')
refresh_token = context.get_secret('refresh_token')
if not refresh_token:
raise RuntimeError(
"secrets.refresh_token is missing. "
"Run the OAuth connect flow from the admin portal first."
)
access_token, expires_in = refresh_access_token(
TOKEN_URL, client_id, client_secret, refresh_token, context.logger,
)
headers = {'Authorization': f'Bearer {access_token}', 'Accept': 'application/json'}
# ... use headers for the rest of the run ...
If the provider rotates the refresh token in the response (i.e. the
response contains a different refresh_token than the one you sent), you
cannot use this pattern as-is — see the "How the framework treats OAuth
state" section above for what to do instead.
Walking through what's important
Why fetch-on-demand and not store
Storing access tokens means persisting expiry timestamps, handling clock skew across the platform and the provider, racing concurrent runs, and dealing with secret-write failures. Fetching on demand collapses all of that into "ask the provider for a token, use it, drop it." A new client-credentials exchange takes ~200ms — cheaper than the bug surface the alternative creates.
Why a tenant id outside the token
OAuth-token-per-user APIs often serve multiple tenants under one user's
access. Xero's Xero-Tenant-Id header tells the API which organization a
call applies to. Store it in config (not secrets) — it's not sensitive,
and an operator may need to change it when the customer re-permissions the
integration.
How the initial setup works (out of scope here)
The first refresh token (for non-rotating providers) comes from an interactive OAuth flow that runs outside the connector — typically the operator runs the auth-code dance once in their browser, gets the refresh token, and pastes it into the connector instance's secrets in the admin portal. The connector then exchanges it for access tokens on every run.
Testing locally
The toolkit doesn't run a real OAuth flow. Options:
- Real credentials. Put real
client_id/client_secret(andrefresh_tokenfor auth-code flows) insettings.jsonand let the connector hit the real provider. The connector code is unchanged. - Mock at the token-fetch boundary. Make
get_access_token()/refresh_access_token()return a hardcoded(token, 3600)tuple whenclient_id == 'fake'. Toolkit-only path; production never sees it.
Common pitfalls
| Symptom | Likely cause |
|---|---|
| Works for an hour, fails after | You're caching the access token across the expiry boundary without refreshing. Use the TokenHolder pattern (or just re-fetch at the top of every run). |
invalid_grant on every run | Refresh token was rotated by the provider (so the value in secrets is stale) or it was revoked. Re-do the auth-code dance and update the secret. |
invalid_client | Client secret typo, or the OAuth app was disabled by the provider. |
| 403 with valid token | Scope mismatch — request the right scope at consent time. |
Deployment notes
- Scope minimum. Request only the scopes you need. Operators reading the consent screen rightly bail on requests for "read everything".
- Time skew.
expires_inis server clock. If the container's clock is off, you may use a stale token. Refresh proactively a minute before expiry rather than waiting for the 401 (seeTokenHolderabove).
What's next
- REST API pull — once you have the bearer token, the rest is the standard pull pattern.
- Logging — log refresh events but never the token values themselves.
- JobContext — how
context.secretsprovides the credentials each run.