Grove/Transport/
IPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::{
8 path::{Path, PathBuf},
9 sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14
15use crate::{
16 Transport::{
17 Strategy::{TransportStats, TransportStrategy, TransportType},
18 TransportConfig,
19 },
20 dev_log,
21};
22
23#[derive(Clone, Debug)]
25pub struct IPCTransport {
26 SocketPath:Option<PathBuf>,
28 #[allow(dead_code)]
30 PipeName:Option<String>,
31 #[allow(dead_code)]
33 Configuration:TransportConfig,
34 Connected:Arc<RwLock<bool>>,
36 Statistics:Arc<RwLock<TransportStats>>,
38}
39
40impl IPCTransport {
41 pub fn New() -> anyhow::Result<Self> {
43 #[cfg(unix)]
44 {
45 let SocketPath = Self::DefaultSocketPath();
46 Ok(Self {
47 SocketPath:Some(SocketPath),
48 PipeName:None,
49 Configuration:TransportConfig::default(),
50 Connected:Arc::new(RwLock::new(false)),
51 Statistics:Arc::new(RwLock::new(TransportStats::default())),
52 })
53 }
54
55 #[cfg(windows)]
56 {
57 Ok(Self {
58 SocketPath:None,
59 PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
60 Configuration:TransportConfig::default(),
61 Connected:Arc::new(RwLock::new(false)),
62 Statistics:Arc::new(RwLock::new(TransportStats::default())),
63 })
64 }
65
66 #[cfg(not(any(unix, windows)))]
67 {
68 Err(anyhow::anyhow!("IPC transport not supported on this platform"))
69 }
70 }
71
72 pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
74 #[cfg(unix)]
75 {
76 Ok(Self {
77 SocketPath:Some(SocketPath.as_ref().to_path_buf()),
78 PipeName:None,
79 Configuration:TransportConfig::default(),
80 Connected:Arc::new(RwLock::new(false)),
81 Statistics:Arc::new(RwLock::new(TransportStats::default())),
82 })
83 }
84
85 #[cfg(not(unix))]
86 {
87 Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
88 }
89 }
90
91 #[cfg(unix)]
93 fn DefaultSocketPath() -> PathBuf {
94 let mut Path = std::env::temp_dir();
95 Path.push("grove-ipc.sock");
96 Path
97 }
98
99 #[cfg(unix)]
101 pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
102
103 pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
105
106 #[cfg(unix)]
108 async fn CleanupSocket(&self) -> anyhow::Result<()> {
109 if let Some(SocketPath) = &self.SocketPath {
110 if SocketPath.exists() {
111 tokio::fs::remove_file(SocketPath)
112 .await
113 .map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
114 dev_log!("transport", "Removed existing socket: {:?}", SocketPath);
115 }
116 }
117 Ok(())
118 }
119}
120
121#[async_trait]
122impl TransportStrategy for IPCTransport {
123 type Error = IPCTransportError;
124
125 async fn connect(&self) -> Result<(), Self::Error> {
126 dev_log!("transport", "Connecting to IPC transport");
127
128 #[cfg(unix)]
129 {
130 self.CleanupSocket()
131 .await
132 .map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
133 *self.Connected.write().await = true;
134 dev_log!("transport", "IPC connection established: {:?}", self.SocketPath);
135 }
136
137 #[cfg(windows)]
138 {
139 *self.Connected.write().await = true;
140 dev_log!("transport", "IPC connection established via named pipe");
141 }
142
143 #[cfg(not(any(unix, windows)))]
144 {
145 return Err(IPCTransportError::NotSupported);
146 }
147
148 Ok(())
149 }
150
151 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
152 if !self.is_connected() {
153 return Err(IPCTransportError::NotConnected);
154 }
155
156 dev_log!("transport", "Sending IPC request ({} bytes)", request.len());
157
158 let Response:Vec<u8> = vec![];
159
160 let mut Stats = self.Statistics.write().await;
161 Stats.record_sent(request.len() as u64, 0);
162 Stats.record_received(Response.len() as u64);
163
164 Ok(Response)
165 }
166
167 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
168 if !self.is_connected() {
169 return Err(IPCTransportError::NotConnected);
170 }
171
172 dev_log!("transport", "Sending IPC notification ({} bytes)", data.len());
173
174 let mut Stats = self.Statistics.write().await;
175 Stats.record_sent(data.len() as u64, 0);
176 Ok(())
177 }
178
179 async fn close(&self) -> Result<(), Self::Error> {
180 dev_log!("transport", "Closing IPC connection");
181 *self.Connected.write().await = false;
182
183 #[cfg(unix)]
184 {
185 if let Some(SocketPath) = &self.SocketPath {
186 if SocketPath.exists() {
187 tokio::fs::remove_file(SocketPath).await.map_err(|E| {
188 dev_log!("transport", "warn: failed to remove socket: {}", E);
189 IPCTransportError::CleanupFailed(E.to_string())
190 })?;
191 }
192 }
193 }
194
195 dev_log!("transport", "IPC connection closed");
196 Ok(())
197 }
198
199 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
200
201 fn transport_type(&self) -> TransportType { TransportType::IPC }
202}
203
204#[derive(Debug, thiserror::Error)]
206pub enum IPCTransportError {
207 #[error("Connection failed: {0}")]
209 ConnectionFailed(String),
210 #[error("Send failed: {0}")]
212 SendFailed(String),
213 #[error("Receive failed: {0}")]
215 ReceiveFailed(String),
216 #[error("Not connected")]
218 NotConnected,
219 #[error("IPC not supported on this platform")]
221 NotSupported,
222 #[error("Cleanup failed: {0}")]
224 CleanupFailed(String),
225 #[error("Socket error: {0}")]
227 SocketError(String),
228 #[error("Timeout")]
230 Timeout,
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn TestIPCTransportCreation() {
239 #[cfg(any(unix, windows))]
240 {
241 let Result = IPCTransport::New();
242 assert!(Result.is_ok());
243 }
244 }
245
246 #[cfg(unix)]
247 #[test]
248 fn TestIPCTransportWithSocketPath() {
249 let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
250 assert!(Result.is_ok());
251 let Transport = Result.unwrap();
252 assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
253 }
254
255 #[tokio::test]
256 async fn TestIPCTransportNotConnected() {
257 #[cfg(any(unix, windows))]
258 {
259 let Transport = IPCTransport::New().unwrap();
260 assert!(!Transport.is_connected());
261 }
262 }
263}