Skip to content

Commit

Permalink
function to move rows to bigdata backend on seatable
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderbates committed Nov 7, 2024
1 parent 8af9bc0 commit e1a9ca6
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 6 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Imports:
rgl,
bit64,
httr,
httr2,
jsonlite,
pbapply,
dplyr,
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export(banc_xyz2id)
export(bancsee)
export(banctable_append_rows)
export(banctable_login)
export(banctable_move_to_bigdata)
export(banctable_query)
export(banctable_set_token)
export(banctable_update_rows)
Expand Down
99 changes: 97 additions & 2 deletions R/banc-table.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
#' @param append_allowed Logical. Whether rows without row identifiers can be appended.
#' @param chunksize To split large requests into smaller ones with max this many rows.
#' @param token_name The name of the token in your .Renviron file, should be \code{BANCTABLE_TOKEN}.
#' @param where Optional SQL-like where clause to filter rows (default: NULL moves all rows)
#' @param bigdata logical, if `TRUE` new rows are added to the bigdata archive rather than the 'normal' seatable.
#' @param invert whether to send the specified rows (`where`) to big data storage (`FALSE`) or from storage to the 'normal' table (`FALSE`.)
#' @param ... Additional arguments passed to pbsapply which might include cl=2 to specify a number of parallel jobs to run.
#'
#' @return a \code{data.frame} of results. There should be 0 rows if no rows
Expand Down Expand Up @@ -279,10 +282,95 @@ banctable_base_impl <- function (base_name = "banc_meta",
base
}

#' @export
#' @rdname banctable_query
banctable_move_to_bigdata <- function(table = "banc_meta",
base = "banc_meta",
url = "https://cloud.seatable.io/",
workspace_id = "57832",
token_name = "BANCTABLE_TOKEN",
where = "`region` = 'optic'",
invert = FALSE){

# get base
ac <- banctable_login(token_name=token_name)
base <- banctable_base_impl(table = table,
base_name = base,
url = url,
workspace_id = workspace_id)
base_uuid <- base$dtable_uuid
token <- base$jwt_token

# Remove any protocol prefix if present
server <- gsub("^https?://", "", base$server_url)
server <- gsub("/$", "", server)

# Construct the URL
if(invert){
movement <- "unarchive"
}else{
movement <- "archive-view"
}
endpoint <- sprintf("https://%s/api-gateway/api/v2/dtables/%s/%s/", server, base_uuid, movement)

# Prepare the request body
body <- list(table_name = table)

# Add where clause if provided
if (!is.null(where)) {
body$where <- where
}

# Make the request
response <- httr2::request(endpoint) %>%
httr2::req_headers(
"Authorization" = sprintf("Bearer %s", token),
"Accept" = "application/json",
"Content-Type" = "application/json"
) %>%
httr2::req_body_json(body) %>%
httr2::req_error(is_error = function(resp) FALSE) %>% # This allows us to handle errors manually
httr2::req_perform()

# Check for successful response
if (httr2::resp_status(response) != 200) {
# Try to get error message from response body
error_msg <- tryCatch({
if (httr2::resp_content_type(response) == "application/json") {
error_content <- httr2::resp_body_json(response)
} else {
# If not JSON, get the raw text
httr2::resp_body_string(response)
}
}, error = function(e) {
"Could not parse error message"
})
stop(error_msg)
}

# Return the response
invisible()
}

# ## in python:
# url = "https://cloud.seatable.io/api-gateway/api/v2/dtables/397da290-5aec-44dc-8a05-e2f58254d84a/archive-view/"
# headers = {
# "accept": "application/json",
# "content-type": "application/json",
# "authorization": "Bearer MY_TOKEN"
# }
# body = {
# "table_name": "banc_meta",
# "where": "`cell_class` = 'glia'"
# }
# response = requests.post(url, headers=headers, json=body)
# print(response.text)

#' @export
#' @rdname banctable_query
banctable_append_rows <- function (df,
table,
bigdata = FALSE,
base = NULL,
chunksize = 1000L,
workspace_id = "57832",
Expand All @@ -306,7 +394,11 @@ banctable_append_rows <- function (df,
return(all(oks))
}
pyl = fafbseg:::df2appendpayload(df)
res = base$batch_append_rows(table_name = table, rows_data = pyl)
if(bigdata){
res = base$batch_append_rows(table_name = table, rows_data = pyl)
}else{
res = base$big_data_insert_rows(table_name = table, rows_data = pyl)
}
ok = isTRUE(all.equal(res[["inserted_row_count"]], nx))
return(ok)
}
Expand Down Expand Up @@ -438,7 +530,10 @@ banctable_updateids <- function(){
dplyr::select(-pt_root_id,-pt_position)

# Update root IDs directly where needed
bc.new <- banc_updateids(bc.new, root.column = "root_id", supervoxel.column = "supervoxel_id")
bc.new <- banc_updateids(bc.new,
root.column = "root_id",
supervoxel.column = "supervoxel_id",
position.column = "position")

# Make sure supervoxel and root position information that is missing, is filled in
bc.new <- bc.new %>%
Expand Down
12 changes: 8 additions & 4 deletions R/ids.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ banc_updateids <- function(x,
if(sum(no.sp)){
cat('determining missing supervoxel_ids ...\n')
x[no.sp,][[supervoxel.column]] <- unname(pbapply::pbsapply(x[no.sp,][[position.column]], function(row){
tryCatch(banc_xyz2id(row,rawcoords = TRUE, root = FALSE, ...), error = function(e) NA)
tryCatch(quiet_function(banc_xyz2id(row,rawcoords = TRUE, root = FALSE, ...)),
error = function(e) NA)
}))
}
}
Expand All @@ -214,20 +215,23 @@ banc_updateids <- function(x,
bad <- is.na(update)|update=="0"
update <- update[!bad]
if(length(update)) x[old,][[root.column]][!bad] <- update
old[old][!bad] <- TRUE
old[!bad] <- FALSE
}
old[is.na(old)] <- TRUE

# update based on position
if(any(c("position","pt_position")%in%colnames(x)) && sum(old)){
cat('updating root_ids with a position ...\n')
update <- unname(pbapply::pbsapply(x[old,][[position.column]], banc_xyz2id, rawcoords = TRUE, root = TRUE, ...))
update <- unname(pbapply::pbsapply(x[old,][[position.column]], function(row){
tryCatch(quiet_function(banc_xyz2id(row,rawcoords = TRUE, root = TRUE, ...)),
error = function(e) NA)
}))
bad <- is.na(update)|update=="0"
update <- update[!bad]
if(length(update)) x[old,][[root.column]][!bad] <- update
old[!bad] <- FALSE
}
old[is.na(old)] <- FALSE
old[is.na(old)] <- TRUE

# update based on root Ids
if(root.column%in%colnames(x) && sum(old)){
Expand Down
6 changes: 6 additions & 0 deletions man/banc_latestid.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/banctable_query.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e1a9ca6

Please sign in to comment.