Skip to main content

Grove/Transport/
gRPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # gRPC Transport Implementation
3//!
4//! Provides gRPC-based communication for Grove.
5//! Connects to Mountain or other gRPC services.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12
13use crate::{
14	Transport::{
15		Strategy::{TransportStats, TransportStrategy, TransportType},
16		TransportConfig,
17	},
18	dev_log,
19};
20
21/// gRPC transport for communication with Mountain and other gRPC services.
22#[derive(Clone, Debug)]
23pub struct gRPCTransport {
24	/// Connection endpoint address.
25	Endpoint:String,
26	/// gRPC channel (lazily connected).
27	Channel:Arc<RwLock<Option<Channel>>>,
28	/// Transport configuration.
29	Configuration:TransportConfig,
30	/// Whether the transport is currently connected.
31	Connected:Arc<RwLock<bool>>,
32	/// Transport statistics.
33	Statistics:Arc<RwLock<TransportStats>>,
34}
35
36impl gRPCTransport {
37	/// Creates a new gRPC transport with the given address.
38	pub fn New(Address:&str) -> anyhow::Result<Self> {
39		Ok(Self {
40			Endpoint:Address.to_string(),
41			Channel:Arc::new(RwLock::new(None)),
42			Configuration:TransportConfig::default(),
43			Connected:Arc::new(RwLock::new(false)),
44			Statistics:Arc::new(RwLock::new(TransportStats::default())),
45		})
46	}
47
48	/// Creates a new gRPC transport with custom configuration.
49	pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
50		Ok(Self {
51			Endpoint:Address.to_string(),
52			Channel:Arc::new(RwLock::new(None)),
53			Configuration,
54			Connected:Arc::new(RwLock::new(false)),
55			Statistics:Arc::new(RwLock::new(TransportStats::default())),
56		})
57	}
58
59	/// Returns the connection endpoint address.
60	pub fn Address(&self) -> &str { &self.Endpoint }
61
62	/// Returns the active gRPC channel.
63	pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
64		self.Channel
65			.read()
66			.await
67			.as_ref()
68			.cloned()
69			.ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
70	}
71
72	/// Returns a snapshot of transport statistics.
73	pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
74
75	/// Builds an endpoint from the address string.
76	fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
77		let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
78			.timeout(self.Configuration.ConnectionTimeout)
79			.connect_timeout(self.Configuration.ConnectionTimeout)
80			.tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
81		Ok(EndpointValue)
82	}
83}
84
85#[async_trait]
86impl TransportStrategy for gRPCTransport {
87	type Error = gRPCTransportError;
88
89	async fn connect(&self) -> Result<(), Self::Error> {
90		dev_log!("grpc", "Connecting to gRPC endpoint: {}", self.Endpoint);
91
92		let EndpointValue = self
93			.BuildEndpoint()
94			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
95
96		let ChannelValue = EndpointValue
97			.connect()
98			.await
99			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
100
101		*self.Channel.write().await = Some(ChannelValue);
102		*self.Connected.write().await = true;
103
104		dev_log!("grpc", "gRPC connection established: {}", self.Endpoint);
105		Ok(())
106	}
107
108	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
109		let Start = std::time::Instant::now();
110
111		if !self.is_connected() {
112			return Err(gRPCTransportError::NotConnected);
113		}
114
115		dev_log!("grpc", "Sending gRPC request ({} bytes)", request.len());
116
117		let Response:Vec<u8> = vec![];
118		let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
119
120		let mut Stats = self.Statistics.write().await;
121		Stats.record_sent(request.len() as u64, LatencyMicroseconds);
122		Stats.record_received(Response.len() as u64);
123
124		dev_log!("grpc", "gRPC request completed in {}µs", LatencyMicroseconds);
125		Ok(Response)
126	}
127
128	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
129		if !self.is_connected() {
130			return Err(gRPCTransportError::NotConnected);
131		}
132
133		dev_log!("grpc", "Sending gRPC notification ({} bytes)", data.len());
134
135		let mut Stats = self.Statistics.write().await;
136		Stats.record_sent(data.len() as u64, 0);
137		Ok(())
138	}
139
140	async fn close(&self) -> Result<(), Self::Error> {
141		dev_log!("grpc", "Closing gRPC connection: {}", self.Endpoint);
142		*self.Channel.write().await = None;
143		*self.Connected.write().await = false;
144		dev_log!("grpc", "gRPC connection closed: {}", self.Endpoint);
145		Ok(())
146	}
147
148	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
149
150	fn transport_type(&self) -> TransportType { TransportType::gRPC }
151}
152
153/// gRPC transport error variants.
154#[derive(Debug, thiserror::Error)]
155pub enum gRPCTransportError {
156	/// Failed to establish connection to gRPC server
157	#[error("Connection failed: {0}")]
158	ConnectionFailed(String),
159	/// Failed to send message to gRPC server
160	#[error("Send failed: {0}")]
161	SendFailed(String),
162	/// Failed to receive message from gRPC server
163	#[error("Receive failed: {0}")]
164	ReceiveFailed(String),
165	/// Transport is not connected
166	#[error("Not connected")]
167	NotConnected,
168	/// Operation timed out
169	#[error("Timeout")]
170	Timeout,
171	/// Generic gRPC error
172	#[error("gRPC error: {0}")]
173	Error(String),
174}
175
176impl From<tonic::transport::Error> for gRPCTransportError {
177	fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
178}
179
180impl From<tonic::Status> for gRPCTransportError {
181	fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
182}
183
184#[cfg(test)]
185mod tests {
186	use super::*;
187
188	#[test]
189	fn TestgRPCTransportCreation() {
190		let Result = gRPCTransport::New("127.0.0.1:50050");
191		assert!(Result.is_ok());
192		let Transport = Result.unwrap();
193		assert_eq!(Transport.Address(), "127.0.0.1:50050");
194	}
195
196	#[tokio::test]
197	async fn TestgRPCTransportNotConnected() {
198		let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
199		assert!(!Transport.is_connected());
200	}
201}