diff --git a/packages/runtime/src/artifact.ts b/packages/runtime/src/artifact.ts index 30ad171a3..29788a74f 100644 --- a/packages/runtime/src/artifact.ts +++ b/packages/runtime/src/artifact.ts @@ -58,7 +58,10 @@ export namespace Artifact { return value; }; - export let extract = async (blob: tg.Blob): Promise => { + export let extract = async (blob: tg.Blob | tg.File): Promise => { + if (blob instanceof tg.File) { + blob = await blob.contents(); + } let value = await tg.build({ args: ["extract", blob], env: undefined, diff --git a/packages/runtime/src/blob.ts b/packages/runtime/src/blob.ts index 5b8871993..06ea36257 100644 --- a/packages/runtime/src/blob.ts +++ b/packages/runtime/src/blob.ts @@ -70,9 +70,12 @@ export namespace Blob { }; export let compress = async ( - blob: Blob, + blob: Blob | tg.File, format: CompressionFormat, ): Promise => { + if (blob instanceof tg.File) { + blob = await blob.contents(); + } let value = await tg.build({ args: ["compress", blob, format], env: undefined, @@ -82,7 +85,10 @@ export namespace Blob { return value; }; - export let decompress = async (blob: Blob): Promise => { + export let decompress = async (blob: Blob | tg.File): Promise => { + if (blob instanceof tg.File) { + blob = await blob.contents(); + } let value = await tg.build({ args: ["decompress", blob], env: undefined, diff --git a/packages/runtime/tangram.d.ts b/packages/runtime/tangram.d.ts index 5bd678f84..66b5c84ec 100644 --- a/packages/runtime/tangram.d.ts +++ b/packages/runtime/tangram.d.ts @@ -92,13 +92,13 @@ declare namespace tg { /** Compress a blob. **/ export let compress: ( - blob: tg.Blob, + blob: tg.Blob | tg.File, format: tg.Blob.CompressionFormat, ) => Promise; /** Decompress a blob. **/ export let decompress: ( - blob: tg.Blob, + blob: tg.Blob | tg.File, ) => Promise; /** Download the contents of a URL. */ @@ -125,13 +125,13 @@ declare namespace tg { /** Compress a blob. **/ export let compress: ( - blob: tg.Blob, + blob: tg.Blob | tg.File, format: tg.Blob.CompressionFormat, ) => Promise; /** Decompress a blob. **/ export let decompress: ( - blob: tg.Blob, + blob: tg.Blob | tg.File, ) => Promise; /** Download a blob. **/ @@ -238,7 +238,7 @@ declare namespace tg { /** Extract an artifact from an archive. **/ export let extract: ( - blob: tg.Blob, + blob: tg.Blob | tg.File, ) => Promise; /** Bundle an artifact. **/ @@ -271,7 +271,7 @@ declare namespace tg { /** Extract an artifact from an archive. **/ export let extract: ( - blob: tg.Blob, + blob: tg.Blob | tg.File, ) => Promise; /** Bundle an artifact. **/ diff --git a/packages/server/src/blob/read.rs b/packages/server/src/blob/read.rs index c1c279d16..4545f4c9b 100644 --- a/packages/server/src/blob/read.rs +++ b/packages/server/src/blob/read.rs @@ -178,9 +178,9 @@ impl Reader { .map_err(|source| tg::error!(!source, "failed to execute the statemtent"))?; let reader = if let Some(row) = row { let blob_path = server.blobs_path().join(row.blob.to_string()); - let file = tokio::fs::File::open(blob_path) - .await - .map_err(|source| tg::error!(!source, "failed to open the file"))?; + let file = tokio::fs::File::open(&blob_path).await.map_err( + |source| tg::error!(!source, %path = blob_path.display(), "failed to open the file"), + )?; let reader = File::new(file, row.position, row.length).await?; Self::File(reader) } else { diff --git a/packages/server/src/runtime/builtin/extract.rs b/packages/server/src/runtime/builtin/extract.rs index a7e00cb96..4bcc285e8 100644 --- a/packages/server/src/runtime/builtin/extract.rs +++ b/packages/server/src/runtime/builtin/extract.rs @@ -1,16 +1,53 @@ use super::Runtime; -use crate::Server; +use crate::{Server, blob::create::Destination, temp::Temp}; use futures::{AsyncReadExt as _, StreamExt as _}; +use indoc::indoc; +use num::ToPrimitive as _; use std::{ + collections::BTreeMap, path::{Path, PathBuf}, pin::Pin, + sync::Arc, time::Duration, }; -use tangram_client as tg; +use tangram_client::{self as tg}; +use tangram_database::{Connection as _, Database as _, Transaction as _}; +use tangram_either::Either; use tangram_futures::read::shared_position_reader::SharedPositionReader; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeek}; +use time::format_description::well_known::Rfc3339; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt as _, AsyncSeek}; use tokio_util::compat::{FuturesAsyncReadCompatExt as _, TokioAsyncReadCompatExt as _}; +#[derive(Debug, Clone)] +enum Artifact { + Directory { + id: Option, + data: Option, + entries: BTreeMap, + depth: Option, + count: Option, + size: Option, + weight: Option, + }, + File { + id: tg::file::Id, + data: tg::file::Data, + contents: crate::blob::create::Blob, + count: u64, + depth: Option, + size: u64, + weight: u64, + }, + Symlink { + id: tg::symlink::Id, + data: tg::symlink::Data, + count: u64, + depth: Option, + size: u64, + weight: u64, + }, +} + impl Runtime { pub async fn extract(&self, process: &tg::Process) -> tg::Result { let server = &self.server; @@ -126,8 +163,11 @@ where // Create the reader. let mut reader = tokio_tar::Archive::new(reader); - // Get the entries. - let mut entries: Vec<(PathBuf, tg::Artifact)> = Vec::new(); + // Create the root. + let mut root = Artifact::empty_directory().with_depth(0); + let root_path = PathBuf::from("."); + + // Add the entries. let mut iter = reader .entries() .map_err(|source| tg::error!(!source, "failed to get the entries from the archive"))?; @@ -141,20 +181,20 @@ where .map_err(|source| tg::error!(!source, "failed to get the entry path"))? .as_ref(), ); - match header.entry_type() { + match &header.entry_type() { tokio_tar::EntryType::Directory => { - let directory = tg::Directory::with_entries([].into()); - let artifact = tg::Artifact::Directory(directory); - entries.push((path, artifact)); + if path == root_path { + continue; + } + root.add_entry(path, Artifact::empty_directory())?; }, tokio_tar::EntryType::Symlink => { let target = header .link_name() .map_err(|source| tg::error!(!source, "failed to read the symlink target"))? .ok_or_else(|| tg::error!("no symlink target stored in the archive"))?; - let symlink = tg::Symlink::with_target(target.as_ref().into()); - let artifact = tg::Artifact::Symlink(symlink); - entries.push((path, artifact)); + let symlink = Artifact::symlink(target)?; + root.add_entry(path, symlink)?; }, tokio_tar::EntryType::Link => { let target = header @@ -164,8 +204,11 @@ where })? .ok_or_else(|| tg::error!("no hard link target path stored in the archive"))?; let target = Path::new(target.as_ref()); - if let Some((_, artifact)) = entries.iter().find(|(path, _)| path == target) { - entries.push((path, artifact.clone())); + if let Some(artifact) = root + .find(target) + .map_err(|source| tg::error!(!source, "could not search artifact"))? + { + root.add_entry(path, artifact.clone())?; } else { return Err(tg::error!( "could not find the hard link target in the archive" @@ -181,27 +224,20 @@ where .mode() .map_err(|source| tg::error!(!source, "failed to read the entry mode"))?; let executable = mode & 0o111 != 0; - let blob = tg::Blob::with_reader(server, entry).await?; - let file = tg::File::builder(blob).executable(executable).build(); - let artifact = tg::Artifact::File(file); - entries.push((path, artifact)); + let file = Artifact::file(server, entry, executable).await?; + root.add_entry(path, file)?; }, } } - // Build the directory. - let mut builder = tg::directory::Builder::default(); - for (path, artifact) in entries { - if !path - .components() - .all(|component| matches!(component, std::path::Component::CurDir)) - { - builder = builder.add(server, &path, artifact).await?; - } - } - let directory = builder.build(); + // Insert and store the result. + root.persist(server) + .await + .map_err(|source| tg::error!(!source, "failed to persist archive contents"))?; - Ok(directory.into()) + // Return the result. + let artifact = root.as_artifact()?; + Ok(artifact) } async fn zip(server: &Server, reader: R) -> tg::Result @@ -213,7 +249,9 @@ where .await .map_err(|source| tg::error!(!source, "failed to create the zip reader"))?; - let mut entries: Vec<(PathBuf, tg::Artifact)> = Vec::new(); + // Create the root. + let mut root = Artifact::empty_directory().with_depth(0); + for index in 0..reader.file().entries().len() { // Get the reader. let mut reader = reader @@ -250,9 +288,7 @@ where // Create the artifacts. if is_dir { - let directory = tg::Directory::with_entries([].into()); - let artifact = tg::Artifact::Directory(directory); - entries.push((path, artifact)); + root.add_entry(path, Artifact::empty_directory())?; } else if is_symlink { let mut buffer = Vec::new(); reader @@ -261,31 +297,22 @@ where .map_err(|source| tg::error!(!source, "failed to read symlink target"))?; let target = core::str::from_utf8(&buffer) .map_err(|source| tg::error!(!source, "symlink target not valid UTF-8"))?; - let symlink = tg::Symlink::with_target(target.into()); - let artifact = tg::Artifact::Symlink(symlink); - entries.push((path, artifact)); + let symlink = Artifact::symlink(target)?; + root.add_entry(path, symlink)?; } else { - let output = server.create_blob(reader.compat()).await?; - let blob = tg::Blob::with_id(output.blob); - let file = tg::File::builder(blob).executable(is_executable).build(); - let artifact = tg::Artifact::File(file); - entries.push((path, artifact)); + let file = Artifact::file(server, reader.compat(), is_executable).await?; + root.add_entry(path, file)?; } } - // Build the directory. - let mut builder = tg::directory::Builder::default(); - for (path, artifact) in entries { - if !path - .components() - .all(|component| matches!(component, std::path::Component::CurDir)) - { - builder = builder.add(server, &path, artifact).await?; - } - } - let directory = builder.build(); + // Insert and store the result. + root.persist(server) + .await + .map_err(|source| tg::error!(!source, "failed to persist archive contents"))?; - Ok(directory.into()) + // Return the result. + let artifact = root.as_artifact()?; + Ok(artifact) } async fn detect_archive_format( @@ -416,3 +443,522 @@ fn parse_octal_checksum(bytes: &[u8]) -> tg::Result { Err(_) => Err(tg::error!("Invalid tar checksum format")), } } + +impl Artifact { + fn empty_directory() -> Self { + Self::Directory { + id: None, + data: None, + entries: BTreeMap::default(), + count: None, + depth: None, + size: None, + weight: None, + } + } + + async fn file(server: &Server, reader: impl AsyncRead, executable: bool) -> tg::Result { + // Create the blob, storing to a temp. + let temp = Temp::new(server); + let temp_path = temp.path().to_path_buf(); + let destination = Destination::Temp(temp); + let contents = server.create_blob_inner(reader, Some(&destination)).await?; + let blob_path = server.blobs_path().join(contents.id.to_string()); + tokio::fs::rename(temp_path, blob_path) + .await + .map_err(|source| { + tg::error!(!source, "failed to rename the file to the blobs directory") + })?; + + let data = tg::file::Data::Normal { + contents: contents.id.clone(), + dependencies: BTreeMap::new(), + executable, + }; + let bytes = data.serialize()?; + let id = tg::file::Id::new(&bytes); + let count = contents.count + 1; + let depth = None; + let size = bytes.len().to_u64().unwrap(); + let weight = contents.weight + size; + let file = Artifact::File { + id, + data, + contents, + count, + size, + weight, + depth, + }; + Ok(file) + } + + fn symlink(target: impl AsRef) -> tg::Result { + let data = tg::symlink::Data::Target { + target: target.as_ref().to_path_buf(), + }; + let bytes = data.serialize()?; + let id = tg::symlink::Id::new(&bytes); + let count = 1; + let depth = None; + let size = bytes.len().to_u64().unwrap(); + let weight = size; + let symlink = Artifact::Symlink { + id, + data, + count, + size, + weight, + depth, + }; + Ok(symlink) + } + + fn add_entry(&mut self, path: impl AsRef, artifact: Artifact) -> tg::Result<()> { + let path = path.as_ref(); + + // Ensure we're a directory. + let Artifact::Directory { depth, entries, .. } = self else { + return Err( + tg::error!(?self, %path = path.display(), "can only add entries to directories"), + ); + }; + let Some(depth) = depth else { + return Err(tg::error!(?self, "depth not set")); + }; + let depth = *depth; + + let mut components = path.components(); + let name = loop { + match components.next() { + Some(std::path::Component::Normal(name)) => { + break name; + }, + Some(std::path::Component::CurDir) => (), + _ => { + return Err( + tg::error!(%path = path.display(), "expected a normal path component"), + ); + }, + } + }; + + let name = name + .to_str() + .ok_or_else(|| tg::error!(?name, "failed to convert name to utf8 string"))? + .to_string(); + + let mut remaining_components = components.peekable(); + if remaining_components.peek().is_some() { + // Find the parent directory. + let parent = entries + .entry(name) + .or_insert_with(|| Artifact::empty_directory().with_depth(depth + 1)); + if !matches!(parent, Artifact::Directory { .. }) { + return Err(tg::error!(?parent, "parent path was not a directory")); + } + + // Insert the artifact. + let remaining_path: PathBuf = remaining_components.collect(); + parent.add_entry(remaining_path, artifact)?; + } else { + // Set the correct depth on the artifact and insert. + let artifact = artifact.with_depth(depth + 1); + entries.insert(name, artifact); + } + + Ok(()) + } + + fn as_artifact(&self) -> tg::Result { + let id = self.id()?; + let artifact = tg::Artifact::with_id(id); + Ok(artifact) + } + + fn compute_metadata(&mut self) -> tg::Result<()> { + if let Artifact::Directory { + id, + data, + entries, + depth, + count, + size, + weight, + } = self + { + // Compute data, recursively computing children as needed. + let entries_ = entries + .iter_mut() + .map(|(name, artifact)| { + if let Ok(id) = artifact.id() { + Ok((name.clone(), id)) + } else { + artifact.compute_metadata()?; + artifact.id().map(|id| (name.clone(), id)) + } + }) + .collect::, _>>()?; + let data_ = tg::directory::data::Directory::Normal { entries: entries_ }; + + // Compute size. + let bytes = data_.serialize()?; + let size_ = bytes.len().to_u64().unwrap(); + + // Compute ID. + let id_ = tg::directory::Id::new(&bytes); + + // Compute metadata. + let (count_, depth_, weight_) = entries.values().try_fold( + (1, 1, size_), + |(acc_count, acc_depth, acc_weight), artifact| { + let count = artifact.count()?; + let depth = artifact.depth()?; + let weight = artifact.weight()?; + let output = (acc_count + count, acc_depth.max(depth), acc_weight + weight); + Ok::<_, tg::Error>(output) + }, + )?; + + // Store values. + *data = Some(data_); + *id = Some(id_); + *count = Some(count_); + *size = Some(size_); + *depth = Some(depth_); + *weight = Some(weight_); + } + Ok(()) + } + + fn count(&self) -> tg::Result { + let count = match self { + Artifact::Symlink { count, .. } | Artifact::File { count, .. } => *count, + Artifact::Directory { count, .. } => { + let Some(count) = count else { + return Err(tg::error!(?self, "directory count not yet computed")); + }; + *count + }, + }; + Ok(count) + } + + fn data(&self) -> tg::Result { + let data = match self { + Artifact::Symlink { data, .. } => data.clone().into(), + Artifact::File { data, .. } => data.clone().into(), + Artifact::Directory { data, .. } => { + let Some(data) = data else { + return Err(tg::error!(?self, "directory data not yet computed")); + }; + data.clone().into() + }, + }; + Ok(data) + } + + fn depth(&self) -> tg::Result { + let depth = match &self { + Artifact::Symlink { depth, .. } + | Artifact::File { depth, .. } + | Artifact::Directory { depth, .. } => { + let Some(depth) = depth else { + return Err(tg::error!(?self, "missing depth")); + }; + *depth + }, + }; + Ok(depth) + } + + fn id(&self) -> tg::Result { + let id = match self { + Artifact::Symlink { id, .. } => id.clone().into(), + Artifact::File { id, .. } => id.clone().into(), + Artifact::Directory { id, .. } => { + let Some(id) = id else { + return Err(tg::error!(?self, "directory id not yet computed")); + }; + id.clone().into() + }, + }; + Ok(id) + } + + fn weight(&self) -> tg::Result { + let weight = match &self { + Artifact::Symlink { weight, .. } | Artifact::File { weight, .. } => *weight, + Artifact::Directory { weight, .. } => { + let Some(weight) = weight else { + return Err(tg::error!(?self, "directory weight not yet computed")); + }; + *weight + }, + }; + Ok(weight) + } + + fn find(&self, path: impl AsRef) -> tg::Result> { + let path = path.as_ref(); + + let Artifact::Directory { entries, .. } = self else { + return Ok(None); + }; + + let mut components = path.components(); + let name = loop { + match components.next() { + Some(std::path::Component::Normal(name)) => { + break name; + }, + Some(std::path::Component::CurDir) => (), + None => { + return Ok(Some(self)); + }, + _ => { + return Ok(None); + }, + } + }; + + let name = name + .to_str() + .ok_or_else(|| tg::error!(?name, "failed to convert name to utf8 string"))?; + + let entry = entries + .get(name) + .ok_or_else(|| tg::error!(%path = path.display(), "path not found"))?; + + let mut remaining_components = components.peekable(); + if remaining_components.peek().is_some() { + let remaining_path: PathBuf = remaining_components.collect(); + entry.find(remaining_path) + } else { + Ok(Some(entry)) + } + } + + fn with_depth(self, depth: u64) -> Self { + let depth = Some(depth); + match self { + Self::Directory { + id, + data, + entries, + count, + size, + weight, + .. + } => Self::Directory { + id, + data, + entries, + count, + depth, + size, + weight, + }, + Self::File { + id, + data, + contents, + count, + size, + weight, + .. + } => Self::File { + id, + data, + contents, + count, + depth, + size, + weight, + }, + Self::Symlink { + id, + data, + count, + size, + weight, + .. + } => Self::Symlink { + id, + data, + count, + depth, + size, + weight, + }, + } + } + + async fn persist(&mut self, server: &Server) -> tg::Result<()> { + self.compute_metadata()?; + + let artifact = Arc::new(self.clone()); + let touched_at = time::OffsetDateTime::now_utc().unix_timestamp(); + let result = futures::try_join!( + artifact.database_insert(server, touched_at), + artifact.store_put(server, touched_at) + ); + if let Err(error) = result { + return Err(tg::error!(!error, "failed to join futures")); + } + Ok(()) + } + + async fn database_insert(self: &Arc, server: &Server, touched_at: i64) -> tg::Result<()> { + // Get a database connection. + let mut connection = server + .database + .write_connection() + .await + .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; + + // Begin a transaction. + let mut transaction = connection + .transaction() + .await + .map_err(|source| tg::error!(!source, "failed to create a transaction"))?; + + match &mut transaction { + Either::Left(transaction) => { + transaction + .with({ + let artifact = self.clone(); + move |transaction| { + let mut stack = vec![&*artifact]; + while let Some(artifact) = stack.pop() { + match artifact { + Artifact::File { contents, .. } => { + // Write the contents. + Server::insert_blob_sqlite(contents, transaction)?; + Server::insert_blob_objects_sqlite( + contents, + transaction, + touched_at, + )?; + }, + Artifact::Directory { entries, .. } => { + // Collect the children. + stack.extend(entries.values()); + }, + Artifact::Symlink { .. } => {}, + } + // Write the object. + write_object_sqlite(artifact, transaction)?; + } + + Ok::<_, tg::Error>(()) + } + }) + .await?; + }, + Either::Right(_) => { + return Err(tg::error!("unimplemented")); + }, + } + + // Commit the transaction. + transaction + .commit() + .await + .map_err(|source| tg::error!(!source, "failed to commit the transaction"))?; + + Ok(()) + } + + async fn store_put(&self, server: &Server, touched_at: i64) -> tg::Result<()> { + let mut batch = Vec::new(); + + let mut stack = vec![self]; + while let Some(artifact) = stack.pop() { + let id = artifact.id()?; + let id: tg::object::Id = id.into(); + let data = artifact.data()?; + let bytes = data.serialize()?; + + // Add any objects found in the children to the stack. + match artifact { + Artifact::File { contents, .. } => { + let mut contents_batch = Vec::new(); + let mut stack = vec![contents]; + while let Some(blob) = stack.pop() { + if let Some(data) = &blob.data { + let id: tg::object::Id = blob.id.clone().into(); + let bytes = data.serialize()?; + contents_batch.push((id, bytes)); + } + stack.extend(&blob.children); + } + + batch.extend_from_slice(&contents_batch); + }, + Artifact::Directory { entries, .. } => { + stack.extend(entries.values()); + }, + Artifact::Symlink { .. } => {}, + } + batch.push((id, bytes)); + } + + let arg = crate::store::PutBatchArg { + objects: batch, + touched_at, + }; + + server + .store + .put_batch(arg) + .await + .map_err(|source| tg::error!(!source, "failed to store the objects"))?; + Ok(()) + } +} + +fn write_object_sqlite( + artifact: &Artifact, + transaction: &mut rusqlite::Transaction<'_>, +) -> tg::Result<()> { + let statement = indoc!( + " + insert into objects (id, complete, count, depth, incomplete_children, size, touched_at, weight) + values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) + on conflict (id) do update set touched_at = ?7; + " + ); + let mut statement = transaction + .prepare_cached(statement) + .map_err(|source| tg::error!(!source, "failed to prepare statement"))?; + let id = artifact.id()?; + let data = artifact.data()?; + let bytes = data.serialize()?; + let size = bytes.len().to_u64().unwrap(); + let count = artifact.count()?; + let depth = artifact.depth()?; + let weight = artifact.weight()?; + let now = time::OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); + let params = rusqlite::params![id.to_string(), false, count, depth, 0, size, now, weight,]; + statement + .execute(params) + .map_err(|source| tg::error!(!source, "failed to execute the statement"))?; + let statement = indoc!( + " + insert into object_children (object, child) + values (?1, ?2) + on conflict (object, child) do nothing; + " + ); + let mut statement = transaction + .prepare_cached(statement) + .map_err(|source| tg::error!(!source, "failed to prepare statement"))?; + for child in data.children() { + let params = rusqlite::params![id.to_string(), child.to_string()]; + statement + .execute(params) + .map_err(|source| tg::error!(!source, "failed to execute the statement"))?; + } + Ok(()) +}