-
Notifications
You must be signed in to change notification settings - Fork 0
/
parralel.rmd
1507 lines (1199 loc) · 74.5 KB
/
parralel.rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---
title: "Parallel Processesing in R"
output: pdf_document
---
```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)
```
##Parallel Porcessing In R
This is notes and code exercises from the DataCamp Course Parallel Programming In R
Packages:
-parallel
-foreach
-future.apply
Splitting a computation problem into parallel chunks?
-by tasks where different tasks can be performed independently.
-splitting a computation problem by data. In this case, the same task is performed on different chunks of data (i.e compute a sum of each row of a matrix in parallel.)
Exercise 1:
In the given text, find the most frequent words that start with each letter of the alphabet and are at least a given length long. You'll use the janeaustenr package with its 6 books by Jane Austen, which is available in your environment. Also loaded are the stringr package and the function janeausten_words(), which extracts words from the 6 books and converts them to lowercase. Here, the specific task is to find, for each letter of the alphabet, the most frequent word that is at least five characters long. Your job is to partition this task into independent pieces.
```{r cars}
library(janeaustenr)
# Vector of words from all six books
words <- janeausten_words()
# Most frequent "a"-word that is at least 5 chars long
max_frequency(letter = "a", words = words, min_length = 5)
# Partitioning
result <- lapply(letters, max_frequency,
words = words, min_length = 5) %>% unlist()
# Barplot of result
barplot(result, las = 2)
```
##CPU
The most important criterion is the Central Processing Unit, or CPU.
-In order to run your code in parallel, you need at least two and ideally more processors or cores.
-And those can either live on a single computer, or it can be a cluster of single- or multi-core computers connected via a network.
The second most important aspect is the memory.
-Take the concept of a multicore machine. Such systems have memory, or RAM, that all processes can access and communicate through, and is therefore called shared memory. An advantage of sharing memory is that processes can share variables and data, which often makes applications more time-efficient. This is handled by shared memory software.
-A system with distributed memory on the other hand, consists of multiple machines, think of a network of workstations, each of which has its own memory. Here, a process cannot access the memory of another process. Such systems are often called message-passing systems, because processes communicate via sending messages to one another.
-The advantage of message-passing software is that it can run on both systems, distributed as well as shared memory, as you will see throughout this course, which makes applications independent of the underlying hardware. This is not the case with shared memory software.
##Programming Paradigms
1. Master-worker
-well suited for for embarassingly paralel operations: All N calls to myfunc() can be run in parallel. However in reality, N is often much larger than the number of available processors. The master-worker model can deal with that problem. There is one process called master, which creates a set of other processes, the workers, and distributes tasks among them. Workers perform their tasks and return results to the master process, which in turn performs the final processing.
2. Map reduce paradigm: for data distrbuted across multiple machines
-Haddop, Spark
Exercise 2:
```{r}
# Complete the function definition
mean_of_rnorm <- function(n) {
# Generate normally distributed random numbers
random_numbers <- rnorm(n)
# Calculate the mean of the random numbers
return(mean(random_numbers))
}
# Try it out
mean_of_rnorm(10)
#Define a for loop, with variable iter counting from 1 to n_replicates.
#Inside the loop, set result[iter] to the result of mean_of_rnorm(), called with n_numbers_per_replicate.
n_numbers_per_replicate=10000
n_replicates=50
# Create a vector to store the results
result <- rep(NA,n_replicates)
# Set the random seed to 123
set.seed(123)
# Set up a for loop with iter from 1 to n_replicates
for (iter in seq_len(n_replicates)) {
# Call mean_of_rnorm with n_numbers_per_replicate
result[iter] <- mean_of_rnorm(n_numbers_per_replicate)
}
# View the result
hist(result)
#Create a vector of n_numbers_per_replicate, repeated n_replicates times.
#Call mean_of_rnorm() repeatedly using sapply()
# Repeat n_numbers_per_replicate, n_replicates times
n <- rep(n_numbers_per_replicate,n_replicates)
# Call mean_of_rnorm() repeatedly using sapply()
result <- sapply(
# The vectorized argument to pass
n,
# The function to call
mean_of_rnorm
)
# View the results
hist(result)
```
Exercise 2:
```{r}
ar1_one_value=function(est, r) {
est['mu'] + est['phi'] * (r - est['mu']) +
rnorm(1, sd = est['sigma'])
}
ar1_one_trajectory=function(est, rate0, len = 15) {
trajectory <- rep(NA, len)
rate <- rate0
for (time in seq_len(len)) {
trajectory[time] <- ar1_one_value(est, r = rate)
rate <- trajectory[time]
}
trajectory
}
ar1_block_of_trajectories=function(id, rate0 = 0.015, traj_len = 15, block_size = 10) {
trajectories <- matrix(NA, nrow = block_size, ncol = traj_len)
for (i in seq_len(block_size))
trajectories[i,] <- ar1_one_trajectory(unlist(ar1est[id, ]),
rate0 = rate0, len = traj_len)
trajectories
}
#Recall that function ar1_block_of_trajectories() generates a block of trajectories for a given row (in this example, a given ID) of the ar1est dataset. Write a function ar1_multiple_blocks_of_trajectories() that generates such blocks of trajectories for given vector of row identifiers ids.
#Inside the function, use lapply to call ar1_block_of_trajectories() on each element of ids.
#Bind the results by row using do.call().
# Function definition of ar1_multiple_blocks_of_trajectories()
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
# Call ar1_block_of_trajectories() for each ids
trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
# rbind results
do.call(rbind, trajectories_by_block)
}
#Apply ar1_multiple_blocks_of_trajectories() to traj_ids with 0.015 as rate0, block size 10, and 15 time periods.
# Create a sequence from 1 to number of blocks
library(ar1est)
traj_ids <- seq_len(nrow(ar1est))
# Generate trajectories for all rows of the estimation dataset
trajs <- ar1_multiple_blocks_of_trajectories(
ids = traj_ids, rate0 = 0.015,
block_size = 10, traj_len = 15
)
show_migration=function(trajs) {
df <- data.frame(time = seq(2020, by = 5, len = ncol(trajs)),
migration_rate = apply(trajs, 2, median),
lower = apply(trajs, 2, quantile, 0.1),
upper = apply(trajs, 2, quantile, 0.9)
)
g <- ggplot(df, aes(x = time, y = migration_rate)) +
geom_ribbon(aes(ymin = lower, ymax = upper), fill = "grey70") +
geom_line()
print(g)
}
# Show results
show_migration(trajs)
```
##Packages
map-reduce:
-sparklyr which offers an interface to the Apache Spark engine
-iotools package presented in the DataCamp course Scalable Data Processing in R.
-pbdR which stands for Programming with Big Data in R.
master-worker model:
-foreach
-future.apply.
-future which is under active development. It provides an abstraction layer, or a unified API for sequential and parallel processing. future.apply is an implementation of the apply-type functions used in future.
these three packages may be a little outdated:
-snow: mostly a reimplementation of snow. However, not all snow functionality has been ported over to parallel.
-snowFT is an extention to snow that adds important features, such as reproducibility and ease of use.
-snowfall: Adding simplicity to snow.
paralell:
- detectCores() to find out how many cores your computer has. If you are interested only in the physical cores, set the argument logical to FALSE.
-makeCluster() creates a cluster of nodes, or a cluster of workers.
-Here we created as many workers as we have physical cores.
-The current R session serves as the master process, while each worker is a separate R process.
-clusterApply() is the workhorse of the parallel package.
-Its first argument is the cluster object, from makeCluster().
-The second argument x is a sequence whose length determines how many times the function fun, which is the third argument, is going to be evaluated.
-The evaluation is spread across the workers, and the elements of the x sequence are passed as the first argument of the function fun().
-stopCluster() for when a cluster is not needed anymore, it is closed.
-In our example, where ncores is four, and therefore x in clusterApply() is a sequence from four to one, the first worker gets the instruction from the master to evaluate rnorm(4), the second worker rnorm(3), third worker rnorm(2) and finally, the fourth worker gets the number one as its first argument. The master then collects the results, which in this case are 10 random numbers.
Example:
We will use a similar example as in the last lesson where we evaluated rnorm() on a sequence c(4,3,2,1) in parallel, except this time you will pass additional arguments to rnorm(). These can be added to the clusterApply() function.
For comparison, you'll create a sequential solution using lapply() in Step 2 of this exercise and create a parallel solution in Step 3.
Even though computers have often more logical cores than physical cores, there is no speed advantage of running R processes on more than the number of physical cores.
```{r}
# From previous step
library(parallel)
ncores <- detectCores(logical = FALSE)
n <- ncores:1
# Use lapply to call rnorm for each n,
# setting mean to 10 and sd to 2
lapply(n,rnorm,mean=10,sd=2)
#cluster approach
# Create a cluster
cl <- makeCluster(ncores)
# Use clusterApply to call rnorm for each n in parallel,
# again setting mean to 10 and sd to 2
clusterApply(cl,n,rnorm,mean=10,sd=2)
# Stop the cluster
stopCluster(cl)
```
```{r}
library(tictoc)
vals=seq_len(10000)*seq_len(10000)
cl <- makeCluster(ncores)
```
```{r}
vals=seq_len(10000000)^2
```
```{r}
#took about 3 minutes to do it for 1 million event with just 2 cores
#took 50 minutes fr 10 million
tic()
b=clusterApply(cl,vals,sqrt)
toc()
```
```{r}
#took 2 minutes for just 100,000
tic()
list1=list()
for (x in vals){
list1=list.append(list1,sqrt(x))
}
toc()
```
Exercise 2
In the first lesson, you learned how to split sum(1:100) into independent pieces. For two cores, you can do sum(1:50) + sum(51:100). Here, we'll implement this using clusterApply(). The parallel package is preloaded, as well as a cluster object cl with two workers.
```{r}
# Evaluate partial sums in parallel
part_sums <- clusterApply(cl, x = c(1, 51),
fun = function(x) sum(x:(x + 49)))
# Total sum
total <- sum(unlist(part_sums))
# Check for correctness
total == sum(1:100)
```
Example 3
You will now parallelize your simple embarrassingly parallel application from a previous exercise. To repeatedly evaluate mean_of_rnorm() that computes the mean of a set of random numbers, in parallel
```{r}
# Create a cluster and set parameters
cl <- makeCluster(2)
n_replicates <- 50
n_numbers_per_replicate <- 10000
# Parallel evaluation on n_numbers_per_replicate, n_replicates times
means <- clusterApply(cl,
x = rep(n_numbers_per_replicate, n_replicates),
fun = mean_of_rnorm)
# View results as histogram
hist(unlist(means))
```
##Parallel Package
The parallel package consists of two parts, each of which is a reimplementation of a user-contributed package.
-One group of functions provides functionality originally implemented in the snow package.
-Uses message passing methods and therefore can be used on both, systems with distributed as well as shared memory.
-The snow part of parallel is OS independent.
-The other group of functions come from the multicore package.
-Multicore on the other hand, takes advantage of shared memory methods and thus, can be used only on single multi-core machines. It works on OS X and Linux, but not on Windows.
ClusterApply():
-Green bars denote the time workers spend processing tasks and red lines denote sending messages between the master and the nodes.
-At the beginning on the left, the master delivers the first batch of tasks to workers and then waits for the results.
-After it collects results from all workers, it sends the second batch of tasks, then waits for results, and so on, until the end, which takes in this case about 0.7 sec.
-There is an alternative to this way of processing that is more efficient.
Backends for makeCluster():
-lower-level details are hidden in the various backends that the parallel package is built on.
Socket:
-The default, and probably the most used, type is the socket communication.
-The type of backend is passed via the type argument.
-In this case we pass "PSOCK", which is the default and works on all OS platforms.
-In a socket cluster, all workers start with an empty environment, or a new R process. ]
-The master has to take care of sending everything they need for their computing tasks.
Fork:
-The type argument is called "FORK".
-In this case, the workers are complete copies of the master process, that is, they know about all global objects and functions, which can significantly decrease the communication needs.
-not available for windows
MPI:
-A backend based on the MPI library is also available.
-The interface is provided by the Rmpi R package.
-Recommend to use the default Socket backend as it is OS independent.
-However, if your hardware has the MPI libraries available, the MPI backend might provide a better efficiency to your applications.
Example 1:
In this exercise, you will explore the cluster object created by makeCluster(). In addition, you will use clusterCall(), which evaluates a given function on all workers. This can be useful, for example, when retrieving information from workers.
clusterCall() takes two arguments: the cluster object and the function to apply to each worker node. Just like lapply(), the function is passed without parentheses.
Here, we will use clusterCall() to determine the process ID of the workers, which is equivalent to finding process IDs of R sessions spawned by the master. Such info could be used for process management, including things outside of R.
```{r}
# Load the parallel package
library(parallel)
# Make a cluster with 4 nodes
cl <- makeCluster(4)
# Investigate the structure of cl
str(cl)
# What is the process ID of the workers?
clusterCall(cl,Sys.getpid)
# Stop the cluster
stopCluster(cl)
```
Example 2:
Now we will explore differences between the socket and the fork backends. In a fork cluster, each worker is a copy of the master process, whereas socket workers start with an empty environment. We define a global object and check if workers have an access to it under each of the backends. Your job is to use the function clusterCall() to look for the global object in the workers' environment.
The package parallel and the print_global_var() function, which calls print(a_global_var), are available in your workspace.
Make a socket cluster (of type "PSOCK") with 2 nodes.
Call print_global_var() on each cluster node. Can the function find the global variable?
```{r}
print_global_var=function() {print(a_global_var)}
# A global variable and is defined
a_global_var <- "before"
# Create a socket cluster with 2 nodes
cl_sock <- makeCluster(2,type="PSOCK")
# Evaluate the print function on each node
clusterCall(cl_sock,print_global_var)
# Stop the cluster
stopCluster(cl_sock)
#Make a fork cluster (of type "FORK") with 2 nodes.
#Call print_global_var on each cluster node. Can the function find the global variable now?
# A global variable and is defined
a_global_var <- "before"
# Create a fork cluster with 2 nodes
cl_fork <- makeCluster(2,type="FORK")
# Evaluate the print function on each node
clusterCall(cl_fork,print_global_var)
# Stop the cluster
stopCluster(cl_fork)
#Update a_global_var to contain the string "after".
#Call print_global_var on each cluster node. Can the function find the global variable now?
# A global variable and is defined
a_global_var <- "before"
# Create a fork cluster with 2 nodes
cl_fork <- makeCluster(2, type = "FORK")
# Change the global var to "after"
a_global_var <- 'after'
# Evaluate the print fun on each node again
clusterCall(cl_fork,print_global_var)
# Stop the cluster
stopCluster(cl_fork)
```
There were two problems in the previous steps:
-The socket cluster failed to print a_global_var.
-The fork cluster did not update a_global_var after it was updated.
- Workers in a socket cluster start with an empty environment. Thus, a_global_var was not defined.
- Workers in a fork cluster start with a copy of the master environment. After forking, they do not see changes on the master. Thus, a_global_var was not updated.
##Core Functions
-clusterApply() does most of the work when a parallel application is processed.
-An alternative to it which ensures the work is distributed fairly among workers is the function clusterApplyLB(), where LB means load balanced.
There are a few wrapper functions available as well.
-For example the parApply(), parLapply() and parSapply() functions work analogously to their sequential counterparts apply(), lapply() and sapply().
-Functions parRapply() and parCapply() are parallel row and column apply() functions for matrices.
-These five functions are wrappers around the clusterApply() function.
-Similarly, the load balanced alternatives, namely parLapplyLB() and parSapplyLB() are wrappers of the clusterApplyLB() function.
-In this course, we will mainly talk about clusterApply() and clusterApplyLB(), as they provide more flexibility. However, the wrapper functions are useful if your data falls nicely into the apply() framework, as they do some post-processing, like putting results into the required shape
##clusterApply(): Number of tasks
-number of tasks to be distributed to workers.
-We also talked that the sequence in argument x is split so that each element is passed as the first argument to the given function, here myfunc() like c(1,51) is 2 tasks.
-What it means is that the length of the argument x determines the number of tasks that is sent to workers.
-Remember this picture of master sending tasks to eight parallel workers? Here the length of the arg.sequence object determines the number of green bars. This is important to remember, because communication between master and workers is expensive in terms of processing time, so ideally we want to minimize sending messages, the red lines, and maximize the time workers process their tasks, that is the length of the green bars.
##Parallel vs. sequential
-Not all embarrassingly parallel applications are suited for parallel processing.
-There is an overhead that one needs to take into account when designing parallel applications.
-For example, starting and stopping cluster takes time.
-The number of messages sent between nodes and master contributes to the overhead.
-The size of messages can make a difference.
-Sending big datasets between master and workers is definitely not a good idea.
Therefore, when you design parallel applications, there are a few things to consider.
-How big is the single task that will be repeatedly evaluated, that is the green bar in the previous picture.
-How much data need to be sent back and fort.
-How much overall gain is there by running the application in parallel as oppose to running it sequentially. Here, benchmarking can help to answer that question.
Exercise
Benchmarking setup
In this exercise, you will take the simple embarrassingly parallel application for computing mean of random numbers (mean_of_rnorm()) from the first chapter, and implement two functions: One that runs the application sequentially, mean_of_rnorm_sequentially(), and one that runs it in parallel, mean_of_rnorm_in_parallel(). We will then benchmark these two functions in the next exercise.
mean_of_rnorm() and a cluster object, cl, are defined.
Turn the script to call mean_of_rnorm() sequentially into a function named mean_of_rnorm_sequentially.
```{r}
# Wrap this code into a function
mean_of_rnorm_sequentially = function(n_numbers_per_replicate,n_replicates){
n <- rep(n_numbers_per_replicate, n_replicates)
lapply(n, mean_of_rnorm)
}
# Call it to try it
mean_of_rnorm_sequentially(1000, 5)
#Turn the script to call mean_of_rnorm() in parallel into a function named mean_of_rnorm_in_parallel.
# Wrap this code into a function
mean_of_rnorm_in_parallel = function(n_numbers_per_replicate, n_replicates){
n <- rep(n_numbers_per_replicate, n_replicates)
clusterApply(cl, n, mean_of_rnorm)
}
# Call it to try it
mean_of_rnorm_in_parallel(1000, 5)
```
Example 3:
Task size matters
Now you will benchmark the functions created in the previous exercise using the microbenchmark package. To see the impact of the parallel processing overhead, you will pass different number of replications and sample sizes and explore under which conditions parallel processing becomes inefficient.
mean_of_rnorm_sequentially(), mean_of_rnorm_in_parallel(), and a cluster cl with two nodes are available.
```{r}
# Set numbers per replicate to 5 million
n_numbers_per_replicate <- 5000000
# Set number of replicates to 4
n_replicates <- 4
# Run a microbenchmark
microbenchmark(
# Call mean_of_rnorm_sequentially()
mean_of_rnorm_sequentially(n_numbers_per_replicate,n_replicates),
# Call mean_of_rnorm_in_parallel()
mean_of_rnorm_in_parallel(n_numbers_per_replicate,n_replicates),
times = 1,
unit = "s"
)
```
Unit: seconds
expr min
mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates) 1.5490132
mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates) 0.7479845
lq mean median uq max neval
1.5490132 1.5490132 1.5490132 1.5490132 1.5490132 1
0.7479845 0.7479845 0.7479845 0.7479845 0.7479845 1
```{r}
# Change the numbers per replicate to 100
n_numbers_per_replicate <- 100
# Change number of replicates to 100
n_replicates <- 100
# Rerun the microbenchmark
microbenchmark(
mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates),
mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates),
times = 1,
unit = "s"
)
```
Unit: seconds
expr min
mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates) 0.001235391
mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates) 0.371440166
lq mean median uq max neval
0.001235391 0.001235391 0.001235391 0.001235391 0.001235391 1
0.371440166 0.371440166 0.371440166 0.371440166 0.371440166 1
-Saw a speed-up when the independent tasks were large and the communication overhead low.
-We saw a significant inefficiency when the sample size was small.
-Note that the DataCamp servers have either two or four physical cores and its load depends on the level of current utilization. You can run these tests on your own computer, hopefully with more cores, and you should see a larger speed-up.
##Why initialize?
-We know that usually each cluster node starts with a clean empty environment.
-This is for example the case when the socket backend is used.
-In this case, no libraries are loaded and no user objects are known to the process.
-Furthermore, we know that repeated communication of master with the nodes is expensive. Ex:
-We use the clusterApply() function to repeatedly call rnorm() to generate 1000 random numbers in each of the n repetitions. The standard deviation of the 1000 numbers ranges from 1 to 1000, and it is the same in each of the n calls. Here, the master has to send a vector of 1:1000 to all n tasks. And of course, n can be quite large, or, instead of 1000 we could have a vector of million. In short, passing large arguments within clusterApply() can become very quickly a significant overhead. Therefore, a good practice is to make the master to initialize workers at the beginning of the process with everything that stays constant or/and is time consuming. And this can be for example sending static data. We know datasets can be large and this can save lots of time. Or loading libraries on nodes. Or evaluating global functions.
Let's look at functions in the parallel package that allow such inititalization. There are three functions we will talk about here.
##clusterCall()
The first one is clusterCall(), which you've already seen in your exercise.
-This function calls the same function with the same arguments on all nodes.
-Here is an example. We create a cluster of size two, then we use clusterCall() to call a function on both nodes that loads the janeaustenr library. Now both nodes are ready to use functions and data from the janeaustenr package. We can test it by using clusterCall() to call a function on both nodes that returns the 20-th element of the emma dataset, which is a dataset from the janeaustenr package. And yes, both nodes know the emma dataset and they both return the same row from the Emma book.
##clusterEvalQ()
-The second function, maybe more convenient than clusterCall(), is clusterEvalQ(). This function evaluates a literal expression on all nodes, which is a parallel equivalent to the evalq() function.
-Here is an example. After creating a cluster, we use clusterEvalQ() to evaluate an expression composed from three steps. First, loading the janeaustenr package, second, loading the stringr package and third, defining a function called get_books() which returns a vector of names of books included in the package. After this call, both nodes will be initialized with the two libraries and they both have the function get_books() in their global environment. To test it, we use clusterCall() to evaluate a function that returns the first three elements of the book vector, making use of the get_books() function. And indeed, both nodes were able to evaluate get_books() and returned names of the first three books.
##clusterExport()
And finally, a very useful clusterExport() function. It exports given objects from the master to the workers.
-The objects are given by their names, and they must exist in the master process.
-As an example, we define an object called books, which is the result of the get_books() function. We create a cluster and export the books object by passing its name. Then we test it with clusterCall() where the books object is printed. And indeed, the books object is defined in the global environment of both workers.
Example 1:
In this example, you will run a simple application in parallel that requires the package extraDistr. You will see that if you load it only on the master, the code will fail. Then you will use the function clusterEvalQ() to load the package on all cluster nodes.
The parallel package and a 4-node cluster object cl with socket backend is available in your workspace. You will use a pre-defined function myrdnorm() that takes n, mean, and sd as arguments and passes it to the rdnorm() function from extraDistr to generate n random numbers from the discrete normal distribution with given mean and standard deviation.
```{r}
# Pre-defined myrdnorm
myrdnorm <- function(n, mean = 0, sd = 1)
rdnorm(n, mean = mean, sd = sd)
# Parameters
n_numbers_per_replicate <- 1000
n_replicates <- 20
# Repeat n_numbers_per_replicate, n_replicates times
n <- rep(n_numbers_per_replicate,n_replicates)
# Load extraDistr on master
library(extraDistr)
# Run myrdnorm in parallel. This should fail!
res <- clusterApply(cl, n,myrdnorm)
#result:
#Error: 20 nodes produced errors; first error: could not find function "rdnorm
#Load extraDistr on master again.
#Use clusterEvalQ() to also load extraDistr on each node.
# clusterApply() to apply myrdnorm to n in parallel.
#Run the plotting code to see the result.
# From previous step
myrdnorm <- function(n, mean = 0, sd = 1)
rdnorm(n, mean = mean, sd = sd)
n_numbers_per_replicate <- 1000
n_replicates <- 20
n <- rep(n_numbers_per_replicate, n_replicates)
# Load extraDistr on master
library(extraDistr)
# Load extraDistr on all workers
clusterEvalQ(cl,library(extraDistr))
# Run myrdnorm in parallel. It should work now!
res <- clusterApply(cl,n,myrdnorm)
# Plot the result
plot(table(unlist(res)))
```
Example 2:
Setting global variables
Here you will use a slight modification of the previous example, where instead of passing the mean and sd parameters as arguments, they will be defined in the worker's environment as global variables. You will use the clusterEvalQ() function again for the worker initialization.
As before, the parallel package and the cluster object cl are available in your workspace. A variant of myrdnorm() that uses global variables is shown in the script.
```{r}
# rdnorm(), but using global variables
myrdnorm <- function(n) {
rdnorm(n, mean = mean, sd = sd)
}
# Set mean to 10, globally
mean <- 10
# Set sd to 5, globally
sd <- 5
# Generate 1000 numbers with myrdnorm()
myrdnorm(1000)
```
Use clusterEvalQ() to initialize workers as follows:
Load the extraDistr package.
Set mean to 10 and sd to 5 to be worker's global variables.
Evaluate myrdnorm in parallel as in the previous exercise.
Plot results. Can you see the effect of the global variables on the distribution?
```{r}
# From previous step
myrdnorm <- function(n) {
rdnorm(n, mean = mean, sd = sd)
}
# Set number of numbers to generate
n <- rep(1000, 20)
# Run an expression on each worker
clusterEvalQ(
cl, {
# Load extraDistr
library(extraDistr)
# Set mean to 10
mean <- 10
# Set sd to 5
sd <- 5
})
# Run myrdnorm in parallel
res <- clusterApply(cl,n,myrdnorm)
# Plot the results
plot(table(unlist(res)))
```
Example 2:
Exporting global objects
Using clusterEvalQ() for setting global variables as in the last example assigns them only on the nodes and not on the master. If you want to share the same objects between master and the nodes, use the function clusterExport(). Here we use the same function as in the last exercise, myrdnorm() and initialize the global objects first on the master. Then you will use clusterExport() to export those objects to the workers.
clusterExport() takes two arguments: a cluster object and a character vector of variable names to copy from the master to the nodes.
The parallel package, a cluster object of size four, cl, the function myrdnorm(), and the number of numbers to generate, n, are available.
On the master, set global objects mean and sd to 20 and 10, respectively.
Load the extraDistr package on workers using clusterEvalQ().
Using clusterExport(), export mean and sd to the cluster nodes.
As in the previous exercise, evaluate myrdnorm() in parallel, assigning to res.
```{r}
# Set global objects on master: mean to 20, sd to 10
mean <- 20
sd <- 10
# Load extraDistr on workers
clusterEvalQ(cl,{library(extraDistr)})
# Export global objects to workers
clusterExport(cl,c("mean","sd"))
# Run myrdnorm in parallel
res <- clusterApply(cl,n,myrdnorm)
# Plot the results
plot(table(unlist(res)))
```
##subsetting data
Data chunks
-Each task is applied to different data, or data chunk.
-A data chunk corresponding to a task can be made available to the worker in various ways.
-In one situation the data are random numbers that are generated on the fly.
-In another situation, the data chunk is passed as an argument, and we will see an example of this in a moment.
-the application dataset is chunked into several blocks on the master side, then each block is passed to the corresponding worker via an argument.
-This behaviour is incorporated into some higher level functions of the parallel package, like parApply() and its variants.
-Here is an example. After creating a cluster with four workers, we generate a matrix of random numbers with four columns and three rows.
-Now we'd like to compute the sum of the columns, analogously to the function colSums().
-The function parCapply() does exactly that. It splits the matrix by columns and sends each column to a worker which applies the sum() function on the received data.
-If you'd want to do the same with clusterApply(), you need to convert the matrix into a data frame and unlist the results.
-This way of chunking data may be convenient but can become communication-heavy if the matrix size increases.
-Another option is to chunk the data on the worker's side.
-Often it would not be efficient to pass the data as an argument, especially if the data is big.
-Instead, workers can be pre-loaded with the whole dataset and each task carries information about which part of the d data to work on.
-So the chunking happens on the worker's end.
-Here is an example of a matrix multiplication M times M. We create a matrix M of size 100 times 100 and export it to the workers using clusterExport(). So all workers see the whole dataset.
-Then we define a function mult_row() that computes a multiplication for one row of the matrix.
-The row is specified by the id argument.
-The apply() function splits the matrix into columns.
-The specified row is multiplied with each of the columns, and summed.
-The function results in a vector of size 100 corresponding to the id row of M times M.
-We can then use clusterApply() to call mult_row() in parallel, passing 1:n as row ids.
-The result is then converted to a matrix using rbind().
-This is an example of a situation where we saved communication time by letting all workers to see the same data and only passed information on which part of the data the worker should work on - here it was the identifier of the matrix row.
Example 1:
Passing data as arguments
Here, we will explore subsetting data by passing it to workers as arguments. We will use the Jane Austen example introduced previously. The goal is to find unique words within the 6 books from the janeaustenr package, that start with the given letter, here "v", and have the given number of characters or more, here at least 10. You will evaluate the task in parallel on a cluster of size 2. You will also split the set of words into 2 subsets so that each worker gets one of them.
The parallel package, the set of words extracted from janeaustenr, and a cluster object cl with 2 workers are available in your workspace. The function with the following arguments has also been defined for you:
select_words(words, letter, min_length)
select_words() extracts all words that start with letter and are of length min_length or more. Run the function in the console to see how it works.
```{r}
# Select words beginning with "v", at least 10 letters long
words_v10 <- select_words(words, "v", 10)
# Get the unique words
print(unique(words_v10))
```
Next, prepare the data to do this in parallel.
Use sample() to sample numbers up to 2, as many times as there are words, with replacement, to generate two random groups.
Use split() to split words by those groups.
```{r}
# Generate 2 random groups
groups <- sample(x =2, size = length(words), replace = TRUE)
#returns like 1,2,1,2,2,2,2
# See those groups
head(groups, 20)
# Split words into groups
split_words <- split(words, groups)
```
Now select the words in parallel.
Use clusterApply() on split_words to select the words starting with "v" and at least 10 letters long.
Use unlist() to flatten the result.
Print the unique words.
```{r}
# From previous step
groups <- sample(2, length(words), replace = TRUE)
split_words <- split(words, groups)
# Apply select_words() to each element of split_words in parallel
res <- clusterApply(cl, split_words, select_words, letter = "v", min_length = 10)
# Flatten the result
words_v10 <- unlist(res)
# Get the unique words
unique(words_v10)
```
Example 2:
The migration application from the first chapter is an example of the third type of chunking, namely when the workers "see" the whole dataset and only get instructions which part of the data to work on. Here, you will parallelize the code. Earlier you defined a function ar1_multiple_blocks_of_trajectories() that takes a vector of row identifiers as an argument and generates migration trajectories using the corresponding rows of the parameter set ar1est.
ar1_multiple_blocks_of_trajectories() depends on ar1_block_of_trajectories() which in turn depends on ar1_one_trajectory(). These functions along with the cluster object cl of size 4, function show_migration(), the dataset ar1est (reduced to 200 rows) and packages parallel and ggplot2 are available in your workspace.
Use clusterApply() to run ar1_multiple_blocks_of_trajectories() in parallel where each parallel task processes one row of ar1est.
The x argument to clusterApply() should be a vector from 1 to the number of rows of ar1est.
Use default values for the starting rate, length of trajectories and block size.
```{r}
ar1_one_trajectory=function(est, rate0, len = 15) {
ar1_one_value <- function(est, r) {
# simulate one AR(1) value
est['mu'] + est['phi'] * (r - est['mu']) +
rnorm(1, sd = est['sigma'])
}
trajectory <- rep(NA, len)
rate <- rate0
for (time in seq_len(len)) {
trajectory[time] <- ar1_one_value(est, r = rate)
rate <- trajectory[time]
}
trajectory
}
ar1_multiple_blocks_of_trajectories=function(ids, ...) {
trajectories <- NULL
for (id in ids)
trajectories <- rbind(trajectories, ar1_block_of_trajectories(id, ...))
trajectories
}
show_migration=function(trajs) {
df <- data.frame(time = seq(2020, by = 5, len = ncol(trajs)),
migration_rate = apply(trajs, 2, median),
lower = apply(trajs, 2, quantile, 0.1),
upper = apply(trajs, 2, quantile, 0.9)
)
g <- ggplot(df, aes(x = time, y = migration_rate)) +
geom_ribbon(aes(ymin = lower, ymax = upper), fill = "grey70") +
geom_line()
print(g)
}
# Export data and functions
clusterExport(cl, c("ar1est", "ar1_one_trajectory", "ar1_block_of_trajectories"))
# Process ar1_multiple_blocks_of_trajectories in parallel
res <- clusterApply(cl,
1:nrow(ar1est),
fun = ar1_multiple_blocks_of_trajectories)
# Combine results into a matrix and show results
trajs <- do.call(rbind, res)
show_migration(trajs)
```
Example 3:
Alternative chunking
We will now slightly modify the application of chunking in the previous exercise. Instead of a task corresponding to one row, each task will now process 40 rows, which corresponds to 5 tasks in total. You will then compare the shape of the returned object to the one you created in the last exercise. Recall that in both cases you are generating 10 trajectories per each of the 200 rows, which is 2000 trajectories in total, each 15 time points long.
The package parallel and cluster object cl with 4 workers along with all dependent functions and dataset have been exported to the workers. The object res returned by clusterApply() in the previous exercise is also in your workspace, now called res_prev.
Use splitIndices() to split indices of the rows of the ar1est dataset into 5 chunks.
Run ar1_multiple_blocks_of_trajectories() in parallel as in the previous exercise, but passing the split indices as the x argument.
Compare the shape of res and res_prev by printing the str()ucture of each. Why are they different?
```{r}
splitIndices=function (nx, ncl)
{
i <- seq_len(nx)
if (ncl == 0L)
list()
else if (ncl == 1L || nx == 1L)
list(i)
else {
fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length.out = ncl +
1L)
structure(split(i, cut(i, breaks)), names = NULL)
}
}
# Split task into 5 chunks
ind <- splitIndices(nrow(ar1est), 5)
# Process ar1_multiple_blocks_of_trajectories in parallel
res <- clusterApply(cl,ind,ar1_multiple_blocks_of_trajectories)
# Compare the structure of the results
str(res)
str(res_prev)
```
##foreach package
1. What is foreach for?
-It provides a new looping construct for repeated execution of your code.
-This construct makes it possible that in the background, the repeated execution is distributed on multiple processors, and thus it supports parallel processing.
-Provides a unified interface for sequential and parallel processing. You write your code the same way, regardless if it is going to be processed sequentially or in parallel.
-Greatly suited for embarrassingly parallel applications
2. foreach looping construct
-It is a combination of the foreach() function and a special binary operator do, written as %do%
-Ex: we are generating five random numbers and repeat that three times. So we set an iteration variable n to a vector of three values of five. The %do% binary operator is followed by an expression to be evaluated repeatedly, passing the iteration variable n as an argument. It will instruct the foreach construct to evaluate rnorm() three times, generating five random numbers in each of the three iterations. Unlike the standard for loop, foreach returns a value.
foreach(n=rep(5,3),m=10^(0:2)) %do% rnorm(n,mean=m)
3. combine results
-foreach(n=rep(5,3),.combine=rbind %do% rnorm(n,mean=m)
-.combine = '+' to sum across the answers
-you could also write your own functions.
-list comprehnsion: foreach()%:%when()%do%x
Example 1:
Recall the first chapter where you found the most frequent words from the janeaustenr package that are of certain minimum length. It looked like this:
result <- lapply(letters, max_frequency,
words = words, min_length = 5) %>% unlist
In this exercise, you will implement the foreach construct to solve the same problem.
The janeaustenr package, a vector of all words from the included books, words, and a function max_frequency() for finding the results based on a given starting letter are all available in your workspace.
```{r}
# Load the package
library(foreach)
# foreach() %do% construct
result <- foreach(let = letters, .combine= c) %do%
max_frequency(let, words, 5)
# Plot results
barplot(result, las = 2)
```
Example 2:
This exercise is a modification of the previous one. Now assume you want to vary the minimum word length across the letters of the alphabet. Specifically, your job is to modify the code so that the maximum frequency for the first half of the alphabet is obtained for words that are two and more characters long, while the frequency corresponding to the second half of the alphabet is derived from words that are six and more characters long. Note that we are using an alphabet of 26 characters.
Your namespace has all the objects from the previous exercise, plus the foreach package is loaded.
```{r}
# foreach()%do% construct with 2 iterators
result <- foreach(let = letters, n = c(rep(2,13),rep(6,13)), .combine = c) %do%
max_frequency(let, words, n)
# Plot results
barplot(result, las = 2)
```
##Popular backends
-Several packages available that provide parallel backends to the foreach package.
-Probably the most popular one is the doParallel backend, which uses techniques and tools of the parallel package to implement the loop
-First register the backend via the function registerDoParallel() while passing the cluster information.
-The quickest way to register is to just pass the number of desired nodes as the cores argument and the package does the rest. registerDoParallel(cores=3)
-If you want more nuance in creating the cluster, you can create it outside of the registering function, namely using the makeCluster() function and pass the resulting cl object to registerDoParallel().
-In this case, the snow functionality of the parallel package will be used.
-If you are using a Unix-like system, the parallel code will use the multicore part of the parallel package, creating the processes by forking.
-On Windows systems, it will use the snow part of parallel.
-%dopar% instead of %do%
-A relatively new parallel backend is provided by the doFuture backend which is based on the future package.
-Has a more generic approach, as it aims to provide a universal adaptor to foreach enabling a use of any parallel backend. -The central idea is the notion of a plan which determines a strategy of how foreach() should work behind the scenes.
- Among the available built-in plans are the:
-sequential plan for sequential processing
-the cluster plan for parallelization on the local machine or across multiple machines
-the multicore plan providing the multicore-like functionality of the parallel package for parallelization on the local computer.
-Multiprocess is a convenience plan that abstracts from the underlying hardware.
-Additional strategies are provided by the package future.batchtools, which allows to run processes on high performance computing clusters, powered by systems like Torque, Slurm, SGE etc.
-First, load the package and register the backend using registerDoFuture() with no arguments. Then:
- plan(cluster,workers=3 Then regular foreach()
- plan(multicore) Then regular foreach()
- etc
-With doFuture you can easily switch between backends without having to use another package.
-Worth mentioning is also the doSEQ interface which allows to switch between parallel and sequential processing without changing the loop.
Example 1:
Using doParallel
In one of the previous exercises, you parallelized the function myrdnorm() which uses the package extraDistr to generate discrete random numbers. Here you will implement a parallel foreach solution to the same problem, using the doParallel backend. doParallel provides an argument .packages which ensures that external packages are loaded on all nodes.
doParallel and the function myrdnorm() are loaded in your namespace.
Register a cluster of 3 nodes using registerDoParallel() and argument cores.
Write a parallel foreach loop that evaluates the myrdnorm() function 100 times generating 1000 random numbers in each iteration. Thus, the iterator r is a vector of length 100. Results should be combined into a matrix. Within foreach() use the .packages argument to load "extraDistr" on workers.
Assign the dimensions of res to dim_res.
```{r}
# Register doParallel with 3 cores
registerDoParallel(cores=3)
# foreach()%dopar% loop
res <- foreach(r = rep(1000, 100), .combine = rbind,
.packages ="extraDistr") %dopar% myrdnorm(r)
# Dimensions of res
dim_res <- dim(res)
```
Example 2:
So far you learned how to search for the most frequent word in a text sequentially using foreach(). In the course of the next two exercises, you will implement the same task using doParallel and doFuture for parallel processing and benchmark it against the sequential version. The sequential solution is implemented in function freq_seq() (type freq_seq in your console to see it). It iterates over a global character vector chars and calls the function max_frequency() which searches within a vector of words, while filtering for minimum word length. All these objects are preloaded, as is the doParallel package. Your job now is to write a function freq_doPar() that runs the same code in parallel via doParallel.
```{r}
freq_seq=function(min_length = 5)
foreach(let = chars, .combine = c) %do%
max_frequency(let, words = words, min_length = min_length)
# Function for doParallel foreach
freq_doPar <- function(cores, min_length = 5) {
# Register a cluster of size cores
registerDoParallel(cores=cores)
# foreach loop
foreach(let = letters, .combine=c,
.export = c("max_frequency", "select_words", "words"), #export functions
.packages = c("janeaustenr", "stringr")) %dopar% #export pacakges
max_frequency(let, words = words, min_length = min_length)
}
# Run on 2 cores
freq_doPar(2)
```
Example 3:
Word frequency with doFuture and benchmarking
Now your job is to create a function freq_doFut() that accomplishes the same task as freq_doPar() but with the doFuture backend. Note that when using doFuture, arguments .packages and .export in foreach() are not necessary, as the package deals with the exports automatically.
You will then benchmark these two functions, together with the sequential freq_seq(). All the functions from the last exercise are available in your workspace. In addition, the packages doFuture and microbenchmark are also preloaded. To keep the computation time low, the global chars vector is set to the first six letters of the alphabet only.