Skip to content

Commit

Permalink
*: add UDS support
Browse files Browse the repository at this point in the history
A new module named `net` is added to unify TCP socket and UDS.
By default, sccache server is still listen on local TCP port,
user can choose to listen on UDS by setting environment variable
`SCCACHE_SERVER_UDS`.

Generic is used in server implementation for best performance,
trait object is used in client implementation for simplicity and
better readability.

Close #933.

Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay committed Jun 16, 2024
1 parent 808e8fb commit d8b60f3
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 101 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ jobs:

env:
SCCACHE_GHA_ENABLED: "on"
SCCACHE_SERVER_UDS: "/tmp/sccache.socket"

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ If you don't [specify otherwise](#storage-options), sccache will use a local dis

sccache works using a client-server model, where the server runs locally on the same machine as the client. The client-server model allows the server to be more efficient by keeping some state in memory. The sccache command will spawn a server process if one is not already running, or you can run `sccache --start-server` to start the background server process without performing any compilation.

By default sccache server will listen on `127.0.0.1:4226`, you can specify environment variable `SCCACHE_SERVER_PORT` to use a different port or `SCCACHE_SERVER_UDS` to listen on unix domain socket. Abstract unix socket is also supported as long as the path is escaped following the [format](https://doc.rust-lang.org/std/ascii/fn.escape_default.html).

You can run `sccache --stop-server` to terminate the server. It will also terminate after (by default) 10 minutes of inactivity.

Running `sccache --show-stats` will print a summary of cache statistics.
Expand Down
32 changes: 16 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
// limitations under the License.

use crate::errors::*;
use crate::net::Connection;
use crate::protocol::{Request, Response};
use crate::util;
use byteorder::{BigEndian, ByteOrder};
use retry::{delay::Fixed, retry};
use std::io::{self, BufReader, BufWriter, Read};
use std::net::TcpStream;

/// A connection to an sccache server.
pub struct ServerConnection {
/// A reader for the socket connected to the server.
reader: BufReader<TcpStream>,
reader: BufReader<Box<dyn Connection>>,
/// A writer for the socket connected to the server.
writer: BufWriter<TcpStream>,
writer: BufWriter<Box<dyn Connection>>,
}

impl ServerConnection {
/// Create a new connection using `stream`.
pub fn new(stream: TcpStream) -> io::Result<ServerConnection> {
let writer = stream.try_clone()?;
pub fn new(conn: Box<dyn Connection>) -> io::Result<ServerConnection> {
let write_conn = conn.try_clone()?;
Ok(ServerConnection {
reader: BufReader::new(stream),
writer: BufWriter::new(writer),
reader: BufReader::new(conn),
writer: BufWriter::new(write_conn),
})
}

Expand Down Expand Up @@ -62,24 +62,24 @@ impl ServerConnection {
}
}

/// Establish a TCP connection to an sccache server listening on `port`.
pub fn connect_to_server(port: u16) -> io::Result<ServerConnection> {
trace!("connect_to_server({})", port);
let stream = TcpStream::connect(("127.0.0.1", port))?;
ServerConnection::new(stream)
/// Establish a TCP connection to an sccache server listening on `addr`.
pub fn connect_to_server(addr: &crate::net::SocketAddr) -> io::Result<ServerConnection> {
trace!("connect_to_server({addr})");
let conn = crate::net::connect(addr)?;
ServerConnection::new(conn)
}

/// Attempt to establish a TCP connection to an sccache server listening on `port`.
/// Attempt to establish a TCP connection to an sccache server listening on `addr`.
///
/// If the connection fails, retry a few times.
pub fn connect_with_retry(port: u16) -> io::Result<ServerConnection> {
trace!("connect_with_retry({})", port);
pub fn connect_with_retry(addr: &crate::net::SocketAddr) -> io::Result<ServerConnection> {
trace!("connect_with_retry({addr})");
// TODOs:
// * Pass the server Child in here, so we can stop retrying
// if the process exited.
// * Send a pipe handle to the server process so it can notify
// us once it starts the server instead of us polling.
match retry(Fixed::from_millis(500).take(10), || connect_to_server(port)) {
match retry(Fixed::from_millis(500).take(10), || connect_to_server(addr)) {
Ok(conn) => Ok(conn),
Err(e) => Err(io::Error::new(
io::ErrorKind::TimedOut,
Expand Down
51 changes: 26 additions & 25 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ pub const DEFAULT_PORT: u16 = 4226;
const SERVER_STARTUP_TIMEOUT: Duration = Duration::from_millis(10000);

/// Get the port on which the server should listen.
fn get_port() -> u16 {
env::var("SCCACHE_SERVER_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_PORT)
fn get_addr() -> crate::net::SocketAddr {
if let Ok(addr) = env::var("SCCACHE_SERVER_UDS") {
return crate::net::SocketAddr::parse_uds(&addr);
}
let port = env::var("SCCACHE_SERVER_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_PORT);
crate::net::SocketAddr::Net(std::net::SocketAddr::new("127.0.0.1".parse().unwrap(), port))

}

/// Check if ignoring all response errors
Expand Down Expand Up @@ -295,13 +300,13 @@ fn run_server_process(startup_timeout: Option<Duration>) -> Result<ServerStartup
})
}

/// Attempt to connect to an sccache server listening on `port`, or start one if no server is running.
/// Attempt to connect to an sccache server listening on `addr`, or start one if no server is running.
fn connect_or_start_server(
port: u16,
addr: &crate::net::SocketAddr,
startup_timeout: Option<Duration>,
) -> Result<ServerConnection> {
trace!("connect_or_start_server({})", port);
match connect_to_server(port) {
trace!("connect_or_start_server({addr})");
match connect_to_server(addr) {
Ok(server) => Ok(server),
Err(ref e)
if e.kind() == io::ErrorKind::ConnectionRefused
Expand All @@ -310,13 +315,11 @@ fn connect_or_start_server(
// If the connection was refused we probably need to start
// the server.
match run_server_process(startup_timeout)? {
ServerStartup::Ok { port: actualport } => {
if port != actualport {
ServerStartup::Ok { addr: actual_addr } => {
if addr.to_string() != actual_addr {
// bail as the next connect_with_retry will fail
bail!(
"sccache: Listening on port {} instead of {}",
actualport,
port
"sccache: Listening on port {actual_addr} instead of {addr}"
);
}
}
Expand All @@ -326,7 +329,7 @@ fn connect_or_start_server(
ServerStartup::TimedOut => bail!("Timed out waiting for server startup. Maybe the remote service is unreachable?\nRun with SCCACHE_LOG=debug SCCACHE_NO_DAEMON=1 to get more information"),
ServerStartup::Err { reason } => bail!("Server startup failed: {}\nRun with SCCACHE_LOG=debug SCCACHE_NO_DAEMON=1 to get more information", reason),
}
let server = connect_with_retry(port)?;
let server = connect_with_retry(addr)?;
Ok(server)
}
Err(e) => Err(e.into()),
Expand Down Expand Up @@ -616,7 +619,7 @@ pub fn run_command(cmd: Command) -> Result<i32> {
match cmd {
Command::ShowStats(fmt, advanced) => {
trace!("Command::ShowStats({:?})", fmt);
let stats = match connect_to_server(get_port()) {
let stats = match connect_to_server(&get_addr()) {
Ok(srv) => request_stats(srv).context("failed to get stats from server")?,
// If there is no server, spawning a new server would start with zero stats
// anyways, so we can just return (mostly) empty stats directly.
Expand Down Expand Up @@ -660,18 +663,16 @@ pub fn run_command(cmd: Command) -> Result<i32> {
// We aren't asking for a log file
daemonize()?;
}
server::start_server(config, get_port())?;
server::start_server(config, &get_addr())?;
}
Command::StartServer => {
trace!("Command::StartServer");
println!("sccache: Starting the server...");
let startup =
run_server_process(startup_timeout).context("failed to start server process")?;
match startup {
ServerStartup::Ok { port } => {
if port != DEFAULT_PORT {
println!("sccache: Listening on port {}", port);
}
ServerStartup::Ok { addr } => {
println!("sccache: Listening on port {addr}");
}
ServerStartup::TimedOut => bail!("Timed out waiting for server startup"),
ServerStartup::AddrInUse => bail!("Server startup failed: Address in use"),
Expand All @@ -681,13 +682,13 @@ pub fn run_command(cmd: Command) -> Result<i32> {
Command::StopServer => {
trace!("Command::StopServer");
println!("Stopping sccache server...");
let server = connect_to_server(get_port()).context("couldn't connect to server")?;
let server = connect_to_server(&get_addr()).context("couldn't connect to server")?;
let stats = request_shutdown(server)?;
stats.print(false);
}
Command::ZeroStats => {
trace!("Command::ZeroStats");
let conn = connect_or_start_server(get_port(), startup_timeout)?;
let conn = connect_or_start_server(&get_addr(), startup_timeout)?;
request_zero_stats(conn).context("couldn't zero stats on server")?;
eprintln!("Statistics zeroed.");
}
Expand Down Expand Up @@ -749,7 +750,7 @@ pub fn run_command(cmd: Command) -> Result<i32> {
),
Command::DistStatus => {
trace!("Command::DistStatus");
let srv = connect_or_start_server(get_port(), startup_timeout)?;
let srv = connect_or_start_server(&get_addr(), startup_timeout)?;
let status =
request_dist_status(srv).context("failed to get dist-status from server")?;
serde_json::to_writer(&mut io::stdout(), &status)?;
Expand Down Expand Up @@ -787,7 +788,7 @@ pub fn run_command(cmd: Command) -> Result<i32> {
} => {
trace!("Command::Compile {{ {:?}, {:?}, {:?} }}", exe, cmdline, cwd);
let jobserver = unsafe { Client::new() };
let conn = connect_or_start_server(get_port(), startup_timeout)?;
let conn = connect_or_start_server(&get_addr(), startup_timeout)?;
let mut runtime = Runtime::new()?;
let res = do_compile(
ProcessCommandCreator::new(&jobserver),
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod dist;
mod jobserver;
pub mod lru_disk_cache;
mod mock_command;
mod net;
mod protocol;
pub mod server;
#[doc(hidden)]
Expand Down
133 changes: 133 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! The module is used to provide abstraction over TCP socket and UDS.

use std::fmt;

use futures::{Future, TryFutureExt};
use tokio::io::{AsyncRead, AsyncWrite};

// A unify version of `std::net::SocketAddr` and Unix domain socket.
#[derive(Debug)]
pub enum SocketAddr {
Net(std::net::SocketAddr),
Unix(std::path::PathBuf),
#[cfg(any(target_os = "linux", target_os = "android"))]
UnixAbstract(Vec<u8>),
}

impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SocketAddr::Net(addr) => write!(f, "{}", addr),
SocketAddr::Unix(p) => write!(f, "{}", p.display()),
#[cfg(any(target_os = "linux", target_os = "android"))]
SocketAddr::UnixAbstract(p) => write!(f, "{}", p.escape_ascii()),
}
}
}

impl SocketAddr {
/// Parse a string as a unix domain socket.
///
/// The string should follow the format of `self.to_string()`.
pub fn parse_uds(s: &str) -> Self {
// Parse abstract socket address first as it can contain any chars.
#[cfg(any(target_os = "linux", target_os = "android"))]
{
if s.starts_with('\x00') {
let data = crate::util::ascii_unescape_default(s.as_bytes());
return SocketAddr::UnixAbstract(data);
}
}
let path = std::path::PathBuf::from(s);
SocketAddr::Unix(path)
}
}

// A helper trait to unify the behavior of TCP and UDS listener.
pub trait Acceptor {
type Socket: AsyncRead + AsyncWrite + Unpin + Send;

fn accept(&self) -> impl Future<Output=tokio::io::Result<Self::Socket>> + Send;
fn local_addr(&self) -> tokio::io::Result<SocketAddr>;
}

impl Acceptor for tokio::net::TcpListener {
type Socket = tokio::net::TcpStream;

#[inline]
fn accept(&self) -> impl Future<Output=tokio::io::Result<Self::Socket>> + Send {
tokio::net::TcpListener::accept(self).and_then(|(s, _)| futures::future::ok(s))
}

#[inline]
fn local_addr(&self) -> tokio::io::Result<SocketAddr> {
tokio::net::TcpListener::local_addr(&self).map(SocketAddr::Net)
}
}

// A helper trait to unify the behavior of TCP and UDS stream.
pub trait Connection: std::io::Read + std::io::Write {
fn try_clone(&self) -> std::io::Result<Box<dyn Connection>>;
}

impl Connection for std::net::TcpStream {
#[inline]
fn try_clone(&self) -> std::io::Result<Box<dyn Connection>> {
let stream = std::net::TcpStream::try_clone(self)?;
Ok(Box::new(stream))
}
}

// Helper function to create a stream. Uses dynamic dispatch to make code more
// readable.
pub fn connect(addr: &SocketAddr) -> std::io::Result<Box<dyn Connection>> {
match addr {
SocketAddr::Net(addr) => std::net::TcpStream::connect(addr).map(|s| Box::new(s) as Box<dyn Connection>),
#[cfg(unix)]
SocketAddr::Unix(p) => std::os::unix::net::UnixStream::connect(p).map(|s| Box::new(s) as Box<dyn Connection>),
#[cfg(any(target_os = "linux", target_os = "android"))]
SocketAddr::UnixAbstract(p) => {
let sock = std::os::unix::net::SocketAddr::from_abstract_name(p);
std::os::unix::net::UnixStream::connect_addr(sock).map(|s| Box::new(s) as Box<dyn Connection>)
}
}
}

#[cfg(unix)]
mod unix_imp {
use std::path::PathBuf;

use futures::TryFutureExt;

use super::*;

impl Acceptor for tokio::net::UnixListener {
type Socket = tokio::net::UnixStream;

#[inline]
fn accept(&self) -> impl Future<Output=tokio::io::Result<Self::Socket>> + Send {
tokio::net::UnixListener::accept(self).and_then(|(s, _)| futures::future::ok(s))
}

#[inline]
fn local_addr(&self) -> tokio::io::Result<SocketAddr> {
let addr = tokio::net::UnixListener::local_addr(self)?;
if let Some(p) = addr.as_pathname() {
return Ok(SocketAddr::Unix(p.to_path_buf()));
}
#[cfg(any(target_os = "linux", target_os = "android"))]
if let Some(p) = addr.as_abstract_name() {
return Ok(SocketAddr::UnixAbstract(p.to_vec()));
}
Ok(SocketAddr::Unix(PathBuf::new()))
}
}

impl Connection for std::os::unix::net::UnixStream {
#[inline]
fn try_clone(&self) -> std::io::Result<Box<dyn Connection>> {
let stream = std::os::unix::net::UnixStream::try_clone(self)?;
Ok(Box::new(stream))
}
}
}
Loading

0 comments on commit d8b60f3

Please sign in to comment.