P2P Transfer Strategies and Load Balancing¶
This document explains how P2P transfers work in Global Store mode. It is code-derived and focuses on control flow, data flow, memory and VRAM movement, and the thread model that drives transfers.
Related docs:
- docs/architecture/artifact-views-and-retrieval.md
- docs/architecture/view-replicas-and-assembly.md
Scope and terminology¶
- P2P transfer happens between Store Daemons; Global Store only coordinates.
- Control plane: gRPC to Global Store and key mapping.
- Data plane: today
communicator::engine::Communicatorover RDMA or MTCP (multi-TCP); Phase 2 adds a routing wrapper (communicator::routing::RoutingContext) that will sit above the engine once the P2P path is integrated. - Remote access uses memory keys registered for coalesced chunk ranges.
- Topology modeling (
core/communicator/topology) now exists to describe Pool/Endpoint/Link reachability (including switch endpoints) without hardware discovery; routing integration is a follow-up.
End-to-end lifecycle (control + data)¶
sequenceDiagram
participant Source as Source Store Daemon
participant Target as Target Store Daemon
participant GS as Global Store
participant Comm as Communicator
Note over Source: registration or publish path
Source->>Comm: register_tensor_ex (GPU or CPU chunk ranges)
Source->>GS: RegisterReplica(memory) with remote_memory_keys
Note over Target: materialize AUTO
Target->>GS: RequestReplicaTransport(artifact_id, target_device)
GS-->>Target: TransportSession(remote_memory_info)
Target->>Comm: read_tensor(key, addr, bytes, ...)
Comm-->>Target: READ_RESPONSE_EX (RDMA/MTCP segments)
Comm-->>Target: data via RDMA or MTCP
Target->>GS: CompleteReplicaTransport(transport_id)
Source-side memory export and registration¶
RegistrationBackend::commitoptionally callsReplica::enable_remote_memory_access, which usesMemoryExportRegistry::export_chunksto coalesce chunk ranges and register them with the communicator (core/store/replica/memory_export_registry.cc).- CPU exports register tensors without MR (
register_mr=false) and hold UMA keepalive + stable leases; GPU exports register MR when RDMA is enabled and setdirect_rdma_enabledwhen staging is not required. - When
enable_p2pis true, registration publishes a memory replica viaGlobalStoreClient::register_memory_replicaincludingremote_memory_keys,buffer_sizes, and optionalverification_json(core/store/runtime/metadata/registration_backend.cc,core/store/runtime/metadata/metadata_gateway.cc). WorkerLifecycleManagertoggles local export on availability changes viaenable_remote_replica_accessanddisable_remote_replica_access, and includes existingremote_memory_keys/buffer_sizesin HA state sync when available; it does not mint new keys on its own (daemon/ha/worker_lifecycle_manager.cc).
Transport request and replica selection (Global Store)¶
Global Store selects a source replica with an atomic claim in ReplicaRepository.find_available_for_transport:
WITH candidate AS (
SELECT r.replica_id
FROM artifact_replicas r
LEFT JOIN replica_counters rc ON rc.replica_id = r.replica_id
LEFT JOIN workers w ON r.worker_id = w.worker_id
WHERE r.artifact_id = ?
AND COALESCE(rc.current_requests, 0) < r.max_concurrency
AND r.is_available = TRUE
AND w.accepting_new_requests = TRUE
AND w.inactive_at IS NULL
AND EXTRACT(epoch FROM w.last_heartbeat) > ?
ORDER BY
CASE
WHEN r.memory_type = 'GPU' THEN 0
WHEN r.memory_type = 'RAM' THEN 1
WHEN r.memory_type = 'DISK' THEN 2
ELSE 3
END,
r.max_concurrency ASC,
(COALESCE(rc.current_requests, 0) * 1.0 / GREATEST(r.max_concurrency, 1)),
r.updated_at ASC
LIMIT 1
)
UPDATE replica_counters
SET current_requests = current_requests + 1,
last_assigned_at = now()
WHERE replica_id = (SELECT replica_id FROM candidate)
RETURNING replica_id
Notes:
- Selection prefers GPU over RAM over DISK, then smaller max_concurrency, then load ratio, then oldest updated_at.
- The increment is atomic; failures to find a replica are retried until timeout in TransportService.request_transport.
- request_view_transport currently falls back to canonical routing in GlobalStoreClient (view-aware routing is not implemented yet).
Target-side orchestration and pipeline¶
Materialization control flow¶
MaterializationServicetries in-order: reuse existing replica, local CPU to GPU copy, then P2P in AUTO mode (core/store/runtime/ingestion/materialization_service.cc).MaterializeOrchestrator::runrequests a transport, rejects stale local routes, builds aP2PSource, and falls back to disk when the daemon resolves a disk source binding (managed shared-disk or local import) (core/store/materialization/control/materialize_orchestrator.cc).- The orchestrator always calls
complete_replica_transporton success or failure; once a disk source is resolved, disk fallback does not require additional Global Store calls.
Entry points used by the daemon¶
ResolveKeyMappingremains the control-path lookup forkey -> artifact_idwhen callers start from keys.MaterializeReplicaconsumesArtifactSelectionand callsmaterialize_replicawith AUTO or LOAD_ONLY depending on preference and inputs.
Ingestion pipeline stages¶
IngestionPipeline::ingest_from_p2p drives the runtime pipeline:
P2PSourceAdapter::prepareattaches the shared communicator engine and setsfallback_disk_dirunless preference iskPreferP2P.MetadataStageplans view transforms if needed; for view loads, it fetches the canonical index from Global Store.AllocationStagecreates the replica and blocks onReplicaLoadController::load_async_from_source. GPU loads retry after eviction onResourceExhausted.VerificationStagevalidatesverification_jsonkey points and computes optional view hashes.HandleStagebuildsReplicaHandle.
Publish and registration after load¶
IngestionRuntimecallsingest_from_p2pwithpublish_to_global_store=true;MaterializationFacademints apublish_context_idand emits started and completed events.MetadataGatewayconsumes completion events and callsregister_replica(presence only).MaterializeOrchestratoralso callsregister_replica_with_global_storeafter successful P2P or disk loads; the cachedpublish_context_iddedupes duplicate publishes.- Memory-replica publication with remote keys is only done by the registration backend (see source-side registration above).
Data plane: loaders and transfer pipeline¶
P2PLoader and sources¶
P2PLoaderrequiresmemory_keys,buf_sizes,size_bytes, and a communicator engine. It builds aRemoteKeySourcewhich maps global offsets to per-key offsets (core/store/materialization/dataplane/loaders/p2p_loader.cc,core/store/materialization/dataplane/sources/remote_key_source.cc).RemoteKeySource::read_atissuesCommunicator::read_tensorcalls and blocks on the returned future; this is why the pipeline runs on the blocking executor.- The routing wrapper (
core/communicator/routing) is not yet wired intoP2PLoader; Phase 4 will routeread_tensorthrough the wrapper to select direct links/NVLINK where applicable. - If
fallback_disk_diris set,P2PLoaderwraps the remote source inMuxSeekableSource, which falls back on short reads or remote errors (core/store/materialization/dataplane/sources/mux_seekable_source.cc).
TransferService and pump_ranges¶
ReplicaLoadController::load_async_from_sourcedelegates toTransferService::executeon the blocking executor and uses a per-GPU gate to serialize GPU sessions (core/store/replica/replica_load_controller.cc).TransferServicecreates a per-sessionStreamingPinnedBufferbacked by the sharedPinnedBufferPool. The pool slice size must divideartifact_chunk_bytesto avoid cross-chunk slices.pump_rangesdrives a producer/consumer pipeline:- Producers run on the blocking executor and call
SeekableSource::read_atinto pinned slots. - The consumer writes to a
PositionedSink. For GPU sinks,AsyncPositionedSink::write_at_asyncschedules H2D copies viaAsyncCopyManagerand returns slots only after completion. - Direct-write fast path: when the source supports direct write (RDMA enabled) and the sink implements
DirectWriteCapable(CPU sink),pump_rangesrequestsDirectWriteGrantwindows from UMA and callsRemoteKeySource::read_intoto RDMA-read directly into CPU VA ranges. Failures fall back to staged reads.
CPU vs GPU sinks¶
- CPU target uses
CpuVaSinkto write into UMA CPU VA and exposesplan_direct_write. - GPU target uses
GpuMemorySinkwith per-GPU inflight limits (bytes and copy counts) and H2D submission viaAsyncCopyManager. The sink does not implement direct write.
Network transport internals¶
RDMA (pull model)¶
- Control channel: TCP
ENGINE_OP_READ_REQUESTandENGINE_OP_READ_RESPONSE_EX. - The server stages or exposes memory and replies with RDMA segments (addr, rkey, bytes, window_seq).
- The client posts RDMA READs via
RdmaTransport::read_multi. On completion it triggers window ACKs (ENGINE_OP_RDMA_READ_DONE_EX) so the server can release staging credit. - Zero-copy RDMA: when GPU memory was registered with
direct_rdma_enabled, the server uses the original MR as the stage window (no staging buffer). - Staged RDMA uses a
MemoryStager: HostPinnedGpuStagerperforms D2H into pinned buffers.GpuVramRdmaStagerperforms D2D into a per-GPU VRAM pool whenrdma.staging_backend=GPU_VRAM.HostPinnedCpuStagercopies into pinned host buffers.
MTCP (push model)¶
- MTCP is used when
enable_rdma=falseor when a request uses MTCP explicitly. - The server sends
READ_RESPONSE_EX(transport=MTCP) and enqueues a staging task onmtcp_staging_thread_. MTcpTransportmanages multiple sockets (tcp_conn_count) plus a staged-send thread. Stage windows are sliced across lanes and each segment completion releases staging credit.- GPU receives: MTCP reads into pinned staging buffers (
StreamingPinnedBuffer) and schedules H2D copies viaAsyncCopyManager. Each sub-chunk returns its slot on copy completion. - CPU receives: MTCP reads directly into the CPU destination buffer, no extra copy.
Staging flow control and credit¶
FlowCreditLedgeris per-channel and sized bystager.buffers_per_flow. It grants window credit for both RDMA and MTCP.StagingWindowslices each request into credit-bounded windows;stager.max_window_segmentscaps window size.StageLeaseRegistrytracks active staged segments. RDMA ACKs, MTCP send completions, and the GC reaper reclaim leases and return credit.Communicatorenforces a GPU MTCP channel limit based onstager.expected_gpu_channelsand pinned pool capacity to prevent unbounded staging.
Replica memory state flow (target)¶
stateDiagram-v2
[*] --> UNALLOCATED
UNALLOCATED --> ALLOCATED : allocate_memory
ALLOCATED --> LOADING : load_async_from_source / copy_data_async
LOADING --> LOADED : commit + set_state
LOADING --> FAILED : load error
FAILED --> UNALLOCATED : release_memory + reset
Notes:
- P2P loads call ensure_loaded_async and block on the load future in AllocationStage.
- GPU loads may retry after eviction when ResourceExhausted is returned.
Thread model (hot paths)¶
Communicatorthreads: request loop (do_read_request_loop), channel GC (do_channel_gc_loop), handshake retry, and MTCP staging loop.- RDMA IO: per-device
RdmaThreadwith send, poll, and recv threads. - MTCP IO:
MTcpTransporthas server/client loops plus send, recv, and staged-send threads. Each connection usesMTcpTransportTasksend/recv threads. - P2P ingestion:
pump_rangesproducers run on the blocking executor; the consumer runs in the caller thread. GPU sessions are serialized byGpuSchedHandle.
Failure handling and fallbacks¶
MaterializeOrchestratoralways callscomplete_replica_transport. On P2P failure, it falls back to disk when a daemon-resolved disk source is available.P2PLoaderfallback is per-read:MuxSeekableSourcecompletes remaining bytes from disk on short reads or remote errors.- RDMA handshake failures surface as transport errors; no automatic MTCP fallback is performed.
- MTCP staging waits for credit up to
staging_wait_timeoutand fails withResourceExhaustedwhen the deadline is exceeded. do_channel_gc_loopreaps staleStageLeaseentries when ACKs are missing (ack_ttl_ms).
View-aware routing and verification¶
request_view_transportis invoked whenview_idis present. The client falls back to canonical routing when the server is view-unaware.MetadataStagefetches the canonical index from Global Store when a view needs planning.VerificationStagevalidates P2P transfers usingverification_json(key-point verification) and computes optional view hashes.
View Registration Telemetry¶
- View registrations are handled by
RegistrationControllerandRegistrationBackend, which build a bidirectional view plan and stream view writes into the target replica (core/store/runtime/metadata/registration_backend.cc). - On commit, the backend computes view hash and optional leaf digests (when a canonical
ByteRangeMapis available) and publishes aViewStateUpdateviaRegistrationPublisher::update_view_state. GlobalStoreRegistrationPublishercallsGlobalStoreClient::update_artifact_view_stateto persist view metadata; failures are logged and treated as best-effort when the server does not support the RPC.- For view materialization via P2P,
MetadataGateway::register_replicaalso callsrecord_view_residency(currentlyUnimplementedin the client), so view residency updates are best-effort until the server RPC lands.
Observability¶
- Global Store metrics:
inc_transport_request,observe_transport_wait,inc_active_transports,dec_active_transports. - Store runtime metrics:
tc_p2p_bytes_total,tc_tx_duration_ms,tc_tx_inflight_copies_gauge,tc_tx_inflight_bytes_gauge,tc_tx_direct_window_*. - P2P staging logs:
[staging_credit]lines identify window grants, outstanding credit, and ACK release paths.
Benchmark quickstart¶
- Exercise the RDMA window flow with logs enabled:
- Stress the MTCP path and observe staged completions:
- Validate unified flow control across transports: