Resilience
Production trading clients can't tolerate a 5-second network blip killing the bot. The SDKs ship three primitives — modeled on the sweetspot-maker-client patterns — that turn long-lived gRPC streams into something a maker bot can rely on:
ResilientStream— auto-reconnects with exponential backoff (500 ms → 30 s, capped at 6 attempts), fans out to multiple subscribers, and exposes aConnectionStatewatch.ServerState— holds the latest blockhash, recommended CU price, and current slot. Each field is a watch; consumers canwait*until the first value arrives.ConfigCache+verifyProgramId— caches the server'sMarketDataService.ListPairs+TxService.GetSponsoredPayersresponses, and cross-checks the advertised program id against an independent Solana RPC.
The shapes are the same in all three SDKs.
ResilientStream
rust
use std::sync::Arc;
use superis::{ConnectionState, ResilientStream};
use superis::proto::tx_service_client::TxServiceClient;
use superis::proto::SubscribeBlockhashRequest;
let mut tx_client = TxServiceClient::with_interceptor(channel.clone(), auth.interceptor());
let factory: superis::resilience::StreamFactory<superis::proto::BlockhashEvent> = {
let mut tx_client = tx_client.clone();
Arc::new(move || {
let mut tx_client = tx_client.clone();
Box::pin(async move {
let stream = tx_client
.subscribe_blockhash(SubscribeBlockhashRequest {})
.await?
.into_inner();
Ok(Box::pin(stream) as superis::resilience::BoxStream<_>)
})
})
};
let blockhash_stream = ResilientStream::new(factory, None, 64);
let mut state_rx = blockhash_stream.state();
tokio::spawn(async move {
while state_rx.changed().await.is_ok() {
match *state_rx.borrow() {
ConnectionState::Disconnected => tracing::warn!("blockhash feed down — pausing submissions"),
ConnectionState::Connected => tracing::info!("blockhash feed back"),
ConnectionState::Connecting => {}
}
}
});
let mut bh_rx = blockhash_stream.subscribe().await;go
import (
"context"
"github.com/superis/sweetspot-maker-client/go/superis"
pb "github.com/superis/sweetspot-maker-client/go/sweetspot/api/v1"
)
factory := func(ctx context.Context) (<-chan *pb.BlockhashEvent, <-chan error, error) {
stream, err := txClient.SubscribeBlockhash(ctx, &pb.SubscribeBlockhashRequest{})
if err != nil {
return nil, nil, err
}
items := make(chan *pb.BlockhashEvent, 64)
errs := make(chan error, 1)
go func() {
defer close(items)
defer close(errs)
for {
ev, err := stream.Recv()
if err != nil {
errs <- err
return
}
items <- ev
}
}()
return items, errs, nil
}
rs := superis.NewResilientStream(factory, nil, 64)
rs.Start(ctx)
defer rs.Close()
go func() {
for s := range rs.State() {
if s == superis.ConnectionDisconnected {
log.Warn("blockhash feed down")
}
}
}()
for ev := range rs.Subscribe() {
bh := ev.GetBlockhash()
_ = bh
}ts
import { ResilientStream } from "@superis/sweetspot-client";
const stream = new ResilientStream({
factory: async (signal) => {
return txClient.subscribeBlockhash({}, { signal });
},
capacity: 64,
});
stream.onState((s) => {
if (s === "disconnected") console.warn("blockhash feed down");
});
const unsub = stream.subscribe((ev) => {
console.log(ev.blockhash, ev.recommendedCuPrice);
});
stream.start();The Rust and TS variants pass the abort signal / drop the stream when their owning supervisor cancels; Go uses ctx cancellation.
ServerState
Hooks the resilient stream up to a typed holder:
rust
use std::sync::Arc;
use superis::ServerState;
let state = Arc::new(ServerState::new());
let _bh_task = state.run_blockhash(blockhash_stream.clone());
let _slot_task = state.run_slots(slot_stream.clone());
let blockhash = state.wait_blockhash().await;
let slot = state.wait_current_slot().await;go
state := superis.NewServerState()
go func() {
for ev := range blockhashStream.Subscribe() {
if bh := ev.GetBlockhash(); bh != nil {
state.SetBlockhash(base58.Encode(bh.Key), ev.GetRecommendedCuPrice())
}
}
}()
bh, err := state.WaitBlockhash(ctx)
slot, _ := state.WaitCurrentSlot(ctx)ts
import { ServerState } from "@superis/sweetspot-client";
const state = new ServerState();
blockhashStream.subscribe((ev) => {
if (ev.blockhash) state.setBlockhash(toBase58(ev.blockhash.key), ev.recommendedCuPrice);
});
slotStream.subscribe((ev) => state.setCurrentSlot(ev.slot));
const blockhash = await state.waitBlockhash();
const slot = await state.waitCurrentSlot();Program-id verification
Defense in depth. A compromised server could swap in an attacker program id — the SDK validates against any public Solana JSON-RPC endpoint the caller controls.
rust
use superis::config::{refresh, verify_program_id, ConfigCache};
let cache = ConfigCache::new();
let cfg = refresh(&cache, &mut market_client, &mut tx_client, server_program_id).await?;
verify_program_id("https://api.mainnet-beta.solana.com", &cfg.program_id).await?;go
err := superis.VerifyProgramID(
ctx,
"https://api.mainnet-beta.solana.com",
cfg.ProgramID,
)ts
import { verifyProgramId } from "@superis/sweetspot-client";
await verifyProgramId({
rpcUrl: "https://api.mainnet-beta.solana.com",
programId: cfg.programId,
});Backoff
Identical schedule across SDKs:
| Attempt | Wait |
|---|---|
| 0 | 500 ms |
| 1 | 1 s |
| 2 | 2 s |
| 3 | 4 s |
| 4 | 8 s |
| 5 | 16 s |
| 6+ | 30 s (capped) |
Resets to attempt 0 on every successful reconnect.