diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 86d1392ebf61..536874fc9eef 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -55,13 +55,14 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] } md-5 = { version = "0.10.6", default-features = false, optional = true } +httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } [features] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] -azure = ["cloud"] +azure = ["cloud", "httparse"] gcp = ["cloud", "rustls-pemfile"] aws = ["cloud", "md-5"] http = ["cloud"] @@ -75,6 +76,8 @@ hyper-util = "0.1" http-body-util = "0.1" rand = "0.8" tempfile = "3.1.0" +regex = "1.11.1" +http = "1.1.0" [[test]] name = "get_range_file" diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index e78f8db7a8c8..76dedd71aa50 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -31,13 +31,13 @@ use crate::{ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, }; use async_trait::async_trait; -use base64::prelude::BASE64_STANDARD; +use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD}; use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use hyper::http::HeaderName; use reqwest::{ - header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH}, + header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}, Client as ReqwestClient, Method, RequestBuilder, Response, }; use serde::{Deserialize, Serialize}; @@ -79,6 +79,34 @@ pub(crate) enum Error { path: String, }, + #[snafu(display("Error performing bulk delete request: {}", source))] + BulkDeleteRequest { source: crate::client::retry::Error }, + + #[snafu(display("Error receiving bulk delete request body: {}", source))] + BulkDeleteRequestBody { source: reqwest::Error }, + + #[snafu(display( + "Bulk delete request failed due to invalid input: {} (code: {})", + reason, + code + ))] + BulkDeleteRequestInvalidInput { code: String, reason: String }, + + #[snafu(display("Got invalid bulk delete response: {}", reason))] + InvalidBulkDeleteResponse { reason: String }, + + #[snafu(display( + "Bulk delete request failed for key {}: {} (code: {})", + path, + reason, + code + ))] + DeleteFailed { + path: String, + code: String, + reason: String, + }, + #[snafu(display("Error performing list request: {}", source))] ListRequest { source: crate::client::retry::Error }, @@ -247,6 +275,223 @@ impl<'a> PutRequest<'a> { } } +#[inline] +fn extend(dst: &mut Vec, data: &[u8]) { + dst.extend_from_slice(data); +} + +// Write header names as title case. The header name is assumed to be ASCII. +// We need it because Azure is not always treating headers as case insensitive. +fn title_case(dst: &mut Vec, name: &[u8]) { + dst.reserve(name.len()); + + // Ensure first character is uppercased + let mut prev = b'-'; + for &(mut c) in name { + if prev == b'-' { + c.make_ascii_uppercase(); + } + dst.push(c); + prev = c; + } +} + +fn write_headers(headers: &HeaderMap, dst: &mut Vec) { + for (name, value) in headers { + // We need special case handling here otherwise Azure returns 400 + // due to `Content-Id` instead of `Content-ID` + if name == "content-id" { + extend(dst, b"Content-ID"); + } else { + title_case(dst, name.as_str().as_bytes()); + } + extend(dst, b": "); + extend(dst, value.as_bytes()); + extend(dst, b"\r\n"); + } +} + +// https://docs.oasis-open.org/odata/odata/v4.0/errata02/os/complete/part1-protocol/odata-v4.0-errata02-os-part1-protocol-complete.html#_Toc406398359 +fn serialize_part_delete_request( + dst: &mut Vec, + boundary: &str, + idx: usize, + request: reqwest::Request, + relative_url: String, +) { + // Encode start marker for part + extend(dst, b"--"); + extend(dst, boundary.as_bytes()); + extend(dst, b"\r\n"); + + // Encode part headers + let mut part_headers = HeaderMap::new(); + part_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/http")); + part_headers.insert( + "Content-Transfer-Encoding", + HeaderValue::from_static("binary"), + ); + // Azure returns 400 if we send `Content-Id` instead of `Content-ID` + part_headers.insert("Content-ID", HeaderValue::from(idx)); + write_headers(&part_headers, dst); + extend(dst, b"\r\n"); + + // Encode the subrequest request-line + extend(dst, b"DELETE "); + extend(dst, format!("/{} ", relative_url).as_bytes()); + extend(dst, b"HTTP/1.1"); + extend(dst, b"\r\n"); + + // Encode subrequest headers + write_headers(request.headers(), dst); + extend(dst, b"\r\n"); + extend(dst, b"\r\n"); +} + +fn parse_multipart_response_boundary(response: &Response) -> Result { + let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { + reason: msg.to_string(), + }; + + let content_type = response + .headers() + .get(CONTENT_TYPE) + .ok_or_else(|| invalid_response("missing Content-Type"))?; + + let boundary = content_type + .as_ref() + .strip_prefix(b"multipart/mixed; boundary=") + .ok_or_else(|| invalid_response("invalid Content-Type value"))? + .to_vec(); + + let boundary = + String::from_utf8(boundary).map_err(|_| invalid_response("invalid multipart boundary"))?; + + Ok(boundary) +} + +fn invalid_response(msg: &str) -> Error { + Error::InvalidBulkDeleteResponse { + reason: msg.to_string(), + } +} + +#[derive(Debug)] +struct MultipartField { + headers: HeaderMap, + content: Bytes, +} + +fn parse_multipart_body_fields(body: Bytes, boundary: &[u8]) -> Result> { + let start_marker = [b"--", boundary, b"\r\n"].concat(); + let next_marker = &start_marker[..start_marker.len() - 2]; + let end_marker = [b"--", boundary, b"--\r\n"].concat(); + + // There should be at most 256 responses per batch + let mut fields = Vec::with_capacity(256); + let mut remaining: &[u8] = body.as_ref(); + loop { + remaining = remaining + .strip_prefix(start_marker.as_slice()) + .ok_or_else(|| invalid_response("missing start marker for field"))?; + + // The documentation only mentions two headers for fields, we leave some extra margin + let mut scratch = [httparse::EMPTY_HEADER; 10]; + let mut headers = HeaderMap::new(); + match httparse::parse_headers(remaining, &mut scratch) { + Ok(httparse::Status::Complete((pos, headers_slice))) => { + remaining = &remaining[pos..]; + for header in headers_slice { + headers.insert( + HeaderName::from_bytes(header.name.as_bytes()).expect("valid"), + HeaderValue::from_bytes(header.value).expect("valid"), + ); + } + } + _ => return Err(invalid_response("unable to parse field headers").into()), + }; + + let next_pos = remaining + .windows(next_marker.len()) + .position(|window| window == next_marker) + .ok_or_else(|| invalid_response("early EOF while seeking to next boundary"))?; + + fields.push(MultipartField { + headers, + content: body.slice_ref(&remaining[..next_pos]), + }); + + remaining = &remaining[next_pos..]; + + // Support missing final CRLF + if remaining == end_marker || remaining == &end_marker[..end_marker.len() - 2] { + break; + } + } + Ok(fields) +} + +async fn parse_blob_batch_delete_body( + batch_body: Bytes, + boundary: String, + paths: &[Path], +) -> Result>> { + let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); + + for field in parse_multipart_body_fields(batch_body, boundary.as_bytes())? { + let id = field + .headers + .get("content-id") + .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) + .and_then(|v| v.parse::().ok()); + + // Parse part response headers + // Documentation mentions 5 headers and states that other standard HTTP headers + // may be provided, in order to not incurr in more complexity to support an arbitrary + // amount of headers we chose a conservative amount and error otherwise + // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers + let mut headers = [httparse::EMPTY_HEADER; 48]; + let mut part_response = httparse::Response::new(&mut headers); + match part_response.parse(&field.content) { + Ok(httparse::Status::Complete(_)) => {} + _ => return Err(invalid_response("unable to parse response").into()), + }; + + match (id, part_response.code) { + (Some(_id), Some(code)) if (200..300).contains(&code) => {} + (Some(id), Some(404)) => { + results[id] = Err(crate::Error::NotFound { + path: paths[id].as_ref().to_string(), + source: Error::DeleteFailed { + path: paths[id].as_ref().to_string(), + code: 404.to_string(), + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into(), + }); + } + (Some(id), Some(code)) => { + results[id] = Err(Error::DeleteFailed { + path: paths[id].as_ref().to_string(), + code: code.to_string(), + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into()); + } + (None, Some(code)) => { + return Err(Error::BulkDeleteRequestInvalidInput { + code: code.to_string(), + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into()) + } + _ => return Err(invalid_response("missing part response status code").into()), + } + } + + Ok(results) +} + #[derive(Debug)] pub(crate) struct AzureClient { config: AzureConfig, @@ -380,6 +625,86 @@ impl AzureClient { Ok(()) } + fn build_bulk_delete_body( + &self, + boundary: &str, + paths: &[Path], + credential: &Option>, + ) -> Vec { + let mut body_bytes = Vec::with_capacity(paths.len() * 2048); + + for (idx, path) in paths.iter().enumerate() { + let url = self.config.path_url(path); + + // Build subrequest with proper authorization + let request = self + .client + .request(Method::DELETE, url) + .header(CONTENT_LENGTH, HeaderValue::from(0)) + // Each subrequest must be authorized individually [1] and we use + // the CredentialExt for this. + // [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body + .with_azure_authorization(credential, &self.config.account) + .build() + .unwrap(); + + // Url for part requests must be relative and without base + let relative_url = self.config.service.make_relative(request.url()).unwrap(); + + serialize_part_delete_request(&mut body_bytes, boundary, idx, request, relative_url) + } + + // Encode end marker + extend(&mut body_bytes, b"--"); + extend(&mut body_bytes, boundary.as_bytes()); + extend(&mut body_bytes, b"--"); + extend(&mut body_bytes, b"\r\n"); + body_bytes + } + + pub(crate) async fn bulk_delete_request(&self, paths: Vec) -> Result>> { + if paths.is_empty() { + return Ok(Vec::new()); + } + + let credential = self.get_credential().await?; + + // https://www.ietf.org/rfc/rfc2046 + let random_bytes = rand::random::<[u8; 16]>(); // 128 bits + let boundary = format!("batch_{}", BASE64_STANDARD_NO_PAD.encode(random_bytes)); + + let body_bytes = self.build_bulk_delete_body(&boundary, &paths, &credential); + + // Send multipart request + let url = self.config.path_url(&Path::from("/")); + let batch_response = self + .client + .request(Method::POST, url) + .query(&[("restype", "container"), ("comp", "batch")]) + .header( + CONTENT_TYPE, + HeaderValue::from_str(format!("multipart/mixed; boundary={}", boundary).as_str()) + .unwrap(), + ) + .header(CONTENT_LENGTH, HeaderValue::from(body_bytes.len())) + .body(body_bytes) + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(BulkDeleteRequestSnafu {})?; + + let boundary = parse_multipart_response_boundary(&batch_response)?; + + let batch_body = batch_response + .bytes() + .await + .context(BulkDeleteRequestBodySnafu {})?; + + let results = parse_blob_batch_delete_body(batch_body, boundary, &paths).await?; + + Ok(results) + } + /// Make an Azure Copy request pub(crate) async fn copy_request(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { let credential = self.get_credential().await?; @@ -814,8 +1139,10 @@ pub(crate) struct UserDelegationKey { #[cfg(test)] mod tests { use bytes::Bytes; + use regex::bytes::Regex; use super::*; + use crate::StaticCredentialProvider; #[test] fn deserde_azure() { @@ -1005,4 +1332,159 @@ mod tests { let _delegated_key_response_internal: UserDelegationKey = quick_xml::de::from_str(S).unwrap(); } + + #[tokio::test] + async fn test_build_bulk_delete_body() { + let credential_provider = Arc::new(StaticCredentialProvider::new( + AzureCredential::BearerToken("static-token".to_string()), + )); + + let config = AzureConfig { + account: "testaccount".to_string(), + container: "testcontainer".to_string(), + credentials: credential_provider, + service: "http://example.com".try_into().unwrap(), + retry_config: Default::default(), + is_emulator: false, + skip_signature: false, + disable_tagging: false, + client_options: Default::default(), + }; + + let client = AzureClient::new(config).unwrap(); + + let credential = client.get_credential().await.unwrap(); + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; + + let boundary = "batch_statictestboundary".to_string(); + + let body_bytes = client.build_bulk_delete_body(&boundary, paths, &credential); + + // Replace Date header value with a static date + let re = Regex::new("Date:[^\r]+").unwrap(); + let body_bytes = re + .replace_all(&body_bytes, b"Date: Tue, 05 Nov 2024 15:01:15 GMT") + .to_vec(); + + let expected_body = b"--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 0\r +\r +DELETE /testcontainer/a HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 1\r +\r +DELETE /testcontainer/b HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 2\r +\r +DELETE /testcontainer/c HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary--\r\n" + .to_vec(); + + assert_eq!(expected_body, body_bytes); + } + + #[tokio::test] + async fn test_parse_blob_batch_delete_body() { + let response_body = b"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 0\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 1\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2851\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 2\r +\r +HTTP/1.1 404 The specified blob does not exist.\r +x-ms-error-code: BlobNotFound\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r +x-ms-version: 2018-11-09\r +Content-Length: 216\r +Content-Type: application/xml\r +\r + +BlobNotFoundThe specified blob does not exist. +RequestId:778fdc83-801e-0000-62ff-0334671e2852 +Time:2018-06-14T16:46:54.6040685Z\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r\n"; + + let response: reqwest::Response = http::Response::builder() + .status(202) + .header("Transfer-Encoding", "chunked") + .header( + "Content-Type", + "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed", + ) + .header("x-ms-request-id", "778fdc83-801e-0000-62ff-033467000000") + .header("x-ms-version", "2018-11-09") + .body(Bytes::from(response_body.as_slice())) + .unwrap() + .into(); + + let boundary = parse_multipart_response_boundary(&response).unwrap(); + let body = response.bytes().await.unwrap(); + + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; + + let results = parse_blob_batch_delete_body(body, boundary, paths) + .await + .unwrap(); + + assert!(results[0].is_ok()); + assert_eq!(&paths[0], results[0].as_ref().unwrap()); + + assert!(results[1].is_ok()); + assert_eq!(&paths[1], results[1].as_ref().unwrap()); + + assert!(results[2].is_err()); + let err = results[2].as_ref().unwrap_err(); + let crate::Error::NotFound { source, .. } = err else { + unreachable!("must be not found") + }; + let Some(Error::DeleteFailed { path, code, reason }) = source.downcast_ref::() + else { + unreachable!("must be client error") + }; + + assert_eq!(paths[2].as_ref(), path); + assert_eq!("404", code); + assert_eq!("The specified blob does not exist.", reason); + } } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index f89a184f9523..177bffb653ae 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -30,7 +30,7 @@ use crate::{ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; -use futures::stream::BoxStream; +use futures::stream::{BoxStream, StreamExt, TryStreamExt}; use reqwest::Method; use std::fmt::Debug; use std::sync::Arc; @@ -119,6 +119,26 @@ impl ObjectStore for MicrosoftAzure { self.client.delete_request(location, &()).await } + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, Result>, + ) -> BoxStream<'a, Result> { + locations + .try_chunks(256) + .map(move |locations| async { + // Early return the error. We ignore the paths that have already been + // collected into the chunk. + let locations = locations.map_err(|e| e.1)?; + self.client + .bulk_delete_request(locations) + .await + .map(futures::stream::iter) + }) + .buffered(20) + .try_flatten() + .boxed() + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { self.client.list(prefix) }