High Availability — Developer Guide¶
Overview¶
This document explains High Availability (HA) as implemented in code: startup recovery, enhanced heartbeat, state synchronization, and daemon lifecycle behavior. It links directly to source files and proto definitions used in production.
TODO¶
- Add persistence-task durability/resume plan for distributed persistence (daemon-local journal + recovery flow). See
docs/architecture/api/policy-persistence.mdfor the current model.
Key Components (sources)¶
- Global Store
- Recovery/state sync service:
tensorcast/global_store/services/recovery_service.py - gRPC surface (registration, heartbeat, reconcile, health):
tensorcast/global_store/grpc_service.py - Store Daemon (C++)
- Lifecycle + heartbeat + sync + drift eviction:
daemon/ha/worker_lifecycle_manager.cc - Global Store client + retries/backoff:
core/store/components/global_store_client.{h,cc} - HA wiring + endpoint selection:
daemon/app/server_main.cc - Protocol buffers
- Enhanced heartbeat and sync:
proto/tensorcast/global_store/v1/global_store.proto
Materialization HA invariant: v2 descriptor streaming in the daemon uses UMA view plans when a view is requested and routes disk fallback through daemon-owned source bindings (managed shared-disk or local import). SDKs never read disk directly for retrieval, so selective tensor loads and disk-sourced replicas share the same daemon-managed buffer layout.
Registration & Identity¶
- The daemon must advertise a routable address (non-loopback, non-unspecified).
server.advertise.hostoverridesserver.listen.host;RegisterWorkerrejects loopback/unspecified IPs. daemon_idis required and is the stable identity used for Global Store registration; when omitted in config it is auto-generated and persisted under the host-scoped runtime root.worker_idis an assigned row id and may change across re-registrations.- Endpoint takeover: if a new registration reuses an existing
node_address:grpc_port, Global Store reclaims the previous endpoint owner and accepts the new daemon registration as authoritative (no heartbeat-timeout wait required). - HA requires
server.p2p_listen.portto be non-zero; startup aborts otherwise. - Recovery registration preserves
worker_idwhen possible and always requests a state sync; stale replicas owned by the previous worker id are reassigned or marked unavailable.
Startup Recovery (Global Store)¶
On startup the Global Store runs a guarded recovery pass; state stays marked stale until workers re-confirm via registration or heartbeat.
RecoveryService.initiate_recoveryshort-circuits if already running so only one pass executes.- The pass validates the DuckDB backing store, marks every worker and replica as stale, preserves persisted state versions/checksums, and records the recovery timestamp.
- Keeping the stale flag forces each daemon to re-register and re-sync before it can serve traffic, ensuring registry drift does not leak into routing.
Enhanced Heartbeat¶
Heartbeats are always enhanced and must include a non-zero state_version so the Global Store can detect drift. Legacy heartbeats (state_version == 0) are rejected.
- Request payload: worker id, available memory, acceptance flag, current
state_version, computedstate_checksum, the set of registered artifact ids (publishable + resident only), last successful sync timestamp, daemon→Global Store connection status, andcapability_flags(bitset) when capability directory publishing is enabled. - Global Store handling (
grpc_service._handle_enhanced_heartbeat): - Compares the sent version with the persisted
state_versionand compares the worker checksum with the cached global checksum (recomputed only when missing). - Marks
state_sync_requiredwhen versions or checksums diverge, or when obsolete replicas are detected. - Returns the server timestamp, the expected version, and the list of obsolete replicas (diagnostic only; daemons use it to request a sync, not to unload directly).
- Updates
capability_flagsonly when the value changes to avoid per-heartbeat writes. - Registration initializes each worker’s
state_versionto 1; the daemon seeds its localstate_versionfromRegisterWorkerResponse.expected_state_versionand treats missing/zero versions as an error. - Daemon behavior (
WorkerLifecycleManager+GlobalStoreClient): - Collects publishable, resident replicas, computes a checksum, and calls
send_heartbeat_enhanced. - Heartbeat is lightweight and only queues sync work; state synchronization runs in a separate loop with a single in-flight sync, and the monitor requests cancel only when progress exceeds the configured RPC-timeout budget, restarting after the thread exits to avoid overlap.
- If the RPC returns
NOT_FOUNDwhile the channel is healthy, the daemon re-registers with the preserved identity and triggers a reconcile bootstrap cycle before resuming heartbeats.
State Reconcile (V2)¶
When state_sync_required is returned (or the daemon detects its version/checksum diverged), the daemon submits inventory through ReconcileWorkerState.
- Request identity is
(worker_id, daemon_id, generation, request_seq).generationidentifies daemon incarnation;request_seqis monotonic within one generation. - Global Store returns typed outcomes:
APPLIED,NOOP,IGNORED_STALE,RETRY_LATER,REBASE_REQUIRED,FATAL. APPLIED/NOOPupdate daemon-localstate_version+ checksum.IGNORED_STALEis treated as success for stale/duplicate requests.RETRY_LATERprovidesretry_after_ms; daemon backs off and retries reconcile.REBASE_REQUIREDreturns authoritativeexpected_replicas; daemon applies local rebase from that snapshot.StateChangeremovals are applied per replica key(artifact_id, memory_type, device_id)so GPU/CPU tiers are reconciled independently.- The daemon’s inventory is publishable + resident only; non-resident replicas are never mapped to DISK.
StateChangeremovals enqueue a safe-retire flow; unload happens only after ref counts, use leases, placement pins, and transport locks clear.StateChangeupdates propagate endpoint metadata drift (node address/port, memory size, and transport keys when reported) so routing stays accurate.- Checksum parity: both sides compute a stable FNV-1a hash over a sorted inventory of
(artifact_id, node_id, node_address, node_port, memory_type, device_id, availability). Order never changes the checksum, and availability or endpoint drift intentionally mutates it to trigger reconciliation. - Authoritative empties: when
request_kind=SNAPSHOT, empty inventory is authoritative and removals are applied (for drains/retirements).DELTAkeeps conservative behavior for empty inventory.
Connection Management & Retries (Daemon)¶
All Global Store RPCs use bounded retries with exponential backoff and jitter (max_retries=3, initial backoff 100ms, doubled per attempt with ±50% jitter). Each call also sets deadlines so stalled channels fail fast.
- The retry helper is centralized in
GlobalStoreClient::execute_rpc_with_retry, so heartbeat, registration, sync, and health probes share the same policy. - HA config can override heartbeat/reconcile RPC timeouts and retry limits to bound recovery latency without affecting other calls.
- Endpoint selection: the daemon currently uses the first entry in
high_availability.global_store_endpointsasglobal_store_address(single Global Store only; additional endpoints are ignored for now). - Advertise constraints:
RegisterWorkerrejects loopback or unspecified addresses;resolve_advertised_addressprefersserver.advertise.host, then a routableserver.listen.host, then the outbound route IP to the configured Global Store endpoint, and finally the default interface IP. - Health check:
GlobalStoreClient::initializeissues aHealthCheckbefore registration; failing the probe aborts HA startup early instead of hanging during registration. When ameta.cluster_tokenis configured, the daemon refuses to proceed unless the HealthCheck token matches, preventing accidental cross-cluster registration. Endpoint metadata (bind/advertise) is returned byGetServerInfofor operators and CLI tooling.
Responsibilities¶
- Global Store: persist registry; accept heartbeats and reconcile state; compute/apply changes; return expected snapshots on
REBASE_REQUIRED; reject ambiguous loopback registrations; expose cluster token via HealthCheck and endpoints via GetServerInfo. - Store Daemon: maintain authoritative publishable inventory; send enhanced heartbeats; run typed reconcile loop; enqueue safe retire for drift/removals; re-register on desync; prefetch adds returned by reconcile; never unload directly from heartbeat obsolete hints.
Failure Modes and Recovery Behavior¶
- Global Store outage/restart
- Heartbeats log warnings and retry with backoff;
is_connected()flips false when the gRPC channel is not READY, so P2P transports and registrations reject withFailedPreconditionunless a disk hint is provided. - Store Daemons continue serving already-materialized replicas locally; inventory remains authoritative on the node, but new replicas are not registered while the outage lasts.
- In-flight transports keep streaming because the data plane is daemon-to-daemon; the follow-up
complete_replica_transportcall may fail and leak counters temporarily, butcleanup_expired_transports()on the Global Store cleans them up. -
When the Global Store comes back, the next enhanced heartbeat can return
NOT_FOUND; the daemon automatically re-registers with preserved identity and performs reconcile bootstrap to reconcile drift before resuming normal heartbeats. -
Store Daemon crash or shutdown
- Heartbeats stop; Global Store marks the worker stale after
heartbeat_timeout, sets it inactive, and removes it from transport selection. Existing replicas remain in the registry but are not assigned becausefind_available_for_transport()filters by heartbeat freshness and inactivity. - Stuck transports sourced from the crashed daemon are force-completed by the Global Store sweeper; P2P sessions targeting the crashed daemon fail at connection time and surface an error to callers.
-
On restart, the daemon re-registers, runs reconcile bootstrap, and prunes obsolete replicas locally so it rejoins routing without manual cleanup.
-
Network partition between daemon and Global Store
- RPCs exhaust retry budgets (
execute_rpc_with_retry), heartbeats record failures, and the daemon eventually considers the channel disconnected. Local reads still succeed; any operation requiring coordination (registration, transport request, chunk sync) fails fast to clients with the underlying gRPC status. -
When connectivity resumes, the daemon resumes heartbeats, re-registers if needed, and performs reconcile with typed outcomes depending on drift.
-
P2P transport failure (network or source-side)
MaterializeOrchestratoralways callscomplete_replica_transport()even wheningest_from_p2p()fails, releasing source-side counters promptly.- If a disk hint exists, the orchestrator retries via
ingest_from_disk(); otherwise the error propagates to the client SDK. GPU memory pressure triggers an eviction + single retry before failing. -
Transport cleanup on the Global Store prevents long-lived counter leaks when either side dies mid-transfer.
-
Client SDK visibility
- Requests served from a healthy local replica succeed even during Global Store outages.
- Missing replicas with no daemon-resolved disk source return
FailedPreconditionwhen the Global Store is unreachable; P2P failures surface the transport error unless disk fallback succeeds. - After a daemon restart or reconnection, the first successful sync re-enables normal routing; clients see transient failures only during the gap.
HA Event Timeline (heartbeat, re-registration, sync)¶
sequenceDiagram
autonumber
participant SD as Store Daemon
participant GS as Global Store
participant CL as Client SDK
CL->>SD: Materialize request
SD->>GS: RegisterWorker
SD->>GS: ReconcileWorkerState (SNAPSHOT)
SD->>GS: Heartbeat (enhanced)
GS-->>SD: HB OK (no sync)
Note over SD,GS: GS restart / outage
SD->>GS: Heartbeat retries w/ backoff
GS-->>SD: NOT_FOUND (after restart)
SD->>GS: Re-register (preserve identity)
SD->>GS: ReconcileWorkerState (SNAPSHOT)
GS-->>SD: REBASE_REQUIRED + expected_replicas
SD->>SD: Apply rebase, resume traffic
CL->>SD: New request during outage
SD-->>CL: Serve local replica OR error (needs GS / no disk hint)
Transport Failure and Fallback Path¶
flowchart TD
RQ[Client materialize request] --> LCL{Local replica present?}
LCL -- Yes --> SERVE[Serve immediately]
LCL -- No --> GSOK{Global Store reachable?}
GSOK -- No --> DISKHINT{Disk hint provided?}
DISKHINT -- Yes --> DISK[ingest_from_disk()]
DISKHINT -- No --> ERR1[Fail: FailedPrecondition]
GSOK -- Yes --> P2P[RequestReplicaTransport()]
P2P -- Granted --> COPY[ingest_from_p2p()]
COPY -- Success --> COMPLETE[complete_replica_transport()]
COMPLETE --> REG[register_replica_with_global_store()]
COPY -- Failure --> CLOSE[complete_replica_transport()]
CLOSE --> DISKFALL{Disk hint available?}
DISKFALL -- Yes --> DISK
DISKFALL -- No --> ERR2[Fail with transport error]
DISK --> SERVE
Protocols (authoritative)¶
From proto/tensorcast/global_store/v1/global_store.proto:
ReconcileWorkerStateRequest:worker_id,daemon_id,generation,request_seq,request_kind, and inventory (ReplicaInfolist).ReconcileWorkerStateResponse:result_kind,retry_after_ms,new_state_version,new_state_checksum,state_changes, and optionalexpected_replicas(for rebase).RegisterWorkerResponse.reconcile_generation: daemon-incarnation generation assigned by Global Store at registration.
End-to-End Flows¶
- Startup recovery: validate DB, mark stale, preserve persisted versions/checksums -> workers re-register/heartbeat -> server may request reconcile.
- Heartbeat loop: daemon sends enhanced heartbeat -> server compares version/checksum and obsolete set -> may request reconcile.
- Reconcile loop: daemon sends
ReconcileWorkerStatewith monotonic generation/sequence -> server computes/apply StateChange list and returns typed result. - Rebase path: server returns
REBASE_REQUIREDwith expected replicas -> daemon prunes/apply local state to match authoritative snapshot.
Reconciliation Invariants¶
- Authoritative inventory: daemon’s
local_replicasis the source of truth for local presence. - Addition over removal: removals are suppressed if the daemon provides no inventory; only add/remove based on explicit differences, and state versions bump only when changes are applied.
- Idempotency: repeated adds resolve to the same replica logically; generation/request_seq ensure stale requests are ignored.
- Publishable-only visibility: only resident replicas marked publishable are advertised; non-resident replicas are never mapped to DISK.
- Safe retire: unload only after ref counts, use leases, placement pins, and transport locks clear; heartbeat obsolete hints never trigger unload directly.
Configuration¶
Global Store (Unified Config)¶
Use tensorcast.config.v1.GlobalStoreConfig (YAML/JSON → proto with strict validation):
database:
db_file: /var/lib/tensorcast/global_store.duckdb # persistent for HA
server:
listen:
host: 0.0.0.0
port: 50051
advertise:
host: 10.0.0.5 # routable address for clients/GetServerInfo
max_workers: 20
worker_policy:
heartbeat_timeout: 30s
cleanup_interval: 60s
default_heartbeat_interval: 5s
memory_tiers:
snapshot_retention: 10m
snapshot_max_rows: 200
publish_interval: 5s
meta:
cluster_token: <optional-cluster-id> # propagated via HealthCheck
If server.advertise.host is set, it must be routable (non-loopback/unspecified) or Global Store startup fails. When unset, the server auto-detects a routable IPv4 address and logs it.
Store Daemon¶
high_availability:
enabled: true
global_store_endpoints:
- host: 10.0.0.5 # first entry is used
port: 50051
heartbeat_interval: 5s # defaults to 5s if omitted
periodic_sync_interval: 10s # drives chunk_sync_loop; 0 disables
heartbeat_rpc_timeout: 2s # optional per-RPC overrides
state_sync_rpc_timeout: 5s
server:
listen:
host: 0.0.0.0
port: 50052
advertise:
host: 10.0.0.20 # must be routable (non-loopback)
p2p_listen:
host: 0.0.0.0
port: 65090 # required for HA registration
Set meta.cluster_token in the daemon config to guard against cross-cluster connections: the daemon’s HealthCheck will fail fast when the Global Store advertises a different token.
Observability¶
- Global Store Prometheus metrics (see
tensorcast/global_store/metrics.py): tc_state_sync_total{result=success|error},tc_state_sync_secondstc_active_workers,tc_replicas_total,tc_replicas_per_memtype- gRPC interceptors export
tc_grpc_server_handled_totaland latency histograms - Daemon: OTEL spans around all RPCs plus HA counters exported through OTEL (
tc_daemon_ha_heartbeat_success_total,tc_daemon_ha_heartbeat_failure_total,tc_daemon_ha_sync_success_total,tc_daemon_ha_sync_failure_total). - Health check:
tensorcast.global_store.v1.ClusterAdminService/HealthCheckreturnscluster_tokenplus status;GetServerInforeturns bind (listen_*) and routable (advertise_*) endpoints, metrics_port, and version.
Troubleshooting¶
- Registration rejected: ensure
server.advertise.host(or listen host) is routable andserver.p2p_listen.portis non-zero. - Heartbeat OK but frequent sync requests: verify checksum computation on daemon; check for rapidly changing inventories or missing replicas in
local_replicas. - Unexpected removals: ensure daemon includes full
local_replicas; server suppresses removals if inventory is empty. - RPC failures: inspect OTEL traces and Global Store metrics; adjust network and retry settings if the first endpoint is unreachable.
References¶
docs/architecture/architecture-overview.mddocs/architecture/p2p-transfer-strategies.mddocs/deployment/global-store-deployment.md