From c47bbbdd0bc77c8d9d5d301a337d26cb93be853a Mon Sep 17 00:00:00 2001 From: Adrian Chan Date: Mon, 19 Aug 2024 20:31:59 +0000 Subject: [PATCH] first draft --- R/write.R | 69 ++++++++++++++++++++++++++++++------- tests/testthat/test-write.R | 60 ++++++++++++++++---------------- 2 files changed, 85 insertions(+), 44 deletions(-) diff --git a/R/write.R b/R/write.R index 27a4884c..b714f356 100644 --- a/R/write.R +++ b/R/write.R @@ -7,6 +7,7 @@ #' @param .df A data frame to write. #' @param path Path where transport file will be written. File name sans will be #' used as `xpt` name. +#' @param max_size_gb Maximum size in gb of the exported files #' @param label `r lifecycle::badge("deprecated")` Previously used to to set the Dataset label. #' Use the `metadata` argument to set the dataset label. #' @param strict_checks If TRUE, xpt validation will report errors and not write @@ -47,6 +48,7 @@ #' xportr_write <- function(.df, path, + max_size_gb = NULL, metadata = NULL, domain = NULL, strict_checks = FALSE, @@ -107,22 +109,11 @@ xportr_write <- function(.df, tryCatch( { # If data is not split, data is just written out - if (is.null(attr(data, "_xportr.split_by_"))) { + if (is.null(max_size_gb)) { write_xpt(data, path = path, version = 5, name = name) check_xpt_size(path) } else { - # If data is split, perform the split and get an index for the for loop - split_data <- split(data, data[[attr(data, "_xportr.split_by_")]]) - split_index <- unique(data[[attr(data, "_xportr.split_by_")]]) - paths <- get_split_path(path, seq_along(split_index)) - # Iterate on the unique values of the split - for (i in seq_along(split_index)) { - # Write out the split data, `get_split_path` will determine file name - write_xpt(split_data[[i]], - path = paths[i], version = 5, name = name - ) - check_xpt_size(paths[i]) - } + export_to_xpt(data, path = path, max_size_gb = max_size_gb, file_prefix = name) } }, error = function(err) { @@ -156,3 +147,55 @@ get_split_path <- function(path, ind) { tools::file_ext(path) ) } + +#' Function to export data frame to xpt files ensuring each file does not exceed the maximum specified size in GB +#' +#' @param df A data frame to write. +#' @param max_size_gb Maximum size in gb of the exported files +#' @param file_prefix Name of each exported file +#' +#' @noRd + +export_to_xpt <- function(.df, path, max_size_gb, file_prefix) { + # Convert GB to bytes + max_size_bytes <- max_size_gb * 1000^3 + + total_rows <- nrow(.df) + chunk_counter <- 1 + row_start <- 1 + + while (row_start <= total_rows) { + # Binary search to find the maximum number of rows that fit within the size limit + low <- row_start + high <- total_rows + best_fit <- row_start + + while (low <= high) { + mid <- floor((low + high) / 2) + temp_file <- tempfile() + write_xpt(.df[row_start:mid, ], temp_file) + file_size <- file.info(temp_file)$size + + if (file_size <= max_size_bytes) { + best_fit <- mid + low <- mid + 1 + } else { + high <- mid - 1 + } + + unlink(temp_file) # Clean up the temporary file + } + + # Write the best fitting chunk to the final file + file_name <- sprintf("%s%d.xpt", file_prefix, chunk_counter) + path <- sub("\\/[^/]*$", "/", path) + + write_xpt(.df[row_start:best_fit, ], path = paste0(path, file_name), version = 5, name = file_prefix) + + # Update counters + row_start <- best_fit + 1 + chunk_counter <- chunk_counter + 1 + } + + cat("Data frame exported to", chunk_counter - 1, "xpt files.\n") +} diff --git a/tests/testthat/test-write.R b/tests/testthat/test-write.R index aa6c45f9..7be6c043 100644 --- a/tests/testthat/test-write.R +++ b/tests/testthat/test-write.R @@ -212,47 +212,45 @@ test_that("write Test 13: Capture errors by haven and report them as such", { ) }) -## Test 14: xportr_write: `split_by` attribute is used to split the data ---- -test_that("write Test 14: `split_by` attribute is used to split the data", { - tmpdir <- tempdir() - tmp <- file.path(tmpdir, "xyz.xpt") +## Test 14: xportr_write: `max_size_gb` is used to split data frame into specified maximum file size ---- +test_that("write Test 14: `max_size_gb` is used to split data frame into specified maximum file size", { +adlb <- pharmaverseadam::adlb - on.exit(unlink(tmpdir)) +tmpdir <- tempdir() - dts <- data_to_save() - dts %>% - xportr_split(split_by = "X") %>% - xportr_write(path = tmp) +#20 mb +max_size_gb <- 20 / 1000 + +xportr_write(adlb, + path = paste0(tmpdir, "/adlb.xpt"), + domain = "adlb", + max_size_gb = max_size_gb, + strict_checks = FALSE +) expect_true( - file.exists(file.path(tmpdir, "xyz1.xpt")) + file.exists(file.path(tmpdir, "adlb1.xpt")), + file.info(file.path(tmpdir, "adlb1.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F)) ) + expect_true( - file.exists(file.path(tmpdir, "xyz2.xpt")) + file.exists(file.path(tmpdir, "adlb2.xpt")), + file.info(file.path(tmpdir, "adlb2.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F)) ) + expect_true( - file.exists(file.path(tmpdir, "xyz3.xpt")) + file.exists(file.path(tmpdir, "adlb3.xpt")), + file.info(file.path(tmpdir, "adlb3.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F)) ) - expect_equal( - read_xpt(file.path(tmpdir, "xyz1.xpt")) %>% - extract2("X") %>% - unique() %>% - length(), - 1 - ) - expect_equal( - read_xpt(file.path(tmpdir, "xyz2.xpt")) %>% - extract2("X") %>% - unique() %>% - length(), - 1 + + expect_true( + file.exists(file.path(tmpdir, "adlb4.xpt")), + file.info(file.path(tmpdir, "adlb4.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F)) ) - expect_equal( - read_xpt(file.path(tmpdir, "xyz3.xpt")) %>% - extract2("X") %>% - unique() %>% - length(), - 1 + + expect_true( + file.exists(file.path(tmpdir, "adlb5.xpt")), + file.info(file.path(tmpdir, "adlb5.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F)) ) })