Skip to content

Resilience

Production trading clients can't tolerate a 5-second network blip killing the bot. The SDKs ship three primitives — modeled on the sweetspot-maker-client patterns — that turn long-lived gRPC streams into something a maker bot can rely on:

  • ResilientStream — auto-reconnects with exponential backoff (500 ms → 30 s, capped at 6 attempts), fans out to multiple subscribers, and exposes a ConnectionState watch.
  • ServerState — holds the latest blockhash, recommended CU price, and current slot. Each field is a watch; consumers can wait* until the first value arrives.
  • ConfigCache + verifyProgramId — caches the server's MarketDataService.ListPairs + TxService.GetSponsoredPayers responses, and cross-checks the advertised program id against an independent Solana RPC.

The shapes are the same in all three SDKs.

ResilientStream

rust
use std::sync::Arc;
use superis::{ConnectionState, ResilientStream};
use superis::proto::tx_service_client::TxServiceClient;
use superis::proto::SubscribeBlockhashRequest;

let mut tx_client = TxServiceClient::with_interceptor(channel.clone(), auth.interceptor());

let factory: superis::resilience::StreamFactory<superis::proto::BlockhashEvent> = {
    let mut tx_client = tx_client.clone();
    Arc::new(move || {
        let mut tx_client = tx_client.clone();
        Box::pin(async move {
            let stream = tx_client
                .subscribe_blockhash(SubscribeBlockhashRequest {})
                .await?
                .into_inner();
            Ok(Box::pin(stream) as superis::resilience::BoxStream<_>)
        })
    })
};
let blockhash_stream = ResilientStream::new(factory, None, 64);
let mut state_rx = blockhash_stream.state();

tokio::spawn(async move {
    while state_rx.changed().await.is_ok() {
        match *state_rx.borrow() {
            ConnectionState::Disconnected => tracing::warn!("blockhash feed down — pausing submissions"),
            ConnectionState::Connected => tracing::info!("blockhash feed back"),
            ConnectionState::Connecting => {}
        }
    }
});
let mut bh_rx = blockhash_stream.subscribe().await;
go
import (
    "context"
    "github.com/superis/sweetspot-maker-client/go/superis"
    pb "github.com/superis/sweetspot-maker-client/go/sweetspot/api/v1"
)

factory := func(ctx context.Context) (<-chan *pb.BlockhashEvent, <-chan error, error) {
    stream, err := txClient.SubscribeBlockhash(ctx, &pb.SubscribeBlockhashRequest{})
    if err != nil {
        return nil, nil, err
    }
    items := make(chan *pb.BlockhashEvent, 64)
    errs := make(chan error, 1)
    go func() {
        defer close(items)
        defer close(errs)
        for {
            ev, err := stream.Recv()
            if err != nil {
                errs <- err
                return
            }
            items <- ev
        }
    }()
    return items, errs, nil
}

rs := superis.NewResilientStream(factory, nil, 64)
rs.Start(ctx)
defer rs.Close()

go func() {
    for s := range rs.State() {
        if s == superis.ConnectionDisconnected {
            log.Warn("blockhash feed down")
        }
    }
}()

for ev := range rs.Subscribe() {
    bh := ev.GetBlockhash()
    _ = bh
}
ts
import { ResilientStream } from "@superis/sweetspot-client";

const stream = new ResilientStream({
  factory: async (signal) => {
    return txClient.subscribeBlockhash({}, { signal });
  },
  capacity: 64,
});

stream.onState((s) => {
  if (s === "disconnected") console.warn("blockhash feed down");
});
const unsub = stream.subscribe((ev) => {
  console.log(ev.blockhash, ev.recommendedCuPrice);
});
stream.start();

The Rust and TS variants pass the abort signal / drop the stream when their owning supervisor cancels; Go uses ctx cancellation.

ServerState

Hooks the resilient stream up to a typed holder:

rust
use std::sync::Arc;
use superis::ServerState;

let state = Arc::new(ServerState::new());
let _bh_task = state.run_blockhash(blockhash_stream.clone());
let _slot_task = state.run_slots(slot_stream.clone());

let blockhash = state.wait_blockhash().await;
let slot = state.wait_current_slot().await;
go
state := superis.NewServerState()

go func() {
    for ev := range blockhashStream.Subscribe() {
        if bh := ev.GetBlockhash(); bh != nil {
            state.SetBlockhash(base58.Encode(bh.Key), ev.GetRecommendedCuPrice())
        }
    }
}()

bh, err := state.WaitBlockhash(ctx)
slot, _ := state.WaitCurrentSlot(ctx)
ts
import { ServerState } from "@superis/sweetspot-client";

const state = new ServerState();
blockhashStream.subscribe((ev) => {
  if (ev.blockhash) state.setBlockhash(toBase58(ev.blockhash.key), ev.recommendedCuPrice);
});
slotStream.subscribe((ev) => state.setCurrentSlot(ev.slot));

const blockhash = await state.waitBlockhash();
const slot = await state.waitCurrentSlot();

Program-id verification

Defense in depth. A compromised server could swap in an attacker program id — the SDK validates against any public Solana JSON-RPC endpoint the caller controls.

rust
use superis::config::{refresh, verify_program_id, ConfigCache};

let cache = ConfigCache::new();
let cfg = refresh(&cache, &mut market_client, &mut tx_client, server_program_id).await?;
verify_program_id("https://api.mainnet-beta.solana.com", &cfg.program_id).await?;
go
err := superis.VerifyProgramID(
    ctx,
    "https://api.mainnet-beta.solana.com",
    cfg.ProgramID,
)
ts
import { verifyProgramId } from "@superis/sweetspot-client";

await verifyProgramId({
  rpcUrl: "https://api.mainnet-beta.solana.com",
  programId: cfg.programId,
});

Backoff

Identical schedule across SDKs:

AttemptWait
0500 ms
11 s
22 s
34 s
48 s
516 s
6+30 s (capped)

Resets to attempt 0 on every successful reconnect.

Apache 2.0