-
Notifications
You must be signed in to change notification settings - Fork 0
/
1_get_tracks.R
119 lines (105 loc) · 2.64 KB
/
1_get_tracks.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
library(bigrquery)
library(dplyr)
bigrquery::bq_auth()
2
project_id <- "world-fishing-827"
dataset <- 'scratch_andrea_ttl100'
# david combining dates 3 dates per ssvid
dates_query <- "
SELECT DISTINCT ssvid, date
FROM (
SELECT ssvid, date
FROM `scratch_david.bird_encounters`
UNION ALL
SELECT ssvid,
DATE_ADD(date, interval 1 day)
FROM
`scratch_david.bird_encounters`
UNION ALL
SELECT ssvid,
DATE_SUB(date, interval 1 day)
FROM `scratch_david.bird_encounters`)
"
dates_q <- bq_project_query(project_id, dates_query)
dates <- bq_table_download(dates_q)
dates <- dates %>% arrange(ssvid, date)
count(distinct(dates, date)) #302
count(distinct(track_proj, date))
count(distinct(encounters, date))
DATES <- unique(dates$date)
DATES <- sort(DATES)
head(DATES)
dates %>%
count(ssvid) %>%
arrange(desc(n))
# query for one day
query_tracks <- "
CREATE TEMP FUNCTION today() AS (timestamp('{.x}'));
WITH
encounters AS (
SELECT DISTINCT
*
FROM
(SELECT ssvid, date FROM `scratch_david.bird_encounters`
UNION ALL
SELECT ssvid,
DATE_ADD(date, interval 1 day)
FROM `scratch_david.bird_encounters`
UNION ALL
SELECT ssvid,
DATE_SUB(date, interval 1 day)
FROM `scratch_david.bird_encounters`)
WHERE date = DATE(today())),
messages AS (
SELECT
ssvid,
lat,
lon,
timestamp,
speed_knots,
heading,
course,
nnet_score,
DATE(timestamp) AS date
FROM
`pipe_ais_v3_published.messages`
WHERE DATE(timestamp) = DATE(today()) AND
clean_segs),
messages_daily AS(
SELECT *
FROM encounters
INNER JOIN messages
USING (ssvid, date)
)
SELECT *
FROM messages_daily
ORDER BY ssvid, date, timestamp
"
create_table <- function(.x) {
# create the query
query <- glue::glue(query_tracks)
# name the table
table_name <- paste0(project_id, ".", dataset, ".orben_tracks_results$",format.Date(.x, "%Y%m%d"))
# do the query
bq_project_query(project_id,
query = query,
destination_table = table_name,
configuration = list(
query = list(
time_partitioning =
list(field = "date",
type = "DAY"))
)
)
}
# execute the query in parallel
library(furrr)
parallel::detectCores()
plan(multisession, workers = parallel::detectCores() - 2)
furrr::future_map(DATES, ~create_table(.x))
plan("sequential")
# download
orben_tracks <- bq_table_download(paste(project_id, dataset, "orben_tracks_results", sep = "."))
readr::write_csv(orben_tracks, "orben_results.csv")
count(orben_tracks, ssvid)
count(orben_tracks, date)