# Ephemera — Task runner (async pipeline) on Cloudflare (Worker + Queues + D1)

> Self-executing Markdown. The **Cloudflare binding** of the *task-runner* intent — same intent as
> [`task-runner.aws.md`](./task-runner.aws.md), different binding. The cloud is the source of truth for
> state; this file is intent + write-back ledger + audit.

---

## 🤖 Director prompt

Same contract as `task-runner.aws.md`: observe before acting; verify each step; stop at 🔴/💥 for human
go; write realized values back into Live State. Cloudflare deploys are seconds and reverse with one
command — **no IAM, no ARNs to thread, no async waits**; resources are wired by *bindings* in
`wrangler.jsonc`, and the only gate is the outward-facing deploy.

```
Legend  🟢 create · 🟡 config · 🔴 GATE (human go) · 💥 destructive (human go) · ⏳ wait · ✔ verify
```

## Intent

A snappy async task API: `POST /task` creates a task (writes D1 `pending`, enqueues a Cloudflare Queue,
returns `taskId`); a queue-triggered consumer processes it (`processing` → `completed`); `GET /task/:id`
returns status. **Identical intent to the AWS binding** — only the realization differs.

**Shared acceptance contract** (the same test both bindings must pass):
1. `POST /task` → `200` + `.taskId`
2. poll `GET /task/:id` → `.status` reaches `completed` (via `pending` → `processing`)
3. a poison message lands in the DLQ after `max_retries` (not retried forever)

## Provisioning Inputs

| # | Question | Options (closed enum) | Default | Sets | Gates |
|---|----------|-----------------------|---------|------|-------|
| 1 | Status store | `d1` / `kv` / `durable-object` | `d1` | `STATE_STORE` | §1 store + the Worker's read/write calls |
| 2 | Processing engine | `queue` / `workflow` | `queue` | `ENGINE` | §2 (Queue) vs a Workflow binding (durable multi-step) |
| 3 | Environment | `dev` / `stg` / `uat` / `prod` | `dev` | `ENV` | every resource name (`*-${ENV}`) |

**Why `d1` is the default:** the contract is *write `pending`, then poll for `processing`→`completed`* —
that wants **read-after-write consistency**, which D1 gives (and SQL for the status table, closest to the
AWS DynamoDB table). `kv` is simpler with **native `expirationTtl`** (the DynamoDB-TTL twin) but is
eventually consistent — a poll right after an update can briefly show stale status. `durable-object` is
strongly consistent per-task but is the most code. **`ENGINE=workflow`** swaps Queues for **Cloudflare
Workflows** (durable, multi-step, built-in retries/sleeps) — the richer engine when a "task" is really a
multi-step saga; `queue` is the direct analog of the AWS SQS+Lambda pipeline.

```yaml
# → written into Live State once resolved
resolved_inputs:
  state_store: d1            # d1 | kv | durable-object
  engine:      queue         # queue | workflow
  env:         dev
  resolved_by: <human who confirmed>
  resolved_at: <timestamp>
```

## Live State

```yaml
status:        not-created      # published template - run it to realize state
last_action:   authored — task-runner Cloudflare binding (Worker + Queues + D1)
last_verified: —
```

| key            | value (filled on apply) |
|----------------|-------------------------|
| WORKER_NAME    | `task-runner-${ENV}` |
| URL            | `—` (`*.workers.dev`) |
| D1_NAME / ID   | `task-runner-db-${ENV}` / `—` |
| QUEUE          | `task-runner-queue-${ENV}` |
| DLQ            | `task-runner-dlq-${ENV}` |

| ✔ check                         | expected                              | observed | result |
|---------------------------------|---------------------------------------|----------|--------|
| D1 schema applied               | `tasks` table exists                  | —        | — |
| queue + DLQ exist               | both listed; consumer → DLQ wired      | —        | — |
| `POST /task` → taskId           | `200` + `.taskId`                     | —        | — |
| poll `GET /task/:id` → status   | reaches `completed`                   | —        | — |
| poison message → DLQ            | lands in DLQ after `max_retries`       | —        | — |

## 0. Variables

```bash
export ENV="dev" STATE_STORE="d1" ENGINE="queue"
export CONFIG="<project>"
export WORKER="task-runner-${ENV}"
export D1_NAME="task-runner-db-${ENV}" QUEUE="task-runner-queue-${ENV}" DLQ="task-runner-dlq-${ENV}"
# NOTE: this machine wraps `wrangler` in a shell function; use `command wrangler` non-interactively
# to use the ambient CLOUDFLARE_API_TOKEN.
```

## Dependency frontier

```
d1 (status table) ─┐
queue + dlq ───────┼─> wrangler.jsonc bindings (DB, TASK_QUEUE producer, consumer→DLQ) ─> 🟡 deploy ─> ✔ acceptance
worker code ───────┘
```
Non-negotiable edges: **the consumer config needs the DLQ to exist** (DLQ before the main queue's
consumer block); **the Worker's bindings need the D1 `database_id` + queue names**; **deploy needs all
three (D1, queue, code) declared**. No IAM, no ARNs — the binding *names* are the wiring. Teardown reverses.

## 1. Status store — D1  🟢  *(`STATE_STORE=d1`)*

```bash
# create the database; capture the database_id for wrangler.jsonc
command wrangler d1 create "$D1_NAME"        # prints database_id → put it in task-runner.jsonc d1_databases[].database_id
cat > /tmp/task-runner-schema.sql <<'SQL'
CREATE TABLE IF NOT EXISTS tasks (
  taskId     TEXT PRIMARY KEY,
  status     TEXT NOT NULL,                 -- pending | processing | completed | failed
  payload    TEXT,
  created_at TEXT DEFAULT CURRENT_TIMESTAMP,
  expires_at INTEGER                        -- TTL analog (epoch secs); swept by an optional cron trigger
);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
SQL
command wrangler d1 execute "$D1_NAME" --remote --file=/tmp/task-runner-schema.sql
```
```bash
# ✔ verify
command wrangler d1 execute "$D1_NAME" --remote --command "SELECT name FROM sqlite_master WHERE type='table' AND name='tasks'"
```
> → Live State: D1 database_id, schema applied.
> **`kv` branch:** `wrangler kv namespace create task-runner-${ENV}` → bind `KV`; store `taskId → {status,payload}`
> with `{ expirationTtl: 86400 }` (native TTL). **`durable-object` branch:** declare a `TASK_DO` class binding +
> `migrations` in `wrangler.jsonc`; one DO per task holds status transactionally.

## 2. Queue + dead-letter queue  🟢  *(`ENGINE=queue`)*

```bash
command wrangler queues create "$DLQ"        # DLQ first (the consumer references it)
command wrangler queues create "$QUEUE"
```
```bash
# ✔ verify
command wrangler queues list 2>&1 | grep -E "$QUEUE|$DLQ"
```
> → Live State: QUEUE, DLQ.
> **`ENGINE=workflow` branch:** skip the queue; declare a `[[workflows]]` binding and a `WorkflowEntrypoint`
> class; `POST /task` does `env.TASK_WORKFLOW.create({ id: taskId, params })` instead of `QUEUE.send`.

## 3. Worker code — one Worker, two handlers  🟢

> A single Worker carries **both** the API (`fetch`) and the consumer (`queue`). No separate API gateway,
> no second function, no IAM role between them — the queue binding *is* the wiring.

```bash
mkdir -p "$(dirname "$CONFIG")/src"
cat > "$(dirname "$CONFIG")/src/task-runner.js" <<'JS'
export default {
  // ---- API (producer) ----
  async fetch(request, env) {
    const url = new URL(request.url);
    if (request.method === 'POST' && url.pathname === '/task') {
      const taskId = crypto.randomUUID();
      const payload = (await request.text()) || '{}';
      const expires = Math.floor(Date.now() / 1000) + 86400;
      await env.DB.prepare('INSERT INTO tasks (taskId, status, payload, expires_at) VALUES (?, ?, ?, ?)')
        .bind(taskId, 'pending', payload, expires).run();
      await env.TASK_QUEUE.send({ taskId, payload });
      return Response.json({ taskId, status: 'pending' });
    }
    const m = url.pathname.match(/^\/task\/(.+)$/);
    if (request.method === 'GET' && m) {
      const row = await env.DB.prepare('SELECT status FROM tasks WHERE taskId = ?').bind(m[1]).first();
      return row ? Response.json({ taskId: m[1], status: row.status })
                 : Response.json({ error: 'not found' }, { status: 404 });
    }
    return Response.json({ error: 'method not allowed' }, { status: 405 });
  },
  // ---- consumer (queue-triggered) ----
  async queue(batch, env) {
    for (const msg of batch.messages) {
      const { taskId } = msg.body;
      try {
        await env.DB.prepare('UPDATE tasks SET status = ? WHERE taskId = ?').bind('processing', taskId).run();
        // ...actual work here...
        await env.DB.prepare('UPDATE tasks SET status = ? WHERE taskId = ?').bind('completed', taskId).run();
        msg.ack();
      } catch (e) {
        msg.retry();          // after max_retries (§4) the message auto-routes to the DLQ
      }
    }
  }
};
JS
```
```bash
# ✔ verify code is present
test -f "$(dirname "$CONFIG")/src/task-runner.js" && echo "worker code written"
```

## 4. Bindings — `task-runner.jsonc`  🟡

> The config *is* the wiring: D1 + the queue producer + the consumer (pointing at the DLQ with `max_retries`).

```jsonc
{
  "name": "task-runner-dev",
  "main": "src/task-runner.js",
  "compatibility_date": "2025-04-01",
  "d1_databases": [
    { "binding": "DB", "database_name": "task-runner-db-dev", "database_id": "<from §1>" }
  ],
  "queues": {
    "producers": [ { "queue": "task-runner-queue-dev", "binding": "TASK_QUEUE" } ],
    "consumers": [ {
      "queue": "task-runner-queue-dev",
      "max_batch_size": 10,
      "max_retries": 5,
      "dead_letter_queue": "task-runner-dlq-dev"
    } ]
  }
}
```
```bash
# ✔ config valid
command wrangler deploy --config "$CONFIG" --dry-run 2>&1 | tail -5
```

## 5. Deploy  🟡 (outward-facing — light gate)

> 🟡 Publishes a public `*.workers.dev` URL and binds the consumer. Free tier, seconds, reverses with one
> `wrangler delete`. Show the command, then deploy.

```bash
command wrangler deploy --config "$CONFIG"
# → write Live State: URL; status: live
```

## 6. Acceptance verify  ✔  *(mirrors the AWS binding's contract)*

```bash
URL="https://task-runner-dev.<subdomain>.workers.dev"   # realized in §5
TASK_ID="$(curl -s -X POST "$URL/task" -H 'Content-Type: application/json' -d '{"work":"demo"}' | python3 -c 'import sys,json;print(json.load(sys.stdin)["taskId"])')"
for i in $(seq 1 10); do
  STATUS="$(curl -s "$URL/task/$TASK_ID" | python3 -c 'import sys,json;print(json.load(sys.stdin)["status"])')"
  echo "poll $i: $STATUS"; [ "$STATUS" = "completed" ] && break; sleep 1
done
```
> → write Live State: status: live; fill verify rows.

## Update (idempotent reconcile)

- New code → re-run §5 `wrangler deploy` (atomic; instant rollback via `wrangler rollback`).
- Schema change → add a migration: `wrangler d1 migrations create … && wrangler d1 migrations apply --remote`.
- Tune throughput/retries → edit the `consumers` block (`max_batch_size`, `max_retries`, `retry_delay`), redeploy.

## Teardown  💥 (delete the Worker, then the queues, then D1)

> 💥 Human go. No disable→wait state machine; everything deletes in seconds. No IAM to unwind.

```bash
command wrangler delete --config "$CONFIG"          # removes the Worker (API + consumer)
command wrangler queues delete "$QUEUE"             # main queue
command wrangler queues delete "$DLQ"               # dead-letter queue
command wrangler d1 delete "$D1_NAME"               # status store  (kv: `wrangler kv namespace delete`)
```
```bash
# ✔ verify teardown
command wrangler queues list 2>&1 | grep -E "$QUEUE|$DLQ" || echo "queues gone"
```
> → write Live State: status: gone.

---

## Portability ledger — same intent, two bindings

| | AWS (`task-runner.aws.md`) | Cloudflare (`task-runner.cloudflare.md`) |
|---|---|---|
| Components to reach the intent | 6 (API Gateway, 2 Lambdas, SQS, DLQ, DynamoDB) + IAM role | 3 (1 Worker, Queue+DLQ, D1) — no IAM |
| API surface | API Gateway REST (method/integration/deploy) | the Worker's `fetch` handler — no gateway |
| Producer / consumer split | two Lambdas + an event-source mapping | one Worker, `fetch` + `queue` handlers in one script |
| Queue + DLQ | SQS + a redrive policy (`maxReceiveCount`) | Queues + `dead_letter_queue` + `max_retries` (config, not policy) |
| Status store | DynamoDB (native TTL) | D1 (SQL, read-after-write) / KV (native TTL) / DO (strong) |
| Wiring | IAM roles + resource policies + ARNs threaded between steps | **bindings by name** in `wrangler.jsonc` |
| Auth API→worker | Lambda **resource policy** (`add-permission`, source-ARN) | none — same Worker; the binding is the boundary |
| Time to live | IAM propagation + API-GW eventual consistency retries | seconds, deterministic |
| Teardown | versioned cleanup, role + policies, ~ordered | `wrangler delete` ×4 |
| Richer engine | Step Functions (separate service) | Workflows (`ENGINE=workflow`) — durable multi-step in-binding |

## Deliberately not included

- **A separate DLQ consumer Worker** — the DLQ catches poison messages (verifiable); draining/altering it
  is a follow-on (`queues consumer` on the DLQ).
- **TTL sweeper** — D1 has no native row TTL; `expires_at` + a **cron trigger** `DELETE WHERE expires_at < now`
  is the analog (KV's `expirationTtl` makes this automatic). Omitted to keep the frontier legible.
- **Auth / rate-limiting on `POST /task`** — add Turnstile / a WAF rate-limit rule (see the denial-of-wallet
  note in `web.cloudflare.md`); CF's is plan-flat vs AWS WAF's per-request billing.
- **Workflows engine fully wired** — `ENGINE=workflow` is sketched in §2; the full binding is a sibling plan.
