diff --git a/Cargo.lock b/Cargo.lock index f2feb4e8c..9760b9cf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4770,6 +4770,7 @@ dependencies = [ "serde_with", "tangram_client", "tangram_server", + "time", "tokio", "tokio-util", "tracing", @@ -4905,6 +4906,7 @@ dependencies = [ "indoc", "itertools", "libc", + "lru", "lsp-types", "mime", "notify", diff --git a/Cargo.toml b/Cargo.toml index 8f5378750..65cb144c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ indicatif = "0.17" indoc = "2" itertools = "0.12" libc = "0.2" +lru = "0.12" lsp-types = { version = "0.95" } mime = "0.3" notify = "6" diff --git a/packages/cli/Cargo.toml b/packages/cli/Cargo.toml index 7d9e26ae4..e03abdbc7 100644 --- a/packages/cli/Cargo.toml +++ b/packages/cli/Cargo.toml @@ -38,6 +38,7 @@ serde_json = { workspace = true } serde_with = { workspace = true } tangram_client = { workspace = true } tangram_server = { workspace = true } +time = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/packages/cli/src/commands.rs b/packages/cli/src/commands.rs index 84e127e55..0d67efa9d 100644 --- a/packages/cli/src/commands.rs +++ b/packages/cli/src/commands.rs @@ -23,3 +23,4 @@ pub mod run; pub mod server; pub mod tree; pub mod upgrade; +pub mod watch; diff --git a/packages/cli/src/commands/checkin.rs b/packages/cli/src/commands/checkin.rs index d20998db9..13618d383 100644 --- a/packages/cli/src/commands/checkin.rs +++ b/packages/cli/src/commands/checkin.rs @@ -7,6 +7,10 @@ use tangram_client as tg; pub struct Args { /// The path to check in. pub path: Option, + + /// Toggle whether the server will add a file watcher for this path. + #[arg(long)] + pub watch: Option, } impl Cli { @@ -19,9 +23,10 @@ impl Cli { if let Some(path_arg) = &args.path { path.push(path_arg); } + let watch = args.watch.unwrap_or(true); // Perform the checkin. - let artifact = tg::Artifact::check_in(client, &path.try_into()?).await?; + let artifact = tg::Artifact::check_in(client, &path.try_into()?, watch).await?; // Print the ID. let id = artifact.id(client).await?; diff --git a/packages/cli/src/commands/server.rs b/packages/cli/src/commands/server.rs index ae73e4d0e..742d3afa5 100644 --- a/packages/cli/src/commands/server.rs +++ b/packages/cli/src/commands/server.rs @@ -140,6 +140,17 @@ impl Cli { }, ); + // Create the file system monitor options + let file_system_monitor = config + .as_ref() + .and_then(|config| config.file_system_monitor.as_ref()) + .map_or( + tangram_server::options::FileSystemMonitor { enable: true }, + |file_system_monitor| tangram_server::options::FileSystemMonitor { + enable: file_system_monitor.enable, + }, + ); + // Create the messenger options. let messenger = config .and_then(|config| config.messenger.as_ref()) @@ -248,6 +259,7 @@ impl Cli { advanced, build, database, + file_system_monitor, messenger, oauth, path, diff --git a/packages/cli/src/commands/watch.rs b/packages/cli/src/commands/watch.rs new file mode 100644 index 000000000..ffe2b9c3d --- /dev/null +++ b/packages/cli/src/commands/watch.rs @@ -0,0 +1,79 @@ +use std::path::PathBuf; + +use tangram_client as tg; + +use crate::Cli; +use tg::Handle; + +/// Manage watches. +#[derive(Debug, clap::Args)] +pub struct Args { + #[clap(subcommand)] + pub command: Command, +} + +#[derive(Debug, clap::Subcommand)] +pub enum Command { + Add(AddArgs), + Remove(RemoveArgs), + List(ListArgs), +} + +#[derive(Debug, clap::Args)] +pub struct AddArgs { + pub path: PathBuf, +} + +#[derive(Debug, clap::Args)] +pub struct RemoveArgs { + pub path: PathBuf, +} + +#[derive(Debug, clap::Args)] + +pub struct ListArgs; + +impl Cli { + pub async fn command_watch(&self, args: Args) -> tg::Result<()> { + match args.command { + Command::Add(args) => self.command_watch_add(args).await, + Command::List(args) => self.command_watch_list(args).await, + Command::Remove(args) => self.command_watch_remove(args).await, + } + } + + async fn command_watch_add(&self, args: AddArgs) -> tg::Result<()> { + let client = &self.client().await?; + let path = args + .path + .canonicalize() + .map_err(|source| tg::error!(!source, "failed to canonicalize the path"))?; + let path = path.try_into()?; + tg::Artifact::check_in(client, &path, true) + .await + .map_err(|source| tg::error!(!source, %path, "failed to add watch"))?; + Ok(()) + } + + async fn command_watch_list(&self, _args: ListArgs) -> tg::Result<()> { + let client = self.client().await?; + let paths = client + .list_watches() + .await + .map_err(|source| tg::error!(!source, "failed to get watches"))?; + for path in paths { + eprintln!("{path}"); + } + Ok(()) + } + + async fn command_watch_remove(&self, args: RemoveArgs) -> tg::Result<()> { + let client = self.client().await?; + let path = args.path.try_into()?; + client + .remove_watch(&path) + .await + .map_err(|source| tg::error!(!source, %path, "failed to remove watch"))?; + Ok(()) + } +} diff --git a/packages/cli/src/config.rs b/packages/cli/src/config.rs index 1e26aeee6..2c1619742 100644 --- a/packages/cli/src/config.rs +++ b/packages/cli/src/config.rs @@ -1,3 +1,11 @@ +//! # Configuring the tangram CLI and server +//! +//! Tangram can be configured by a global config file located at $HOME/.config/config.json or by passing the `--config ` option to the `tg` command line before any subcommand, for example +//! +//! ```sh +//! # Run the server using a config file. +//! tg --config config.json server run +//! ``` use serde_with::serde_as; use std::path::PathBuf; use tangram_client as tg; @@ -6,57 +14,75 @@ use url::Url; #[serde_as] #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Config { + /// Advanced configuration options. #[serde(default, skip_serializing_if = "Option::is_none")] pub advanced: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub autoenv: Option, + /// Configure the server's build options. #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, + /// Configure the server's database options. #[serde(default, skip_serializing_if = "Option::is_none")] pub database: Option, + /// Configure the server's file system monitoring options. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub file_system_monitor: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub messenger: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub oauth: Option, + /// Configure the server's path. Default = `$HOME/.tangram`. #[serde(default, skip_serializing_if = "Option::is_none")] pub path: Option, + /// A list of remote servers that this server can push and pull objects/builds from. #[serde(default, skip_serializing_if = "Option::is_none")] pub remotes: Option>, + /// Server and CLI tracing options. #[serde(default, skip_serializing_if = "Option::is_none")] pub tracing: Option, + /// The URL of the server, if serving over tcp. #[serde(default, skip_serializing_if = "Option::is_none")] pub url: Option, + /// Configurate the virtual file system. #[serde(default, skip_serializing_if = "Option::is_none")] pub vfs: Option, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Advanced { + /// Configure how errors are displayed in the CLI. #[serde(default, skip_serializing_if = "Option::is_none")] pub error_trace_options: Option, + /// Configure the number of file descriptors available in the client. #[serde(default, skip_serializing_if = "Option::is_none")] pub file_descriptor_limit: Option, + /// Configure the number of file descriptors available in the server. #[serde(default, skip_serializing_if = "Option::is_none")] pub file_descriptor_semaphore_size: Option, + /// Toggle whether temp directories are preserved or deleted after builds. Default = false. #[serde(default, skip_serializing_if = "Option::is_none")] pub preserve_temp_directories: Option, + /// Toggle whether tokio-console support is enabled. Default = false. #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub tokio_console: bool, + /// Toggle whether log messages are printed to the server's stderr. Default = false. #[serde(default, skip_serializing_if = "Option::is_none")] pub write_build_logs_to_stderr: Option, } @@ -69,7 +95,7 @@ pub struct Autoenv { #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Build { - /// Enable builds. + /// Toggle whether builds are enabled on this server. #[serde(default, skip_serializing_if = "Option::is_none")] pub enable: Option, @@ -85,6 +111,12 @@ pub enum Database { Postgres(PostgresDatabase), } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct FileSystemMonitor { + /// Toggle whether the file system monitor is enabled. Default = true. + pub enable: bool, +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct SqliteDatabase { /// The maximum number of connections. @@ -149,8 +181,11 @@ pub struct RemoteBuild { #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Tracing { + /// The filter applied to tracing messages. #[serde(default, skip_serializing_if = "String::is_empty")] pub filter: String, + + /// The display format of tracing messages. #[serde(default, skip_serializing_if = "Option::is_none")] pub format: Option, } @@ -190,6 +225,7 @@ impl std::str::FromStr for TracingFormat { #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Vfs { + /// Toggle whether the VFS is enabled. When the VFS is disabled, checkouts will be made onto local disk. Default = true. #[serde(default, skip_serializing_if = "Option::is_none")] pub enable: Option, } diff --git a/packages/cli/src/main.rs b/packages/cli/src/main.rs index 271576594..d7780e110 100644 --- a/packages/cli/src/main.rs +++ b/packages/cli/src/main.rs @@ -76,6 +76,7 @@ pub enum Command { Tree(self::commands::tree::Args), Update(self::commands::package::UpdateArgs), Upgrade(self::commands::upgrade::Args), + Watch(self::commands::watch::Args), } fn default_path() -> PathBuf { @@ -212,6 +213,7 @@ impl Cli { Command::Tree(args) => self.command_tree(args).boxed(), Command::Update(args) => self.command_package_update(args).boxed(), Command::Upgrade(args) => self.command_upgrade(args).boxed(), + Command::Watch(args) => self.command_watch(args).boxed(), } .await } diff --git a/packages/client/src/artifact.rs b/packages/client/src/artifact.rs index e310b3195..789f04fdd 100644 --- a/packages/client/src/artifact.rs +++ b/packages/client/src/artifact.rs @@ -75,6 +75,7 @@ pub enum Data { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct CheckInArg { pub path: crate::Path, + pub watch: bool, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -123,8 +124,11 @@ impl Artifact { } impl Artifact { - pub async fn check_in(tg: &impl Handle, path: &crate::Path) -> tg::Result { - let arg = CheckInArg { path: path.clone() }; + pub async fn check_in(tg: &impl Handle, path: &crate::Path, watch: bool) -> tg::Result { + let arg = CheckInArg { + path: path.clone(), + watch, + }; let output = tg.check_in_artifact(arg).await?; let artifact = Self::with_id(output.id); Ok(artifact) diff --git a/packages/client/src/blob.rs b/packages/client/src/blob.rs index e5937558d..0d6776e36 100644 --- a/packages/client/src/blob.rs +++ b/packages/client/src/blob.rs @@ -261,7 +261,7 @@ impl Blob { // Check in the extracted artifact. let path = path.try_into()?; - let artifact = Artifact::check_in(tg, &path) + let artifact = Artifact::check_in(tg, &path, false) .await .map_err(|source| error!(!source, "failed to check in the extracted archive"))?; diff --git a/packages/client/src/handle.rs b/packages/client/src/handle.rs index 03dff1ad8..172c1bcff 100644 --- a/packages/client/src/handle.rs +++ b/packages/client/src/handle.rs @@ -308,4 +308,8 @@ pub trait Handle: Clone + Unpin + Send + Sync + 'static { &self, token: &str, ) -> impl Future>> + Send; + + fn list_watches(&self) -> impl Future>> + Send; + + fn remove_watch(&self, path: &tg::Path) -> impl Future> + Send; } diff --git a/packages/client/src/lib.rs b/packages/client/src/lib.rs index 4c0e6e5a9..1d1273bed 100644 --- a/packages/client/src/lib.rs +++ b/packages/client/src/lib.rs @@ -74,6 +74,7 @@ pub mod template; pub mod user; mod util; pub mod value; +pub mod watch; #[derive(Debug, Clone)] pub struct Client { @@ -781,4 +782,12 @@ impl Handle for Client { async fn get_user_for_token(&self, token: &str) -> tg::Result> { self.get_user_for_token(token).await } + + async fn list_watches(&self) -> tg::Result> { + self.get_watches().await + } + + async fn remove_watch(&self, path: &tg::Path) -> tg::Result<()> { + self.remove_watch(path).await + } } diff --git a/packages/client/src/path.rs b/packages/client/src/path.rs index 34eb6c631..3b81adcd4 100644 --- a/packages/client/src/path.rs +++ b/packages/client/src/path.rs @@ -1,4 +1,4 @@ -use crate::{self as tg, error}; +use crate as tg; use derive_more::{TryUnwrap, Unwrap}; use std::{ffi::OsStr, path::PathBuf}; @@ -256,7 +256,7 @@ impl TryFrom for Path { value .as_os_str() .to_str() - .ok_or_else(|| error!("the path must be valid UTF-8"))? + .ok_or_else(|| tg::error!("the path must be valid UTF-8"))? .parse() } } @@ -270,7 +270,7 @@ impl<'a> TryFrom<&'a std::path::Path> for Path { .to_str() .ok_or_else(|| { let path = value.display(); - error!(%path, "the path must be valid UTF-8") + tg::error!(%path, "the path must be valid UTF-8") })? .parse() } diff --git a/packages/client/src/watch.rs b/packages/client/src/watch.rs new file mode 100644 index 000000000..610bdfaef --- /dev/null +++ b/packages/client/src/watch.rs @@ -0,0 +1,56 @@ +use crate::{self as tg, util::http::empty}; +use http_body_util::BodyExt; + +impl tg::Client { + pub(crate) async fn get_watches(&self) -> tg::Result> { + let method = http::Method::GET; + let uri = "/watches"; + let request = http::request::Builder::default().method(method).uri(uri); + let body = empty(); + let request = request + .body(body) + .map_err(|source| tg::error!(!source, "failed to create the request"))?; + let response = self.send(request).await?; + if !response.status().is_success() { + let bytes = response + .collect() + .await + .map_err(|source| tg::error!(!source, "failed to collect the response body"))? + .to_bytes(); + let error = serde_json::from_slice(&bytes) + .unwrap_or_else(|_| tg::error!("failed to deserialize the error")); + return Err(error); + } + let bytes = response + .collect() + .await + .map_err(|source| tg::error!(!source, "failed to collect the response body"))? + .to_bytes(); + let paths = serde_json::from_slice(&bytes) + .map_err(|source| tg::error!(!source, "failed to deserialize the response body"))?; + Ok(paths) + } + + pub(crate) async fn remove_watch(&self, path: &tg::Path) -> tg::Result<()> { + let method = http::Method::DELETE; + let path = urlencoding::encode(path.as_str()); + let uri = format!("/watches/{path}"); + let request = http::request::Builder::default().method(method).uri(uri); + let body = empty(); + let request = request + .body(body) + .map_err(|source| tg::error!(!source, "failed to create the request"))?; + let response = self.send(request).await?; + if !response.status().is_success() { + let bytes = response + .collect() + .await + .map_err(|source| tg::error!(!source, "failed to collect the response body"))? + .to_bytes(); + let error = serde_json::from_slice(&bytes) + .unwrap_or_else(|_| tg::error!("failed to deserialize the error")); + return Err(error); + } + Ok(()) + } +} diff --git a/packages/server/Cargo.toml b/packages/server/Cargo.toml index f7207f790..61d6087fb 100644 --- a/packages/server/Cargo.toml +++ b/packages/server/Cargo.toml @@ -48,6 +48,7 @@ indexmap = { workspace = true } indoc = { workspace = true } itertools = { workspace = true } libc = { workspace = true } +lru = { workspace = true } lsp-types = { workspace = true } mime = { workspace = true } notify = { workspace = true } diff --git a/packages/server/src/artifact.rs b/packages/server/src/artifact.rs index 76a27bb94..6d9fc07c4 100644 --- a/packages/server/src/artifact.rs +++ b/packages/server/src/artifact.rs @@ -33,7 +33,7 @@ impl Server { // Check in the artifact. let id = self - .check_in_artifact_inner(&arg.path, &transaction) + .check_in_artifact_inner(&arg.path, arg.watch, &transaction) .await?; // Commit the transaction. @@ -54,39 +54,64 @@ impl Server { async fn check_in_artifact_inner( &self, path: &tg::Path, + watch: bool, transaction: &Transaction<'_>, ) -> tg::Result { + // Check if we've already checked this artifact in. + if let Some(artifact) = self + .try_get_artifact_at_path(path, transaction) + .await + .map_err(|source| tg::error!(!source, %path, "failed to get artifact for path"))? + { + // Update the path for this artifact. + self.add_artifact_at_path(path, artifact.clone(), watch, transaction) + .await + .map_err( + |source| tg::error!(!source, %path, %artifact, "failed to set the artifact pat"), + )?; + return Ok(artifact); + } + // Get the metadata for the file system object at the path. let metadata = tokio::fs::symlink_metadata(&path).await.map_err( |source| tg::error!(!source, %path, "failed to get the metadata for the path"), )?; // Call the appropriate function for the file system object at the path. - if metadata.is_dir() { - self.check_in_directory(path, &metadata, transaction) + let artifact = if metadata.is_dir() { + self.check_in_directory(path, watch, &metadata, transaction) .await - .map_err(|source| tg::error!(!source, %path, "failed to check in the directory")) + .map_err(|source| tg::error!(!source, %path, "failed to check in the directory"))? } else if metadata.is_file() { - self.check_in_file(path, &metadata, transaction) + self.check_in_file(path, watch, &metadata, transaction) .await - .map_err(|source| tg::error!(!source, %path, "failed to check in the file")) + .map_err(|source| tg::error!(!source, %path, "failed to check in the file"))? } else if metadata.is_symlink() { - self.check_in_symlink(path, &metadata, transaction) + self.check_in_symlink(path, watch, &metadata, transaction) .await - .map_err(|source| tg::error!(!source, %path, "failed to check in the symlink")) + .map_err(|source| tg::error!(!source, %path, "failed to check in the symlink"))? } else { let file_type = metadata.file_type(); - Err(tg::error!( + return Err(tg::error!( %path, ?file_type, "invalid file type" - )) - } + )); + }; + + // Update the path for this artifact. + self.add_artifact_at_path(path, artifact.clone(), watch, transaction) + .await + .map_err( + |source| tg::error!(!source, %path, %artifact, "failed to set the artifact pat"), + )?; + Ok(artifact) } async fn check_in_directory( &self, path: &tg::Path, + watch: bool, _metadata: &std::fs::Metadata, transaction: &Transaction<'_>, ) -> tg::Result { @@ -119,7 +144,9 @@ impl Server { .into_iter() .map(|name| async { let path = path.clone().join(&name); - let id = self.check_in_artifact_inner(&path, transaction).await?; + let id = self + .check_in_artifact_inner(&path, watch, transaction) + .await?; Ok::<_, tg::Error>((name, id)) }) .collect::>() @@ -138,6 +165,7 @@ impl Server { async fn check_in_file( &self, path: &tg::Path, + _watch: bool, metadata: &std::fs::Metadata, transaction: &Transaction<'_>, ) -> tg::Result { @@ -184,6 +212,7 @@ impl Server { async fn check_in_symlink( &self, path: &tg::Path, + _watch: bool, _metadata: &std::fs::Metadata, transaction: &Transaction<'_>, ) -> tg::Result { @@ -278,7 +307,10 @@ impl Server { // Check in an existing artifact at the path. let existing_artifact = if exists { - let arg = tg::artifact::CheckInArg { path: path.clone() }; + let arg = tg::artifact::CheckInArg { + path: path.clone(), + watch: false, + }; let output = self.check_in_artifact(arg).await?; Some(tg::Artifact::with_id(output.id)) } else { diff --git a/packages/server/src/fsm.rs b/packages/server/src/fsm.rs new file mode 100644 index 000000000..4a793f49e --- /dev/null +++ b/packages/server/src/fsm.rs @@ -0,0 +1,487 @@ +use crate::{ + database::Transaction, + util::http::{empty, full, Incoming, Outgoing}, + Http, Server, +}; +use indoc::formatdoc; +use notify::Watcher; +use std::sync::Arc; +use tangram_client as tg; +use tangram_database::{self as db, prelude::*}; + +#[derive(Clone)] +pub struct FileSystemMonitor { + inner: Arc, +} + +struct Inner { + paths: std::sync::RwLock>, + watcher: std::sync::Mutex>, +} + +#[derive(Debug)] +struct ArtifactPath { + artifact: tg::artifact::Id, + id: u64, + mtime: Option<[u8; 16]>, +} + +impl FileSystemMonitor { + /// Start the file system watcher. + pub fn start(server: &Server) -> tg::Result { + // Create the paths. + let paths = lru::LruCache::unbounded_with_hasher(fnv::FnvBuildHasher::default()); + + // Create the server. + let file_system_monitor = Self { + inner: Arc::new(Inner { + paths: std::sync::RwLock::new(paths), + watcher: std::sync::Mutex::new(None), + }), + }; + + let main_runtime_handle = tokio::runtime::Handle::current(); + let watcher = notify::recommended_watcher({ + let file_system_monitor = file_system_monitor.clone(); + let server = server.clone(); + move |event: Result| { + let Ok(event) = + event.inspect_err(|error| tracing::error!(%error, "failed to get event")) + else { + return; + }; + if !event.kind.is_modify() { + return; + } + main_runtime_handle.spawn({ + let file_system_monitor = file_system_monitor.clone(); + async move { + let Ok(mut connection) = + server.inner.database.connection().await.inspect_err( + |error| tracing::error!(%error, "failed to get database connection"), + ) + else { + return; + }; + let Ok(transaction) = connection.transaction().await.inspect_err( + |error| tracing::error!(%error, "failed to create database transaction"), + ) else { + return; + }; + for path in event.paths { + let Ok::(path) = path.try_into() else { + continue; + }; + + // Update the cache. + file_system_monitor + .inner + .paths + .write() + .unwrap() + .put(path.clone(), ()); + + // Update the database. + server + .remove_artifact_at_path(&path, &transaction) + .await + .inspect_err(|error| { + tracing::error!( %path, %error, "failed to remove artifact path"); + }) + .ok(); + } + transaction + .commit() + .await + .inspect_err( + |error| tracing::error!(%error, "failed to commit the transaction"), + ) + .ok(); + } + }); + } + }) + .map_err(|source| tg::error!(!source, "failed to create watcher"))?; + + file_system_monitor + .inner + .watcher + .lock() + .unwrap() + .replace(watcher); + + Ok(file_system_monitor) + } + + /// Stop the file watcher. + pub fn stop(&self) { + self.inner.watcher.lock().unwrap().take(); + self.inner.paths.write().unwrap().clear(); + } + + /// Get the current list of paths that are being watched. + pub fn paths(&self) -> Vec { + self.inner + .paths + .read() + .unwrap() + .iter() + .map(|(k, ())| k.clone()) + .collect() + } + + /// Add a path to the watcher. + pub fn add(&self, path: tg::Path) -> tg::Result<()> { + let path = path.normalize(); + if self.inner.paths.read().unwrap().contains(&path) { + return Ok(()); + } + + loop { + // Get the watcher, bailing out if it's been removed. + let mut watcher_ = self.inner.watcher.lock().unwrap(); + let Some(watcher) = watcher_.as_mut() else { + return Ok(()); + }; + + // Try and add the watch. + let Err(source) = watcher.watch(path.as_ref(), notify::RecursiveMode::Recursive) else { + break; + }; + + // If we've hit the limit of files to watch, pop from the cache and try again. + if let notify::ErrorKind::MaxFilesWatch = &source.kind { + if self.inner.paths.write().unwrap().pop_lru().is_some() { + continue; + } + } + + // Something else went wrong. + return Err(tg::error!(!source, %path, "failed to add path to file watcher")); + } + + // Add the path and the current time. + self.inner.paths.write().unwrap().push(path, ()); + Ok(()) + } + + /// Remove a path from the file system watcher. + pub fn remove(&self, path: &tg::Path) -> tg::Result<()> { + let path = path.clone().normalize(); + if !self.inner.paths.read().unwrap().contains(&path) { + return Ok(()); + } + let mut watcher = self.inner.watcher.lock().unwrap(); + let Some(watcher) = watcher.as_mut() else { + return Ok(()); + }; + watcher + .unwatch(path.as_ref()) + .map_err(|source| tg::error!(!source, %path, "failed to remove path from watcher"))?; + self.inner.paths.write().unwrap().pop(&path); + Ok(()) + } +} + +impl Server { + pub(crate) fn get_watches(&self) -> Vec { + let paths = self + .inner + .file_system_monitor + .lock() + .unwrap() + .as_ref() + .map(crate::fsm::FileSystemMonitor::paths) + .unwrap_or_default(); + paths + } + + pub(crate) fn remove_watch(&self, path: &tg::Path) -> tg::Result<()> { + self.inner + .file_system_monitor + .lock() + .unwrap() + .as_ref() + .map_or(Ok(()), |f| f.remove(path)) + } + + pub(crate) async fn add_artifact_at_path( + &self, + path: &tg::Path, + artifact: tg::artifact::Id, + watch: bool, + transaction: &Transaction<'_>, + ) -> tg::Result<()> { + // Add to the file watcher if appropriate. + if watch { + self.inner + .file_system_monitor + .lock() + .unwrap() + .as_ref() + .map(|f| f.add(path.clone())); + } + + // Update the database. + self.add_artifact_path(path, artifact, transaction) + .await + .map_err(|source| tg::error!(!source, %path, "failed to add the artifact path"))?; + + Ok(()) + } + + pub(crate) async fn remove_artifact_at_path( + &self, + path: &tg::Path, + transaction: &Transaction<'_>, + ) -> tg::Result<()> { + let Some(artifact_path) = self + .try_get_artifact_path(path, transaction) + .await + .map_err(|source| tg::error!(!source, "failed to get the artifact at path"))? + else { + return Ok(()); + }; + self.remove_artifact_path(&artifact_path, transaction) + .await + .map_err(|source| tg::error!(!source, "failed to remove the artifact"))?; + Ok(()) + } + + pub(crate) async fn try_get_artifact_at_path( + &self, + path: &tg::Path, + transaction: &Transaction<'_>, + ) -> tg::Result> { + // Try to get the artifact path. + let Some(artifact_path) = self + .try_get_artifact_path(path, transaction) + .await + .map_err(|source| tg::error!(!source, %path, "failed to get artifact path"))? + else { + return Ok(None); + }; + + // If the mtimes don't match, remove the artifact path. + if !matches!(artifact_path.artifact, tg::artifact::Id::Directory(_)) { + let mtime = get_mtime(path).await?; + if Some(mtime) != artifact_path.mtime { + self.remove_artifact_path(&artifact_path, transaction) + .await + .map_err( + |source| tg::error!(!source, %path, "failed to remove the artifact path"), + )?; + return Ok(None); + } + } + Ok(Some(artifact_path.artifact)) + } + + async fn add_artifact_path( + &self, + path: &tg::Path, + artifact: tg::artifact::Id, + transaction: &Transaction<'_>, + ) -> tg::Result<()> { + let path = path.clone().normalize(); + let mut components = path + .clone() + .normalize() + .into_components() + .into_iter() + .skip(1) + .map(tg::path::Component::unwrap_normal) + .peekable(); + + let mtime = if matches!(artifact, tg::artifact::Id::Directory(_)) { + None + } else { + Some(get_mtime(&path).await?) + }; + + let mut parent = 0; + while let Some(name) = components.next() { + // If this is the last component of the path, get the artifact and mtime. + let (mtime, artifact) = if components.peek().is_none() { + (mtime, Some(artifact.clone())) + } else { + (None, None) + }; + // If parent is not null we can upsert the row. + let p = self.inner.database.p(); + let statement = formatdoc!( + " + insert into artifact_paths (parent, name, mtime, artifact) + values ({p}1, {p}2, {p}3, {p}4) + on conflict (parent, name) + do update set + mtime = {p}3, + artifact = {p}4 + returning id; + " + ); + let params = db::params![parent, name, mtime, artifact]; + parent = transaction + .query_one_value_into(statement, params) + .await + .map_err(|source| tg::error!(!source, "failed to perform the query"))?; + } + + Ok(()) + } + + async fn remove_artifact_path( + &self, + artifact_path: &ArtifactPath, + transaction: &Transaction<'_>, + ) -> tg::Result<()> { + // Invalidate the parents. + let mut id = artifact_path.id; + while id != 0 { + let p = self.inner.database.p(); + let statement = formatdoc!( + " + update artifact_paths + set + artifact = null, + mtime = null + where id = {p}1 + returning parent; + " + ); + let params = db::params![id]; + id = transaction + .query_one_value_into(statement, params) + .await + .map_err(|source| tg::error!(!source, "failed to execute the statement"))?; + } + + Ok(()) + } + + async fn try_get_artifact_path( + &self, + path: &tg::Path, + transaction: &Transaction<'_>, + ) -> tg::Result> { + let path = path.clone().normalize(); + let mut components = path + .clone() + .normalize() + .into_components() + .into_iter() + .skip(1) + .map(tg::path::Component::unwrap_normal) + .peekable(); + + let mut parent = 0; + while let Some(name) = components.next() { + #[derive(serde::Deserialize)] + struct Row { + id: u64, + mtime: Option<[u8; 16]>, + artifact: Option, + } + let p = self.inner.database.p(); + let statement = formatdoc!( + " + select id, mtime, artifact + from artifact_paths + where name = {p}1 and parent = {p}2; + " + ); + let params = db::params![name, parent]; + let row = transaction + .query_optional_into::(statement, params) + .await + .map_err(|source| tg::error!(!source, "failed to perform the query"))?; + let Some(row) = row else { + return Ok(None); + }; + + // If this is not the last component, continue. + if components.peek().is_some() { + parent = row.id; + continue; + } + + // Otherwise update the artifact/mtime fields. + let Some(artifact) = row.artifact else { + return Ok(None); + }; + let mtime = row.mtime; + return Ok(Some(ArtifactPath { + id: row.id, + mtime, + artifact, + })); + } + + Ok(None) + } +} + +async fn get_mtime(path: &tg::Path) -> tg::Result<[u8; 16]> { + let metadata = tokio::fs::symlink_metadata(path) + .await + .map_err(|source| tg::error!(!source, %path, "failed to get file metadata"))?; + + let mtime = metadata + .modified() + .map_err(|source| tg::error!(!source, "failed to get mtime"))? + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_micros() + .to_le_bytes(); + + Ok(mtime) +} + +impl Http +where + H: tg::Handle, +{ + pub async fn handle_get_watches_request( + &self, + _request: http::Request, + ) -> tg::Result> { + // Get the watches. + let watches = self.inner.tg.list_watches().await?; + + // Create the body. + let body = serde_json::to_vec(&watches) + .map_err(|source| tg::error!(!source, "failed to serialize the body"))?; + let body = full(body); + + // Create the response. + let response = http::Response::builder().body(body).unwrap(); + + Ok(response) + } + + pub async fn handle_remove_watch_request( + &self, + request: http::Request, + ) -> tg::Result> { + // Get the path params. + let path_components: Vec<&str> = request.uri().path().split('/').skip(1).collect(); + let ["watches", path] = path_components.as_slice() else { + let path = request.uri().path(); + return Err(tg::error!(%path, "unexpected path")); + }; + let path = serde_urlencoded::from_str(path) + .map_err(|source| tg::error!(!source, "failed to serialize path"))?; + + // Remove the watch. + self.inner + .tg + .remove_watch(&path) + .await + .map_err(|source| tg::error!(!source, %path, "failed to remove the path"))?; + + // Create the response + let response = http::Response::builder().body(empty()).unwrap(); + + Ok(response) + } +} diff --git a/packages/server/src/lib.rs b/packages/server/src/lib.rs index 84eae15c8..3864df794 100644 --- a/packages/server/src/lib.rs +++ b/packages/server/src/lib.rs @@ -11,6 +11,7 @@ use self::{ use async_nats as nats; use bytes::Bytes; use either::Either; +use fsm::FileSystemMonitor; use futures::{future, stream::FuturesUnordered, FutureExt, Stream, TryFutureExt, TryStreamExt}; use http_body_util::BodyExt; use hyper_util::rt::{TokioExecutor, TokioIo}; @@ -37,6 +38,7 @@ mod blob; mod build; mod clean; mod database; +mod fsm; mod language; mod messenger; mod migrations; @@ -62,6 +64,7 @@ struct Inner { build_state: std::sync::RwLock, fnv::FnvBuildHasher>>, database: Database, file_descriptor_semaphore: tokio::sync::Semaphore, + file_system_monitor: std::sync::Mutex>, http: std::sync::Mutex>>, local_pool_handle: tokio_util::task::LocalPoolHandle, lockfile: std::sync::Mutex>, @@ -216,6 +219,9 @@ impl Server { let file_descriptor_semaphore = tokio::sync::Semaphore::new(options.advanced.file_descriptor_semaphore_size); + // Create the file system monitor. + let file_system_monitor = std::sync::Mutex::new(None); + // Create the http server. let http = std::sync::Mutex::new(None); @@ -277,6 +283,7 @@ impl Server { build_state, database, file_descriptor_semaphore, + file_system_monitor, http, local_pool_handle, lockfile, @@ -314,6 +321,18 @@ impl Server { .await?; } + // Start the file system monitor. + if server.inner.options.file_system_monitor.enable { + let file_system_monitor = FileSystemMonitor::start(&server) + .map_err(|source| tg::error!(!source, "failed to start the file system monitor"))?; + server + .inner + .file_system_monitor + .lock() + .unwrap() + .replace(file_system_monitor); + } + // Start the VFS if necessary and set up the checkouts directory. let artifacts_path = server.artifacts_path(); self::vfs::unmount(&artifacts_path).await.ok(); @@ -480,6 +499,12 @@ impl Server { // Clear the build state. server.inner.build_state.write().unwrap().clear(); + // Stop the file system monitor. + let file_system_monitor = server.inner.file_system_monitor.lock().unwrap().clone(); + if let Some(file_system_monitor) = file_system_monitor { + file_system_monitor.stop(); + } + // Join the vfs server. let vfs = server.inner.vfs.lock().unwrap().clone(); if let Some(vfs) = vfs { @@ -861,6 +886,14 @@ where .map(Some) .boxed(), + // Watch. + (http::Method::GET, ["watches"]) => { + self.handle_get_watches_request(request).map(Some).boxed() + }, + (http::Method::DELETE, ["watches", _]) => { + self.handle_remove_watch_request(request).map(Some).boxed() + }, + (_, _) => future::ready(None).boxed(), } .await; @@ -1159,6 +1192,14 @@ impl tg::Handle for Server { async fn get_user_for_token(&self, token: &str) -> tg::Result> { self.get_user_for_token(token).await } + + async fn list_watches(&self) -> tg::Result> { + Ok(self.get_watches()) + } + + async fn remove_watch(&self, path: &tg::Path) -> tg::Result<()> { + self.remove_watch(path) + } } impl AsRef for Tmp { diff --git a/packages/server/src/migrations.rs b/packages/server/src/migrations.rs index c33c8f0c5..58b38bb1f 100644 --- a/packages/server/src/migrations.rs +++ b/packages/server/src/migrations.rs @@ -71,12 +71,14 @@ async fn migration_0000(path: &Path) -> tg::Result<()> { " create table artifact_paths ( id integer primary key, - parent integer, + parent integer not null, name text not null, mtime integer, - artifact text not null + artifact text, ); + create index artifact_paths_parent_name_index on artifact_paths (parent, name); + create table builds ( id text primary key, complete integer not null, diff --git a/packages/server/src/options.rs b/packages/server/src/options.rs index bb575793c..11ef7f48a 100644 --- a/packages/server/src/options.rs +++ b/packages/server/src/options.rs @@ -7,6 +7,7 @@ pub struct Options { pub advanced: Advanced, pub build: Build, pub database: Database, + pub file_system_monitor: FileSystemMonitor, pub messenger: Messenger, pub oauth: Oauth, pub path: PathBuf, @@ -36,6 +37,11 @@ pub enum Database { Postgres(PostgresDatabase), } +#[derive(Clone, Debug)] +pub struct FileSystemMonitor { + pub enable: bool, +} + #[derive(Clone, Debug)] pub struct SqliteDatabase { pub max_connections: usize, diff --git a/packages/server/src/package.rs b/packages/server/src/package.rs index 0625c9057..d931e3fcf 100644 --- a/packages/server/src/package.rs +++ b/packages/server/src/package.rs @@ -237,7 +237,8 @@ impl Server { // Add the module to the package directory. let artifact = - tg::Artifact::check_in(self, &module_absolute_path.clone().try_into()?).await?; + tg::Artifact::check_in(self, &module_absolute_path.clone().try_into()?, true) + .await?; package = package.add(self, &module_path, artifact).await?; // Get the module's text. @@ -264,7 +265,7 @@ impl Server { // Check in the artifact at the included path. let included_artifact = - tg::Artifact::check_in(self, &included_artifact_absolute_path).await?; + tg::Artifact::check_in(self, &included_artifact_absolute_path, false).await?; // Add the included artifact to the directory. package = package diff --git a/packages/server/src/runtime/darwin.rs b/packages/server/src/runtime/darwin.rs index 831069e37..9f4587fe5 100644 --- a/packages/server/src/runtime/darwin.rs +++ b/packages/server/src/runtime/darwin.rs @@ -458,7 +458,7 @@ impl Runtime { .map_err(|source| tg::error!(!source, "failed to determine if the path exists"))? { // Check in the output. - let artifact = tg::Artifact::check_in(server, &output_path.clone().try_into()?) + let artifact = tg::Artifact::check_in(server, &output_path.clone().try_into()?, false) .await .map_err(|source| tg::error!(!source, "failed to check in the output"))?; diff --git a/packages/server/src/runtime/linux.rs b/packages/server/src/runtime/linux.rs index d5e6f1ba0..5e699c928 100644 --- a/packages/server/src/runtime/linux.rs +++ b/packages/server/src/runtime/linux.rs @@ -768,9 +768,10 @@ impl Runtime { .map_err(|source| tg::error!(!source, "failed to determine in the path exists"))? { // Check in the output. - let artifact = tg::Artifact::check_in(server, &output_host_path.clone().try_into()?) - .await - .map_err(|source| tg::error!(!source, "failed to check in the output"))?; + let artifact = + tg::Artifact::check_in(server, &output_host_path.clone().try_into()?, false) + .await + .map_err(|source| tg::error!(!source, "failed to check in the output"))?; // Verify the checksum if one was provided. if let Some(expected) = target.checksum(server).await?.clone() { diff --git a/packages/server/src/runtime/proxy.rs b/packages/server/src/runtime/proxy.rs index 629202c99..10e26f696 100644 --- a/packages/server/src/runtime/proxy.rs +++ b/packages/server/src/runtime/proxy.rs @@ -398,4 +398,12 @@ impl tg::Handle for Server { async fn get_user_for_token(&self, _token: &str) -> tg::Result> { Err(tg::error!("forbidden")) } + + async fn list_watches(&self) -> tg::Result> { + Err(tg::error!("forbidden")) + } + + async fn remove_watch(&self, _path: &tg::Path) -> tg::Result<()> { + Err(tg::error!("forbidden")) + } }