-
Notifications
You must be signed in to change notification settings - Fork 4
/
r_bigData_integration_lab.Rmd
211 lines (174 loc) · 7.84 KB
/
r_bigData_integration_lab.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
---
title: "R for HPC and big data"
output:
html_notebook: default
html_document: default
---
This lab is part of [R Summer School](http://rsummer.data-analysis.at) for the talk R for HPC and big data.
It is partly based on http://spark.rstudio.com, https://blog.cloudera.com/blog/2016/09/introducing-sparklyr-an-r-interface-for-apache-spark and http://www.win-vector.com/blog/2017/07/working-with-r-and-big-data-use-replyr/.
You can run the lab on yor laptop. If you want to use a cloud environment https://community.cloud.databricks.com/ provides 0.6 CPU and 6gb RAM for free but requires some additional R packages if you want to use it from a local Rstudio. If you completely want to use the cloud https://databricks.com/blog/2017/05/25/using-sparklyr-databricks.html should be a good start.
This guide covers everything which is necessary to run locally. Otherwise if you want to use a local RStudio installation the file `cloud.Rmd` partially covers how to set things up.
## step 0 - prerequesites
install the following software:
- Rstudio https://www.rstudio.com/products/RStudio/
- R https://cran.r-project.org
## step 1 - installation
We will install a couple of packages. If you want to use the latest features feel free to directly install from github and not from CRAN:
```{r, eval=FALSE, include=TRUE}
install.packages(c("devtools", "tidyverse", "replyr"))
# devtools::install_github("hadley/tidyverse") # not mandatory to have the latest features here.
devtools::install_github("rstudio/sparklyr",lib="")
# some data to analyze later on
install.packages(c("nycflights13", "Lahman"))
```
## step 2 - start a (local) spark cluster
Either start /connect to the spark master from the GUI
![use the GUI](img/startingSpark.png) or execute the following code:
```{r, echo=TRUE}
library(sparklyr)
suppressPackageStartupMessages(library("dplyr"))
library(nycflights13)
library(Lahman)
```
on the first run you also must download spark. First check for available versions
```{r, eval=FALSE, include=TRUE}
spark_available_versions()
```
then install the latest version. Currently, this is `2.2.0`
```{r, eval=FALSE, include=TRUE}
spark_install(version = "2.2.0", hadoop_version = "2.7")
```
then connect:
```{r}
# local spark
spark <- spark_connect(master = "local")
# cloud spark https://databricks.com/blog/2017/05/25/using-sparklyr-databricks.html
# spark <- spark_connect(method = "databricks") # that also requires sparkR to be installed
```
to view sparks web interface:
```{r, eval=FALSE, include=TRUE}
sparklyr::spark_web(spark)
```
## lab
### exercise 1 - dplyr interface to sparkSQL:
- copy data from R to spark, i.e. you can choose a dummy data set like the iris data set (`iris`)
- create a second spark data frame with the flights nycflights13 dataset as flights (`nycflights13::flights`)
- create a third data frame with the batting data set (`Lahman::Batting`). However this time we want to read it as a CSV. Write the CSV first to a local file, than read it to spark.
```{r}
iris_tbl <- copy_to(spark,iris)
flights_tbl<-copy_to(spark,nycflights13::flights,"flights")
write.csv2(Lahman::Batting, file = "batting.csv", row.names = FALSE)
batting_tbl <- spark_read_csv(spark, "batting", "batting.csv", header = TRUE,
infer_schema = TRUE, delimiter = ",", quote = "\"", escape = "\\",
charset = "UTF-8", null_value = "NA", options = list(),
repartition = 2, memory = TRUE, overwrite = TRUE)
# batting_tbl <- copy_to(spark,Lahman::Batting,"batting")
```
show all spark data frames i.e. show a list of all created tables
```{r}
src_tbls(spark)
```
filter the flights table by `departue delay == 2`
```{r}
flights_tbl%>%filter(dep_delay==2)
```
### exercise 2 - dplyr in action
- group fligths by tail_num
- aggregate by `count(*)`, mean distance as `dist`, mean delay as delay
- where `count>20` and `dist<2000` and `delay is not null`
```{r}
delay <- flights_tbl %>% group_by(tailnum) %>% summarise(
count = n(),
dist = mean(distance),
delay = mean(arr_delay)
) %>% filter(count > 20,
dist <
2000,!is.na(delay)) %>% collect()
head(delay)
```
plot the results
- create a scatterplot of results of `dist` and `delay`
- include a regression line
```{r}
library(ggplot2)
ggplot(
delay,aes(dist,delay))+
geom_point(aes(size=count),alpha=1/2)+ geom_smooth()+ scale_size_area(max_size=2)
```
### exercise 3 - use plain sql
you can also query the spark table via plain sql queries. Use `DBI` which is also the SQL backend of `dplyr`.
```{r}
library(DBI)
iris_preview <- dbGetQuery(spark, "SELECT * FROM iris LIMIT 10")
iris_preview
```
### exercise 4 - simple machine learning LM
- some of sparks ML functionality is accessible
- additionally h20.ai`s `rsparkling` provides ML methods on top of `sparklyR`
- load the mtcars dataset to spark
- create a train / test split with 50% of the data
- create a feature which is binary and `TRUE` if `cyl == 8`
- fit a simple LM model with `mpg` as response and `wt`and `cyl`as features
- print the summary of the linear model
```{r}
# copy mtcars into spark
mtcars_tbl <- copy_to(spark, mtcars)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# fit a linear model to the training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
summary(fit)
```
Finally, stop the spark session:
```{r}
spark_disconnect(spark)
```
### exercise 5 - UDF
Define a user defined function (colsure) to execute custom R code via spark.
```{r}
iris_tbl %>% spark_apply(function(e) sapply(e[,1:4], jitter))
```
and also add the result as an additional column:
```{r}
iris_tbl %>%
spark_apply(
function(e) data.frame(2 * e$Sepal_Length, e),
names = c("2xS_len", colnames(iris_tbl)))
```
### exercise 6 - grouped UDF (UADF) and spark configuration
Define a custom function of R code which is executed on one data frame and learn how to start spark with custom settings i.e. more memory.
Start spark with custom configuration which allows for more memory:
```{r}
# more memory as suggested in https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "2G"
spark <- sparklyr::spark_connect(version='2.2.0',
hadoop_version = '2.7',
master = "local",
config = config)
```
Assume you have a big data set and for each group you want to compute an aggregation. In general this is rather easy via sparklyr or the SQL API for standard SQL aggregations (mean,sum,count).
However, when you want to fit a model, i.e. a more complex and non standard aggregation it gets a bit more complicated.
If you were using native (java, scala) spark you could write a custom UADF. This is not possible from R.
sparkR however offers `gapply` to handle working with groups.
Recently, sparklyr introduced `sapply` which also can operate on grouped datasets which easily allows to scale out computation i.e. paralellize it per each group.
Compute a linear model for the iris dataset for each group of species.
```{r}
iris_tbl <- copy_to(spark,iris)
spark_apply(
iris_tbl,
function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)),
names = c("term", "estimate", "std.error", "statistic", "p.value"),
group_by = "Species"
)
```
> Note: as outlined here https://spark.rstudio.com/articles/guides-distributed-r.html you need to extract tabular names from the output of the function in order to properly *view* a result. Broom from the tidy universe is a good tool for this.
To conclude the lab let's stop spark
```{r}
spark_disconnect(spark)
```
For further inspiration visit https://spark.rstudio.com which outline how to use i.e. window functions and much more functionality.