Title: | A Multi-Process 'dplyr' Backend |
---|---|
Description: | Partition a data frame across multiple worker processes to provide simple multicore parallelism. |
Authors: | Hadley Wickham [aut, cre], Posit Software, PBC [cph, fnd] |
Maintainer: | Hadley Wickham <[email protected]> |
License: | MIT + file LICENSE |
Version: | 0.1.3.9000 |
Built: | 2024-12-26 04:44:02 UTC |
Source: | https://github.com/tidyverse/multidplyr |
'cluster_call()' executes the code on each worker and returns the results; 'cluster_send()' executes the code ignoring the result. Jobs are submitted to workers in parallel, and then we wait until they're complete.
cluster_call(cluster, code, simplify = FALSE, ptype = NULL) cluster_send(cluster, code)
cluster_call(cluster, code, simplify = FALSE, ptype = NULL) cluster_send(cluster, code)
cluster |
A cluster. |
code |
An expression to execute on each worker. |
simplify |
Should the results be simplified from a list? * 'TRUE': simplify or die trying. * 'NA': simplify if possible. * 'FALSE': never try to simplify, always leaving as a list. 'code' must return a vector of length one in order for simplification to succeed. |
ptype |
If 'simplify' is 'TRUE', use 'ptype' to enforce the desired output type. |
A list of results with one element for each worker in 'cluster'.
cl <- default_cluster() # Run code on each cluster and retrieve results cluster_call(cl, Sys.getpid()) cluster_call(cl, runif(1)) # use ptype to simplify cluster_call(cl, runif(1), simplify = TRUE) # use cluster_send() to ignore results cluster_send(cl, x <- runif(1)) cluster_call(cl, x, simplify = TRUE)
cl <- default_cluster() # Run code on each cluster and retrieve results cluster_call(cl, Sys.getpid()) cluster_call(cl, runif(1)) # use ptype to simplify cluster_call(cl, runif(1), simplify = TRUE) # use cluster_send() to ignore results cluster_send(cl, x <- runif(1)) cluster_call(cl, x, simplify = TRUE)
These functions provide useful helpers for performaning common operations. 'cluster_assign()' assigns the same value on each worker; 'cluster_assign_each()' assigns different values on each worker; 'cluster_assign_partition()' partitions vectors so that each worker gets (approximately) the same number of pieces.
cluster_assign(.cluster, ...) cluster_assign_each(.cluster, ...) cluster_assign_partition(.cluster, ...) cluster_copy(cluster, names, env = caller_env()) cluster_rm(cluster, names) cluster_library(cluster, packages)
cluster_assign(.cluster, ...) cluster_assign_each(.cluster, ...) cluster_assign_partition(.cluster, ...) cluster_copy(cluster, names, env = caller_env()) cluster_rm(cluster, names) cluster_library(cluster, packages)
... |
Name-value pairs |
cluster , .cluster
|
Cluster to work on |
names |
Name of variables to copy. |
env |
Environment in which to look for varibles to copy. |
packages |
Character vector of packages to load |
Functions that modify the worker environment invisibly return 'cluster' so calls can be piped together. The other functions return lists with one element for each worker.
cl <- default_cluster() cluster_assign(cl, a = runif(1)) cluster_call(cl, a) # Assign different values on each cluster cluster_assign_each(cl, b = c(1, 10)) cluster_call(cl, b) # Partition a vector so that each worker gets approximately the # same amount of it cluster_assign_partition(cl, c = 1:11) cluster_call(cl, c) # If you want different to compute different values on each # worker, use `cluster_call()` directly: cluster_call(cl, d <- runif(1)) cluster_call(cl, d) # cluster_copy() is a useful shortcut e <- 10 cluster_copy(cl, "e") cluster_call(cl, ls()) cluster_rm(cl, letters[1:5]) cluster_call(cl, ls()) # Use cluster_library() to load packages cluster_call(cl, search()) cluster_library(cl, "magrittr") cluster_call(cl, search())
cl <- default_cluster() cluster_assign(cl, a = runif(1)) cluster_call(cl, a) # Assign different values on each cluster cluster_assign_each(cl, b = c(1, 10)) cluster_call(cl, b) # Partition a vector so that each worker gets approximately the # same amount of it cluster_assign_partition(cl, c = 1:11) cluster_call(cl, c) # If you want different to compute different values on each # worker, use `cluster_call()` directly: cluster_call(cl, d <- runif(1)) cluster_call(cl, d) # cluster_copy() is a useful shortcut e <- 10 cluster_copy(cl, "e") cluster_call(cl, ls()) cluster_rm(cl, letters[1:5]) cluster_call(cl, ls()) # Use cluster_library() to load packages cluster_call(cl, search()) cluster_library(cl, "magrittr") cluster_call(cl, search())
Clusters created with this function will automatically clean up after themselves.
new_cluster(n)
new_cluster(n)
n |
Number of workers to create. Avoid setting this higher than the number of cores in your computer as it will degrade performance. |
A 'multidplyr_cluster' object.
cluster <- new_cluster(2) cluster
cluster <- new_cluster(2) cluster
Partitioning ensures that all observations in a group end up on the same worker. To try and keep the observations on each worker balanced, 'partition()' uses a greedy algorithm that iteratively assigns each group to the worker that currently has the fewest rows.
partition(data, cluster)
partition(data, cluster)
data |
Dataset to partition, typically grouped. When grouped, all observations in a group will be assigned to the same cluster. |
cluster |
Cluster to use. |
A [party_df].
library(dplyr) cl <- default_cluster() cluster_library(cl, "dplyr") mtcars2 <- partition(mtcars, cl) mtcars2 %>% mutate(cyl2 = 2 * cyl) mtcars2 %>% filter(vs == 1) mtcars2 %>% group_by(cyl) %>% summarise(n()) mtcars2 %>% select(-cyl)
library(dplyr) cl <- default_cluster() cluster_library(cl, "dplyr") mtcars2 <- partition(mtcars, cl) mtcars2 %>% mutate(cyl2 = 2 * cyl) mtcars2 %>% filter(vs == 1) mtcars2 %>% group_by(cyl) %>% summarise(n()) mtcars2 %>% select(-cyl)
This S3 class represents a data frame partitioned across workers in a cluster. You can use this constructor if you have already spread data frames spread across a cluster. If not, start with [partition()] instead.
party_df(cluster, name, auto_rm = FALSE)
party_df(cluster, name, auto_rm = FALSE)
cluster |
A cluster |
name |
Name of data frame variable. Must exist on every worker, be a data frame, and have the same names. |
auto_rm |
If 'TRUE', will automatically 'rm()' the data frame on the workers when this object is created. |
An S3 object with class 'multidplyr_party_df'.
# If a real example, you might spread file names across the clusters # and read in using data.table::fread()/vroom::vroom()/qs::qread(). cl <- default_cluster() cluster_send(cl[1], n <- 10) cluster_send(cl[2], n <- 15) cluster_send(cl, df <- data.frame(x = runif(n))) df <- party_df(cl, "df") df
# If a real example, you might spread file names across the clusters # and read in using data.table::fread()/vroom::vroom()/qs::qread(). cl <- default_cluster() cluster_send(cl[1], n <- 10) cluster_send(cl[2], n <- 15) cluster_send(cl, df <- data.frame(x = runif(n))) df <- party_df(cl, "df") df