Skip to content

Commit c33a41e

Browse files
committed
added fmap2
1 parent 3508186 commit c33a41e

8 files changed

Lines changed: 203 additions & 9 deletions

File tree

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export(filearray_create)
2121
export(filearray_load)
2222
export(filearray_threads)
2323
export(fmap)
24+
export(fmap2)
2425
export(fmap_element_wise)
2526
export(fwhich)
2627
export(mapreduce)

R/RcppExports.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ FARR_buffer_map <- function(input_filebases, output_filebase, map, buffer_nelems
8181
.Call(`_filearray_FARR_buffer_map`, input_filebases, output_filebase, map, buffer_nelems, result_nelems)
8282
}
8383

84+
FARR_buffer_map2 <- function(input_filebases, map, buffer_nelems) {
85+
.Call(`_filearray_FARR_buffer_map2`, input_filebases, map, buffer_nelems)
86+
}
87+
8488
FARR_buffer_mapreduce <- function(filebase, map, reduce, buffer_nelems) {
8589
.Call(`_filearray_FARR_buffer_mapreduce`, filebase, map, reduce, buffer_nelems)
8690
}

R/map.R

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#' @title Map multiple file arrays and save results
2-
#' @description Advanced mapping function for multiple file arrays. This
2+
#' @description Advanced mapping function for multiple file arrays. \code{fmap}
3+
#' runs the mapping functions and stores the results in file arrays.
4+
#' \code{fmap2} stores results in memory. This
35
#' feature is experimental. There are several constraints to the input.
46
#' Failure to meet these constraints may result in undefined results, or
57
#' even crashes. Please read Section 'Details' carefully before using
@@ -10,6 +12,8 @@
1012
#' @param .y a file array object, used to save results
1113
#' @param .input_size number of elements to read from each array of \code{x}
1214
#' @param .output_size \code{fun} output vector length
15+
#' @param .simplify whether to apply \code{\link[base]{simplify2array}} to
16+
#' the result
1317
#' @param ... other arguments passing to \code{fun}
1418
#' @return File array instance \code{.y}
1519
#' @details
@@ -190,6 +194,56 @@ fmap <- function(x, fun, .y, .input_size = NA, .output_size = NA, ...){
190194
.y
191195
}
192196

197+
#' @rdname fmap
198+
#' @export
199+
fmap2 <- function(x, fun, .input_size = NA, .simplify = TRUE, ...){
200+
if(!length(x)){
201+
stop("`x` must be a list of file arrays")
202+
}
203+
204+
if(inherits(x, "FileArray")){
205+
x <- list(x)
206+
}
207+
208+
dims <- sapply(x, dim)
209+
dim <- dims[,1, drop = TRUE]
210+
211+
if(any(dims - dim != 0)){
212+
stop("Input `x` array dimensions must match")
213+
}
214+
215+
fbases <- sapply(x, function(el){
216+
if( !is_filearray(el) ){
217+
stop("Input `x` must only contains file arrays")
218+
}
219+
el$initialize_partition()
220+
el$.filebase
221+
})
222+
223+
if(is.na(.input_size)){
224+
.input_size <- get_buffer_size() / 8L
225+
}
226+
if(.input_size <= 0){
227+
stop("`.input_size` must be postive")
228+
}
229+
.input_size <- as.integer(.input_size)
230+
231+
args <- list(quote(input), ...)
232+
map <- function(input){
233+
do.call(fun, args)
234+
}
235+
236+
res <- FARR_buffer_map2(
237+
input_filebases = fbases,
238+
map = map,
239+
buffer_nelems = .input_size
240+
)
241+
if(.simplify){
242+
res <- simplify2array(res)
243+
}
244+
res
245+
}
246+
193247
is_filearray <- function(object){
194248
if(!isS4(object)){ return(FALSE) }
195249
cls <- class(object)

man/fmap.Rd

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/RcppExports.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,19 @@ BEGIN_RCPP
274274
return rcpp_result_gen;
275275
END_RCPP
276276
}
277+
// FARR_buffer_map2
278+
SEXP FARR_buffer_map2(std::vector<std::string>& input_filebases, const Function& map, const int& buffer_nelems);
279+
RcppExport SEXP _filearray_FARR_buffer_map2(SEXP input_filebasesSEXP, SEXP mapSEXP, SEXP buffer_nelemsSEXP) {
280+
BEGIN_RCPP
281+
Rcpp::RObject rcpp_result_gen;
282+
Rcpp::RNGScope rcpp_rngScope_gen;
283+
Rcpp::traits::input_parameter< std::vector<std::string>& >::type input_filebases(input_filebasesSEXP);
284+
Rcpp::traits::input_parameter< const Function& >::type map(mapSEXP);
285+
Rcpp::traits::input_parameter< const int& >::type buffer_nelems(buffer_nelemsSEXP);
286+
rcpp_result_gen = Rcpp::wrap(FARR_buffer_map2(input_filebases, map, buffer_nelems));
287+
return rcpp_result_gen;
288+
END_RCPP
289+
}
277290
// FARR_buffer_mapreduce
278291
SEXP FARR_buffer_mapreduce(const std::string& filebase, const Function map, const Nullable<Function> reduce, const int& buffer_nelems);
279292
RcppExport SEXP _filearray_FARR_buffer_mapreduce(SEXP filebaseSEXP, SEXP mapSEXP, SEXP reduceSEXP, SEXP buffer_nelemsSEXP) {
@@ -411,6 +424,7 @@ static const R_CallMethodDef CallEntries[] = {
411424
{"_filearray_FARR_subset_sequential", (DL_FUNC) &_filearray_FARR_subset_sequential, 7},
412425
{"_filearray_FARR_subset2", (DL_FUNC) &_filearray_FARR_subset2, 8},
413426
{"_filearray_FARR_buffer_map", (DL_FUNC) &_filearray_FARR_buffer_map, 5},
427+
{"_filearray_FARR_buffer_map2", (DL_FUNC) &_filearray_FARR_buffer_map2, 3},
414428
{"_filearray_FARR_buffer_mapreduce", (DL_FUNC) &_filearray_FARR_buffer_mapreduce, 4},
415429
{"_filearray_getThreads", (DL_FUNC) &_filearray_getThreads, 1},
416430
{"_filearray_setThreads", (DL_FUNC) &_filearray_setThreads, 2},

src/map.cpp

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,106 @@ SEXP FARR_buffer_map(
164164
return(R_NilValue);
165165
}
166166

167+
// [[Rcpp::export]]
168+
SEXP FARR_buffer_map2(
169+
std::vector<std::string>& input_filebases,
170+
const Function& map,
171+
const int& buffer_nelems
172+
){
173+
// prepare inputs
174+
int narrays = input_filebases.size();
175+
std::vector<List> metas(narrays);
176+
std::vector<SEXPTYPE> arr_types(narrays);
177+
std::vector<SEXPTYPE> file_buffer_types(narrays);
178+
std::vector<SEXPTYPE> memory_buffer_types(narrays);
179+
180+
std::vector<SEXP> cumparts(narrays);
181+
std::vector<int64_t> part_lengths(narrays);
182+
183+
SEXP in_dim = R_NilValue;
184+
185+
for(int ii = 0; ii < narrays; ii++){
186+
std::string fbase = correct_filebase(input_filebases[ii]);
187+
input_filebases[ii] = fbase;
188+
List meta = FARR_meta(fbase);
189+
metas[ii] = meta;
190+
arr_types[ii] = meta["sexp_type"];
191+
file_buffer_types[ii] = file_buffer_sxptype(arr_types[ii]);
192+
memory_buffer_types[ii] = array_memory_sxptype(arr_types[ii]);
193+
cumparts[ii] = realToInt64_inplace(meta["cumsum_part_sizes"]);
194+
if( in_dim == R_NilValue ){
195+
in_dim = meta["dimension"];
196+
realToInt64_inplace(in_dim);
197+
}
198+
}
199+
200+
if( in_dim == R_NilValue ){
201+
stop("Cannot obtain input dimensions");
202+
}
203+
204+
R_xlen_t in_ndims = Rf_length(in_dim);
205+
int64_t* in_dimptr = INTEGER64(in_dim);
206+
int64_t in_unit_partlen = 1;
207+
for(R_xlen_t jj = 0; jj <in_ndims - 1; jj++, in_dimptr++){
208+
in_unit_partlen *= *in_dimptr;
209+
}
210+
int64_t in_array_length = in_unit_partlen * *(INTEGER64(in_dim) + (in_ndims - 1));
211+
212+
213+
// allocate buffers
214+
SEXP argbuffers = PROTECT(Rf_allocVector(VECSXP, narrays));
215+
for(int ii = 0; ii < narrays; ii++){
216+
SET_VECTOR_ELT(argbuffers, ii, PROTECT(Rf_allocVector(memory_buffer_types[ii], buffer_nelems)));
217+
}
218+
219+
int64_t current_pos = 0;
220+
221+
int ncores = getThreads();
222+
if( ncores > narrays ){
223+
ncores = narrays;
224+
}
225+
226+
R_xlen_t niters = in_array_length / buffer_nelems;
227+
if( niters * buffer_nelems < in_array_length ){
228+
niters++;
229+
}
230+
SEXP ret = PROTECT(Rf_allocVector(VECSXP, niters));
231+
R_xlen_t iter = 0;
232+
233+
for( ; current_pos < in_array_length; current_pos += buffer_nelems, iter++ ){
234+
235+
#pragma omp parallel num_threads(ncores)
236+
{
237+
#pragma omp for schedule(static, 1) nowait
238+
for(int ii = 0; ii < narrays; ii++){
239+
FARR_subset_sequential(
240+
input_filebases[ii],
241+
in_unit_partlen,
242+
cumparts[ii],
243+
arr_types[ii],
244+
VECTOR_ELT(argbuffers, ii),
245+
current_pos, buffer_nelems
246+
);
247+
}
248+
}
249+
250+
try{
251+
SET_VECTOR_ELT(ret, iter, Shield<SEXP>(map(argbuffers)));
252+
} catch(std::exception &ex){
253+
UNPROTECT(2 + narrays);
254+
forward_exception_to_r(ex);
255+
} catch(...){
256+
UNPROTECT(2 + narrays);
257+
stop("Unknown error.");
258+
}
259+
260+
261+
}
262+
263+
UNPROTECT(2 + narrays);
264+
265+
return(ret);
266+
}
167267

168268
/*** R
169269
# devtools::load_all()
@@ -190,6 +290,15 @@ FARR_buffer_map(
190290
3L,
191291
1L
192292
)
193-
293+
res <- FARR_buffer_map2(
294+
fbases,
295+
function(x){
296+
print(c(x[[1]], sum(x[[1]])))
297+
sum(x[[1]])
298+
},
299+
3L,
300+
1L
301+
)
302+
# y[] - simplify2array(res)
194303
195304
*/

tests/testthat/test-map.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ test_that("map arrays", {
5151

5252
expect_equal(output[], b)
5353

54+
55+
d <- fmap2(x, f, .input_size = 842800, .simplify = TRUE)
56+
expect_equal(d, b)
57+
5458
x$delete()
5559
output$delete()
5660

vignettes/performance.Rmd

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ In the current version, converting from `double` to `float` introduces overhead
138138

139139
Instead of writing one slice at a time along each margin, we write `100x100x100x5` (10 slices) each time.
140140

141-
1. partition margin
141+
1. Write blocks of data along the partition margin
142142

143143
```{r}
144144
microbenchmark::microbenchmark(
@@ -162,7 +162,7 @@ microbenchmark::microbenchmark(
162162
#> float 0.626 0.662 0.783 0.698 0.861 1.02 3
163163
```
164164

165-
2. Write along fast margin
165+
2. Write blocks of data along the fast margin
166166

167167
```{r}
168168
microbenchmark::microbenchmark(
@@ -186,7 +186,7 @@ microbenchmark::microbenchmark(
186186
#> float 0.625 0.652 0.732 0.679 0.786 0.893 3
187187
```
188188

189-
3. Writing along slow margin
189+
3. Write blocks of data along slow margin
190190

191191
```{r}
192192
microbenchmark::microbenchmark(
@@ -209,9 +209,6 @@ microbenchmark::microbenchmark(
209209
#> float 2.64 2.70 2.73 2.77 2.78 2.79 3
210210
```
211211

212-
Note: always avoid writing along the first margin (like `x[i,,,] <- ...`). It's slow.
213-
214-
215212

216213
## Read
217214

@@ -292,6 +289,8 @@ Collapse calculates the margin sum/mean. Collapse function in `filearray` uses s
292289

293290
```{r}
294291
keep <- c(2, 4)
292+
output <- filearray_create(tempfile(), dim(x_dbl)[keep])
293+
output$initialize_partition()
295294
microbenchmark::microbenchmark(
296295
farr_double = { x_dbl$collapse(keep = keep, method = "sum") },
297296
farr_float = { x_flt$collapse(keep = keep, method = "sum") },
@@ -310,3 +309,4 @@ microbenchmark::microbenchmark(
310309

311310
The `dipsaus` package uses multiple threads to collapse arrays in-memory. It is `7~8x` as fast as base R. File array is `1~2x` as fast as base R. Both `dipsaus` and `filearray` have little memory over-heads.
312311

312+

0 commit comments

Comments
 (0)