Skip to main content

Grove/Transport/
Strategy.rs

1//! Transport Strategy Module
2//!
3//! Defines the transport strategy trait and types for different
4//! communication methods (gRPC, IPC, WASM).
5
6use std::fmt;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12use crate::Transport::{
13	gRPCTransport::gRPCTransport,
14	IPCTransport::IPCTransport,
15	WASMTransport::WASMTransportImpl,
16};
17
18/// Transport strategy trait
19///
20/// All transport implementations must implement this trait to provide
21/// a common interface for connecting, sending, and closing connections.
22#[async_trait]
23pub trait TransportStrategy: Send + Sync {
24	/// Error type for this transport
25	type Error: std::error::Error + Send + Sync + 'static;
26
27	/// Connect to the transport endpoint
28	async fn connect(&self) -> Result<(), Self::Error>;
29
30	/// Send a request and receive a response
31	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error>;
32
33	/// Send data without expecting a response (fire and forget)
34	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error>;
35
36	/// Close the transport connection
37	async fn close(&self) -> Result<(), Self::Error>;
38
39	/// Check if the transport is connected
40	fn is_connected(&self) -> bool;
41
42	/// Get the transport type identifier
43	fn transport_type(&self) -> TransportType;
44}
45
46/// Transport type enumeration
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48pub enum TransportType {
49	/// gRPC transport
50	gRPC,
51	/// Inter-process communication
52	IPC,
53	/// Direct WASM module communication
54	WASM,
55	/// Unknown/unspecified transport
56	Unknown,
57}
58
59impl fmt::Display for TransportType {
60	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result {
61		match self {
62			Self::gRPC => write!(f, "grpc"),
63			Self::IPC => write!(f, "ipc"),
64			Self::WASM => write!(f, "wasm"),
65			Self::Unknown => write!(f, "unknown"),
66		}
67	}
68}
69
70impl std::str::FromStr for TransportType {
71	type Err = anyhow::Error;
72
73	fn from_str(s:&str) -> Result<Self, Self::Err> {
74		match s.to_lowercase().as_str() {
75			"grpc" => Ok(Self::gRPC),
76			"ipc" => Ok(Self::IPC),
77			"wasm" => Ok(Self::WASM),
78			_ => Err(anyhow::anyhow!("Unknown transport type: {}", s)),
79		}
80	}
81}
82
83/// Transport enumeration.
84///
85/// Union type wrapping all supported transport implementations.
86#[derive(Debug)]
87pub enum Transport {
88	/// gRPC-based transport (Mountain/Air communication).
89	gRPC(gRPCTransport),
90	/// IPC transport (same-machine process communication).
91	IPC(IPCTransport),
92	/// Direct WASM module transport (browser).
93	WASM(WASMTransportImpl),
94}
95
96impl Transport {
97	/// Get the transport type
98	pub fn transport_type(&self) -> TransportType {
99		match self {
100			Self::gRPC(_) => TransportType::gRPC,
101			Self::IPC(_) => TransportType::IPC,
102			Self::WASM(_) => TransportType::WASM,
103		}
104	}
105
106	/// Connect to the transport
107	pub async fn connect(&self) -> anyhow::Result<()> {
108		match self {
109			Self::gRPC(transport) => {
110				transport
111					.connect()
112					.await
113					.map_err(|e| anyhow::anyhow!("gRPC connect error: {}", e))
114			},
115			Self::IPC(transport) => {
116				transport
117					.connect()
118					.await
119					.map_err(|e| anyhow::anyhow!("IPC connect error: {}", e))
120			},
121			Self::WASM(transport) => {
122				transport
123					.connect()
124					.await
125					.map_err(|e| anyhow::anyhow!("WASM connect error: {}", e))
126			},
127		}
128	}
129
130	/// Send a request and receive a response
131	pub async fn send(&self, request:&[u8]) -> anyhow::Result<Vec<u8>> {
132		match self {
133			Self::gRPC(transport) => {
134				transport
135					.send(request)
136					.await
137					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
138			},
139			Self::IPC(transport) => {
140				transport
141					.send(request)
142					.await
143					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
144			},
145			Self::WASM(transport) => {
146				transport
147					.send(request)
148					.await
149					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
150			},
151		}
152	}
153
154	/// Send data without expecting a response
155	pub async fn send_no_response(&self, data:&[u8]) -> anyhow::Result<()> {
156		match self {
157			Self::gRPC(transport) => {
158				transport
159					.send_no_response(data)
160					.await
161					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
162			},
163			Self::IPC(transport) => {
164				transport
165					.send_no_response(data)
166					.await
167					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
168			},
169			Self::WASM(transport) => {
170				transport
171					.send_no_response(data)
172					.await
173					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
174			},
175		}
176	}
177
178	/// Close the transport
179	pub async fn close(&self) -> anyhow::Result<()> {
180		match self {
181			Self::gRPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("gRPC close error: {}", e)),
182			Self::IPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("IPC close error: {}", e)),
183			Self::WASM(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("WASM close error: {}", e)),
184		}
185	}
186
187	/// Check if the transport is connected
188	pub fn is_connected(&self) -> bool {
189		match self {
190			Self::gRPC(transport) => transport.is_connected(),
191			Self::IPC(transport) => transport.is_connected(),
192			Self::WASM(transport) => transport.is_connected(),
193		}
194	}
195
196	/// Get gRPC transport reference (if applicable)
197	pub fn AsgRPC(&self) -> Option<&gRPCTransport> {
198		match self {
199			Self::gRPC(Transport) => Some(Transport),
200			_ => None,
201		}
202	}
203
204	/// Returns the IPC transport reference if this is an IPC transport.
205	pub fn AsIPC(&self) -> Option<&IPCTransport> {
206		match self {
207			Self::IPC(Transport) => Some(Transport),
208			_ => None,
209		}
210	}
211
212	/// Get WASM transport reference (if applicable)
213	pub fn as_wasm(&self) -> Option<&WASMTransportImpl> {
214		match self {
215			Self::WASM(transport) => Some(transport),
216			_ => None,
217		}
218	}
219}
220
221impl Default for Transport {
222	fn default() -> Self {
223		Self::gRPC(
224			gRPCTransport::New("127.0.0.1:50050").unwrap_or_else(|_| {
225				gRPCTransport::New("0.0.0.0:50050")
226					.expect("Failed to create default gRPC transport")
227			}),
228		)
229	}
230}
231
232impl fmt::Display for Transport {
233	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Transport({})", self.transport_type()) }
234}
235
236/// Transport message wrapper
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct TransportMessage {
239	/// Message type identifier
240	pub message_type:String,
241	/// Message ID for correlation
242	pub message_id:String,
243	/// Timestamp (Unix epoch)
244	pub timestamp:u64,
245	/// Message payload
246	pub payload:Bytes,
247	/// Optional metadata
248	pub metadata:Option<serde_json::Value>,
249}
250
251impl TransportMessage {
252	/// Create a new transport message
253	pub fn new(message_type:impl Into<String>, payload:Bytes) -> Self {
254		Self {
255			message_type:message_type.into(),
256			message_id:uuid::Uuid::new_v4().to_string(),
257			timestamp:std::time::SystemTime::now()
258				.duration_since(std::time::UNIX_EPOCH)
259				.map(|d| d.as_secs())
260				.unwrap_or(0),
261			payload,
262			metadata:None,
263		}
264	}
265
266	/// Set metadata for the message
267	pub fn with_metadata(mut self, metadata:serde_json::Value) -> Self {
268		self.metadata = Some(metadata);
269		self
270	}
271
272	/// Serialize the message to bytes
273	pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
274		serde_json::to_vec(self).map(Bytes::from).map_err(|e| anyhow::anyhow!("{}", e))
275	}
276
277	/// Deserialize message from bytes
278	pub fn from_bytes(bytes:&[u8]) -> anyhow::Result<Self> {
279		serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("{}", e))
280	}
281}
282
283/// Transport statistics
284#[derive(Debug, Clone, Default, Serialize, Deserialize)]
285pub struct TransportStats {
286	/// Number of messages sent
287	pub messages_sent:u64,
288	/// Number of messages received
289	pub messages_received:u64,
290	/// Number of errors encountered
291	pub errors:u64,
292	/// Total bytes sent
293	pub bytes_sent:u64,
294	/// Total bytes received
295	pub bytes_received:u64,
296	/// Average latency in microseconds
297	pub avg_latency_us:u64,
298	/// Connection uptime in seconds
299	pub uptime_seconds:u64,
300}
301
302impl TransportStats {
303	/// Update statistics with a sent message
304	pub fn record_sent(&mut self, bytes:u64, latency_us:u64) {
305		self.messages_sent += 1;
306		self.bytes_sent += bytes;
307
308		// Update average latency
309		if self.messages_sent > 0 {
310			self.avg_latency_us = (self.avg_latency_us * (self.messages_sent - 1) + latency_us) / self.messages_sent;
311		}
312	}
313
314	/// Update statistics with a received message
315	pub fn record_received(&mut self, bytes:u64) {
316		self.messages_received += 1;
317		self.bytes_received += bytes;
318	}
319
320	/// Record an error
321	pub fn record_error(&mut self) { self.errors += 1; }
322}
323
324#[cfg(test)]
325mod tests {
326	use super::*;
327
328	#[test]
329	fn test_transport_type_to_string() {
330		assert_eq!(TransportType::gRPC.to_string(), "grpc");
331		assert_eq!(TransportType::IPC.to_string(), "ipc");
332		assert_eq!(TransportType::WASM.to_string(), "wasm");
333	}
334
335	#[test]
336	fn test_transport_type_from_str() {
337		assert_eq!("grpc".parse::<TransportType>().unwrap(), TransportType::gRPC);
338		assert_eq!("ipc".parse::<TransportType>().unwrap(), TransportType::IPC);
339		assert_eq!("wasm".parse::<TransportType>().unwrap(), TransportType::WASM);
340		assert!("unknown".parse::<TransportType>().is_err());
341	}
342
343	#[test]
344	fn test_transport_display() {
345		// Create a dummy transport to test Display implementation
346		// In real tests, we'd use an actual transport
347		let transport = Transport::default();
348		let display = format!("{}", transport);
349		assert!(display.contains("Transport"));
350	}
351
352	#[test]
353	fn test_transport_message_creation() {
354		let message = TransportMessage::new("test_type", Bytes::from("hello"));
355		assert_eq!(message.message_type, "test_type");
356		assert_eq!(message.payload, Bytes::from("hello"));
357		assert!(!message.message_id.is_empty());
358	}
359
360	#[test]
361	fn test_transport_message_serialization() {
362		let message = TransportMessage::new("test", Bytes::from("data"));
363		let bytes = message.to_bytes().unwrap();
364		let deserialized = TransportMessage::from_bytes(&bytes).unwrap();
365		assert_eq!(deserialized.message_type, message.message_type);
366		assert_eq!(deserialized.payload, message.payload);
367	}
368
369	#[test]
370	fn test_transport_stats() {
371		let mut stats = TransportStats::default();
372		stats.record_sent(100, 1000);
373		stats.record_received(50);
374		stats.record_error();
375
376		assert_eq!(stats.messages_sent, 1);
377		assert_eq!(stats.messages_received, 1);
378		assert_eq!(stats.errors, 1);
379		assert_eq!(stats.bytes_sent, 100);
380		assert_eq!(stats.bytes_received, 50);
381		assert_eq!(stats.avg_latency_us, 1000);
382	}
383}