|
| 1 | +use std::{ |
| 2 | + ops::Deref, |
| 3 | + os::{fd::OwnedFd, unix::fs::DirBuilderExt}, |
| 4 | + path::{Path, PathBuf}, |
| 5 | +}; |
| 6 | + |
| 7 | +use bitflags::bitflags; |
| 8 | +use nix::{ |
| 9 | + errno::Errno, |
| 10 | + fcntl::{Flock, FlockArg, OFlag, open}, |
| 11 | + sys::stat::Mode, |
| 12 | + unistd::{close, mkfifo, read, write}, |
| 13 | +}; |
| 14 | + |
| 15 | +use crate::Error; |
| 16 | + |
| 17 | +bitflags! { |
| 18 | + /// Mode of operation of the Hopper pipe. |
| 19 | + /// |
| 20 | + /// **Must** include either [`PipeMode::IN`] or [`PipeMode::OUT`], |
| 21 | + /// but not both. |
| 22 | + #[derive(Debug, Clone)] |
| 23 | + pub struct PipeMode: u32 { |
| 24 | + /// Pipe should be opened as a server-side input |
| 25 | + const IN = (1 << 0); |
| 26 | + /// Pipe should be opened as a server-side output |
| 27 | + const OUT = (1 << 1); |
| 28 | + /// Pipe should be opened in non-blocking mode |
| 29 | + const NONBLOCK = (1 << 2); |
| 30 | + } |
| 31 | +} |
| 32 | + |
| 33 | +/// A pipe for interacting with the Hopper IPC server |
| 34 | +pub struct Pipe { |
| 35 | + mode: PipeMode, |
| 36 | + name: String, |
| 37 | + endpoint: String, |
| 38 | + hopper: PathBuf, |
| 39 | + lock: Option<Flock<OwnedFd>>, |
| 40 | +} |
| 41 | + |
| 42 | +impl Pipe { |
| 43 | + fn get_open_flags(&self) -> OFlag { |
| 44 | + let mut flags = OFlag::empty(); |
| 45 | + |
| 46 | + if self.mode.contains(PipeMode::IN) { |
| 47 | + flags |= OFlag::O_WRONLY; |
| 48 | + } |
| 49 | + if self.mode.contains(PipeMode::OUT) { |
| 50 | + flags |= OFlag::O_RDONLY; |
| 51 | + } |
| 52 | + if self.mode.contains(PipeMode::NONBLOCK) { |
| 53 | + flags |= OFlag::O_NONBLOCK; |
| 54 | + } |
| 55 | + |
| 56 | + flags |
| 57 | + } |
| 58 | + |
| 59 | + fn get_endpoint_path(&self) -> PathBuf { |
| 60 | + self.hopper.join(&self.endpoint) |
| 61 | + } |
| 62 | + |
| 63 | + fn get_pipe_path(&self) -> PathBuf { |
| 64 | + self.get_endpoint_path().join(format!( |
| 65 | + "{}.{}", |
| 66 | + &self.name, |
| 67 | + if self.mode.contains(PipeMode::IN) { |
| 68 | + "in" |
| 69 | + } else if self.mode.contains(PipeMode::OUT) { |
| 70 | + "out" |
| 71 | + } else { |
| 72 | + "" // how the hell did we get here? |
| 73 | + } |
| 74 | + )) |
| 75 | + } |
| 76 | + |
| 77 | + /// Create a new [`Pipe`] with a given mode, name, and endpoint. |
| 78 | + /// |
| 79 | + /// A path to the local Hopper instance can be optionally specified. If |
| 80 | + /// not specified it will be read from the `HOPPER_PATH` environment variable |
| 81 | + /// at runtime, returning [`Error::HopperNotFound`] if it cannot be identified. |
| 82 | + pub fn new<S, P>(mode: PipeMode, name: S, endpoint: S, hopper: Option<P>) -> Result<Self, Error> |
| 83 | + where |
| 84 | + S: AsRef<str>, |
| 85 | + P: AsRef<Path>, |
| 86 | + { |
| 87 | + if !(mode.contains(PipeMode::IN) ^ mode.contains(PipeMode::OUT)) { |
| 88 | + return Err(Error::InvalidMode); |
| 89 | + } |
| 90 | + |
| 91 | + let hopper = if let Some(p) = hopper { |
| 92 | + p.as_ref().to_path_buf() |
| 93 | + } else { |
| 94 | + PathBuf::from( |
| 95 | + std::env::var("HOPPER_PATH") |
| 96 | + .ok() |
| 97 | + .ok_or(Error::HopperNotFound)?, |
| 98 | + ) |
| 99 | + }; |
| 100 | + |
| 101 | + Ok(Self { |
| 102 | + mode, |
| 103 | + name: name.as_ref().to_string(), |
| 104 | + endpoint: endpoint.as_ref().to_string(), |
| 105 | + hopper, |
| 106 | + lock: None, |
| 107 | + }) |
| 108 | + } |
| 109 | + |
| 110 | + /// Open the pipe described by this object. |
| 111 | + /// |
| 112 | + /// If the pipe is already open, close it first. |
| 113 | + /// |
| 114 | + /// A exclusive or shared lock is acquired on the pipe when opening |
| 115 | + /// depending on which mode is set. If the endpoint directiry does not |
| 116 | + /// exist, it will be created. |
| 117 | + pub fn open(&mut self) -> Result<(), Error> { |
| 118 | + if self.is_open() { |
| 119 | + self.close()?; |
| 120 | + } |
| 121 | + |
| 122 | + let endpoint = self.get_endpoint_path(); |
| 123 | + std::fs::DirBuilder::new() |
| 124 | + .recursive(true) |
| 125 | + .mode(0o755) |
| 126 | + .create(&endpoint) |
| 127 | + .map_err(Error::Io)?; |
| 128 | + |
| 129 | + let pipe = self.get_pipe_path(); |
| 130 | + match mkfifo(&pipe, Mode::from_bits_truncate(0o660)) { |
| 131 | + Ok(_) => {} |
| 132 | + Err(Errno::EEXIST) => {} |
| 133 | + Err(e) => return Err(Error::Other(e)), |
| 134 | + } |
| 135 | + |
| 136 | + let flags = self.get_open_flags(); |
| 137 | + let fd = open(&pipe, flags, Mode::empty()).map_err(Error::Other)?; |
| 138 | + |
| 139 | + let lock = Flock::lock( |
| 140 | + fd, |
| 141 | + if self.mode.contains(PipeMode::IN) { |
| 142 | + FlockArg::LockExclusiveNonblock |
| 143 | + } else { |
| 144 | + FlockArg::LockSharedNonblock |
| 145 | + }, |
| 146 | + ); |
| 147 | + |
| 148 | + let lock = match lock { |
| 149 | + Ok(l) => l, |
| 150 | + Err((fd, e)) => { |
| 151 | + let _ = close(fd); |
| 152 | + |
| 153 | + return Err(Error::Other(if e == Errno::EWOULDBLOCK { |
| 154 | + Errno::EBUSY // EBUSY makes more sense for clients |
| 155 | + } else { |
| 156 | + e |
| 157 | + })); |
| 158 | + } |
| 159 | + }; |
| 160 | + |
| 161 | + self.lock = Some(lock); |
| 162 | + |
| 163 | + Ok(()) |
| 164 | + } |
| 165 | + |
| 166 | + /// Read a buffer from the pipe |
| 167 | + pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> { |
| 168 | + let lock = self.lock.as_ref().ok_or(Error::NotOpen)?; |
| 169 | + let res = match read(lock.deref(), buf) { |
| 170 | + Ok(s) => s, |
| 171 | + Err(Errno::EWOULDBLOCK) => 0, // this isn't an error for non-block pipes |
| 172 | + Err(e) => return Err(Error::Other(e)), |
| 173 | + }; |
| 174 | + |
| 175 | + Ok(res) |
| 176 | + } |
| 177 | + |
| 178 | + /// Write a buffer to the pipe |
| 179 | + pub fn write(&self, buf: &[u8]) -> Result<usize, Error> { |
| 180 | + let lock = self.lock.as_ref().ok_or(Error::NotOpen)?; |
| 181 | + let res = match write(lock.deref(), buf) { |
| 182 | + Ok(s) => s, |
| 183 | + Err(Errno::EWOULDBLOCK) => 0, // this isn't an error for non-block pipes |
| 184 | + Err(e) => return Err(Error::Other(e)), |
| 185 | + }; |
| 186 | + |
| 187 | + Ok(res) |
| 188 | + } |
| 189 | + |
| 190 | + /// Close the pipe, freeing all locks. |
| 191 | + pub fn close(&mut self) -> Result<(), Error> { |
| 192 | + let lock = self.lock.take(); |
| 193 | + |
| 194 | + if let Some(lock) = lock { |
| 195 | + match lock.unlock() { |
| 196 | + Ok(fd) => close(fd).map_err(Error::Other)?, |
| 197 | + Err((lock, e)) => { |
| 198 | + self.lock = Some(lock); |
| 199 | + return Err(Error::Other(e)); |
| 200 | + } |
| 201 | + } |
| 202 | + } |
| 203 | + |
| 204 | + Ok(()) |
| 205 | + } |
| 206 | + |
| 207 | + /// Check if the pipe is currently open |
| 208 | + pub fn is_open(&self) -> bool { |
| 209 | + self.lock.is_some() |
| 210 | + } |
| 211 | +} |
| 212 | + |
| 213 | +impl Drop for Pipe { |
| 214 | + fn drop(&mut self) { |
| 215 | + let _ = self.close(); |
| 216 | + } |
| 217 | +} |
0 commit comments