diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index 8c92b58e0..b39df0ba9 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -10,7 +10,7 @@ use std::{ use crate::{ client_events::{ClientId, HostResult}, - node::PeerId, + node::{proximity_cache::ProximityCacheMessage, PeerId}, operations::{ connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg, }, @@ -255,6 +255,10 @@ pub(crate) enum NetMessageV1 { }, Update(UpdateMsg), Aborted(Transaction), + ProximityCache { + from: PeerId, + message: ProximityCacheMessage, + }, } trait Versioned { @@ -279,6 +283,7 @@ impl Versioned for NetMessageV1 { NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0), NetMessageV1::Update(_) => semver::Version::new(1, 0, 0), NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0), + NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0), } } } @@ -339,6 +344,11 @@ pub(crate) enum NodeEvent { target: PeerId, msg: Box, }, + #[allow(dead_code)] // Reserved for future proximity cache broadcasting + BroadcastProximityCache { + from: PeerId, + message: crate::node::proximity_cache::ProximityCacheMessage, + }, } #[derive(Debug, Clone)] @@ -418,6 +428,12 @@ impl Display for NodeEvent { NodeEvent::SendMessage { target, msg } => { write!(f, "SendMessage (to {target}, tx: {})", msg.id()) } + NodeEvent::BroadcastProximityCache { from, message } => { + write!( + f, + "BroadcastProximityCache (from {from}, message: {message:?})" + ) + } } } } @@ -452,6 +468,7 @@ impl MessageStats for NetMessageV1 { NetMessageV1::Update(op) => op.id(), NetMessageV1::Aborted(tx) => tx, NetMessageV1::Unsubscribed { transaction, .. } => transaction, + NetMessageV1::ProximityCache { .. } => Transaction::NULL, } } @@ -464,6 +481,7 @@ impl MessageStats for NetMessageV1 { NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()), NetMessageV1::Aborted(_) => None, NetMessageV1::Unsubscribed { .. } => None, + NetMessageV1::ProximityCache { .. } => None, } } @@ -476,6 +494,7 @@ impl MessageStats for NetMessageV1 { NetMessageV1::Update(op) => op.requested_location(), NetMessageV1::Aborted(_) => None, NetMessageV1::Unsubscribed { .. } => None, + NetMessageV1::ProximityCache { .. } => None, } } } @@ -495,6 +514,9 @@ impl Display for NetMessage { Unsubscribed { key, from, .. } => { write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?; } + ProximityCache { from, message } => { + write!(f, "ProximityCache {{ from: {from}, message: {message:?} }}")?; + } }, }; write!(f, "}}") diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index f3a4b165a..90bdd582f 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -71,6 +71,7 @@ mod message_processor; mod network_bridge; mod op_state_manager; mod p2p_impl; +pub(crate) mod proximity_cache; mod request_router; pub(crate) mod testing_impl; @@ -826,6 +827,26 @@ async fn process_message_v1( op_manager.ring.remove_subscriber(key, from); break; } + NetMessageV1::ProximityCache { from, ref message } => { + tracing::debug!(?from, "Processing proximity cache message"); + + // Handle the proximity cache message + if let Some(response) = op_manager + .proximity_cache + .handle_message(from.clone(), message.clone()) + .await + { + // Send response directly back to the sender + let response_msg = NetMessage::V1(NetMessageV1::ProximityCache { + from: op_manager.ring.connection_manager.get_peer_key().unwrap(), + message: response, + }); + if let Err(err) = conn_manager.send(&from, response_msg).await { + tracing::error!(%err, ?from, "Failed to send proximity cache response"); + } + } + break; + } _ => break, // Exit the loop if no applicable message type is found } } @@ -1047,6 +1068,26 @@ where op_manager.ring.remove_subscriber(key, from); break; } + NetMessageV1::ProximityCache { from, ref message } => { + tracing::debug!(?from, "Processing proximity cache message (pure network)"); + + // Handle the proximity cache message + if let Some(response) = op_manager + .proximity_cache + .handle_message(from.clone(), message.clone()) + .await + { + // Send response directly back to the sender + let response_msg = NetMessage::V1(NetMessageV1::ProximityCache { + from: op_manager.ring.connection_manager.get_peer_key().unwrap(), + message: response, + }); + if let Err(err) = conn_manager.send(&from, response_msg).await { + tracing::error!(%err, ?from, "Failed to send proximity cache response"); + } + } + break; + } _ => break, // Exit the loop if no applicable message type is found } } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 4a39bfc43..ce80a1fd1 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -293,6 +293,39 @@ impl P2pConnManager { peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); + + // Handle ProximityCache messages directly before normal processing + if let crate::message::NetMessage::V1( + crate::message::NetMessageV1::ProximityCache { from, message }, + ) = &msg + { + tracing::info!(?from, ?message, "Processing ProximityCache message directly in InboundMessage handler"); + + // Handle the proximity cache message + if let Some(response) = op_manager + .proximity_cache + .handle_message(from.clone(), message.clone()) + .await + { + // Send response directly back to the sender + let response_msg = crate::message::NetMessage::V1( + crate::message::NetMessageV1::ProximityCache { + from: op_manager + .ring + .connection_manager + .get_peer_key() + .unwrap(), + message: response, + }, + ); + if let Err(err) = ctx.bridge.send(from, response_msg).await { + tracing::error!(%err, ?from, "Failed to send ProximityCache response"); + } + } + // ProximityCache processed, skip normal message handling + continue; + } + ctx.handle_inbound_message( msg, &outbound_message, @@ -301,17 +334,16 @@ impl P2pConnManager { ) .await?; } - ConnEvent::OutboundMessage(NetMessage::V1(NetMessageV1::Aborted(tx))) => { + ConnEvent::OutboundMessage { + target, + msg: NetMessage::V1(NetMessageV1::Aborted(tx)), + } => { // TODO: handle aborted transaction as internal message - tracing::error!(%tx, "Aborted transaction"); + tracing::error!(%tx, target_peer = %target, "Aborted transaction"); } - ConnEvent::OutboundMessage(msg) => { - let Some(target_peer) = msg.target() else { - let id = *msg.id(); - tracing::error!(%id, %msg, "Target peer not set, must be set for connection outbound message"); - ctx.bridge.op_manager.completed(id); - continue; - }; + ConnEvent::OutboundMessage { target, msg } => { + // target is the PeerId from the event - this is the authoritative target + // msg.target() may be None (for ProximityCache) or Some (for other messages) // Check if message targets self - if so, process locally instead of sending over network let self_peer_id = ctx @@ -321,11 +353,11 @@ impl P2pConnManager { .connection_manager .get_peer_key() .unwrap(); - if target_peer.peer == self_peer_id { + if target == self_peer_id { tracing::error!( tx = %msg.id(), msg_type = %msg, - target_peer = %target_peer, + target_peer = %target, self_peer = %self_peer_id, "BUG: OutboundMessage targets self! This indicates a routing logic error - messages should not reach OutboundMessage handler if they target self" ); @@ -343,17 +375,18 @@ impl P2pConnManager { tracing::info!( tx = %msg.id(), msg_type = %msg, - target_peer = %target_peer, + target_peer = %target, + msg_target = ?msg.target(), "Sending outbound message to peer" ); // IMPORTANT: Use a single get() call to avoid TOCTOU race // between contains_key() and get(). The connection can be // removed by another task between those two calls. - let peer_connection = ctx.connections.get(&target_peer.peer); + let peer_connection = ctx.connections.get(&target); tracing::debug!( tx = %msg.id(), self_peer = %ctx.bridge.op_manager.ring.connection_manager.pub_key, - target = %target_peer.peer, + target = %target, conn_map_size = ctx.connections.len(), has_connection = peer_connection.is_some(), "[CONN_TRACK] LOOKUP: Checking for existing connection in HashMap" @@ -368,7 +401,7 @@ impl P2pConnManager { } else { tracing::info!( tx = %msg.id(), - target_peer = %target_peer, + target_peer = %target, "Message successfully sent to peer connection" ); } @@ -376,7 +409,7 @@ impl P2pConnManager { None => { tracing::warn!( id = %msg.id(), - target = %target_peer.peer, + target = %target, "No existing outbound connection, establishing connection first" ); @@ -388,7 +421,7 @@ impl P2pConnManager { ctx.bridge .ev_listener_tx .send(Right(NodeEvent::ConnectPeer { - peer: target_peer.peer.clone(), + peer: target.clone(), tx, callback, is_gw: false, @@ -401,11 +434,11 @@ impl P2pConnManager { // Connection established, try sending again // IMPORTANT: Use single get() call to avoid TOCTOU race let peer_connection_retry = - ctx.connections.get(&target_peer.peer); + ctx.connections.get(&target); tracing::debug!( tx = %msg.id(), self_peer = %ctx.bridge.op_manager.ring.connection_manager.pub_key, - target = %target_peer.peer, + target = %target, conn_map_size = ctx.connections.len(), has_connection = peer_connection_retry.is_some(), "[CONN_TRACK] LOOKUP: Retry after connection established - checking for connection in HashMap" @@ -419,7 +452,7 @@ impl P2pConnManager { } else { tracing::error!( tx = %tx, - target = %target_peer.peer, + target = %target, "Connection established successfully but not found in HashMap - possible race condition" ); } @@ -427,14 +460,14 @@ impl P2pConnManager { Ok(Some(Err(e))) => { tracing::error!( "Failed to establish connection to {}: {:?}", - target_peer.peer, + target, e ); } Ok(None) | Err(_) => { tracing::error!( "Timeout or error establishing connection to {}", - target_peer.peer + target ); } } @@ -815,6 +848,30 @@ impl P2pConnManager { Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e), } } + NodeEvent::BroadcastProximityCache { from, message } => { + // Broadcast ProximityCache message to all connected peers + tracing::debug!( + %from, + ?message, + peer_count = ctx.connections.len(), + "Broadcasting ProximityCache message to connected peers" + ); + + use crate::message::{NetMessage, NetMessageV1}; + let msg = NetMessage::V1(NetMessageV1::ProximityCache { + from: from.clone(), + message: message.clone(), + }); + + for peer in ctx.connections.keys() { + if peer != &from { + tracing::debug!(%peer, "Sending ProximityCache to peer"); + if let Err(e) = ctx.bridge.send(peer, msg.clone()).await { + tracing::warn!(%peer, "Failed to send ProximityCache: {}", e); + } + } + } + } NodeEvent::Disconnect { cause } => { tracing::info!( "Disconnecting from network{}", @@ -1368,7 +1425,13 @@ impl P2pConnManager { target_peer = %target, "handle_notification_msg: Message has target peer, routing as OutboundMessage" ); - return EventResult::Event(ConnEvent::OutboundMessage(msg).into()); + return EventResult::Event( + ConnEvent::OutboundMessage { + target: target.peer, + msg, + } + .into(), + ); } } @@ -1408,8 +1471,8 @@ impl P2pConnManager { fn handle_bridge_msg(&self, msg: Option) -> EventResult { match msg { - Some(Left((_target, msg))) => { - EventResult::Event(ConnEvent::OutboundMessage(*msg).into()) + Some(Left((target, msg))) => { + EventResult::Event(ConnEvent::OutboundMessage { target, msg: *msg }.into()) } Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()), None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()), @@ -1551,7 +1614,7 @@ enum EventResult { #[derive(Debug)] pub(super) enum ConnEvent { InboundMessage(NetMessage), - OutboundMessage(NetMessage), + OutboundMessage { target: PeerId, msg: NetMessage }, NodeAction(NodeEvent), ClosedChannel(ChannelCloseReason), } diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index 541a71c27..803b82f8c 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -32,7 +32,10 @@ use crate::{ ring::{ConnectionManager, LiveTransactionTracker, Ring}, }; -use super::{network_bridge::EventLoopNotificationsSender, NetEventRegister, NodeConfig}; +use super::{ + network_bridge::EventLoopNotificationsSender, proximity_cache::ProximityCacheManager, + NetEventRegister, NodeConfig, +}; #[cfg(debug_assertions)] macro_rules! check_id_op { @@ -77,6 +80,8 @@ pub(crate) struct OpManager { pub peer_ready: Arc, /// Whether this node is a gateway pub is_gateway: bool, + /// Proximity cache manager for tracking which neighbors have which contracts + pub proximity_cache: Arc, } impl OpManager { @@ -126,6 +131,8 @@ impl OpManager { tracing::debug!("Regular peer node: peer_ready will be set after first handshake"); } + let proximity_cache = Arc::new(ProximityCacheManager::new()); + Ok(Self { ring, ops, @@ -135,6 +142,7 @@ impl OpManager { result_router_tx, peer_ready, is_gateway, + proximity_cache, }) } diff --git a/crates/core/src/node/proximity_cache.rs b/crates/core/src/node/proximity_cache.rs new file mode 100644 index 000000000..ac5b5d4cb --- /dev/null +++ b/crates/core/src/node/proximity_cache.rs @@ -0,0 +1,599 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use dashmap::{DashMap, DashSet}; +use freenet_stdlib::prelude::{ContractInstanceId, ContractKey}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tracing::{debug, info, trace}; + +use super::PeerId; + +/// Batch announcement interval - how often to send batched removal announcements +const BATCH_ANNOUNCEMENT_INTERVAL: Duration = Duration::from_secs(30); + +/// Proximity cache manager - tracks what contracts this node and its neighbors are caching +pub struct ProximityCacheManager { + /// Contracts we are caching locally + my_cache: Arc>, + + /// What we know about our neighbors' caches + /// PeerId -> Set of contract IDs they're caching + neighbor_caches: Arc>, + + /// Statistics for monitoring + stats: Arc>, + + /// Last time we sent a batch announcement + #[allow(dead_code)] // Reserved for future batched announcement scheduling + last_batch_announce: Arc>, + + /// Pending removals to be sent in the next batch announcement + pending_removals: Arc>>, +} + +#[derive(Clone, Debug)] +struct NeighborCache { + /// Contract IDs this neighbor is caching + contracts: HashSet, + /// Last time we received an update from this neighbor + #[allow(dead_code)] // Reserved for future stale neighbor cleanup + last_update: Instant, +} + +#[derive(Clone, Debug, Default)] +pub struct ProximityStats { + pub cache_announces_sent: u64, + #[allow(dead_code)] // Reserved for future statistics tracking + pub cache_announces_received: u64, + pub updates_via_proximity: u64, + pub updates_via_subscription: u64, + pub false_positive_forwards: u64, +} + +/// Message types for proximity cache protocol +#[derive(Debug, Clone, Serialize, Deserialize)] +#[allow(clippy::enum_variant_names)] +pub enum ProximityCacheMessage { + /// Announce contracts we're caching (immediate for additions, batched for removals) + CacheAnnounce { + /// Contracts we're now caching + added: Vec, + /// Contracts we're no longer caching + removed: Vec, + }, + /// Request neighbor's cache state (for new connections) + CacheStateRequest, + /// Response with full cache state + CacheStateResponse { contracts: Vec }, +} + +impl Default for ProximityCacheManager { + fn default() -> Self { + Self { + my_cache: Arc::new(DashSet::new()), + neighbor_caches: Arc::new(DashMap::new()), + stats: Arc::new(RwLock::new(ProximityStats::default())), + last_batch_announce: Arc::new(RwLock::new(Instant::now())), + pending_removals: Arc::new(RwLock::new(HashSet::new())), + } + } +} + +impl ProximityCacheManager { + pub fn new() -> Self { + Self::default() + } + + /// Called when we cache a new contract (PUT or successful GET) + pub async fn on_contract_cached( + &self, + contract_key: &ContractKey, + ) -> Option { + let contract_id = *contract_key.id(); + + if self.my_cache.insert(contract_id) { + info!( + contract = %contract_key, + "PROXIMITY_PROPAGATION: Added contract to cache" + ); + + // Update statistics for immediate announcement + let mut stats = self.stats.write().await; + stats.cache_announces_sent += 1; + drop(stats); + + // Immediate announcement for new cache entries + Some(ProximityCacheMessage::CacheAnnounce { + added: vec![contract_id], + removed: vec![], + }) + } else { + trace!( + contract = %contract_key, + "PROXIMITY_PROPAGATION: Contract already in cache" + ); + None + } + } + + /// Called when we evict a contract from cache + #[allow(dead_code)] // TODO: This will be called when contract eviction is implemented + pub async fn on_contract_evicted(&self, contract_key: &ContractKey) { + let contract_id = *contract_key.id(); + + if self.my_cache.remove(&contract_id).is_some() { + debug!( + contract = %contract_key, + "PROXIMITY_PROPAGATION: Removed contract from cache, adding to pending removals" + ); + // Add to pending removals for batch processing + let mut pending = self.pending_removals.write().await; + pending.insert(contract_id); + } + } + + /// Process a proximity cache message from a neighbor + /// Returns an optional response message that should be sent back to the peer + pub async fn handle_message( + &self, + peer_id: PeerId, + message: ProximityCacheMessage, + ) -> Option { + match message { + ProximityCacheMessage::CacheAnnounce { added, removed } => { + let mut stats = self.stats.write().await; + stats.cache_announces_received += 1; + drop(stats); + + // Update our knowledge of this neighbor's cache + let now = Instant::now(); + self.neighbor_caches + .entry(peer_id.clone()) + .and_modify(|cache| { + for contract_id in &added { + cache.contracts.insert(*contract_id); + } + for contract_id in &removed { + cache.contracts.remove(contract_id); + } + cache.last_update = now; + }) + .or_insert_with(|| NeighborCache { + contracts: added.iter().cloned().collect(), + last_update: now, + }); + + info!( + peer = %peer_id, + added = added.len(), + removed = removed.len(), + total_contracts = self.neighbor_caches.get(&peer_id).map(|c| c.contracts.len()).unwrap_or(0), + "PROXIMITY_PROPAGATION: Updated neighbor cache knowledge" + ); + None + } + + ProximityCacheMessage::CacheStateRequest => { + // Send our full cache state + let response = ProximityCacheMessage::CacheStateResponse { + contracts: self.my_cache.iter().map(|r| *r.key()).collect(), + }; + + let cache_size = + if let ProximityCacheMessage::CacheStateResponse { contracts } = &response { + contracts.len() + } else { + 0 + }; + debug!( + peer = %peer_id, + cache_size = cache_size, + "PROXIMITY_PROPAGATION: Sending cache state to neighbor" + ); + + // Update statistics for cache state response + let mut stats = self.stats.write().await; + stats.cache_announces_sent += 1; + + Some(response) + } + + ProximityCacheMessage::CacheStateResponse { contracts } => { + // Update our knowledge of this neighbor's full cache + self.neighbor_caches.insert( + peer_id.clone(), + NeighborCache { + contracts: contracts.into_iter().collect(), + last_update: Instant::now(), + }, + ); + + info!( + peer = %peer_id, + contracts = self.neighbor_caches.get(&peer_id).map(|c| c.contracts.len()).unwrap_or(0), + "PROXIMITY_PROPAGATION: Received full cache state from neighbor" + ); + None + } + } + } + + /// Generate a cache state request for a new peer connection + /// This should be called when a new peer connection is established + #[allow(dead_code)] // Reserved for future peer synchronization + pub fn request_cache_state_from_peer(&self) -> ProximityCacheMessage { + debug!("PROXIMITY_PROPAGATION: Generating cache state request for new peer"); + ProximityCacheMessage::CacheStateRequest + } + + /// Check if any neighbors might have this contract cached (for update forwarding) + pub fn neighbors_with_contract(&self, contract_key: &ContractKey) -> Vec { + let contract_id = contract_key.id(); + + let total_neighbors = self.neighbor_caches.len(); + let mut neighbors = Vec::new(); + for entry in self.neighbor_caches.iter() { + if entry.value().contracts.contains(contract_id) { + neighbors.push(entry.key().clone()); + } + } + + info!( + contract = %contract_key, + neighbor_count = neighbors.len(), + total_neighbors = total_neighbors, + neighbors = ?neighbors.iter().map(|p| format!("{:.8}", p)).collect::>(), + "PROXIMITY_PROPAGATION: Query for neighbors with contract" + ); + + neighbors + } + + /// Generate a batch announcement for pending removals (called periodically) + #[allow(dead_code)] + pub async fn generate_batch_announcement(&self) -> Option { + let mut last_announce = self.last_batch_announce.write().await; + + // Only send batch announcements every 30 seconds + if last_announce.elapsed() < BATCH_ANNOUNCEMENT_INTERVAL { + return None; + } + + *last_announce = Instant::now(); + + // Get pending removals and clear the list + let mut pending = self.pending_removals.write().await; + if pending.is_empty() { + return None; + } + + let removals: Vec = pending.iter().cloned().collect(); + pending.clear(); + drop(pending); // Release lock early + drop(last_announce); // Release lock early + + info!( + removal_count = removals.len(), + "PROXIMITY_PROPAGATION: Generated batch announcement for removals" + ); + + // Update statistics + let mut stats = self.stats.write().await; + stats.cache_announces_sent += 1; + + Some(ProximityCacheMessage::CacheAnnounce { + added: vec![], + removed: removals, + }) + } + + /// Get current statistics + #[allow(dead_code)] + pub async fn get_stats(&self) -> ProximityStats { + self.stats.read().await.clone() + } + + /// Get introspection data for debugging + /// Returns (my_cache, neighbor_data) where neighbor_data maps peer IDs to + /// (cached contracts, time since last update) + #[allow(dead_code)] + pub async fn get_introspection_data( + &self, + ) -> ( + Vec, + HashMap, Duration)>, + ) { + let my_cache = self.my_cache.iter().map(|r| *r.key()).collect(); + + let mut neighbor_data = HashMap::new(); + for entry in self.neighbor_caches.iter() { + // Use elapsed time since last update (monotonic, not affected by system clock changes) + let time_since_update = entry.value().last_update.elapsed(); + neighbor_data.insert( + entry.key().to_string(), // Convert PeerId to String for introspection + ( + entry.value().contracts.iter().cloned().collect(), + time_since_update, + ), + ); + } + + (my_cache, neighbor_data) + } + + /// Record that an update was forwarded via proximity + #[allow(dead_code)] + pub async fn record_proximity_forward(&self) { + let mut stats = self.stats.write().await; + stats.updates_via_proximity += 1; + } + + /// Record that an update was forwarded via subscription + #[allow(dead_code)] + pub async fn record_subscription_forward(&self) { + let mut stats = self.stats.write().await; + stats.updates_via_subscription += 1; + } + + /// Record a false positive (forwarded to a peer that didn't actually have the contract) + #[allow(dead_code)] + pub async fn record_false_positive(&self) { + let mut stats = self.stats.write().await; + stats.false_positive_forwards += 1; + } + + /// Get list of all known neighbor peer IDs for sending batch announcements + #[allow(dead_code)] + pub fn get_neighbor_ids(&self) -> Vec { + self.neighbor_caches + .iter() + .map(|entry| entry.key().clone()) + .collect() + } + + /// Create a periodic task for batch announcements that sends through the event loop + /// This should be spawned as a background task when the node starts + #[allow(dead_code)] + pub fn spawn_periodic_batch_announcements( + self: Arc, + event_loop_notifier: crate::node::EventLoopNotificationsSender, + op_manager: std::sync::Weak, + ) { + use crate::config::GlobalExecutor; + + GlobalExecutor::spawn(async move { + let mut interval = tokio::time::interval(BATCH_ANNOUNCEMENT_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + info!("PROXIMITY_PROPAGATION: Periodic batch announcement task started"); + + loop { + interval.tick().await; + + // Check if the op_manager is still alive + let op_manager = match op_manager.upgrade() { + Some(manager) => manager, + None => { + info!("PROXIMITY_PROPAGATION: OpManager dropped, stopping batch announcement task"); + break; + } + }; + + // Generate batch announcement if there are pending removals + if let Some(announcement) = self.generate_batch_announcement().await { + let neighbor_ids = self.get_neighbor_ids(); + + if neighbor_ids.is_empty() { + debug!("PROXIMITY_PROPAGATION: No neighbors to send batch announcement to"); + continue; + } + + // Get our own peer ID + let own_peer_id = match op_manager.ring.connection_manager.get_peer_key() { + Some(peer_id) => peer_id, + None => { + debug!("PROXIMITY_PROPAGATION: No peer key available, skipping batch announcement"); + continue; + } + }; + + info!( + neighbor_count = neighbor_ids.len(), + removal_count = match &announcement { + ProximityCacheMessage::CacheAnnounce { removed, .. } => removed.len(), + _ => 0, + }, + "PROXIMITY_PROPAGATION: Sending periodic batch announcement to neighbors" + ); + + // Send broadcast request to event loop + // The event loop will iterate through connected peers and send to each one + // This avoids the issue where ProximityCache messages don't have a target field + if let Err(err) = event_loop_notifier + .notifications_sender() + .send(either::Either::Right( + crate::message::NodeEvent::BroadcastProximityCache { + from: own_peer_id, + message: announcement, + }, + )) + .await + { + debug!( + error = ?err, + "PROXIMITY_PROPAGATION: Failed to send broadcast request to event loop" + ); + } + } + } + + info!("PROXIMITY_PROPAGATION: Periodic batch announcement task stopped"); + }); + } + + /// Handle peer disconnection by removing them from the neighbor cache + /// This prevents stale data from accumulating and avoids forwarding updates to disconnected peers + #[allow(dead_code)] + pub fn on_peer_disconnected(&self, peer_id: &PeerId) { + if let Some((_, removed_cache)) = self.neighbor_caches.remove(peer_id) { + debug!( + peer = %peer_id, + cached_contracts = removed_cache.contracts.len(), + "PROXIMITY_CACHE: Removed disconnected peer from neighbor cache" + ); + } + } + + /// Cleanup stale neighbor entries based on last_update timestamp + /// This provides an alternative to explicit disconnect notifications + #[allow(dead_code)] + pub async fn cleanup_stale_neighbors(&self, max_age: Duration) { + let now = Instant::now(); + let mut removed_count = 0; + + // Collect stale peer IDs to avoid holding references while removing + let stale_peers: Vec = self + .neighbor_caches + .iter() + .filter_map(|entry| { + let peer_id = entry.key().clone(); + let cache = entry.value(); + if now.duration_since(cache.last_update)> max_age { + Some(peer_id) + } else { + None + } + }) + .collect(); + + // Remove stale entries + for peer_id in stale_peers { + if let Some((_, removed_cache)) = self.neighbor_caches.remove(&peer_id) { + removed_count += 1; + debug!( + peer = %peer_id, + cached_contracts = removed_cache.contracts.len(), + age = ?now.duration_since(removed_cache.last_update), + "PROXIMITY_CACHE: Removed stale neighbor cache entry" + ); + } + } + + if removed_count> 0 { + info!( + removed_peers = removed_count, + max_age = ?max_age, + "PROXIMITY_CACHE: Cleaned up stale neighbor cache entries" + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use freenet_stdlib::prelude::ContractInstanceId; + use std::time::Duration; + + fn create_test_contract_key() -> ContractKey { + let contract_id = ContractInstanceId::new([1u8; 32]); + ContractKey::from(contract_id) + } + + #[tokio::test] + async fn test_contract_caching_and_eviction() { + let cache = ProximityCacheManager::new(); + let contract_key = create_test_contract_key(); + + // Test caching a contract generates immediate announcement + let announcement = cache.on_contract_cached(&contract_key).await; + assert!(announcement.is_some()); + + if let Some(ProximityCacheMessage::CacheAnnounce { added, removed }) = announcement { + assert_eq!(added.len(), 1); + assert!(removed.is_empty()); + } else { + panic!("Expected CacheAnnounce message"); + } + + // Test evicting a contract adds to pending removals but doesn't generate immediate announcement + cache.on_contract_evicted(&contract_key).await; + + // Check that the contract is in pending removals + let pending = cache.pending_removals.read().await; + assert_eq!(pending.len(), 1); + } + + #[tokio::test] + async fn test_batch_announcement_generation() { + let cache = ProximityCacheManager::new(); + let contract_key = create_test_contract_key(); + + // Add a contract to pending removals manually + let contract_id = *contract_key.id(); + { + let mut pending = cache.pending_removals.write().await; + pending.insert(contract_id); + } + + // Force time to pass for batch announcement + { + let mut last_announce = cache.last_batch_announce.write().await; + *last_announce = Instant::now() - Duration::from_secs(31); + } + + // Generate batch announcement + let announcement = cache.generate_batch_announcement().await; + assert!(announcement.is_some()); + + if let Some(ProximityCacheMessage::CacheAnnounce { added, removed }) = announcement { + assert!(added.is_empty()); + assert_eq!(removed.len(), 1); + assert_eq!(removed[0], contract_id); + } else { + panic!("Expected CacheAnnounce message"); + } + + // Check that pending removals are cleared + let pending = cache.pending_removals.read().await; + assert!(pending.is_empty()); + } + + #[tokio::test] + async fn test_no_batch_announcement_when_no_pending_removals() { + let cache = ProximityCacheManager::new(); + + // Force time to pass for batch announcement + { + let mut last_announce = cache.last_batch_announce.write().await; + *last_announce = Instant::now() - Duration::from_secs(31); + } + + // Generate batch announcement - should be None since no pending removals + let announcement = cache.generate_batch_announcement().await; + assert!(announcement.is_none()); + } + + #[tokio::test] + async fn test_batch_announcement_rate_limiting() { + let cache = ProximityCacheManager::new(); + let contract_key = create_test_contract_key(); + + // Add a contract to pending removals + let contract_id = *contract_key.id(); + { + let mut pending = cache.pending_removals.write().await; + pending.insert(contract_id); + } + + // Try to generate batch announcement too soon - should be rate limited + let announcement = cache.generate_batch_announcement().await; + assert!(announcement.is_none()); + + // Check that pending removals are still there + let pending = cache.pending_removals.read().await; + assert_eq!(pending.len(), 1); + } +} diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index cb3b30ce2..0f25d00f9 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -844,6 +844,7 @@ where msg = conn_manager.recv() => { msg.map(Either::Left) } msg = notification_channel.notifications_receiver.recv() => { if let Some(msg) = msg { + tracing::info!(?msg, "PROXIMITY_ANNOUNCEMENT: Received from notifications channel"); Ok(msg) } else { anyhow::bail!("notification channel shutdown, fatal error"); @@ -935,6 +936,45 @@ where NodeEvent::QueryNodeDiagnostics { .. } => { unimplemented!() } + NodeEvent::BroadcastProximityCache { from, message } => { + tracing::info!( + %from, + ?message, + "PROXIMITY_ANNOUNCEMENT: BroadcastProximityCache event received" + ); + + // Broadcast ProximityCache message to all connected peers (except sender) + use crate::message::{NetMessage, NetMessageV1}; + let connected_peers: Vec<_> = op_manager + .ring + .connection_manager + .connected_peers() + .filter(|peer| peer != &from) + .collect(); + + tracing::info!( + %from, + ?message, + peer_count = connected_peers.len(), + peers = ?connected_peers.iter().map(|p| format!("{:.8}", p)).collect::>(), + "PROXIMITY_ANNOUNCEMENT: Broadcasting ProximityCache to connected peers" + ); + + let msg = NetMessage::V1(NetMessageV1::ProximityCache { + from: from.clone(), + message: message.clone(), + }); + + for peer in &connected_peers { + tracing::info!(%peer, "PROXIMITY_ANNOUNCEMENT: Sending ProximityCache to peer"); + if let Err(e) = conn_manager.send(peer, msg.clone()).await { + tracing::warn!(%peer, "Failed to send ProximityCache: {}", e); + } else { + tracing::info!(%peer, "PROXIMITY_ANNOUNCEMENT: Successfully sent ProximityCache to peer"); + } + } + continue; + } NodeEvent::SendMessage { target, msg } => { tracing::debug!(tx = %msg.id(), %target, "SendMessage event in testing_impl"); conn_manager.send(&target, *msg).await?; diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 963bc641b..bf48cb585 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -211,6 +211,64 @@ pub(crate) async fn request_get( Ok(()) } +async fn announce_proximity_cache( + op_manager: &OpManager, + key: &ContractKey, + own_peer: &PeerId, + tx: &Transaction, + context: &'static str, +) { + match op_manager.proximity_cache.on_contract_cached(key).await { + Some(announcement) => { + tracing::info!( + tx = %tx, + %key, + peer = %own_peer, + ?announcement, + %context, + "PROXIMITY_ANNOUNCEMENT: GET sending BroadcastProximityCache event" + ); + let event = crate::message::NodeEvent::BroadcastProximityCache { + from: own_peer.clone(), + message: announcement, + }; + match op_manager + .to_event_listener + .notifications_sender() + .send(either::Either::Right(event)) + .await + { + Ok(_) => { + tracing::info!( + tx = %tx, + %key, + %context, + "PROXIMITY_ANNOUNCEMENT: GET send() succeeded" + ); + } + Err(e) => { + tracing::error!( + tx = %tx, + %key, + %context, + error = %e, + "PROXIMITY_ANNOUNCEMENT: GET send() failed!" + ); + } + } + } + None => { + tracing::info!( + tx = %tx, + %key, + peer = %own_peer, + %context, + "PROXIMITY_ANNOUNCEMENT: GET on_contract_cached returned None (already in cache)" + ); + } + } +} + #[derive(Debug)] enum GetState { /// A new petition for a get op received from another peer. @@ -966,7 +1024,16 @@ impl Operation for GetOp { op_manager.ring.should_seed(&key) }; - // Put contract locally if needed + tracing::info!( + tx = %id, + %key, + is_original_requester, + subscribe_requested, + should_put, + peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(), + "PROXIMITY_ANNOUNCEMENT: GET evaluating whether to cache contract" + ); + if should_put { // First check if the local state matches the incoming state // to avoid triggering validation errors in contracts that implement @@ -985,27 +1052,59 @@ impl Operation for GetOp { state: Some(local), .. }), .. - }) => { - // Compare the actual state bytes - local.as_ref() == value.as_ref() - } + }) => local.as_ref() == value.as_ref(), _ => false, // No local state or error - we should try to cache }; + let own_peer = op_manager.ring.connection_manager.get_peer_key().unwrap(); + if state_matches { tracing::debug!( tx = %id, %key, "Local state matches network state, skipping redundant cache" ); - // State already cached and identical, mark as seeded if needed - if !op_manager.ring.is_seeding_contract(&key) { - tracing::debug!(tx = %id, %key, "Marking contract as seeded"); + let is_subscribed_contract = op_manager.ring.is_seeding_contract(&key); + tracing::info!( + tx = %id, + %key, + peer = %own_peer, + is_subscribed_contract, + "PROXIMITY_ANNOUNCEMENT: GET checking if should announce (state match)" + ); + if !is_subscribed_contract { + tracing::debug!( + tx = %id, + %key, + peer = %own_peer, + "Contract not cached @ peer, caching (state match)" + ); op_manager.ring.seed_contract(key); + announce_proximity_cache( + op_manager, + &key, + &own_peer, + &id, + "state match", + ) + .await; super::start_subscription_request(op_manager, key).await; + } else { + tracing::info!( + tx = %id, + %key, + peer = %own_peer, + "PROXIMITY_ANNOUNCEMENT: GET skipping announcement - contract already subscribed (state match)" + ); } } else { - tracing::debug!(tx = %id, %key, %is_original_requester, %subscribe_requested, "Putting contract at executor - state differs from local cache"); + tracing::debug!( + tx = %id, + %key, + %is_original_requester, + %subscribe_requested, + "Putting contract at executor - state differs from local cache" + ); let res = op_manager .notify_contract_handler(ContractHandlerEvent::PutQuery { key, @@ -1020,12 +1119,34 @@ impl Operation for GetOp { tracing::debug!(tx = %id, %key, "Contract put at executor"); let is_subscribed_contract = op_manager.ring.is_seeding_contract(&key); + tracing::info!( + tx = %id, + %key, + peer = %own_peer, + is_subscribed_contract, + "PROXIMITY_ANNOUNCEMENT: GET checking if should announce" + ); - // Start subscription if not already seeding if !is_subscribed_contract { - tracing::debug!(tx = %id, %key, peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(), "Contract not cached @ peer, caching"); + tracing::debug!( + tx = %id, + %key, + peer = %own_peer, + "Contract not cached @ peer, caching" + ); op_manager.ring.seed_contract(key); + announce_proximity_cache( + op_manager, &key, &own_peer, &id, "put", + ) + .await; super::start_subscription_request(op_manager, key).await; + } else { + tracing::info!( + tx = %id, + %key, + peer = %own_peer, + "PROXIMITY_ANNOUNCEMENT: GET skipping announcement - contract already subscribed" + ); } } ContractHandlerEvent::PutResponse { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 09c36c629..9046914b3 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -230,6 +230,14 @@ impl Operation for PutOp { .await?; // Mark as seeded locally if not already + tracing::info!( + tx = %id, + %key, + peer = %sender.peer, + is_already_seeding = is_already_seeding, + "PROXIMITY_ANNOUNCEMENT: PUT checking if should announce" + ); + if !is_already_seeding { op_manager.ring.seed_contract(key); tracing::debug!( @@ -238,6 +246,69 @@ impl Operation for PutOp { peer = %sender.peer, "Marked contract as seeding locally" ); + + // Announce to proximity cache that we've cached this contract and broadcast to neighbors + match op_manager.proximity_cache.on_contract_cached(&key).await { + Some(announcement) => { + tracing::info!( + tx = %id, + %key, + peer = %sender.peer, + ?announcement, + "PROXIMITY_ANNOUNCEMENT: PUT sending BroadcastProximityCache event" + ); + let from = + op_manager.ring.connection_manager.get_peer_key().unwrap(); + tracing::info!( + tx = %id, + %key, + %from, + "PROXIMITY_ANNOUNCEMENT: PUT about to call send() on notifications channel" + ); + match op_manager + .to_event_listener + .notifications_sender() + .send(either::Either::Right( + crate::message::NodeEvent::BroadcastProximityCache { + from, + message: announcement, + }, + )) + .await + { + Ok(_) => { + tracing::info!( + tx = %id, + %key, + "PROXIMITY_ANNOUNCEMENT: PUT send() succeeded" + ); + } + Err(e) => { + tracing::error!( + tx = %id, + %key, + error = %e, + "PROXIMITY_ANNOUNCEMENT: PUT send() failed!" + ); + } + } + } + None => { + tracing::info!( + tx = %id, + %key, + peer = %sender.peer, + "PROXIMITY_ANNOUNCEMENT: PUT on_contract_cached returned None (already in cache)" + ); + } + } + } else { + tracing::info!( + tx = %id, + %key, + peer = %sender.peer, + "PROXIMITY_ANNOUNCEMENT: PUT skipping announcement - contract already seeded" + ); } tracing::debug!( diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 4b21ccc72..94139d162 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -2,6 +2,7 @@ use either::Either; use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; +use std::collections::HashSet; pub(crate) use self::messages::UpdateMsg; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; @@ -645,30 +646,48 @@ impl OpManager { key: &ContractKey, sender: &PeerId, ) -> Vec { - let subscribers = self + // Collect subscribers into a set for deduplication + let mut unique_peers: HashSet = self .ring .subscribers_of(key) .map(|subs| { subs.value() .iter() .filter(|pk| &pk.peer != sender) - .cloned() - .collect::>() + .map(|pk| pk.peer.clone()) + .collect() }) .unwrap_or_default(); + // Merge in proximity-based neighbors that are caching the contract + unique_peers.extend( + self.proximity_cache + .neighbors_with_contract(key) + .into_iter() + .filter(|peer| peer != sender), + ); + + // Convert the unique peer list into PeerKeyLocation entries + let targets: Vec = unique_peers + .into_iter() + .map(|peer| PeerKeyLocation { + peer, + location: None, + }) + .collect(); + // Trace update propagation for debugging - if !subscribers.is_empty() { + if !targets.is_empty() { tracing::info!( "UPDATE_PROPAGATION: contract={:.8} from={} targets={} count={}", key, sender, - subscribers + targets .iter() .map(|s| format!("{:.8}", s.peer)) .collect::>() .join(","), - subscribers.len() + targets.len() ); } else { tracing::warn!( @@ -678,7 +697,7 @@ impl OpManager { ); } - subscribers + targets } } @@ -997,7 +1016,7 @@ pub(crate) async fn request_update( false, op_manager, broadcast_state, - (broadcast_to, sender.clone()), + (broadcast_to.into_iter().collect(), sender.clone()), key, updated_value, false, diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 8db58fcbb..d41a84d60 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -394,7 +394,7 @@ impl ConnectionManager { total } - pub(super) fn connected_peers(&self) -> impl Iterator { + pub(crate) fn connected_peers(&self) -> impl Iterator { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index c2287426d..818ced211 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -2585,3 +2585,300 @@ async fn test_update_no_change_notification(ctx: &mut TestContext) -> TestResult Ok(()) } + +/// Test proximity-based update forwarding: +/// Verifies that updates are forwarded to neighbors who have cached the contract. +/// +/// Test scenario: +/// 1. Set up 3 nodes: Gateway + 2 peers (peer1, peer2) +/// 2. Peer1 PUTs a contract (caches it, announces to neighbors) +/// 3. Peer2 GETs the same contract (caches it, announces to neighbors) +/// 4. Peer1 sends an UPDATE +/// 5. Verify peer2's cached state is updated (proving proximity forwarding worked) +#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))] +async fn test_proximity_based_update_forwarding() -> TestResult { + // Load test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + + // Create initial state with empty todo list + let initial_state = test_utils::create_empty_todo_list(); + let initial_wrapped_state = WrappedState::from(initial_state); + + // Create network sockets for 3 nodes + let network_socket_gw = TcpListener::bind("127.0.0.1:0")?; + let ws_api_socket_gw = TcpListener::bind("127.0.0.1:0")?; + let ws_api_socket_peer1 = TcpListener::bind("127.0.0.1:0")?; + let ws_api_socket_peer2 = TcpListener::bind("127.0.0.1:0")?; + + // Configure gateway node + let (config_gw, preset_cfg_gw, config_gw_info) = { + let (cfg, preset) = base_node_test_config( + true, + vec![], + Some(network_socket_gw.local_addr()?.port()), + ws_api_socket_gw.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + + // Configure peer1 (will PUT and UPDATE) + let (config_peer1, preset_cfg_peer1) = base_node_test_config( + false, + vec![serde_json::to_string(&config_gw_info)?], + None, + ws_api_socket_peer1.local_addr()?.port(), + ) + .await?; + let ws_api_port_peer1 = config_peer1.ws_api.ws_api_port.unwrap(); + + // Configure peer2 (will GET and receive UPDATE via proximity cache) + let (config_peer2, preset_cfg_peer2) = base_node_test_config( + false, + vec![serde_json::to_string(&config_gw_info)?], + None, + ws_api_socket_peer2.local_addr()?.port(), + ) + .await?; + let ws_api_port_peer2 = config_peer2.ws_api.ws_api_port.unwrap(); + + // Log data directories for debugging + tracing::info!("Gateway data dir: {:?}", preset_cfg_gw.temp_dir.path()); + tracing::info!("Peer1 data dir: {:?}", preset_cfg_peer1.temp_dir.path()); + tracing::info!("Peer2 data dir: {:?}", preset_cfg_peer2.temp_dir.path()); + + // Free sockets before starting nodes + std::mem::drop(network_socket_gw); + std::mem::drop(ws_api_socket_gw); + std::mem::drop(ws_api_socket_peer1); + std::mem::drop(ws_api_socket_peer2); + + // Start gateway node + let node_gw = async { + let config = config_gw.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Gateway node running"); + node.run().await + } + .boxed_local(); + + // Start peer1 node + let node_peer1 = async { + let config = config_peer1.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Peer1 node running"); + node.run().await + } + .boxed_local(); + + // Start peer2 node + let node_peer2 = async { + let config = config_peer2.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Peer2 node running"); + node.run().await + } + .boxed_local(); + + let test = tokio::time::timeout(Duration::from_secs(240), async { + // Wait for nodes to start up and establish connections + tracing::info!("Waiting for nodes to start up..."); + tokio::time::sleep(Duration::from_secs(25)).await; + + // Connect to peer1 websocket API + let uri_peer1 = format!( + "ws://127.0.0.1:{ws_api_port_peer1}/v1/contract/command?encodingProtocol=native" + ); + let (stream_peer1, _) = connect_async(&uri_peer1).await?; + let mut client_api_peer1 = WebApi::start(stream_peer1); + + // Connect to peer2 websocket API + let uri_peer2 = format!( + "ws://127.0.0.1:{ws_api_port_peer2}/v1/contract/command?encodingProtocol=native" + ); + let (stream_peer2, _) = connect_async(&uri_peer2).await?; + let mut client_api_peer2 = WebApi::start(stream_peer2); + + // Step 1: Peer1 PUTs the contract with initial state + tracing::info!("Peer1: Putting contract with initial state"); + make_put( + &mut client_api_peer1, + initial_wrapped_state.clone(), + contract.clone(), + false, + ) + .await?; + + // Wait for PUT response from peer1 + let resp = tokio::time::timeout(Duration::from_secs(60), client_api_peer1.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + tracing::info!("Peer1: PUT successful for contract: {}", key); + assert_eq!(key, contract_key, "Contract key mismatch in PUT response"); + } + Ok(Ok(other)) => { + bail!( + "Peer1: Unexpected response while waiting for PUT: {:?}", + other + ); + } + Ok(Err(e)) => { + bail!("Peer1: Error receiving PUT response: {}", e); + } + Err(_) => { + bail!("Peer1: Timeout waiting for PUT response"); + } + } + + // Allow time for cache announcement to propagate + tracing::info!("Waiting for cache announcement to propagate..."); + tokio::time::sleep(Duration::from_secs(2)).await; + + // Step 2: Peer2 GETs the contract (this will cache it at peer2) + tracing::info!("Peer2: Getting contract (will be cached)"); + let (response_contract, response_state) = + get_contract(&mut client_api_peer2, contract_key, &preset_cfg_gw.temp_dir).await?; + + assert_eq!( + response_contract.key(), + contract_key, + "Peer2: Contract key mismatch in GET response" + ); + assert_eq!( + response_contract, contract, + "Peer2: Contract content mismatch in GET response" + ); + + // Verify peer2 got the initial state + let peer2_initial_state: test_utils::TodoList = + serde_json::from_slice(response_state.as_ref()) + .expect("Peer2: Failed to deserialize initial state"); + tracing::info!("Peer2: Successfully cached contract with initial state"); + + // Allow time for peer2's cache announcement to propagate + tokio::time::sleep(Duration::from_secs(2)).await; + + // Step 3: Peer1 updates the contract + tracing::info!("Peer1: Creating updated state with a new task"); + let mut updated_todo_list = peer2_initial_state.clone(); + updated_todo_list.tasks.push(test_utils::Task { + id: 1, + title: "Test proximity forwarding".to_string(), + description: "Verify updates propagate via proximity cache".to_string(), + completed: false, + priority: 5, + }); + + let updated_state_bytes = serde_json::to_vec(&updated_todo_list)?; + let updated_state = WrappedState::from(updated_state_bytes); + let expected_version_after_update = updated_todo_list.version + 1; + + tracing::info!("Peer1: Sending UPDATE"); + make_update(&mut client_api_peer1, contract_key, updated_state.clone()).await?; + + // Wait for UPDATE response from peer1 + let resp = tokio::time::timeout(Duration::from_secs(30), client_api_peer1.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse { + key, + summary: _, + }))) => { + tracing::info!("Peer1: UPDATE successful for contract: {}", key); + assert_eq!( + key, contract_key, + "Peer1: Contract key mismatch in UPDATE response" + ); + } + Ok(Ok(other)) => { + bail!( + "Peer1: Unexpected response while waiting for UPDATE: {:?}", + other + ); + } + Ok(Err(e)) => { + bail!("Peer1: Error receiving UPDATE response: {}", e); + } + Err(_) => { + bail!("Peer1: Timeout waiting for UPDATE response"); + } + } + + // Allow time for update to propagate via proximity cache + tracing::info!("Waiting for update to propagate via proximity cache..."); + tokio::time::sleep(Duration::from_secs(3)).await; + + // Step 4: Verify peer2 received the update by GETting the contract again + tracing::info!("Peer2: Getting contract again to verify update was received"); + let (final_contract, final_state) = + get_contract(&mut client_api_peer2, contract_key, &preset_cfg_gw.temp_dir).await?; + + assert_eq!( + final_contract.key(), + contract_key, + "Peer2: Contract key mismatch in final GET" + ); + + // Verify the state was updated + let peer2_final_state: test_utils::TodoList = serde_json::from_slice(final_state.as_ref()) + .expect("Peer2: Failed to deserialize final state"); + + assert_eq!( + peer2_final_state.version, expected_version_after_update, + "Peer2: Version should be updated. Proximity forwarding may have failed!" + ); + + assert_eq!( + peer2_final_state.tasks.len(), + 1, + "Peer2: Should have received the new task via proximity forwarding" + ); + + assert_eq!( + peer2_final_state.tasks[0].title, "Test proximity forwarding", + "Peer2: Task title should match the update" + ); + + tracing::info!( + "SUCCESS: Peer2 received update via proximity cache! Version: {}", + peer2_final_state.version + ); + + Ok::<(), anyhow::Error>(()) + }); + + // Wait for test completion or node failures + select! { + gw = node_gw => { + let Err(gw) = gw; + return Err(anyhow!("Gateway failed: {}", gw).into()); + } + p1 = node_peer1 => { + let Err(p1) = p1; + return Err(anyhow!("Peer1 failed: {}", p1).into()); + } + p2 = node_peer2 => { + let Err(p2) = p2; + return Err(anyhow!("Peer2 failed: {}", p2).into()); + } + r = test => { + r??; + // Keep nodes alive for pending operations to complete + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + Ok(()) +}

AltStyle によって変換されたページ (->オリジナル) /