Skip to main content

SFTP push

The outbound counterpart of SFTP pull. Your customer's ERP doesn't take API calls — it picks up CSV files from a drop folder. You:

  1. Claim N invoices pending integration from Nuntiq.
  2. Render them as CSV (one file per invoice, or one batch file — both shown).
  3. SFTP-push the file(s) to the customer's drop folder.
  4. Acknowledge + report integration result for each invoice.

This is the canonical "push to ERP" connector shape. Same five-step claim flow as REST pushes, just with a file-write step in the middle.

The scenario

Customer's ERP wants:

  • One CSV per invoice, dropped into /inbound/invoices/.
  • Filename: <invoice_number>-<YYYYMMDD>.csv.
  • After-pickup folder (their cleanup): /inbound/invoices/sent/ — you don't read it, but if you wrote a filename that's already in there, rename yours with a suffix.
  • Tab-separated, UTF-8 with BOM, fields: invoice_number\tdate\tcurrency\ttotal\tsupplier_code\tline_count.

Settings

settings.json
{
"customer_number": "10000001",
"secrets": {
"sftp_password": "rotated-quarterly"
},
"config": {
"connector_name": "acme_erp_push",
"sftp_host": "sftp.acme-erp.com",
"sftp_port": 22,
"sftp_user": "nuntiq_push",
"remote_outbox": "/inbound/invoices",
"remote_sent": "/inbound/invoices/sent",
"batch_size": 20
}
}

The connector

connectors/acme_erp_sftp_push.py
"""Push pending invoices to Acme ERP as CSV via SFTP."""
import csv
import io
import os
from datetime import datetime, timezone

import paramiko

from lib.delta import delta_run
from lib.objects.invoice import InvoiceLoad

STREAM_NAME = 'pushed_invoices'

# BOM so Excel renders UTF-8 correctly when finance opens the file.
UTF8_BOM = '\ufeff' # zero-width no-break space, makes Excel honor UTF-8


def open_sftp(host, port, user, password, logger):
logger.info(f"Connecting to SFTP {user}@{host}:{port}", step='sftp_connect')
transport = paramiko.Transport((host, port))
transport.connect(username=user, password=password)
return paramiko.SFTPClient.from_transport(transport), transport


def render_invoice_csv(invoice):
"""Render one invoice as a TSV string the ERP expects.

Field keys (`invoiceNumber`, `grossAmount`, …) come from the customer's
invoice template config — they're typically camelCase. If your customer
uses different keys, swap these strings to match what their template
actually emits.
"""
inv_number = invoice.get_field('invoiceNumber') or 'UNKNOWN'
inv_date = invoice.get_field('invoiceDate') or ''
currency = invoice.get_field('currencyCode') or ''
total = invoice.get_field('grossAmount') or ''

vendor = invoice.get_address('vendor')
supplier_code = invoice.get_field('supplierCode') or (vendor.address_name if vendor else '')

buf = io.StringIO()
buf.write(UTF8_BOM)
writer = csv.writer(buf, delimiter='\t', lineterminator='\n')
writer.writerow(['invoice_number', 'date', 'currency', 'total',
'supplier_code', 'line_count'])
writer.writerow([
inv_number, inv_date, currency, total,
supplier_code, len(invoice.lines),
])
return buf.getvalue(), inv_number


def safe_filename(invoice_number):
"""Drop characters the SFTP server (or Windows) might choke on."""
safe = ''.join(c if c.isalnum() or c in ('-', '_') else '_' for c in invoice_number)
return f"{safe}-{datetime.now(timezone.utc):%Y%m%d}.csv"


def upload_with_rename_on_collision(sftp, remote_dir, name, body_str, logger):
"""Write to a .tmp path, fsync, rename to final. Atomic visibility."""
final = f"{remote_dir}/{name}"
tmp = f"{final}.tmp"

# Step 1: write tmp
with sftp.open(tmp, 'wb') as f:
f.write(body_str.encode('utf-8'))
try:
f.flush()
# Not all servers implement fsync; ignore quietly if missing.
os.fsync(f.fileno()) if hasattr(os, 'fsync') else None
except (AttributeError, IOError):
pass

# Step 2: try to rename to final. Collision → suffix with timestamp.
try:
sftp.posix_rename(tmp, final)
except (AttributeError, IOError):
# posix_rename not supported, or final exists
try:
sftp.rename(tmp, final)
except IOError:
ts = datetime.now(timezone.utc).strftime('%H%M%S')
alt = f"{remote_dir}/{name.rsplit('.', 1)[0]}-{ts}.csv"
logger.warn(
f"{final} exists; renaming to {alt}",
step='upload',
detail={'original': final, 'final': alt},
)
sftp.rename(tmp, alt)
return alt

return final


def run(context):
connector_name = context.get_config('connector_name')
host = context.get_config('sftp_host')
port = int(context.get_config('sftp_port', default=22))
user = context.get_config('sftp_user')
password = context.get_secret('sftp_password')
remote_outbox = context.get_config('remote_outbox')
batch_size = int(context.get_config('batch_size', default=20))

required = {
'connector_name': connector_name, 'sftp_host': host,
'sftp_user': user, 'sftp_password': password,
'remote_outbox': remote_outbox,
}
missing = [k for k, v in required.items() if not v]
if missing:
raise RuntimeError(f"Missing required config/secrets: {missing}")

inv_load = InvoiceLoad(context)

# Claim BEFORE opening SFTP — if there's nothing to do, we don't even
# need to log into the supplier server.
leases = inv_load.claim(
state='PendingIntegration',
limit=batch_size,
lease_ttl_seconds=600,
)
if not leases:
context.logger.info("Nothing to push — no PendingIntegration invoices.")
return {'pushed': 0, 'failed': 0}

context.logger.info(f"Claimed {len(leases)} invoice(s)", step='claim')

sftp, transport = open_sftp(host, port, user, password, context.logger)
pushed = 0
failed = 0

try:
with delta_run(context, connector_name, STREAM_NAME, cursor_type='timestamp_utc') as r:
for lease in leases:
invoice = inv_load.get_by_token(lease.invoice_token)
csv_body, inv_number = render_invoice_csv(invoice)
filename = safe_filename(inv_number)

try:
final_remote = upload_with_rename_on_collision(
sftp, remote_outbox, filename, csv_body, context.logger,
)

# Acknowledge AFTER the file lands on SFTP, BEFORE we
# report the final result. If anything below crashes,
# the next run finds the invoice at PendingResult (93),
# not PendingIntegration — so we don't re-push it.
inv_load.acknowledge(lease.invoice_token)
inv_load.integration_result(
lease.invoice_token,
success=True,
external_id_1=final_remote,
external_message_1='SFTP_DELIVERED',
)
pushed += 1
context.logger.info(
f"Pushed {inv_number}{final_remote}",
step='pushed',
detail={'invoice_number': inv_number, 'remote_path': final_remote},
)

except Exception as e:
# Don't re-raise — record the failure and keep going.
# The invoice will land in IntegrationFailed (97); ops
# can re-trigger or hand-fix.
context.logger.warn(
f"Failed to push {inv_number}: {e}",
step='push',
detail={'invoice_number': inv_number, 'error': str(e)},
)
inv_load.integration_result(
lease.invoice_token,
success=False,
failure_code='SFTP_UPLOAD_FAILED',
failure_message=str(e)[:500],
)
failed += 1

now_iso = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
r.set_new_cursor(now_iso)
r.set_records_processed(pushed + failed)
r.set_metadata({
'pushed': pushed,
'failed': failed,
'remote_outbox': remote_outbox,
})

finally:
sftp.close()
transport.close()

return {
'claimed': len(leases),
'pushed': pushed,
'failed': failed,
}

Walking through what's important

Why claim before opening SFTP

InvoiceLoad.claim() is cheap and fast. Opening an SFTP connection isn't — it's a TCP handshake, an SSH negotiation, a password challenge. If there's nothing to push, we want the entrypoint to return in under a second, not after 3 seconds of SSH overhead.

Why write .tmp and rename

The customer's ERP is reading the drop folder on a polling loop. If it sees a half-written .csv, it may try to parse it, fail, and either abandon the file or repeatedly re-read it. Writing to .tmp and renaming makes the file appear atomically at its final name — there's never a partial-file state visible.

posix_rename is preferred when the server supports it because it's explicitly atomic. rename on Windows-based SFTP servers may fail if the destination exists — that's why the fallback path appends a timestamp.

Why acknowledge before reporting result

acknowledge advances the invoice from 92 to 93 (PendingResult). At that point Nuntiq knows you've taken responsibility for it. If the integration_result call fails (network blip, Nuntiq deploy in progress), the invoice sits at 93 — next run can re-call integration_result for that token (idempotent on success). If you'd skipped acknowledge, a crash between SFTP upload and result reporting would put the invoice back into PendingIntegration on the next run, and the next claim would re-push it, producing a duplicate file in the ERP.

The order is fixed by the API state machine, not by the connector's choice:

claim → upload file → acknowledge → integration_result

Why we report failures instead of raising

A single bad invoice (missing required field, ERP rejected the filename format, server hiccup) shouldn't abort the whole batch. We log, call integration_result(success=False, ...), and continue. The failed invoice lands at IntegrationFailed (97); ops can either retrigger from the admin portal or fix the data and resubmit.

If we re-raised, the delta_run would catch and call delta_fail — but the invoices already pushed would still be Processed in Nuntiq (you can't un-do integration_result(success=True)), so the cursor + delta state would be out of sync with reality. Reporting per-invoice gives a clean mapping: every claimed invoice ends up at either Processed (100) or IntegrationFailed (97), nothing left in 92 or 93.

Why timestamp the cursor here

Pushes don't have a natural "high-water mark" — we drained a batch, we'll drain another batch next time. The cursor on pushed_invoices is for observability ("when did we last successfully run a push?"), not correctness. The claim/lease model owns correctness.

datetime.now(UTC) is fine.

Variants

One batch file instead of per-invoice files

Some ERPs want a single file with N rows. Replace the inner loop with:

buf = io.StringIO()
buf.write(UTF8_BOM)
writer = csv.writer(buf, delimiter='\t', lineterminator='\n')
writer.writerow(HEADER)

successes = []
for lease in leases:
invoice = inv_load.get_by_token(lease.invoice_token)
writer.writerow(render_row(invoice))
successes.append(lease)

# Single upload
final = upload_with_rename_on_collision(
sftp, remote_outbox,
f"invoices-{datetime.now(UTC):%Y%m%dT%H%M%SZ}.csv",
buf.getvalue(), context.logger,
)

# Then acknowledge+result for all successes
for lease in successes:
inv_load.acknowledge(lease.invoice_token)
inv_load.integration_result(lease.invoice_token, success=True,
external_id_1=final)

Trade-off: a single bad row in the batch is harder to isolate — you'd have to either validate everything before writing, or accept that the whole batch fails together.

Pushing attachments too

for att in invoice.attachments:
local = att.save(att.file_name) # writes to context.work_dir
remote_att_path = f"{remote_outbox}/{filename.rsplit('.', 1)[0]}-{att.file_name}"
sftp.put(local, remote_att_path)

Order: upload the CSV last, so the ERP's polling loop only sees the CSV when all its supporting files are already there.

Deployment notes

  • Network policy. If the ERP allowlists inbound IPs, ask Nuntiq for the platform's outbound IP for connector traffic and give it to them.
  • Concurrency. If multiple workers run this entrypoint simultaneously, the claim/lease model handles invoice-level concurrency. SFTP-side, a collision in filename is handled by the timestamp-suffix fallback.
  • Failure mode: SFTP down. open_sftp will raise. delta_run's __exit__ catches, calls delta_fail, re-raises. None of the PendingIntegration invoices were claimed yet (we hadn't entered the loop), so they stay at 90/91 for the next run. Clean.
  • Failure mode: SFTP up, half the batch crashes mid-write. upload_with_rename_on_collision raises → that one invoice gets success=False. Others continue. The .tmp files left behind will be visible to the ERP only if their poller picks up *.tmp (it shouldn't). Worst case: stale .tmp files accumulate. Add a cleanup pass at the start of run() if it matters.

What's next