Skip to main content

Grove/Transport/
IPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # IPC Transport Implementation
3//!
4//! Provides inter-process communication (IPC) for Grove.
5//! Supports Unix domain sockets on macOS/Linux and named pipes on Windows.
6
7use std::{
8	path::{Path, PathBuf},
9	sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14
15use crate::{
16	Transport::{
17		Strategy::{TransportStats, TransportStrategy, TransportType},
18		TransportConfig,
19	},
20	dev_log,
21};
22
23/// IPC transport for local process communication.
24#[derive(Clone, Debug)]
25pub struct IPCTransport {
26	/// Unix domain socket path (macOS/Linux).
27	SocketPath:Option<PathBuf>,
28	/// Named pipe identifier (Windows).
29	#[allow(dead_code)]
30	PipeName:Option<String>,
31	/// Transport configuration.
32	#[allow(dead_code)]
33	Configuration:TransportConfig,
34	/// Whether the transport is currently connected.
35	Connected:Arc<RwLock<bool>>,
36	/// Transport statistics.
37	Statistics:Arc<RwLock<TransportStats>>,
38}
39
40impl IPCTransport {
41	/// Creates a new IPC transport using the default socket path.
42	pub fn New() -> anyhow::Result<Self> {
43		#[cfg(unix)]
44		{
45			let SocketPath = Self::DefaultSocketPath();
46			Ok(Self {
47				SocketPath:Some(SocketPath),
48				PipeName:None,
49				Configuration:TransportConfig::default(),
50				Connected:Arc::new(RwLock::new(false)),
51				Statistics:Arc::new(RwLock::new(TransportStats::default())),
52			})
53		}
54
55		#[cfg(windows)]
56		{
57			Ok(Self {
58				SocketPath:None,
59				PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
60				Configuration:TransportConfig::default(),
61				Connected:Arc::new(RwLock::new(false)),
62				Statistics:Arc::new(RwLock::new(TransportStats::default())),
63			})
64		}
65
66		#[cfg(not(any(unix, windows)))]
67		{
68			Err(anyhow::anyhow!("IPC transport not supported on this platform"))
69		}
70	}
71
72	/// Creates a new IPC transport with a custom Unix domain socket path.
73	pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
74		#[cfg(unix)]
75		{
76			Ok(Self {
77				SocketPath:Some(SocketPath.as_ref().to_path_buf()),
78				PipeName:None,
79				Configuration:TransportConfig::default(),
80				Connected:Arc::new(RwLock::new(false)),
81				Statistics:Arc::new(RwLock::new(TransportStats::default())),
82			})
83		}
84
85		#[cfg(not(unix))]
86		{
87			Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
88		}
89	}
90
91	/// Returns the default socket path for the current platform.
92	#[cfg(unix)]
93	fn DefaultSocketPath() -> PathBuf {
94		let mut Path = std::env::temp_dir();
95		Path.push("grove-ipc.sock");
96		Path
97	}
98
99	/// Returns the socket path (Unix only).
100	#[cfg(unix)]
101	pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
102
103	/// Returns a snapshot of transport statistics.
104	pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
105
106	/// Removes the socket file if it exists.
107	#[cfg(unix)]
108	async fn CleanupSocket(&self) -> anyhow::Result<()> {
109		if let Some(SocketPath) = &self.SocketPath {
110			if SocketPath.exists() {
111				tokio::fs::remove_file(SocketPath)
112					.await
113					.map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
114				dev_log!("transport", "Removed existing socket: {:?}", SocketPath);
115			}
116		}
117		Ok(())
118	}
119}
120
121#[async_trait]
122impl TransportStrategy for IPCTransport {
123	type Error = IPCTransportError;
124
125	async fn connect(&self) -> Result<(), Self::Error> {
126		dev_log!("transport", "Connecting to IPC transport");
127
128		#[cfg(unix)]
129		{
130			self.CleanupSocket()
131				.await
132				.map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
133			*self.Connected.write().await = true;
134			dev_log!("transport", "IPC connection established: {:?}", self.SocketPath);
135		}
136
137		#[cfg(windows)]
138		{
139			*self.Connected.write().await = true;
140			dev_log!("transport", "IPC connection established via named pipe");
141		}
142
143		#[cfg(not(any(unix, windows)))]
144		{
145			return Err(IPCTransportError::NotSupported);
146		}
147
148		Ok(())
149	}
150
151	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
152		if !self.is_connected() {
153			return Err(IPCTransportError::NotConnected);
154		}
155
156		dev_log!("transport", "Sending IPC request ({} bytes)", request.len());
157
158		let Response:Vec<u8> = vec![];
159
160		let mut Stats = self.Statistics.write().await;
161		Stats.record_sent(request.len() as u64, 0);
162		Stats.record_received(Response.len() as u64);
163
164		Ok(Response)
165	}
166
167	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
168		if !self.is_connected() {
169			return Err(IPCTransportError::NotConnected);
170		}
171
172		dev_log!("transport", "Sending IPC notification ({} bytes)", data.len());
173
174		let mut Stats = self.Statistics.write().await;
175		Stats.record_sent(data.len() as u64, 0);
176		Ok(())
177	}
178
179	async fn close(&self) -> Result<(), Self::Error> {
180		dev_log!("transport", "Closing IPC connection");
181		*self.Connected.write().await = false;
182
183		#[cfg(unix)]
184		{
185			if let Some(SocketPath) = &self.SocketPath {
186				if SocketPath.exists() {
187					tokio::fs::remove_file(SocketPath).await.map_err(|E| {
188						dev_log!("transport", "warn: failed to remove socket: {}", E);
189						IPCTransportError::CleanupFailed(E.to_string())
190					})?;
191				}
192			}
193		}
194
195		dev_log!("transport", "IPC connection closed");
196		Ok(())
197	}
198
199	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
200
201	fn transport_type(&self) -> TransportType { TransportType::IPC }
202}
203
204/// IPC transport error variants.
205#[derive(Debug, thiserror::Error)]
206pub enum IPCTransportError {
207	/// Failed to establish IPC connection
208	#[error("Connection failed: {0}")]
209	ConnectionFailed(String),
210	/// Failed to send message via IPC
211	#[error("Send failed: {0}")]
212	SendFailed(String),
213	/// Failed to receive message via IPC
214	#[error("Receive failed: {0}")]
215	ReceiveFailed(String),
216	/// Transport is not connected
217	#[error("Not connected")]
218	NotConnected,
219	/// IPC not supported on this platform
220	#[error("IPC not supported on this platform")]
221	NotSupported,
222	/// Failed to clean up IPC resources
223	#[error("Cleanup failed: {0}")]
224	CleanupFailed(String),
225	/// Socket communication error
226	#[error("Socket error: {0}")]
227	SocketError(String),
228	/// Operation timed out
229	#[error("Timeout")]
230	Timeout,
231}
232
233#[cfg(test)]
234mod tests {
235	use super::*;
236
237	#[test]
238	fn TestIPCTransportCreation() {
239		#[cfg(any(unix, windows))]
240		{
241			let Result = IPCTransport::New();
242			assert!(Result.is_ok());
243		}
244	}
245
246	#[cfg(unix)]
247	#[test]
248	fn TestIPCTransportWithSocketPath() {
249		let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
250		assert!(Result.is_ok());
251		let Transport = Result.unwrap();
252		assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
253	}
254
255	#[tokio::test]
256	async fn TestIPCTransportNotConnected() {
257		#[cfg(any(unix, windows))]
258		{
259			let Transport = IPCTransport::New().unwrap();
260			assert!(!Transport.is_connected());
261		}
262	}
263}