Title: | A Multi-Process 'dplyr' Backend |
---|---|
Description: | Partition a data frame across multiple worker processes to provide simple multicore parallelism. |
Authors: | Hadley Wickham [aut, cre], Posit Software, PBC [cph, fnd] |
Maintainer: | Hadley Wickham <[email protected]> |
License: | MIT + file LICENSE |
Version: | 0.1.3.9000 |
Built: | 2024-06-01 04:33:05 UTC |
Source: | https://github.com/tidyverse/multidplyr |
'cluster_call()' executes the code on each worker and returns the results; 'cluster_send()' executes the code ignoring the result. Jobs are submitted to workers in parallel, and then we wait until they're complete.
cluster_call(cluster, code, simplify = FALSE, ptype = NULL)
cluster_send(cluster, code)
cluster |
A cluster. |
code |
An expression to execute on each worker. |
simplify |
Should the results be simplified from a list? * 'TRUE': simplify or die trying. * 'NA': simplify if possible. * 'FALSE': never try to simplify, always leaving as a list. 'code' must return a vector of length one in order for simplification to succeed. |
ptype |
If 'simplify' is 'TRUE', use 'ptype' to enforce the desired output type. |
A list of results with one element for each worker in 'cluster'.
cl <- default_cluster()
# Run code on each cluster and retrieve results
cluster_call(cl, Sys.getpid())
cluster_call(cl, runif(1))
# use ptype to simplify
cluster_call(cl, runif(1), simplify = TRUE)
# use cluster_send() to ignore results
cluster_send(cl, x <- runif(1))
cluster_call(cl, x, simplify = TRUE)
These functions provide useful helpers for performaning common operations. 'cluster_assign()' assigns the same value on each worker; 'cluster_assign_each()' assigns different values on each worker; 'cluster_assign_partition()' partitions vectors so that each worker gets (approximately) the same number of pieces.
cluster_assign(.cluster, ...)
cluster_assign_each(.cluster, ...)
cluster_assign_partition(.cluster, ...)
cluster_copy(cluster, names, env = caller_env())
cluster_rm(cluster, names)
cluster_library(cluster, packages)
... |
Name-value pairs |
cluster , .cluster
|
Cluster to work on |
names |
Name of variables to copy. |
env |
Environment in which to look for varibles to copy. |
packages |
Character vector of packages to load |
Functions that modify the worker environment invisibly return 'cluster' so calls can be piped together. The other functions return lists with one element for each worker.
cl <- default_cluster()
cluster_assign(cl, a = runif(1))
cluster_call(cl, a)
# Assign different values on each cluster
cluster_assign_each(cl, b = c(1, 10))
cluster_call(cl, b)
# Partition a vector so that each worker gets approximately the
# same amount of it
cluster_assign_partition(cl, c = 1:11)
cluster_call(cl, c)
# If you want different to compute different values on each
# worker, use `cluster_call()` directly:
cluster_call(cl, d <- runif(1))
cluster_call(cl, d)
# cluster_copy() is a useful shortcut
e <- 10
cluster_copy(cl, "e")
cluster_call(cl, ls())
cluster_rm(cl, letters[1:5])
cluster_call(cl, ls())
# Use cluster_library() to load packages
cluster_call(cl, search())
cluster_library(cl, "magrittr")
cluster_call(cl, search())
Clusters created with this function will automatically clean up after themselves.
new_cluster(n)
n |
Number of workers to create. Avoid setting this higher than the number of cores in your computer as it will degrade performance. |
A 'multidplyr_cluster' object.
cluster <- new_cluster(2)
cluster
Partitioning ensures that all observations in a group end up on the same worker. To try and keep the observations on each worker balanced, 'partition()' uses a greedy algorithm that iteratively assigns each group to the worker that currently has the fewest rows.
partition(data, cluster)
data |
Dataset to partition, typically grouped. When grouped, all observations in a group will be assigned to the same cluster. |
cluster |
Cluster to use. |
A [party_df].
library(dplyr)
cl <- default_cluster()
cluster_library(cl, "dplyr")
mtcars2 <- partition(mtcars, cl)
mtcars2 %>% mutate(cyl2 = 2 * cyl)
mtcars2 %>% filter(vs == 1)
mtcars2 %>% group_by(cyl) %>% summarise(n())
mtcars2 %>% select(-cyl)
This S3 class represents a data frame partitioned across workers in a cluster. You can use this constructor if you have already spread data frames spread across a cluster. If not, start with [partition()] instead.
party_df(cluster, name, auto_rm = FALSE)
cluster |
A cluster |
name |
Name of data frame variable. Must exist on every worker, be a data frame, and have the same names. |
auto_rm |
If 'TRUE', will automatically 'rm()' the data frame on the workers when this object is created. |
An S3 object with class 'multidplyr_party_df'.
# If a real example, you might spread file names across the clusters
# and read in using data.table::fread()/vroom::vroom()/qs::qread().
cl <- default_cluster()
cluster_send(cl[1], n <- 10)
cluster_send(cl[2], n <- 15)
cluster_send(cl, df <- data.frame(x = runif(n)))
df <- party_df(cl, "df")
df