From eda948e35a77bec3eff72ab0ffe0055bc627fb82 Mon Sep 17 00:00:00 2001 From: Jessica Black Date: Tue, 10 Dec 2024 16:59:54 -0800 Subject: [PATCH] We extractin' files --- lib/Cargo.toml | 11 ++ lib/src/lib.rs | 158 +++++++++++++++++++++--- lib/src/registry.rs | 258 +++++++++++++++++++++++++++++++++++---- lib/src/transform.rs | 54 ++++++++ lib/tests/it/registry.rs | 44 ++++++- 5 files changed, 477 insertions(+), 48 deletions(-) create mode 100644 lib/src/transform.rs diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 049afd2..1462f67 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -13,19 +13,30 @@ keywords = ["cli", "dependencies", "containers", "images", "oci"] categories = ["command-line-utilities", "development-tools"] [dependencies] +async-compression = { version = "0.4.18", features = ["tokio", "gzip", "zstd"] } +async-tempfile = "0.6.0" bon = "3.3.0" +bytes = "1.9.0" color-eyre = "0.6.3" derive_more = { version = "1.0.0", features = ["full"] } +enum-assoc = "1.2.4" +enum_delegate = "0.2.0" +futures-lite = "2.5.0" hex = "0.4.3" hex-magic = "0.0.2" itertools = "0.13.0" oci-client = "0.14.0" static_assertions = "1.1.0" +strum = { version = "0.26.3", features = ["derive"] } tap = "1.0.1" +tokio = "1.42.0" +tokio-tar = "0.3.1" +tokio-util = { version = "0.7.13", features = ["io"] } tracing = "0.1.41" [dev-dependencies] pretty_assertions = "1.4.1" proptest = "1.5.0" simple_test_case = "1.2.0" +test-log = { version = "0.2.16", features = ["trace"] } tokio = { version = "1.42.0", features = ["full"] } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 288e53e..eb152d1 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -6,11 +6,14 @@ use color_eyre::{ Result, Section, SectionExt, }; use derive_more::derive::Display; +use itertools::Itertools; use std::str::FromStr; +use strum::{AsRefStr, EnumIter, IntoEnumIterator}; use tap::Pipe; mod ext; pub mod registry; +pub mod transform; /// Platform represents the platform a container image is built for. /// This follows the OCI Image Spec's platform definition while also supporting @@ -455,44 +458,161 @@ impl std::fmt::Display for Reference { } } -/// A reference to a specific layer within an OCI container image. +/// A descriptor for a specific layer within an OCI container image. /// This follows the OCI Image Spec's layer descriptor format. #[derive(Debug, Clone, PartialEq, Eq, Builder)] -pub struct LayerReference { +pub struct LayerDescriptor { /// The content-addressable digest of the layer #[builder(into)] pub digest: Digest, -} -impl FromStr for LayerReference { - type Err = color_eyre::Report; + /// The size of the layer in bytes + pub size: i64, - fn from_str(s: &str) -> Result { - let digest = Digest::from_str(s).context("parse digest")?; - Ok(Self { digest }) - } + /// The media type of the layer + pub media_type: LayerMediaType, } -impl From<&LayerReference> for LayerReference { - fn from(layer: &LayerReference) -> Self { +impl From<&LayerDescriptor> for LayerDescriptor { + fn from(layer: &LayerDescriptor) -> Self { layer.clone() } } -impl From<&Digest> for LayerReference { - fn from(digest: &Digest) -> Self { - digest.clone().into() +impl std::fmt::Display for LayerDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.digest) + } +} + +/// Media types for OCI container image layers. +/// +/// Each entry in this enum is a unique media type "base"; some of them then can have flags applied. +/// For example, even though `Foreign` is a valid [`LayerMediaTypeFlag`], [`LayerMediaType::DockerForeign`] +/// is distinct from [`LayerMediaType::Docker`] because it is an entirely different media type. +/// +/// Spec reference: https://github.com/opencontainers/image-spec/blob/main/media-types.md +#[derive(Debug, Clone, PartialEq, Eq, AsRefStr, EnumIter)] +pub enum LayerMediaType { + /// A standard Docker container layer in gzipped tar format. + /// + /// These layers contain filesystem changes that make up the container image. + /// Each layer represents a Dockerfile instruction or equivalent build step. + #[strum(serialize = "application/vnd.docker.image.rootfs.diff.tar.gzip")] + Docker, + + /// A Docker container layer that was built for a different architecture or operating system. + /// + /// Foreign layers are used in multi-platform images where the same image can contain + /// layers for different platforms (e.g. linux/amd64 vs linux/arm64). + #[strum(serialize = "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip")] + DockerForeign, + + /// A standard OCI container layer. + #[strum(serialize = "application/vnd.oci.image.layer.v1.tar")] + Oci(Vec), + + /// An OCI container layer that has restrictions on distribution. + /// + /// Non-distributable layers typically contain licensed content, proprietary code, + /// or other material that cannot be freely redistributed. + /// Registry operators are not required to push or pull these layers. + /// Instead, the layer data might need to be obtained through other means + /// (e.g. direct download from a vendor). + /// + /// These are officially marked deprecated in the OCI spec, along with the directive + /// that clients should download the layers as usual: + /// https://github.com/opencontainers/image-spec/blob/main/layer.md#non-distributable-layers + #[strum(serialize = "application/vnd.oci.image.layer.nondistributable.v1.tar")] + OciNonDistributable(Vec), +} + +impl LayerMediaType { + /// Overwrite the flags for the media type. + fn replace_flags(self, flags: Vec) -> Self { + match self { + LayerMediaType::Oci(_) => LayerMediaType::Oci(flags), + LayerMediaType::OciNonDistributable(_) => LayerMediaType::OciNonDistributable(flags), + LayerMediaType::Docker | LayerMediaType::DockerForeign => self, + } } } -impl From for LayerReference { - fn from(digest: Digest) -> Self { - Self { digest } +impl FromStr for LayerMediaType { + type Err = eyre::Error; + + fn from_str(s: &str) -> Result { + let (base, flags) = s.split_once('+').unwrap_or((s, "")); + for media_type in LayerMediaType::iter() { + if base == media_type.as_ref() { + return match media_type { + // Docker layers don't have flags. + LayerMediaType::Docker | LayerMediaType::DockerForeign => Ok(media_type), + + // OCI layers have flags; handle both bases the same way. + mt @ LayerMediaType::Oci(_) | mt @ LayerMediaType::OciNonDistributable(_) => { + flags + .split('+') + .map(LayerMediaTypeFlag::from_str) + .try_collect() + .map(|flags| mt.replace_flags(flags)) + } + }; + } + + // It's always possible for a future media type to be added that has a plus sign; + // this is a fallback to catch that case. + if s == media_type.as_ref() { + return Ok(media_type); + } + } + bail!("unknown media type: {s}"); } } -impl std::fmt::Display for LayerReference { +impl std::fmt::Display for LayerMediaType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.digest) + write!(f, "{}", self.as_ref()) + } +} + +/// Flags for layer media types. +/// +/// Some flags indicate the underlying media should be transformed, while some are informational. +/// This library interprets all flags as "transforming", and informational flags are simply identity transformations. +/// +/// When multiple flags apply to a media type, this library applies transforms right-to-left. +/// For example, the hypothetical media type `application/vnd.oci.image.layer.v1.tar+foreign+zstd+gzip` +/// would be read with the following steps: +/// 1. Decompress the layer with gzip. +/// 2. Decompress the layer with zstd. +/// 3. Apply the foreign flag (this is an informational flag, so its transformation is a no-op). +/// 4. The underlying media type is now in effect `application/vnd.oci.image.layer.v1.tar`. +/// +/// Note that this library is currently focused on _reading_ images; if you choose to use these +/// flags to _create_ media types make sure you consult the OCI spec for valid combinations. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, AsRefStr, EnumIter)] +pub enum LayerMediaTypeFlag { + /// Foreign layers are used in multi-platform images where the same image can contain + /// layers for different platforms (e.g. linux/amd64 vs linux/arm64). + #[strum(serialize = "foreign")] + Foreign, + + /// The layer is compressed with zstd. + #[strum(serialize = "zstd")] + Zstd, + + /// The layer is compressed with gzip. + #[strum(serialize = "gzip")] + Gzip, +} + +impl FromStr for LayerMediaTypeFlag { + type Err = eyre::Error; + + fn from_str(s: &str) -> Result { + Self::iter() + .find(|flag| flag.as_ref() == s) + .ok_or_else(|| eyre!("unknown flag: {s}")) } } diff --git a/lib/src/registry.rs b/lib/src/registry.rs index b62a094..9c76728 100644 --- a/lib/src/registry.rs +++ b/lib/src/registry.rs @@ -1,36 +1,213 @@ //! Interacts with remote OCI registries. -use std::str::FromStr; +use std::{path::Path, str::FromStr}; +use bytes::Bytes; use color_eyre::eyre::{Context, Result}; +use derive_more::Debug; +use futures_lite::{Stream, StreamExt}; use oci_client::{ - client::ClientConfig, manifest::ImageIndexEntry, secrets::RegistryAuth, Client, - Reference as OciReference, + client::ClientConfig, + manifest::{ImageIndexEntry, OciDescriptor}, + secrets::RegistryAuth, + Client, Reference as OciReference, RegistryOperation, }; +use tokio_tar::Archive; +use tokio_util::io::StreamReader; +use tracing::info; -use crate::{ext::PriorityFind, LayerReference, Platform, Reference, Version}; - -/// Enumerate layers for a container reference in the remote registry. -/// Layers are returned in order from the base image to the application. -#[tracing::instrument] -pub async fn layers( - platform: Option<&Platform>, - reference: &Reference, -) -> Result> { - let client = client(platform.cloned()); - let auth = RegistryAuth::Anonymous; - - let oci_ref = OciReference::from(reference); - let (manifest, _) = client - .pull_image_manifest(&oci_ref, &auth) - .await - .context("pull image manifest: {oci_ref}")?; - - manifest - .layers - .into_iter() - .map(|layer| LayerReference::from_str(&layer.digest)) - .collect() +use crate::{ + ext::PriorityFind, + transform::{self, Chunk}, + Digest, LayerDescriptor, LayerMediaType, LayerMediaTypeFlag, Platform, Reference, Version, +}; + +/// Each instance is a unique view of remote registry for a specific [`Platform`] and [`Reference`]. +/// The intention here is to better support chained methods like "pull list of layers" and then "apply each layer to disk". +// Note: internal fields aren't public because we don't want the caller to be able to mutate the internal state between method calls. +#[derive(Debug, Clone)] +pub struct Registry { + /// The OCI reference, used by the underlying client. + reference: OciReference, + + /// Authentication information for the registry. + auth: RegistryAuth, + + /// The client used to interact with the registry. + #[debug(skip)] + client: Client, +} + +#[bon::bon] +impl Registry { + /// Create a new registry for a specific platform and reference. + #[builder] + pub async fn new(platform: Option, reference: Reference) -> Result { + let client = client(platform.clone()); + let reference = OciReference::from(&reference); + + // Future improvement: support authentication. + let auth = RegistryAuth::Anonymous; + + client + .auth(&reference, &auth, RegistryOperation::Pull) + .await + .context("authenticate to registry")?; + + Ok(Self { + auth, + client, + reference, + }) + } +} + +impl Registry { + /// Enumerate layers for a container reference in the remote registry. + /// Layers are returned in order from the base image to the application. + #[tracing::instrument] + pub async fn layers(&self) -> Result> { + let (manifest, _) = self + .client + .pull_image_manifest(&self.reference, &self.auth) + .await + .context("pull image manifest")?; + manifest + .layers + .into_iter() + .map(LayerDescriptor::try_from) + .collect() + } + + /// Pull the bytes of a layer from the registry in a stream. + /// The `media_type` field of the [`LayerDescriptor`] can be used to determine how best to handle the content. + /// + /// ## Layers explanation + /// + /// You can think of a layer as a "diff" (you can envision this similarly to a git diff) + /// from the previous layer; the first layer is a "diff" from an empty layer. + /// + /// Each diff contains zero or more changes; each change is one of the below: + /// - A file is added. + /// - A file is removed. + /// - A file is modified. + pub async fn pull_layer( + &self, + layer: &LayerDescriptor, + ) -> Result>> { + self.pull_layer_internal(layer) + .await + .map(|stream| stream.map(|chunk| chunk.context("read chunk"))) + } + + async fn pull_layer_internal( + &self, + layer: &LayerDescriptor, + ) -> Result> { + let oci_layer = OciDescriptor::from(layer); + self.client + .pull_blob_stream(&self.reference, &oci_layer) + .await + .context("initiate stream") + .map(|layer| layer.stream) + } + + /// Apply a layer to a location on disk. + /// + /// The intention of this method is that when it is run for each layer in an image in order it is equivalent + /// to the functionality you'd get by running `docker pull`, `docker save`, and then recursively extracting the + /// layers to the same directory. + /// + /// As such the following edge cases are handled as follows: + /// - Foreign layers are skipped, as they would if you ran `docker pull`. + /// - Non-distributable layers are attempted to be applied, but are skipped if they fail. + /// - Standard layers are applied as normal; if they fail they are skipped. + /// + /// If you wish to customize the behavior, use [`Registry::pull_layer`] directly instead. + /// + /// ## Application order + /// + /// This method performs the following steps: + /// 1. Downloads the specified layer from the registry. + /// 2. Applies the layer diff to the specified path on disk. + /// + /// When applying multiple layers, it's important to apply them in order, + /// and to apply them to a consistent location on disk. + /// + /// It is safe to apply each layer to a fresh directory if a separate directory per layer is desired: + /// the only sticking point for this case is removed files, + /// and this function simply skips removing files that don't exist. + /// + /// ## Layers explanation + /// + /// You can think of a layer as a "diff" (you can envision this similarly to a git diff) + /// from the previous layer; the first layer is a "diff" from an empty layer. + /// + /// Each diff contains zero or more changes; each change is one of the below: + /// - A file is added. + /// - A file is removed. + /// - A file is modified. + // + // A future improvement would be to support downloading layers concurrently, + // then still applying them serially. Since network transfer is the slowest part of this process, + // this would speed up the overall process. + #[tracing::instrument] + pub async fn apply_layer(&self, layer: &LayerDescriptor, _path: &Path) -> Result<()> { + let stream = self.pull_layer_internal(layer).await?; + + /// Unwrap a value, logging an error and continuing the loop if it fails. + macro_rules! unwrap_warn { + ($expr:expr) => { + unwrap_warn!($expr,); + }; + ($expr:expr, $($msg:tt)*) => { + match $expr { + Ok(value) => value, + Err(e) => { + tracing::warn!(error = ?e, $($msg)*); + continue; + } + } + }; + } + + // Applying the layer requires interpreting the layer's media type. + match &layer.media_type { + // Docker layers are applied slightly differently than OCI layers. + LayerMediaType::Docker => todo!(), + + // Standard OCI layers are applied as normal. + // Reminder that per the OCI spec, clients should attempt to download and apply "non-distributable" layers. + LayerMediaType::Oci(flags) | LayerMediaType::OciNonDistributable(flags) => { + // Foreign layers are skipped, as they would if you ran `docker pull`. + // This causes an extra iteration over the flags for layers that aren't foreign, + // but the flag count is small and this saves us the complexity of setting up layer transforms + // and then discarding them if this flag is encountered. + if flags.contains(&LayerMediaTypeFlag::Foreign) { + return Ok(()); + } + + // Future improvement: specialize the stream based on the flags; the current implementation + // forces everything through dynamic dispatch to support arbitrary flags, + // but the most common case is a single flag + // (according to the OCI spec that's all we'll ever actually see). + let stream = transform::sequence(stream, flags); + let reader = StreamReader::new(stream); + let mut archive = Archive::new(reader); + let mut entries = archive.entries().context("read entries from tar")?; + while let Some(entry) = entries.next().await { + let entry = unwrap_warn!(entry, "read entry"); + let path = unwrap_warn!(entry.path(), "read path"); + info!(?path, "read entry"); + } + + Ok(()) + } + + // Foreign docker layers are skipped, as they would if you ran `docker pull`. + LayerMediaType::DockerForeign => return Ok(()), + } + } } impl From<&Reference> for OciReference { @@ -50,6 +227,35 @@ impl From<&Reference> for OciReference { } } +impl From for OciDescriptor { + fn from(layer: LayerDescriptor) -> Self { + Self { + digest: layer.digest.to_string(), + media_type: layer.media_type.to_string(), + size: layer.size, + ..Default::default() + } + } +} + +impl From<&LayerDescriptor> for OciDescriptor { + fn from(layer: &LayerDescriptor) -> Self { + layer.clone().into() + } +} + +impl TryFrom for LayerDescriptor { + type Error = color_eyre::Report; + + fn try_from(value: OciDescriptor) -> Result { + Ok(Self { + digest: Digest::from_str(&value.digest).context("parse digest")?, + media_type: LayerMediaType::from_str(&value.media_type).context("parse media type")?, + size: value.size, + }) + } +} + fn client(platform: Option) -> Client { let mut config = ClientConfig::default(); config.platform_resolver = match platform { diff --git a/lib/src/transform.rs b/lib/src/transform.rs new file mode 100644 index 0000000..3941cb8 --- /dev/null +++ b/lib/src/transform.rs @@ -0,0 +1,54 @@ +//! Primitives for stream transformations. + +use std::pin::Pin; + +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; +use bytes::Bytes; +use color_eyre::Result; +use futures_lite::Stream; +use tokio_util::io::{ReaderStream, StreamReader}; + +use crate::LayerMediaTypeFlag; + +/// Convenience alias for a chunk of bytes in a stream. +pub type Chunk = Result; + +/// Identity transformer. +pub fn identity(stream: impl Stream) -> impl Stream { + stream +} + +/// Decompress the stream using gzip. +pub fn gzip(stream: impl Stream) -> impl Stream { + let reader = StreamReader::new(stream); + let inner = GzipDecoder::new(reader); + ReaderStream::new(inner) +} + +/// Decompress the stream using zstd. +pub fn zstd(stream: impl Stream) -> impl Stream { + let reader = StreamReader::new(stream); + let inner = ZstdDecoder::new(reader); + ReaderStream::new(inner) +} + +/// Apply a sequence of transformations to the stream based on the media type flags. +pub fn sequence( + stream: impl Stream + 'static, + flags: &[LayerMediaTypeFlag], +) -> Pin>> { + // Left hand side type annotation is required to coerce to dynamic dispatching. + let mut stream: Pin>> = Box::pin(stream); + + // Each flag in order consumes the prior stream, replacing it with a new transformed stream. + for flag in flags { + match flag { + LayerMediaTypeFlag::Zstd => stream = Box::pin(zstd(stream)), + LayerMediaTypeFlag::Gzip => stream = Box::pin(gzip(stream)), + _ => (), + } + } + + // The final stream is therefore the sequenced version of the stream. + stream +} diff --git a/lib/tests/it/registry.rs b/lib/tests/it/registry.rs index 2bbd392..6db9723 100644 --- a/lib/tests/it/registry.rs +++ b/lib/tests/it/registry.rs @@ -1,4 +1,5 @@ -use circe::{Platform, Reference}; +use async_tempfile::TempDir; +use circe::{registry::Registry, Platform, Reference}; use color_eyre::Result; use simple_test_case::test_case; @@ -7,7 +8,14 @@ use simple_test_case::test_case; #[tokio::test] async fn single_platform_layers(image: &str, platform: Option) -> Result<()> { let reference = image.parse::()?; - let layers = circe::registry::layers(platform.as_ref(), &reference).await?; + let layers = Registry::builder() + .maybe_platform(platform) + .reference(reference) + .build() + .await? + .layers() + .await?; + assert!(!layers.is_empty(), "image should have at least one layer"); Ok(()) } @@ -17,7 +25,37 @@ async fn single_platform_layers(image: &str, platform: Option) -> Resu #[tokio::test] async fn multi_platform_layers(image: &str, platform: Platform) -> Result<()> { let reference = image.parse::()?; - let layers = circe::registry::layers(Some(&platform), &reference).await?; + let layers = Registry::builder() + .platform(platform) + .reference(reference) + .build() + .await? + .layers() + .await?; + + assert!(!layers.is_empty(), "image should have at least one layer"); + Ok(()) +} + +#[test_case("docker.io/library/golang:latest", Platform::linux_amd64(); "docker.io/library/golang:latest.linux_amd64")] +#[test_log::test(tokio::test)] +async fn pull_layer(image: &str, platform: Platform) -> Result<()> { + let reference = image.parse::()?; + let registry = Registry::builder() + .platform(platform) + .reference(reference) + .build() + .await?; + + let layers = registry.layers().await?; assert!(!layers.is_empty(), "image should have at least one layer"); + + let tmp = TempDir::new().await?; + + for layer in layers { + let path = tmp.dir_path().join(layer.digest.as_hex()); + registry.apply_layer(&layer, &path).await?; + } + Ok(()) }