diff --git a/docs/architecture/X402_STREAMING_PAYMENT.md b/docs/architecture/X402_STREAMING_PAYMENT.md new file mode 100644 index 00000000..3c74f578 --- /dev/null +++ b/docs/architecture/X402_STREAMING_PAYMENT.md @@ -0,0 +1,282 @@ +# x402 Streaming Payment Architecture + +## Overview + +NOFX calls AI models (DeepSeek, GPT, Claude, etc.) through the claw402 gateway, using the [x402 protocol](https://github.com/coinbase/x402) to pay per request with USDC on Base L2. + +This document describes the full implementation of the SSE streaming call mode, including client, server, and billing logic. + +## Why Streaming Is Needed + +``` +NOFX (client) ──→ Cloudflare (100s idle limit) ──→ claw402 (gateway) ──→ AI upstream +``` + +- DeepSeek inference takes 60–180 seconds (up to 5 minutes) +- Cloudflare enforces a **100-second hard limit** on idle connections, returning 520/EOF on timeout +- Non-streaming mode: the client receives no data until inference completes — Cloudflare disconnects after 100 seconds +- Streaming mode: the first byte arrives within seconds, subsequent chunks flow continuously, keeping Cloudflare alive + +## End-to-End Request Flow + +``` + NOFX Client claw402 Gateway AI Upstream + │ │ │ + ── Phase 1: Payment ────────────────────────────────────────────────────────────────────────────── + │ │ │ + 1. POST /api/v1/ai/... │ ─── body + stream:true ──────────→ │ │ + (no payment header) │ │ │ + │ ←── 402 + Payment-Required ────── │ │ + │ (base64 JSON: price/chain/asset) │ + │ │ │ + 2. EIP-712 signing │ │ │ + (USDC TransferWithAuth)│ │ │ + │ │ │ + 3. POST + X-Payment hdr │ ─── body + signature ────────────→ │ │ + │ │ ── verify signature → Facilitator + │ │ ←── OK ──────────── Facilitator + │ │ ── settle USDC ───→ Facilitator + │ │ ←── tx hash ─────── Facilitator + │ │ │ + ── Phase 2: Streaming Response ─────────────────────────────────────────────────────────────────── + │ │ │ + │ ←── 200 OK ────────────────────── │ ─── POST stream:true ────────→ │ + │ ←── data: {"choices":[...]} ───── │ ←── SSE chunk ──────────────── │ + │ ←── data: {"choices":[...]} ───── │ ←── SSE chunk ──────────────── │ + │ ←── ... (continuous) ──────────── │ ←── ... ─────────────────────── │ + │ ←── data: [DONE] ──────────────── │ ←── data: [DONE] ────────────── │ +``` + +## Client Implementation (NOFX) + +### File Structure + +| File | Responsibility | +|------|----------------| +| `mcp/payment/claw402.go` | Claw402Client — model routing, wallet management | +| `mcp/payment/x402.go` | x402 payment flow core — DoX402RequestStream, X402CallStream | +| `mcp/client.go` | ParseSSEStream — shared SSE parsing function | + +### Call Chain + +``` +Claw402Client.Call() + └→ X402CallStream() // x402.go:380 + ├→ Build request body + inject stream:true + ├→ DoX402RequestStream() // x402.go:239 + │ ├→ Send initial request (no payment header) + │ ├→ Receive 402 → parse Payment-Required header + │ ├→ signFn() → EIP-712 signature + │ └→ Send retry request with X-Payment header → return open *http.Response + │ + ├→ Start idle timeout watchdog (90s with no data → disconnect) + ├→ TeeReader: simultaneous SSE parsing + raw byte buffering + ├→ ParseSSEStream() // client.go:703 + │ ├→ bufio.Scanner line-by-line read + │ ├→ Parse "data: {...}" → OpenAI chunk format + │ └→ Accumulate text + call onChunk callback + │ + └→ Fallback: if SSE yields nothing, try JSON parsing on buffered bodyBuf +``` + +### Request Identification + +Every request carries an `X-Client-ID: nofx` header (`x402.go:473`), allowing claw402 to identify the request source for logging and monitoring. + +### Model Routing + +`claw402ModelEndpoints` maps user-friendly model names to API paths: + +```go +"deepseek" → "/api/v1/ai/deepseek/chat" +"gpt-5.4" → "/api/v1/ai/openai/chat/5.4" +"claude-opus" → "/api/v1/ai/anthropic/messages/opus" +"qwen-max" → "/api/v1/ai/qwen/chat/max" +// ... more +``` + +Anthropic endpoints (containing `/anthropic/`) automatically switch to the Messages API wire format. + +## Server Implementation (claw402) + +### Core Problem: ginmw Is Incompatible with SSE + +Coinbase's standard Gin middleware `ginmw.PaymentMiddlewareFromConfig` internally works as follows: + +``` +1. Wrap c.Writer with responseCapture (all writes go to buffer) +2. c.Next() — handler runs, SSE chunks all go into buffer +3. Settle payment after handler completes +4. Write buffered content to client only after successful settlement +``` + +Problems: +- SSE chunks are buffered — the client receives no data for minutes +- Cloudflare disconnects after 100 seconds → 520 error +- Handler runs too long (5 min), settlement context expires + +### Solution: streamAwareX402Middleware + +Dual-path design (`internal/gateway/x402.go`): + +```go +func streamAwareX402Middleware(streamServer, standardMW) { + return func(c *gin.Context) { + if !isStreamingBody(c) { + standardMW(c) // Non-streaming → standard ginmw (battle-tested) + return + } + // Streaming → custom path + } +} +``` + +#### Non-Streaming Path + +Delegates entirely to `ginmw.PaymentMiddlewareFromConfig` with no custom logic. + +#### Streaming Path (Pre-Settlement) + +``` +1. isStreamingBody(c) — read body to check for {"stream": true}, restore body +2. streamServer.RequiresPayment(reqCtx) — does this route require payment? +3. streamServer.ProcessHTTPRequest() — verify X-Payment signature +4. handleStreamingPayment(): + a. ProcessSettlement() — settle USDC on-chain (collect payment first) + b. c.Next() — pass to HandleAPIKeyStream + c. SSE chunks write directly to c.Writer (no responseCapture buffer) +``` + +Key differences: + +| | Standard ginmw (non-streaming) | Custom path (streaming) | +|---|---|---| +| Settlement timing | **After** handler completes | **Before** handler starts | +| Response buffer | `responseCapture` buffers everything | No buffer, writes directly to client | +| Timeout risk | Slow handler causes context expiry | Settlement uses `context.Background()` | +| SSE compatible | No | Yes | + +## Billing Logic + +### x402 Protocol Flow + +x402 is an HTTP 402 payment protocol proposed by Coinbase. Core roles: + +- **Resource Server** (claw402) — provides paid APIs +- **Client** (NOFX) — consumer, holds an EVM wallet +- **Facilitator** (Coinbase CDP) — verifies signatures, executes on-chain settlement + +### Payment Signing (EIP-712) + +Client signature type: USDC `TransferWithAuthorization` + +``` +1. Receive Payment-Required header from 402 response (base64 JSON) +2. Decode to get: + - scheme: "exact" + - network: "eip155:8453" (Base L2) + - amount: USDC amount (e.g., "3000" = $0.003) + - asset: USDC contract address + - payTo: claw402 recipient address +3. Sign with wallet private key using EIP-712, authorizing USDC transfer from user wallet to payTo +4. Place signature in X-Payment + Payment-Signature headers +``` + +### Pricing Models + +Each AI model route has its own price configured in claw402: + +| Mode | Description | Example | +|------|-------------|---------| +| Fixed price | Specified directly via `user_price` field | `$0.003` per request | +| Token-based dynamic pricing | Calculated from request token count | `$0.001` per 1K tokens | +| Dispatch fallback | Default price for SDK-compatible routes | `$0.01` per request | + +```go +// Fixed price +price := fmt.Sprintf("$%s", route.UserPrice) + +// Dynamic pricing +price = DynamicPriceFunc(func(ctx, reqCtx) (Price, error) { + return resolveDynamicPrice(ctx, reqCtx, rule) +}) +``` + +### Retry Logic and Double-Charge Prevention + +```go +const X402MaxPaymentRetries = 5 +const X402RetryBaseWait = 3 * time.Second +``` + +- **5xx errors** → exponential backoff retry (3s, 6s, 9s...), no re-signing (same payment authorization) +- **Another 402** → previous signature expired, re-sign and retry (on-chain authorization auto-invalidates, **no double charge**) +- **4xx (non-402)** → non-retryable, fail immediately +- Outer retry is set to 1 (`WithMaxRetries(1)`) to prevent outer retries from causing duplicate payments + +### Settlement Timing: Streaming vs Non-Streaming + +| | Non-Streaming | Streaming | +|---|---|---| +| Settlement timing | After receiving full response | Before streaming begins | +| Risk | Low (content confirmed before charge) | Slightly higher (charge before seeing content) | +| Necessity | Standard mode | Must charge first, otherwise SSE is buffered | + +## Timeout Configuration + +| Location | Timeout | Purpose | +|----------|---------|---------| +| NOFX `X402Timeout` | 5 min | HTTP client overall timeout | +| NOFX `x402StreamIdleTimeout` | 90s | SSE idle disconnect (prevent hangs) | +| NOFX `CallWithRequestStream` idle | 60s | Idle timeout for non-x402 streaming | +| claw402 `ResponseHeaderTimeout` | 120s | Wait for first byte from AI upstream | +| claw402 `streamingHTTP.Timeout` | 0 (unlimited) | SSE stream can last indefinitely | +| claw402 `standardMW WithTimeout` | 10 min | Non-streaming ginmw overall timeout | +| claw402 `x402PaymentTimeout` | 30s | Payment verification/settlement timeout | + +## SSE Fault Tolerance + +### TeeReader Dual Parsing + +```go +var bodyBuf bytes.Buffer +tee := io.TeeReader(resp.Body, &bodyBuf) +text, sseErr := ParseSSEStream(tee, onChunk, onLine) + +if text != "" { + return text, nil // SSE succeeded +} +// SSE yielded nothing → try JSON parsing on bodyBuf (server may have returned non-streaming JSON) +jsonText, _ := ParseMCPResponse(bodyBuf.Bytes()) +``` + +### Idle Timeout Watchdog + +```go +go func() { + t := time.NewTimer(90s) + for { + select { + case <-t.C: + cancel() // timeout → cancel context → close TCP → body.Read() returns error + case <-resetCh: + t.Reset(90s) // received SSE line → reset timer + } + } +}() +``` + +Every incoming SSE line resets the timer. If no data arrives for 90 seconds, the context is cancelled and the TCP connection is closed, preventing indefinite blocking. + +## Related Files + +### NOFX (Client) +- `mcp/payment/claw402.go` — Claw402Client entry point +- `mcp/payment/x402.go` — x402 payment flow (DoX402Request, DoX402RequestStream, X402CallStream) +- `mcp/payment/x402_sign.go` — EIP-712 signing implementation +- `mcp/client.go` — ParseSSEStream, CallWithRequestStream + +### claw402 (Server) +- `internal/gateway/x402.go` — x402 middleware (streamAwareX402Middleware) +- `internal/gateway/proxy/stream.go` — SSE proxy (HandleAPIKeyStream) +- `internal/config/` — Route configuration (pricing, model mapping)