Skip to main content

SFTP pull

Many suppliers and ERPs still exchange invoices over SFTP — drop a CSV / XML / PDF onto a server, the other side picks it up. This page shows how to write a connector that:

  1. Logs into an SFTP server with a username + password (or key).
  2. Lists incoming/ for files that landed since the last successful run.
  3. Downloads each file into the run's work directory.
  4. Uploads it to Nuntiq as a source document.
  5. Moves processed files to incoming/processed/ on the SFTP server so they aren't picked up twice.

paramiko is already in the connector base image (Dockerfile line 35).

The scenario

The supplier "BigVendor" drops one PDF per invoice into /incoming/ on sftp.bigvendor.com. Their setup:

  • SFTP host: sftp.bigvendor.com, port 22
  • User: acme_pickup, password auth
  • Drop folder: /incoming/*.pdf
  • After-pickup folder: /incoming/processed/ (you move files there)

You want to poll every 30 minutes, ingest new PDFs as source documents, and watermark by SFTP mtime so a re-run never skips files.

Settings

settings.json
{
"customer_number": "10000001",
"secrets": {
"sftp_password": "rotated-quarterly"
},
"config": {
"connector_name": "bigvendor_sftp",
"sftp_host": "sftp.bigvendor.com",
"sftp_port": 22,
"sftp_user": "acme_pickup",
"remote_inbox": "/incoming",
"remote_processed": "/incoming/processed",
"receiving_inbox": "invoices@acme.apreceiving.com",
"supplier_code": "BIGVENDOR"
}
}

The receiving_inbox value is the Nuntiq receiving inbox that should receive these documents — it determines which template gets applied during processing.

The connector

connectors/bigvendor_sftp_pull.py
"""Poll BigVendor's SFTP drop folder; ingest new PDFs into Nuntiq."""
import os
import stat
import time
from datetime import datetime, timezone

import paramiko

from lib.delta import delta_run

STREAM_NAME = 'sftp_pickup'

# Polite delay between renames so we don't hammer the supplier's server.
MOVE_PAUSE_SEC = 0.1


def open_sftp(host, port, user, password, logger):
"""Return a connected paramiko SFTPClient. Caller closes it."""
logger.info(f"Connecting to SFTP {user}@{host}:{port}", step='sftp_connect')
transport = paramiko.Transport((host, port))
transport.connect(username=user, password=password)
return paramiko.SFTPClient.from_transport(transport), transport


def list_new_files(sftp, remote_dir, since_mtime, logger):
"""List regular files in `remote_dir` whose mtime > since_mtime.

Returns a list of (name, mtime, size) tuples sorted by mtime ascending,
so processing is deterministic.
"""
out = []
for entry in sftp.listdir_attr(remote_dir):
if not stat.S_ISREG(entry.st_mode):
continue # skip subdirs, symlinks
if not entry.filename.lower().endswith('.pdf'):
continue # only PDFs
if entry.st_mtime <= since_mtime:
continue # already processed
out.append((entry.filename, entry.st_mtime, entry.st_size))

out.sort(key=lambda t: t[1]) # mtime ascending
logger.info(
f"{len(out)} new file(s) since mtime={since_mtime}",
step='list',
detail={'count': len(out)},
)
return out


def download_one(sftp, remote_dir, name, local_dir, logger):
"""Download a single file from SFTP. Returns the local absolute path."""
remote_path = f"{remote_dir}/{name}"
local_path = os.path.join(local_dir, name)
logger.info(f"Downloading {name}", step='download', detail={'remote': remote_path})
sftp.get(remote_path, local_path)
return local_path


def move_to_processed(sftp, remote_dir, processed_dir, name, logger):
"""Move file from `incoming/` to `incoming/processed/` on the SFTP side.

SFTP rename is atomic on POSIX servers, so this is the safest "I'm
done with this file" marker. If the rename fails (perms, full disk),
the file stays in the inbox and we'll see it next run — better than
silently dropping it.
"""
src = f"{remote_dir}/{name}"
dst = f"{processed_dir}/{name}"
try:
sftp.rename(src, dst)
logger.info(f"Moved → {dst}", step='move')
except IOError as e:
# File-already-exists in the processed dir: append a timestamp so
# we don't loop forever.
ts = datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')
dst_unique = f"{processed_dir}/{ts}-{name}"
sftp.rename(src, dst_unique)
logger.warn(
f"Collision in processed/; renamed to {dst_unique} (original error: {e})",
step='move',
)


def run(context):
connector_name = context.get_config('connector_name')
host = context.get_config('sftp_host')
port = int(context.get_config('sftp_port', default=22))
user = context.get_config('sftp_user')
password = context.get_secret('sftp_password')
remote_inbox = context.get_config('remote_inbox')
remote_processed = context.get_config('remote_processed')
receiving_inbox = context.get_config('receiving_inbox')
supplier_code = context.get_config('supplier_code')

required = {
'connector_name': connector_name,
'sftp_host': host,
'sftp_user': user,
'sftp_password': password,
'remote_inbox': remote_inbox,
'remote_processed': remote_processed,
'receiving_inbox': receiving_inbox,
}
missing = [k for k, v in required.items() if not v]
if missing:
raise RuntimeError(f"Missing required config/secrets: {missing}")

sftp, transport = open_sftp(host, port, user, password, context.logger)

try:
with delta_run(context, connector_name, STREAM_NAME, cursor_type='numeric_id') as r:
# SFTP mtimes are unix epoch seconds (int). Use numeric_id cursor.
since_mtime = int(r.lower_bound_cursor) if r.lower_bound_cursor else 0

files = list_new_files(sftp, remote_inbox, since_mtime, context.logger)
if not files:
r.set_new_cursor(since_mtime)
r.set_records_processed(0)
return {'pulled': 0}

uploaded = 0
failed = 0
highest_mtime = since_mtime

for name, mtime, size in files:
try:
local_path = download_one(sftp, remote_inbox, name,
context.work_dir, context.logger)

# Build the multipart payload Nuntiq expects.
with open(local_path, 'rb') as f:
files_payload = [
('files', (name, f.read(), 'application/pdf')),
]
resp = context.api.upload_source_documents(
files_payload,
receiving_inbox=receiving_inbox,
supplier_code=supplier_code,
)
# POST /v1/source-document returns
# {success: true, data: {sourceDocumentId, filesUploaded, ...}}
source_document_id = resp['data']['sourceDocumentId']
context.logger.info(
f"Ingested {name} → source_document_id={source_document_id}",
step='upload',
detail={'size_bytes': size, 'source_document_id': source_document_id},
)

# Mark done on the SFTP side AFTER Nuntiq confirmed.
# If the move fails here, we just re-ingest next time
# — Nuntiq dedupes on content hash.
move_to_processed(sftp, remote_inbox, remote_processed,
name, context.logger)
time.sleep(MOVE_PAUSE_SEC)

uploaded += 1
if mtime > highest_mtime:
highest_mtime = mtime

except Exception as e:
# Don't abort the whole run on a single bad file —
# log, count, and keep going. The mtime cursor will
# NOT advance past this file, so we retry next time.
failed += 1
context.logger.error(
f"Failed to ingest {name}: {e}",
step='ingest',
detail={'file': name, 'error': str(e)},
)
# Important: break here, not continue. Otherwise a
# newer file's mtime could advance the cursor past
# this failure, hiding it permanently.
break

r.set_new_cursor(highest_mtime)
r.set_records_processed(uploaded)
r.set_metadata({
'uploaded': uploaded,
'failed': failed,
'inbox': remote_inbox,
})

return {
'pulled': uploaded,
'failed': failed,
'cursor_committed': highest_mtime,
}

finally:
sftp.close()
transport.close()

Walking through what's important

Why mtime as the cursor

SFTP doesn't have a stable "id" — files come and go. Modification time is the only thing every SFTP server agrees on, and it's monotonic enough for our purposes (you'd have to deliberately backdate a file to break it). Using cursor_type='numeric_id' accepts integers; epoch seconds fits.

If your SFTP supplier rewrites files in place (rare but happens), use a content hash cursor instead — cursor_type='opaque_token', value = the sorted set of (name, sha256) you've seen.

Why move files to processed/ instead of deleting

Deleted files are gone. Moved files leave an audit trail. If a customer disputes "we sent you that invoice last Tuesday", you can prove pickup with a processed/ listing.

Move is also atomic on POSIX — either it's in incoming/ or it's in processed/, never both, never neither. Delete-then-upload-elsewhere is two operations and can desync.

Why break on the first failure, not continue

The mtime cursor is one number. If we process file A (mtime=100), file B fails (mtime=200), file C succeeds (mtime=300), and we advance the cursor to 300, file B is lost — next run starts from 301.

break on the first failure means the cursor only advances past files that were fully processed in mtime order. Next run picks B back up and tries again.

Why upload before moving

Order matters:

download → upload to Nuntiq → confirm Nuntiq accepted → move on SFTP

If we moved first and then the upload failed, the file is now in processed/ and the cursor hasn't advanced — but we'd have to manually chase down the missing file. Upload-first means a failed move just leaves the file in incoming/ for next run, and Nuntiq's hash dedupe drops the duplicate.

Why time.sleep(0.1) between moves

Some SFTP servers (especially WinSCP-backed setups) misbehave under rapid rename volleys. A polite 100ms gap keeps you well clear of "you're being rate-limited" surprises. Drop it if your server can take it.

Testing locally

The toolkit doesn't simulate SFTP — there's no fake paramiko. Two options:

  1. Real test SFTP. Spin up atmoz/sftp in Docker on your dev machine and point settings.json at it. The connector code is unchanged.
  2. Mock at the paramiko boundary. Wrap the three SFTP-touching helpers (list_new_files, download_one, move_to_processed) so they call into a FakeSftp if host == 'fake'. Toolkit-only path; production never sees it.

For the Nuntiq side, upload_source_documents in the toolkit's fake API returns a stub sourceDocumentId without actually ingesting bytes — that's fine for verifying the control flow but not for verifying the PDF parses correctly. Always smoke-test against dev before shipping.

Deployment notes

  • Credentials rotation. SFTP passwords rotate; build the connector to read context.get_secret('sftp_password') so an operator updating the secret in the admin portal doesn't require a code change.
  • Key-based auth. Swap password for paramiko.RSAKey.from_private_key with the key body stored in secrets.sftp_private_key. Same connector shape otherwise.
  • Outbound IP. Some suppliers IP-allowlist their SFTP. Ask Nuntiq for the platform's outbound IP for connector traffic and hand it to the supplier when onboarding.
  • Connection failures. They happen. The framework re-runs the entrypoint on the next scheduled tick; the delta cursor means no work is lost. You don't need to build your own retry loop around the run — just make the run itself crash-safe.

What's next