mirror of
https://github.com/laoxong/nofx.git
synced 2026-06-04 09:58:22 +08:00
feat: add streaming support for x402 payment flow to bypass Cloudflare timeout
- Extract ParseSSEStream as shared function from CallWithRequestStream - Add DoX402RequestStream and X402CallStream for streaming x402 payments - Switch Claw402Client.Call to use streaming (X402CallStream) - TeeReader fallback: SSE parsing with JSON fallback for non-SSE responses - Idle timeout watchdog (90s) protects against stalled streams
This commit is contained in:
+17
-6
@@ -687,15 +687,27 @@ func (client *Client) CallWithRequestStream(req *Request, onChunk func(string))
|
||||
return "", fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var accumulated strings.Builder
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
|
||||
for scanner.Scan() {
|
||||
// Ping the watchdog: we received a line, reset the idle timer.
|
||||
return ParseSSEStream(resp.Body, onChunk, func() {
|
||||
select {
|
||||
case resetCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ParseSSEStream reads an SSE response body, accumulates text deltas,
|
||||
// and calls onChunk with the full accumulated text after each chunk.
|
||||
// If onLine is non-nil, it is called after each raw SSE line is scanned
|
||||
// (useful for resetting idle-timeout watchdogs).
|
||||
// Returns the complete accumulated text.
|
||||
func ParseSSEStream(body io.Reader, onChunk func(string), onLine func()) (string, error) {
|
||||
var accumulated strings.Builder
|
||||
scanner := bufio.NewScanner(body)
|
||||
|
||||
for scanner.Scan() {
|
||||
if onLine != nil {
|
||||
onLine()
|
||||
}
|
||||
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
@@ -706,7 +718,6 @@ func (client *Client) CallWithRequestStream(req *Request, onChunk func(string))
|
||||
break
|
||||
}
|
||||
|
||||
// Parse the SSE JSON chunk
|
||||
var chunk struct {
|
||||
Choices []struct {
|
||||
Delta struct {
|
||||
|
||||
@@ -125,7 +125,7 @@ func (c *Claw402Client) resolveEndpoint() string {
|
||||
func (c *Claw402Client) SetAuthHeader(h http.Header) { X402SetAuthHeader(h) }
|
||||
|
||||
func (c *Claw402Client) Call(systemPrompt, userPrompt string) (string, error) {
|
||||
return X402Call(c.Client, c.signPayment, "Claw402", systemPrompt, userPrompt)
|
||||
return X402CallStream(c.Client, c.signPayment, "Claw402", systemPrompt, userPrompt, nil)
|
||||
}
|
||||
|
||||
func (c *Claw402Client) CallWithRequestFull(req *mcp.Request) (*mcp.LLMResponse, error) {
|
||||
|
||||
@@ -2,6 +2,7 @@ package payment
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
@@ -229,6 +230,239 @@ func DoX402Request(
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// DoX402RequestStream executes an HTTP request with x402 v2 payment flow and
|
||||
// returns the open *http.Response for streaming. The caller is responsible for
|
||||
// reading and closing the response body.
|
||||
// The provided ctx is attached to the final successful HTTP request so that
|
||||
// cancelling ctx will immediately close the underlying connection and unblock
|
||||
// any pending body reads.
|
||||
func DoX402RequestStream(
|
||||
ctx context.Context,
|
||||
httpClient *http.Client,
|
||||
buildReqFn func() (*http.Request, error),
|
||||
signFn X402SignFunc,
|
||||
providerTag string,
|
||||
logger mcp.Logger,
|
||||
) (*http.Response, error) {
|
||||
// Initial request — use background context (no idle timeout yet).
|
||||
req, err := buildReqFn()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to send request: %w", err)
|
||||
}
|
||||
|
||||
// Non-402 initial response
|
||||
if resp.StatusCode != http.StatusPaymentRequired {
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return resp, nil
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("%s API error (status %d): %s", providerTag, resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// 402 — extract payment header and sign
|
||||
paymentHeader := resp.Header.Get("Payment-Required")
|
||||
if paymentHeader == "" {
|
||||
paymentHeader = resp.Header.Get("X-Payment-Required")
|
||||
}
|
||||
if paymentHeader == "" {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("received 402 but no Payment-Required header found. Body: %s", string(body))
|
||||
}
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
paymentSig, err := signFn(paymentHeader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to sign x402 payment: %w", err)
|
||||
}
|
||||
|
||||
// Retry loop for the payment-signed request.
|
||||
// Attach ctx to these requests so the caller can cancel body reads.
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
for attempt := 1; attempt <= X402MaxPaymentRetries; attempt++ {
|
||||
req2, err := buildReqFn()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to build retry request: %w", err)
|
||||
}
|
||||
req2 = req2.WithContext(ctx)
|
||||
req2.Header.Set("X-Payment", paymentSig)
|
||||
req2.Header.Set("Payment-Signature", paymentSig)
|
||||
|
||||
resp2, err := httpClient.Do(req2)
|
||||
if err != nil {
|
||||
if attempt < X402MaxPaymentRetries {
|
||||
wait := X402RetryBaseWait * time.Duration(attempt)
|
||||
logger.Warnf("⚠️ [%s] Payment request failed: %v, retrying in %v (%d/%d)...",
|
||||
providerTag, err, wait, attempt+1, X402MaxPaymentRetries)
|
||||
time.Sleep(wait)
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("failed to send payment retry: %w", err)
|
||||
}
|
||||
|
||||
if resp2.StatusCode == http.StatusOK {
|
||||
if txHash := resp2.Header.Get("Payment-Response"); txHash != "" {
|
||||
logger.Infof("💰 [%s] Payment tx: %s", providerTag, txHash)
|
||||
}
|
||||
if attempt > 1 {
|
||||
logger.Infof("✅ [%s] Payment retry succeeded on attempt %d", providerTag, attempt)
|
||||
}
|
||||
return resp2, nil // caller reads and closes body
|
||||
}
|
||||
|
||||
// Non-200: read body for error handling / re-sign
|
||||
body2, readErr := io.ReadAll(resp2.Body)
|
||||
resp2.Body.Close()
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("failed to read payment retry response: %w", readErr)
|
||||
}
|
||||
|
||||
lastBody = body2
|
||||
lastStatus = resp2.StatusCode
|
||||
|
||||
retryable := resp2.StatusCode >= 500 || resp2.StatusCode == http.StatusPaymentRequired
|
||||
|
||||
if retryable && attempt < X402MaxPaymentRetries {
|
||||
wait := X402RetryBaseWait * time.Duration(attempt)
|
||||
|
||||
if resp2.StatusCode == http.StatusPaymentRequired {
|
||||
newHeader := resp2.Header.Get("Payment-Required")
|
||||
if newHeader == "" {
|
||||
newHeader = resp2.Header.Get("X-Payment-Required")
|
||||
}
|
||||
if newHeader != "" {
|
||||
newSig, signErr := signFn(newHeader)
|
||||
if signErr == nil {
|
||||
paymentSig = newSig
|
||||
logger.Warnf("⚠️ [%s] Payment expired (402), re-signed and retrying in %v (%d/%d)...",
|
||||
providerTag, wait, attempt+1, X402MaxPaymentRetries)
|
||||
} else {
|
||||
logger.Warnf("⚠️ [%s] Payment expired (402), re-sign failed: %v, retrying in %v (%d/%d)...",
|
||||
providerTag, signErr, wait, attempt+1, X402MaxPaymentRetries)
|
||||
}
|
||||
} else {
|
||||
logger.Warnf("⚠️ [%s] Got 402 but no new Payment-Required header, retrying in %v (%d/%d)...",
|
||||
providerTag, wait, attempt+1, X402MaxPaymentRetries)
|
||||
}
|
||||
} else {
|
||||
logger.Warnf("⚠️ [%s] Server error (status %d), retrying in %v (%d/%d)...",
|
||||
providerTag, resp2.StatusCode, wait, attempt+1, X402MaxPaymentRetries)
|
||||
}
|
||||
|
||||
time.Sleep(wait)
|
||||
continue
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("%s payment retry failed (status %d): %s", providerTag, lastStatus, string(lastBody))
|
||||
}
|
||||
|
||||
// x402StreamIdleTimeout is the idle timeout for SSE streaming through x402.
|
||||
// If no SSE line arrives for this duration, the stream is considered stalled.
|
||||
const x402StreamIdleTimeout = 90 * time.Second
|
||||
|
||||
// X402CallStream handles the x402 payment flow with streaming for the simple Call path.
|
||||
// It adds "stream": true to the request body and uses ParseSSEStream to read chunks.
|
||||
//
|
||||
// Robustness: uses TeeReader so the raw body is captured while parsing SSE.
|
||||
// If SSE parsing yields no text (e.g. server returned plain JSON despite stream:true),
|
||||
// falls back to ParseMCPResponse on the buffered body.
|
||||
func X402CallStream(c *mcp.Client, signFn X402SignFunc, tag string, systemPrompt, userPrompt string, onChunk func(string)) (string, error) {
|
||||
c.Log.Infof("📡 [%s] Request AI Server (stream): %s", tag, c.BaseURL)
|
||||
|
||||
requestBody := c.Hooks.BuildMCPRequestBody(systemPrompt, userPrompt)
|
||||
requestBody["stream"] = true
|
||||
jsonData, err := c.Hooks.MarshalRequestBody(requestBody)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Idle-timeout context: cancel() closes the underlying TCP connection,
|
||||
// which immediately unblocks any pending resp.Body.Read().
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
resp, err := DoX402RequestStream(ctx, c.HTTPClient, func() (*http.Request, error) {
|
||||
return c.Hooks.BuildRequest(c.Hooks.BuildUrl(), jsonData)
|
||||
}, signFn, tag, c.Log)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
ct := resp.Header.Get("Content-Type")
|
||||
c.Log.Infof("📡 [%s] Response Content-Type: %s", tag, ct)
|
||||
|
||||
// Start idle-timeout watchdog AFTER the 402 dance is done.
|
||||
resetCh := make(chan struct{}, 1)
|
||||
go func() {
|
||||
t := time.NewTimer(x402StreamIdleTimeout)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
c.Log.Warnf("⚠️ [%s] SSE idle timeout (%v), cancelling stream", tag, x402StreamIdleTimeout)
|
||||
cancel() // closes the TCP connection → body.Read() returns error
|
||||
return
|
||||
case <-resetCh:
|
||||
if !t.Stop() {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
t.Reset(x402StreamIdleTimeout)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
onLine := func() {
|
||||
select {
|
||||
case resetCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// TeeReader: body is streamed through SSE parser AND captured in bodyBuf.
|
||||
// If SSE yields nothing (server returned JSON), we can still parse bodyBuf.
|
||||
var bodyBuf bytes.Buffer
|
||||
tee := io.TeeReader(resp.Body, &bodyBuf)
|
||||
|
||||
text, sseErr := mcp.ParseSSEStream(tee, onChunk, onLine)
|
||||
|
||||
if text != "" {
|
||||
c.Log.Infof("📡 [%s] SSE stream complete, got %d chars", tag, len(text))
|
||||
return text, nil
|
||||
}
|
||||
|
||||
// SSE yielded nothing — try JSON fallback on the buffered body.
|
||||
if bodyBuf.Len() > 0 {
|
||||
c.Log.Infof("📡 [%s] SSE empty, trying JSON fallback on %d bytes", tag, bodyBuf.Len())
|
||||
jsonText, jsonErr := c.Hooks.ParseMCPResponse(bodyBuf.Bytes())
|
||||
if jsonErr == nil && jsonText != "" {
|
||||
return jsonText, nil
|
||||
}
|
||||
c.Log.Warnf("⚠️ [%s] JSON fallback also failed: %v", tag, jsonErr)
|
||||
}
|
||||
|
||||
if sseErr != nil {
|
||||
return "", fmt.Errorf("[%s] stream failed: %w", tag, sseErr)
|
||||
}
|
||||
return "", fmt.Errorf("[%s] no content received (SSE empty, body %d bytes)", tag, bodyBuf.Len())
|
||||
}
|
||||
|
||||
// X402BuildRequest creates a POST request with Content-Type but no auth header.
|
||||
func X402BuildRequest(url string, jsonData []byte) (*http.Request, error) {
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
|
||||
Reference in New Issue
Block a user