Skip to content

Commit 21bc208

Browse files
committed
Fixing #4
1 parent c99c122 commit 21bc208

4 files changed

Lines changed: 97 additions & 34 deletions

File tree

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: filearray
22
Type: Package
33
Title: File-Backed Array for Out-of-Memory Computation
4-
Version: 0.1.4
4+
Version: 0.1.4.9000
55
Language: en-US
66
Encoding: UTF-8
77
License: LGPL-3

src/load.cpp

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,61 +59,94 @@ SEXP FARR_subset_sequential(
5959
std::string fbase = correct_filebase(filebase);
6060
R_len_t nparts = Rf_length(cum_partsizes);
6161

62-
// calculate the first partition
62+
// calculate the first partition, should start from 1
6363
int64_t slice_idx1 = 0;
64+
65+
// partition number cannot go beyond nparts + 1 (can equal)
6466
int64_t slice_idx2 = 0;
6567
int64_t tmp = 0;
68+
69+
// Rcout << "From: " << from << " - len: " << len << " nparts: " << nparts << "\n";
6670
for(; tmp <= from; tmp+= unit_partlen, slice_idx1++){}
67-
for(slice_idx2 = slice_idx1; tmp < from + len; tmp+= unit_partlen, slice_idx2++){}
68-
// Rcout << slice_idx1 << " - " << slice_idx2 << "\n";
6971

72+
for(slice_idx2 = slice_idx1; tmp < from + len && slice_idx2 < nparts; tmp+= unit_partlen, slice_idx2++){}
73+
74+
// Rcout << "Starting from partition: " << slice_idx1 << " - ends before: " << slice_idx2 << "\n";
75+
76+
// Which partition to start: min = 0
7077
int part_start = 0;
7178
int part_end = 0;
7279
int64_t skip_start = 0;
73-
int64_t skip_end = 0;
80+
// int64_t skip_end = 0;
7481

7582
int64_t* cum_part = INTEGER64(cum_partsizes);
76-
for(; slice_idx1 > *cum_part; cum_part++, part_start++){}
83+
for(; *cum_part < slice_idx1; cum_part++, part_start++){}
7784
if( part_start == 0 ){
7885
skip_start = from;
7986
} else {
8087
skip_start = from - (*(cum_part - 1)) * unit_partlen;
8188
}
82-
for(part_end = part_start; slice_idx2 > *cum_part; cum_part++, part_end++){}
83-
skip_end = (*cum_part) * unit_partlen - (from + len);
89+
for(part_end = part_start; *cum_part < slice_idx2; cum_part++, part_end++){}
90+
// if(part_end == 0) {
91+
// skip_end = unit_partlen - (from + len);
92+
// // Rcout << part_start << " " << part_end << " " << *cum_part << std::endl;
93+
// } else {
94+
// skip_end = (*(cum_part - 1)) * unit_partlen - (from + len);
95+
// // Rcout << part_start << " " << part_end << " " << *(cum_part-1) << std::endl;
96+
// }
97+
// // This happens when buffer size is longer than array length
98+
// if(skip_end < 0) {
99+
// skip_end = 0;
100+
// }
101+
102+
// Rcpp::print(cum_partsizes);
103+
84104

85105
// Rcout << part_start << " - " << part_end << "\n";
86106
// Rcout << skip_start << " - " << skip_end << "\n";
87107

88108
int64_t read_start = 0;
89109
int64_t read_len = 0;
90110
int64_t part_nelem = 0;
91-
int64_t last_part_nelem = 0;
92-
cum_part = INTEGER64(cum_partsizes);
111+
cum_part = INTEGER64(cum_partsizes) + part_start;
93112

94113
int64_t nread = 0;
95114

96115
const boost::interprocess::mode_t mode = boost::interprocess::read_only;
97116

117+
98118
for(int part = part_start; part <= part_end; part++, cum_part++, nread += read_len){
99119
if( part >= nparts ){
100120
continue;
101121
}
102122
// get partition n_elems
103-
part_nelem = (*cum_part) * unit_partlen - last_part_nelem;
104-
last_part_nelem = (*cum_part) * unit_partlen;
123+
if(part == 0) {
124+
part_nelem = (*cum_part) * unit_partlen;
125+
} else {
126+
part_nelem = (*cum_part - *(cum_part - 1)) * unit_partlen;
127+
}
128+
// Rcout << "Starting with " << *cum_part << ", current partition contains elements: " << part_nelem << "\n";
129+
105130

106131
// skip read_start elements
107-
read_start = 0;
108132
if( part == part_start ) {
109133
read_start = skip_start;
134+
} else {
135+
read_start = 0;
110136
}
111-
// Rcout << part_nelem << "--\n";
112-
// then read read_len elements
113137
read_len = part_nelem - read_start;
114-
if( part == part_end ){
115-
read_len -= skip_end;
138+
if( read_len > len - nread ) {
139+
read_len = len - nread;
116140
}
141+
// Rcout << "n read: " << nread << ", plan to read more: " << read_len << " from " << read_start << "\n";
142+
143+
// Rcout << part_nelem << "--\n";
144+
// then read read_len elements
145+
// if( part == part_end ){
146+
// read_len -= skip_end;
147+
// }
148+
149+
// Rcout << "reading from partition: " << part << "\n";
117150

118151
std::string part_file = fbase + std::to_string(part) + ".farr";
119152

src/map.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ SEXP FARR_buffer_map(
139139
}
140140
}
141141
convert_as2(tmp, tmp_val, out_array_type);
142+
UNPROTECT(1); // tmp
142143

143144
FARR_subset_assign_sequential_bare(
144145
out_fbase, out_unit_partlen,
@@ -147,7 +148,6 @@ SEXP FARR_buffer_map(
147148
);
148149
current_pos_save += expected_res_nelem;
149150

150-
UNPROTECT(1);
151151
} catch(std::exception &ex){
152152
UNPROTECT(2 + narrays);
153153
forward_exception_to_r(ex);
@@ -237,12 +237,12 @@ SEXP FARR_buffer_map2(
237237
#pragma omp for schedule(static, 1) nowait
238238
for(int ii = 0; ii < narrays; ii++){
239239
FARR_subset_sequential(
240-
input_filebases[ii],
241-
in_unit_partlen,
242-
cumparts[ii],
243-
arr_types[ii],
244-
VECTOR_ELT(argbuffers, ii),
245-
current_pos, buffer_nelems
240+
input_filebases[ii],
241+
in_unit_partlen,
242+
cumparts[ii],
243+
arr_types[ii],
244+
VECTOR_ELT(argbuffers, ii),
245+
current_pos, buffer_nelems
246246
);
247247
}
248248
}

src/save.cpp

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@ SEXP FARR_subset_assign_sequential_bare(
3232
int64_t slice_idx2 = 0;
3333
int64_t tmp = 0;
3434
for(; tmp <= from; tmp+= unit_partlen, slice_idx1++){}
35-
for(slice_idx2 = slice_idx1; tmp < from + len; tmp+= unit_partlen, slice_idx2++){}
35+
36+
for(slice_idx2 = slice_idx1; tmp < from + len && slice_idx2 < nparts; tmp+= unit_partlen, slice_idx2++){}
37+
// for(slice_idx2 = slice_idx1; tmp < from + len; tmp+= unit_partlen, slice_idx2++){}
38+
39+
// Rcout << "Starting from partition: " << slice_idx1 << " - ends before: " << slice_idx2 << "\n";
3640

3741
int part_start = 0;
3842
int part_end = 0;
3943
int64_t skip_start = 0;
40-
int64_t skip_end = 0;
44+
// --- // int64_t skip_end = 0;
4145

4246
int64_t* cum_part = INTEGER64(cum_partsizes);
4347
for(; slice_idx1 > *cum_part; cum_part++, part_start++){}
@@ -47,13 +51,26 @@ SEXP FARR_subset_assign_sequential_bare(
4751
skip_start = from - (*(cum_part - 1)) * unit_partlen;
4852
}
4953
for(part_end = part_start; slice_idx2 > *cum_part; cum_part++, part_end++){}
50-
skip_end = (*cum_part) * unit_partlen - (from + len);
54+
55+
/*
56+
// skip_end = (*cum_part) * unit_partlen - (from + len);
57+
if(part_end == 0) {
58+
skip_end = unit_partlen - (from + len);
59+
// Rcout << part_start << " " << part_end << " " << *cum_part << std::endl;
60+
} else {
61+
skip_end = (*(cum_part - 1)) * unit_partlen - (from + len);
62+
// Rcout << part_start << " " << part_end << " " << *(cum_part-1) << std::endl;
63+
}
64+
// This happens when buffer size is longer than array length
65+
if(skip_end < 0) {
66+
skip_end = 0;
67+
}
68+
*/
5169

5270
int64_t read_start = 0;
5371
int64_t write_len = 0;
5472
int64_t part_nelem = 0;
55-
int64_t last_part_nelem = 0;
56-
cum_part = INTEGER64(cum_partsizes);
73+
cum_part = INTEGER64(cum_partsizes) + part_start;
5774

5875
int64_t nwrite = 0;
5976

@@ -65,20 +82,33 @@ SEXP FARR_subset_assign_sequential_bare(
6582
continue;
6683
}
6784
// get partition n_elems
68-
part_nelem = (*cum_part) * unit_partlen - last_part_nelem;
69-
last_part_nelem = (*cum_part) * unit_partlen;
85+
// part_nelem = (*cum_part) * unit_partlen - last_part_nelem;
86+
// last_part_nelem = (*cum_part) * unit_partlen;
7087

7188
// skip read_start elements
72-
read_start = 0;
89+
if(part == 0) {
90+
part_nelem = (*cum_part) * unit_partlen;
91+
} else {
92+
part_nelem = (*cum_part - *(cum_part - 1)) * unit_partlen;
93+
}
7394
if( part == part_start ) {
7495
read_start = skip_start;
96+
} else {
97+
read_start = 0;
7598
}
7699
// Rcout << part_nelem << "--\n";
77100
// then read read_len elements
78101
write_len = part_nelem - read_start;
79-
if( part == part_end ){
80-
write_len -= skip_end;
102+
103+
if(nwrite + write_len > len) {
104+
write_len = len - nwrite;
105+
}
106+
if(write_len <= 0) {
107+
break;
81108
}
109+
// if( part == part_end ){
110+
// write_len -= skip_end;
111+
// }
82112

83113
std::string part_file = fbase + std::to_string(part) + ".farr";
84114

0 commit comments

Comments
 (0)