Skip to content

Commit

Permalink
communications collectives
Browse files Browse the repository at this point in the history
  • Loading branch information
pnavaro committed Sep 24, 2024
1 parent 6d3a005 commit af19b95
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 67 deletions.
Binary file added allgather.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 16 additions & 0 deletions bcast.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
library(Rmpi)

id <- mpi.comm.rank(comm=0)

# Diffusion du vecteur v sur les processeurs autre que 0
if (id == 0) {
v <- c(1, 2, 3, 4)
mpi.bcast.Robj(obj = v, rank = 0, comm = 0)
} else {
v <- mpi.bcast.Robj(rank = 0, comm = 0)
}

cat("vector on ", id, " = ", v, "\n" )

invisible(mpi.barrier(comm=0))
invisible(mpi.finalize())
Binary file added broadcastvsscatter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 17 additions & 0 deletions demo_pbdmpi.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## Not run:
### Save code in a file "demo.r" and run with 2 processors by
### SHELL> mpiexec -np 2 Rscript demo.r

### Initial.
suppressMessages(library(pbdMPI, quietly = TRUE))
init()

### Examples.
x <- matrix(1:5, nrow = 1)
y <- bcast(x)
comm.print(x)
comm.print(y)

### Finish.
finalize()

Binary file added gather.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
186 changes: 119 additions & 67 deletions index.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ Supports pour apprendre à utiliser la bibliothèque MPI avec R

# Bibliographie

- [Rmpi](https://fisher.stats.uwo.ca/faculty/yu/Rmpi/)
- [Supports de cours MPI](http://www.idris.fr/formations/mpi/) Dimitri Lecas - Rémi Lacroix - Serge Van Criekingen - Myriam Peyrounette CNRS — IDRIS
- [Site officiel de Rmpi](https://fisher.stats.uwo.ca/faculty/yu/Rmpi/)
- [Supports de cours MPI de l'IDRIS](http://www.idris.fr/formations/mpi/)
- [How to run R programs on University of Maryland HPC facility](https://hpcf.umbc.edu/other-packages/how-to-run-r-programs-on-maya/)
- [Documentation de GRICAD](https://gricad-doc.univ-grenoble-alpes.fr/hpc/softenv/nix/#r-packages)
- [MPI Tutorial by Wes Kendall](https://mpitutorial.com)
- [CRAN Task View: High-Performance and Parallel Computing with R](https://cran.r-project.org/web/views/HighPerformanceComputing.html)

# Introduction

Expand Down Expand Up @@ -305,105 +307,115 @@ communicateur indiqué.
Pour chacun des processus, l’appel se termine lorsque la participation de celui-ci
à l’opération collective est achevée, au sens des communications point-à-point
(donc quand la zone mémoire concernée peut être modifiée).

La gestion des étiquettes dans ces communications est transparente et à la
charge du système. Elles ne sont donc jamais définies explicitement lors de
l’appel à ces sous-programmes. Cela a entre autres pour avantage que les
communications collectives n’interfèrent jamais avec les communications point à
point.


## Types de communications collectives

Il y a trois types de sous-programmes :

1. celui qui assure les synchronisations globales :MPI_Barrier().
1. celui qui assure les synchronisations globales : `mpi.barrier(comm=0)`.
2. ceux qui ne font que transférer des données :

- diffusion globale de données :MPI_Bcast();
- diffusion sélective de données :MPI_Scatter();
- collecte de données réparties :MPI_Gather();
- collecte par tous les processus de données réparties :MPI_Allgather();
- collecte et diffusion sélective, par tous les processus, de données réparties :

3. ceux qui, en plus de la gestion des communications, effectuent des opérations sur

les données transférées :
- diffusion globale de données : `mpi.bcast`
- diffusion sélective de données : `mpi.scatter`
- collecte de données réparties : `mpi.gather`
- collecte par tous les processus de données réparties : `mpi.allgather`

• opérations de réduction (somme, produit, maximum, minimum, etc.), qu’elles soient
3. ceux qui, en plus de la gestion des communications, effectuent des opérations sur les données transférées :

d’un type prédéfini ou d’un type personnel :MPI_Reduce();
- opérations de réduction (somme, produit, maximum, minimum, etc.), qu’elles soient d’un type prédéfini ou d’un type personnel : `mpi.reduce`
- opérations de réduction avec diffusion du résultat `mpi.reduce` suivi d’un `mpi.bcast`

• opérations de réduction avec diffusion du résultat (équivalent à unMPI_Reduce()

suivi d’unMPI_Bcast()) :MPI_Allreduce().
## Diffusion générale `mpi.bcast`

1. Envoi, à partir de l’adresse message, d’un message constitué de longueur élément de type `type_message`, par le processus `rank`, à tous les autres processus du communicateur `comm`.

# Synchronisation globale :MPI_Barrier()
2. Réception de ce message à l’adresse message pour les processus autre que `rank`.

# Diffusion générale :MPI_Bcast()
```R
library(Rmpi)

1. Envoi, à partir de l’adresse message, d’un message constitué de longueur élément de type type_message, par le processus rang_source, à tous les autres processus du communicateur comm.
id <- mpi.comm.rank(comm=0)

2. Réception de ce message à l’adresse message pour les processus autre que rang_source.
# Diffusion du vecteur v sur les processeurs autre que 0
if (id == 0) {
v <- c(1, 2, 3, 4)
mpi.bcast.Robj(obj = v, rank = 0, comm = 0)
} else {
v <- mpi.bcast.Robj(rank = 0, comm = 0)
}

cat("vector on ", id, " = ", v, "\n" )

# Diffusion sélective :MPI_Scatter()
invisible(mpi.barrier(comm=0))
invisible(mpi.finalize())
```

1. Distribution, par le processus rang_source, à partir de l’adresse
message_a_repartir, d’un message de taille longueur_message_emis, de type
type_message_emis, à tous les processus du communicateur comm ;
```bash
$ mpirun -np 4 Rscript bcast.R
vector on 3 = 1 2 3 4
vector on 2 = 1 2 3 4
vector on 1 = 1 2 3 4
vector on 0 = 1 2 3 4
```

2. réception du message à l’adresse message_recu, de longueur
longueur_message_recu et de type type_message_recu par tous les processus
du communicateur comm.

Remarques :
# Diffusion sélective `mpi.scatter`

Les couples (longueur_message_emis, type_message_emis) et (longueur_message_recu, type_message_recu) doivent
être tels que les quantités de données envoyées et reçues soient égales.
Les données sont distribuées en tranches égales, une tranche étant constituée de longueur_message_emis éléments du
type type_message_emis.
La ième tranche est envoyée au ième processus.

![](broadcastvsscatter.png)

# Collecte :MPI_Gather()
```R
library(Rmpi)

1. Envoi de chacun des processus du communicateur comm, d’un message message_emis, de taille longueur_message_emis et de type type_message_emis.
id <- mpi.comm.rank(comm=0)
np <- mpi.comm.size(comm=0)

2. Collecte de chacun de ces messages, par le processus rang_dest, à partir l’adresse message_recu, sur une longueur longueur_message_recu et avec le type type_message_recu.
if (id == 0) {
data = matrix(1:24,ncol=3)
splitmatrix = function(x, ncl) lapply(.splitIndices(nrow(x), ncl), function(i) x[i,])
chunk = splitmatrix(data, np)
}

Remarques :
chunk <- mpi.scatter.Robj(obj = chunk, root = 0, comm = 0)

Les couples (longueur_message_emis, type_message_emis) et (longueur_message_recu, type_message_recu) doivent être tels que les quantités de données envoyées et reçues soient égales.
cat("data on ", id, ":", chunk, "\n")

Les données sont collectées dans l’ordre des rangs des processus.

# Collecte générale :MPI_Allgather()
invisible(mpi.barrier(comm=0))
invisible(mpi.finalize())
```

Correspond à unMPI_Gather()suivi d’unMPI_Bcast():
```bash
$ mpirun -np 4 Rscript scatter.R
data on 0 : 1 2 9 10 17 18
data on 3 : 7 8 15 16 23 24
data on 2 : 5 6 13 14 21 22
data on 1 : 3 4 11 12 19 20
```

1. Envoi de chacun des processus du communicateur comm, d’un message message_emis, de taille longueur_message_emis et de type type_message_emis.

2. Collecte de chacun de ces messages, par tous les processus, à partir l’adresse message_recu, sur une longueur longueur_message_recu et avec le type type_message_recu.
# Collecte `mpi.gather`

Remarques :
![](gather.png)

Les couples (longueur_message_emis, type_message_emis) et (longueur_message_recu, type_message_recu) doivent être tels que les quantités de données envoyées et reçues soient égales.
1. Envoi d'un message de chacun des processus du communicateur `comm`
2. Collecte de chacun de ces messages, par le processus `root`

Les données sont collectées dans l’ordre des rangs des processus.

# Collecte générale `mpi.allgather`

# Collectes et diffusions sélectives :MPI_Alltoall()

Ici, le ième processus envoie la jème tranche au jème processus qui le place à
l’emplacement de la ième tranche.

Remarque :

Les couples (longueur_message_emis, type_message_emis) et
(longueur_message_recu, type_message_recu) doivent être tels que les quantités
de données envoyées et reçues soient égales.
Correspond à un `mpi.gather` suivi d’un `mpi.bcast`

![](allgather.png)

# Réductions réparties

Expand All @@ -418,20 +430,60 @@ qui est en fait équivalent à un MPI_Reduce()suivi d’unMPI_Bcast()).
Si plusieurs éléments sont concernés par processus, la fonction de réduction est
appliquée à chacun d’entre eux (par exemple à tous les éléments d’un vecteur).

# Communications collectives `MPI_Reduce()`
# Communications collectives `mpi.reduce`

Opérations pour réductions réparties

- MPI_SUM Somme des éléments
- MPI_PROD Produit des éléments
- MPI_MAX Recherche du maximum
- MPI_MIN Recherche du minimum
- MPI_MAXLOC Recherche de l’indice du maximum
- MPI_MINLOC Recherche de l’indice du minimum
- MPI_LAND ET logique
- MPI_LOR OU logique
- MPI_LXOR OU exclusif logique
- "sum" Somme des éléments
- "prod" Produit des éléments
- "max" Recherche du maximum
- "min" Recherche du minimum
- "maxloc" Recherche de l’indice du maximum
- "minloc" Recherche de l’indice du minimum

```R
library(Rmpi)

id <- mpi.comm.rank(comm=0)
np <- mpi.comm.size(comm=0)

if (id == 0) {
data = matrix(1:24,ncol=3)
splitmatrix = function(x, ncl) lapply(.splitIndices(nrow(x), ncl), function(i) x[i,])
chunk = splitmatrix(data, np)
}

chunk <- mpi.scatter.Robj(obj = chunk, root = 0, comm = 0)

cat("data on ", id, ":", chunk, "\n")

res <- mpi.reduce(chunk, type=1, op="sum", dest = 0, comm = 0)

if ( id == 0 ) {
cat("\n", res, "\n")
}

invisible(mpi.barrier(comm=0))
invisible(mpi.finalize())
```


```bash
$ mpirun -np 4 Rscript reduce.R
data on 2 : 5 6 13 14 21 22
data on 3 : 7 8 15 16 23 24
data on 0 : 1 2 9 10 17 18
data on 1 : 3 4 11 12 19 20

16 20 48 52 80 84
```

# Communications collectives `MPI_Allreduce`
# Recommendations pour utiliser MPI

Réductions réparties avec diffusion du résultat :MPI_Allreduce()
- Le modèle **SPMD** fonctionne mieux sur les clusters de calcul que le modèle **SPMD**
- Limiter au maximum les messages avec un grand volumes de données
- Limiter l'empreinte mémoire en divisant les calculs mais aussi en divisant la mémoire.
- Il est parfois plus intéressant de faire le même calcul sur tous les processus que de le faire sur un seul et ensuite faire une diffusion
- Eviter de lire des données en parallèle. Lire le fichier sur le processeur 0 puis faire un `bcast` ou mieux un `scatter`
- Essayer d'équilibrer la charge sur vos processus
- Jeter un oeil à [pdbMPI](https://github.com/RBigData/pbdMPI)
Binary file added mpi_reduce_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added mpi_reduce_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 23 additions & 0 deletions reduce.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
library(Rmpi)

id <- mpi.comm.rank(comm=0)
np <- mpi.comm.size(comm=0)

if (id == 0) {
data = matrix(1:24,ncol=3)
splitmatrix = function(x, ncl) lapply(.splitIndices(nrow(x), ncl), function(i) x[i,])
chunk = splitmatrix(data, np)
}

chunk <- mpi.scatter.Robj(obj = chunk, root = 0, comm = 0)

cat("data on ", id, ":", chunk, "\n")

res <- mpi.reduce(chunk, type=1, op="sum", dest = 0, comm = 0)

if ( id == 0 ) {
cat("\n", res, "\n")
}

invisible(mpi.barrier(comm=0))
invisible(mpi.finalize())
18 changes: 18 additions & 0 deletions scatter.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
library(Rmpi)

id <- mpi.comm.rank(comm=0)
np <- mpi.comm.size(comm=0)

if (id == 0) {
data = matrix(1:24,ncol=3)
splitmatrix = function(x, ncl) lapply(.splitIndices(nrow(x), ncl), function(i) x[i,])
chunk = splitmatrix(data, np)
}

chunk <- mpi.scatter.Robj(obj = chunk, root = 0, comm = 0)

cat("data on ", id, ":", chunk, "\n")


invisible(mpi.barrier(comm=0))
invisible(mpi.finalize())

0 comments on commit af19b95

Please sign in to comment.