-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathborsh_reader.rs
More file actions
146 lines (121 loc) · 4.15 KB
/
borsh_reader.rs
File metadata and controls
146 lines (121 loc) · 4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// This is free and unencumbered software released into the public domain.
extern crate alloc;
use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
use borsh::{
io::{Read, Result},
BorshDeserialize,
};
use core::error::Error;
use lz4_flex::frame::FrameDecoder;
use rdf_model::{
Countable, Enumerable, HeapQuad, MaybeDurable, MaybeIndexed, MaybeMutable, Source, Statement,
};
use rdf_reader::{Format, Reader};
use crate::{parse_header, BorshQuad, BorshTerm, BorshTermId};
pub struct BorshReader<R: Read> {
decompressor: FrameDecoder<R>,
term_dict: BTreeMap<BorshTermId<u16>, BorshTerm>,
quad_count: usize,
read_count: usize,
}
impl<R: Read> BorshReader<R> {
pub fn new(mut source: R) -> Result<Self> {
let quad_count = {
let mut buf = [0u8; 10];
source.read_exact(&mut buf)?;
parse_header(&mut &buf[..]).map_err(|_| borsh::io::ErrorKind::InvalidData)?
};
let mut decompressor = FrameDecoder::new(source);
let term_dict = {
Vec::<BorshTerm>::deserialize_reader(&mut decompressor)?
.into_iter()
.fold(BTreeMap::new(), |mut acc, term| {
let id = BorshTermId::from(acc.len() as u16 + 1);
acc.insert(id, term);
acc
})
};
let _quad_count = {
let mut buf = [0u8; 4];
decompressor.read_exact(&mut buf)?;
u32::from_le_bytes(buf)
};
// rest of `source` is the quads section block
Ok(Self {
decompressor,
quad_count: quad_count as _,
term_dict,
read_count: 0usize,
})
}
}
impl<R: Read> Reader for BorshReader<R> {
fn format(&self) -> Format {
todo!() // TODO
}
}
impl<R: Read> Source for BorshReader<R> {}
impl<R: Read> Enumerable for BorshReader<R> {
fn grep(&self, pattern: &impl PartialEq<Box<dyn Statement>>) -> Self
where
Self: Sized,
{
todo!()
}
}
impl<R: Read> MaybeDurable for BorshReader<R> {}
impl<R: Read> MaybeIndexed for BorshReader<R> {}
impl<R: Read> MaybeMutable for BorshReader<R> {}
impl<R: Read> Countable for BorshReader<R> {
fn count(&self) -> usize {
self.quad_count
}
}
impl<R: Read> Iterator for BorshReader<R> {
type Item = core::result::Result<Box<dyn Statement>, Box<dyn Error>>;
fn next(&mut self) -> Option<Self::Item> {
if self.len() == 0 {
return None;
}
let quad = match BorshQuad::<u16>::deserialize_reader(&mut self.decompressor) {
Ok(q) => q,
Err(e) => return Some(Err(Box::new(e))),
};
let Some(s) = self.term_dict.get(&quad.subject) else {
return Some(Err(Box::new(borsh::io::Error::new(
borsh::io::ErrorKind::InvalidData,
"subject has unknown term ID",
))));
};
let Some(p) = self.term_dict.get(&quad.predicate) else {
return Some(Err(Box::new(borsh::io::Error::new(
borsh::io::ErrorKind::InvalidData,
"predicate has unknown term ID",
))));
};
let Some(o) = self.term_dict.get(&quad.object) else {
return Some(Err(Box::new(borsh::io::Error::new(
borsh::io::ErrorKind::InvalidData,
"object has unknown term ID",
))));
};
let stmt = if quad.context.is_zero() {
HeapQuad::from((s.0.clone(), p.0.clone(), o.0.clone()))
} else {
let Some(g) = self.term_dict.get(&quad.context) else {
return Some(Err(Box::new(borsh::io::Error::new(
borsh::io::ErrorKind::InvalidData,
"context has unknown term ID",
))));
};
HeapQuad::from((s.0.clone(), p.0.clone(), o.0.clone(), g.0.clone()))
};
self.read_count += 1;
Some(Ok(Box::new(stmt)))
}
fn size_hint(&self) -> (usize, Option<usize>) {
let rem = self.quad_count - self.read_count;
(rem, Some(rem))
}
}
impl<R: Read> ExactSizeIterator for BorshReader<R> {}