Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Chan committed Aug 19, 2024
1 parent 1a612ed commit c47bbbd
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 44 deletions.
69 changes: 56 additions & 13 deletions R/write.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#' @param .df A data frame to write.
#' @param path Path where transport file will be written. File name sans will be
#' used as `xpt` name.
#' @param max_size_gb Maximum size in gb of the exported files
#' @param label `r lifecycle::badge("deprecated")` Previously used to to set the Dataset label.
#' Use the `metadata` argument to set the dataset label.
#' @param strict_checks If TRUE, xpt validation will report errors and not write
Expand Down Expand Up @@ -47,6 +48,7 @@
#'
xportr_write <- function(.df,
path,
max_size_gb = NULL,
metadata = NULL,
domain = NULL,
strict_checks = FALSE,
Expand Down Expand Up @@ -107,22 +109,11 @@ xportr_write <- function(.df,
tryCatch(
{
# If data is not split, data is just written out
if (is.null(attr(data, "_xportr.split_by_"))) {
if (is.null(max_size_gb)) {
write_xpt(data, path = path, version = 5, name = name)
check_xpt_size(path)
} else {
# If data is split, perform the split and get an index for the for loop
split_data <- split(data, data[[attr(data, "_xportr.split_by_")]])
split_index <- unique(data[[attr(data, "_xportr.split_by_")]])
paths <- get_split_path(path, seq_along(split_index))
# Iterate on the unique values of the split
for (i in seq_along(split_index)) {
# Write out the split data, `get_split_path` will determine file name
write_xpt(split_data[[i]],
path = paths[i], version = 5, name = name
)
check_xpt_size(paths[i])
}
export_to_xpt(data, path = path, max_size_gb = max_size_gb, file_prefix = name)
}
},
error = function(err) {
Expand Down Expand Up @@ -156,3 +147,55 @@ get_split_path <- function(path, ind) {
tools::file_ext(path)
)
}

#' Function to export data frame to xpt files ensuring each file does not exceed the maximum specified size in GB
#'
#' @param df A data frame to write.
#' @param max_size_gb Maximum size in gb of the exported files
#' @param file_prefix Name of each exported file
#'
#' @noRd

export_to_xpt <- function(.df, path, max_size_gb, file_prefix) {
# Convert GB to bytes
max_size_bytes <- max_size_gb * 1000^3

total_rows <- nrow(.df)
chunk_counter <- 1
row_start <- 1

while (row_start <= total_rows) {
# Binary search to find the maximum number of rows that fit within the size limit
low <- row_start
high <- total_rows
best_fit <- row_start

while (low <= high) {
mid <- floor((low + high) / 2)
temp_file <- tempfile()
write_xpt(.df[row_start:mid, ], temp_file)
file_size <- file.info(temp_file)$size

if (file_size <= max_size_bytes) {
best_fit <- mid
low <- mid + 1
} else {
high <- mid - 1
}

unlink(temp_file) # Clean up the temporary file
}

# Write the best fitting chunk to the final file
file_name <- sprintf("%s%d.xpt", file_prefix, chunk_counter)
path <- sub("\\/[^/]*$", "/", path)

write_xpt(.df[row_start:best_fit, ], path = paste0(path, file_name), version = 5, name = file_prefix)

# Update counters
row_start <- best_fit + 1
chunk_counter <- chunk_counter + 1
}

cat("Data frame exported to", chunk_counter - 1, "xpt files.\n")
}
60 changes: 29 additions & 31 deletions tests/testthat/test-write.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,47 +212,45 @@ test_that("write Test 13: Capture errors by haven and report them as such", {
)
})

## Test 14: xportr_write: `split_by` attribute is used to split the data ----
test_that("write Test 14: `split_by` attribute is used to split the data", {
tmpdir <- tempdir()
tmp <- file.path(tmpdir, "xyz.xpt")
## Test 14: xportr_write: `max_size_gb` is used to split data frame into specified maximum file size ----
test_that("write Test 14: `max_size_gb` is used to split data frame into specified maximum file size", {
adlb <- pharmaverseadam::adlb

on.exit(unlink(tmpdir))
tmpdir <- tempdir()

dts <- data_to_save()
dts %>%
xportr_split(split_by = "X") %>%
xportr_write(path = tmp)
#20 mb
max_size_gb <- 20 / 1000

xportr_write(adlb,
path = paste0(tmpdir, "/adlb.xpt"),
domain = "adlb",
max_size_gb = max_size_gb,
strict_checks = FALSE
)

expect_true(
file.exists(file.path(tmpdir, "xyz1.xpt"))
file.exists(file.path(tmpdir, "adlb1.xpt")),
file.info(file.path(tmpdir, "adlb1.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F))
)

expect_true(
file.exists(file.path(tmpdir, "xyz2.xpt"))
file.exists(file.path(tmpdir, "adlb2.xpt")),
file.info(file.path(tmpdir, "adlb2.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F))
)

expect_true(
file.exists(file.path(tmpdir, "xyz3.xpt"))
file.exists(file.path(tmpdir, "adlb3.xpt")),
file.info(file.path(tmpdir, "adlb3.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F))
)
expect_equal(
read_xpt(file.path(tmpdir, "xyz1.xpt")) %>%
extract2("X") %>%
unique() %>%
length(),
1
)
expect_equal(
read_xpt(file.path(tmpdir, "xyz2.xpt")) %>%
extract2("X") %>%
unique() %>%
length(),
1

expect_true(
file.exists(file.path(tmpdir, "adlb4.xpt")),
file.info(file.path(tmpdir, "adlb4.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F))
)
expect_equal(
read_xpt(file.path(tmpdir, "xyz3.xpt")) %>%
extract2("X") %>%
unique() %>%
length(),
1

expect_true(
file.exists(file.path(tmpdir, "adlb5.xpt")),
file.info(file.path(tmpdir, "adlb5.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = F))
)
})

Expand Down

0 comments on commit c47bbbd

Please sign in to comment.