Grove/Transport/
gRPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12
13use crate::{
14 Transport::{
15 Strategy::{TransportStats, TransportStrategy, TransportType},
16 TransportConfig,
17 },
18 dev_log,
19};
20
21#[derive(Clone, Debug)]
23pub struct gRPCTransport {
24 Endpoint:String,
26 Channel:Arc<RwLock<Option<Channel>>>,
28 Configuration:TransportConfig,
30 Connected:Arc<RwLock<bool>>,
32 Statistics:Arc<RwLock<TransportStats>>,
34}
35
36impl gRPCTransport {
37 pub fn New(Address:&str) -> anyhow::Result<Self> {
39 Ok(Self {
40 Endpoint:Address.to_string(),
41 Channel:Arc::new(RwLock::new(None)),
42 Configuration:TransportConfig::default(),
43 Connected:Arc::new(RwLock::new(false)),
44 Statistics:Arc::new(RwLock::new(TransportStats::default())),
45 })
46 }
47
48 pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
50 Ok(Self {
51 Endpoint:Address.to_string(),
52 Channel:Arc::new(RwLock::new(None)),
53 Configuration,
54 Connected:Arc::new(RwLock::new(false)),
55 Statistics:Arc::new(RwLock::new(TransportStats::default())),
56 })
57 }
58
59 pub fn Address(&self) -> &str { &self.Endpoint }
61
62 pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
64 self.Channel
65 .read()
66 .await
67 .as_ref()
68 .cloned()
69 .ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
70 }
71
72 pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
74
75 fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
77 let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
78 .timeout(self.Configuration.ConnectionTimeout)
79 .connect_timeout(self.Configuration.ConnectionTimeout)
80 .tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
81 Ok(EndpointValue)
82 }
83}
84
85#[async_trait]
86impl TransportStrategy for gRPCTransport {
87 type Error = gRPCTransportError;
88
89 async fn connect(&self) -> Result<(), Self::Error> {
90 dev_log!("grpc", "Connecting to gRPC endpoint: {}", self.Endpoint);
91
92 let EndpointValue = self
93 .BuildEndpoint()
94 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
95
96 let ChannelValue = EndpointValue
97 .connect()
98 .await
99 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
100
101 *self.Channel.write().await = Some(ChannelValue);
102 *self.Connected.write().await = true;
103
104 dev_log!("grpc", "gRPC connection established: {}", self.Endpoint);
105 Ok(())
106 }
107
108 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
109 let Start = std::time::Instant::now();
110
111 if !self.is_connected() {
112 return Err(gRPCTransportError::NotConnected);
113 }
114
115 dev_log!("grpc", "Sending gRPC request ({} bytes)", request.len());
116
117 let Response:Vec<u8> = vec![];
118 let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
119
120 let mut Stats = self.Statistics.write().await;
121 Stats.record_sent(request.len() as u64, LatencyMicroseconds);
122 Stats.record_received(Response.len() as u64);
123
124 dev_log!("grpc", "gRPC request completed in {}µs", LatencyMicroseconds);
125 Ok(Response)
126 }
127
128 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
129 if !self.is_connected() {
130 return Err(gRPCTransportError::NotConnected);
131 }
132
133 dev_log!("grpc", "Sending gRPC notification ({} bytes)", data.len());
134
135 let mut Stats = self.Statistics.write().await;
136 Stats.record_sent(data.len() as u64, 0);
137 Ok(())
138 }
139
140 async fn close(&self) -> Result<(), Self::Error> {
141 dev_log!("grpc", "Closing gRPC connection: {}", self.Endpoint);
142 *self.Channel.write().await = None;
143 *self.Connected.write().await = false;
144 dev_log!("grpc", "gRPC connection closed: {}", self.Endpoint);
145 Ok(())
146 }
147
148 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
149
150 fn transport_type(&self) -> TransportType { TransportType::gRPC }
151}
152
153#[derive(Debug, thiserror::Error)]
155pub enum gRPCTransportError {
156 #[error("Connection failed: {0}")]
158 ConnectionFailed(String),
159 #[error("Send failed: {0}")]
161 SendFailed(String),
162 #[error("Receive failed: {0}")]
164 ReceiveFailed(String),
165 #[error("Not connected")]
167 NotConnected,
168 #[error("Timeout")]
170 Timeout,
171 #[error("gRPC error: {0}")]
173 Error(String),
174}
175
176impl From<tonic::transport::Error> for gRPCTransportError {
177 fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
178}
179
180impl From<tonic::Status> for gRPCTransportError {
181 fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 #[test]
189 fn TestgRPCTransportCreation() {
190 let Result = gRPCTransport::New("127.0.0.1:50050");
191 assert!(Result.is_ok());
192 let Transport = Result.unwrap();
193 assert_eq!(Transport.Address(), "127.0.0.1:50050");
194 }
195
196 #[tokio::test]
197 async fn TestgRPCTransportNotConnected() {
198 let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
199 assert!(!Transport.is_connected());
200 }
201}