-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathutils.py
More file actions
248 lines (196 loc) · 7.27 KB
/
utils.py
File metadata and controls
248 lines (196 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from __future__ import annotations
from dataclasses import dataclass
from typing import Tuple
import os
import glob
from . import config, byte_codes
from . indexing import KeyFinderState
def file_info(db_name: str) -> Tuple[str, bool, str, bool]:
"""
Returns a tuple of four elements, the first and third being the paths to the
JSON and DDB files, and the second and third being booleans indicating whether
those files exist:
>>> (json_path, json_exists, ddb_path, ddb_exists)
Args:
- `db_name`: The name of the database
"""
base = f"{config.storage_directory}/{db_name}"
j, d = f"{base}.json", f"{base}.ddb"
return j, os.path.exists(j), d, os.path.exists(d)
def find_all(file_name: str) -> list[str]:
"""
Returns a list of all the database names that match the given glob file_name.
Args:
- `file_name`: The glob file_name to search for
"""
files_all = glob.glob(f"{config.storage_directory}/{file_name}.ddb")
files_all += glob.glob(f"{config.storage_directory}/{file_name}.json")
for trim in [f"{config.storage_directory}/", ".ddb", ".json"]:
files_all = [d.replace(trim, "") for d in files_all]
return files_all
def find_all_top_level_keys(json_bytes: bytes, state: KeyFinderState, batch_size: int) -> KeyFinderState:
"""
In the bytes of the json object find all top level keys and the start and end
indices of their values.
"""
while state.i < batch_size:
current = json_bytes[state.i]
if state.skip_next:
state.skip_next = False
elif current == byte_codes.BACKSLASH:
state.skip_next = True
elif current == byte_codes.QUOTE:
if state.dict_depth == 1 and state.list_depth == 0:
if state.in_str:
state.key_end = state.i
state.i += 1
while json_bytes[state.i] in [byte_codes.SPACE, byte_codes.COLON]:
state.i += 1
state.value_start = state.i
else:
state.key_start = state.i + 1
state.in_str = not state.in_str
elif state.in_str or current in [byte_codes.SPACE, byte_codes.TAB, byte_codes.NEWLINE]:
pass
elif current == byte_codes.OPEN_SQUARE:
state.list_depth += 1
elif current == byte_codes.CLOSE_SQUARE:
state.list_depth -= 1
elif current == byte_codes.OPEN_CURLY:
state.dict_depth += 1
elif current == byte_codes.CLOSE_CURLY:
state.dict_depth -= 1
elif state.list_depth == 0 and state.dict_depth == 1:
state.indices.append((json_bytes[state.key_start:state.key_end].decode(), state.value_start, state.i + 1))
state.i += 1
def seek_index_through_value_bytes(json_bytes: bytes, index: int) -> int:
"""
Finds the index of the next comma or closing bracket/brace after the value
of a key-value pair in a bytes object containing valid JSON when decoded.
Valid start indices are the index after the colon or the index after that.
Example:
01234567
"2": {},
Valid start indices are 4 and 5. Returns 7.
Args:
- `json_bytes`: A bytes object containing valid JSON when decoded
- `index`: The start index in json_bytes
Returns:
- The end index of the first byte right after the value's bytes.
"""
# See https://www.json.org/json-en.html for the JSON syntax
in_str, list_depth, dict_depth, i, len_json_bytes = False, 0, 0, index, len(json_bytes)
while i < len_json_bytes:
current = json_bytes[i]
# If backslash, skip the next character
if current == byte_codes.BACKSLASH:
i += 1
# If quote, toggle in_str
elif current == byte_codes.QUOTE:
in_str = not in_str
# Possible exit point where string ends and nesting is zero
if not in_str and list_depth == 0 and dict_depth == 0:
return i + 1
# If in string, skip
elif in_str:
pass
# Invariant: Not in_str, not escaped
# Handle opening brackets
elif current == byte_codes.OPEN_SQUARE:
list_depth += 1
elif current == byte_codes.OPEN_CURLY:
dict_depth += 1
# Handle closing brackets
elif current in [byte_codes.CLOSE_SQUARE, byte_codes.CLOSE_CURLY]:
if current == byte_codes.CLOSE_SQUARE:
list_depth -= 1
if current == byte_codes.CLOSE_CURLY:
dict_depth -= 1
if list_depth == 0:
if dict_depth == 0:
return i + 1
if dict_depth == -1:
return i # Case: {"a": {}}
elif list_depth == 0 and ((dict_depth == 0 and current in [byte_codes.COMMA, byte_codes.NEWLINE]) or dict_depth == -1):
# Handle commas and newline as exit points
return i
i += 1
raise TypeError("Invalid JSON")
def count_nesting_in_bytes(json_bytes: bytes, start: int, end: int) -> int:
"""
Returns the number of nesting levels between the start and end indices.
The nesting is counted by the number of opening and closing brackets/braces
that are not in a string or escaped with a backslash.
Args:
- `json_bytes`: A bytes object containing valid JSON when decoded
"""
in_str, nesting, i = False, 0, start
while i < end:
byte_i = json_bytes[i]
if byte_i == byte_codes.BACKSLASH:
i += 1
elif byte_i == byte_codes.QUOTE:
in_str = not in_str
elif in_str:
pass
elif byte_i == byte_codes.OPEN_CURLY:
nesting += 1
elif byte_i == byte_codes.CLOSE_CURLY:
nesting -= 1
i += 1
return nesting
def find_outermost_key_in_json_bytes(json_bytes: bytes, key: str):
"""
Returns the index of the key that is at the outermost nesting level. If the
key is not found, return -1. If the key you are looking for is `some_key`,
the function will search for `"some_key":` and return the start and end
index of that string that is at the outermost nesting level, or -1 if the
it is not found.
Args:
- `json_bytes`: A bytes object containing valid JSON when decoded
- `key`: The key of an key-value pair in `json_bytes` to search for,
represented as bytes.
Returns:
- A tuple of the key start and end index, or `(-1, -1)` if the key is not found.
"""
key = f"\"{key}\":".encode()
if (curr_i := json_bytes.find(key, 0)) == -1:
return -1, -1
key_nest = [(curr_i, 0)] # (key, nesting)
while (next_i := json_bytes.find(key, curr_i + len(key))) != -1:
nesting = count_nesting_in_bytes(json_bytes, curr_i + len(key), next_i)
key_nest.append((next_i, nesting))
curr_i = next_i
# Early exit if there is only one key
if len(key_nest) == 1:
return key_nest[0][0], key_nest[0][0] + len(key)
# Relative to total nesting
for i in range(1, len(key_nest)):
key_nest[i] = (key_nest[i][0], key_nest[i - 1][1] + key_nest[i][1])
start_index = min(key_nest, key=lambda x: x[1])[0]
end_index = start_index + len(key)
return start_index, end_index
def detect_indentation_in_json_bytes(json_bytes: bytes, index: int) -> Tuple[int, str]:
"""
Count the amount of whitespace before the index to determine the indentation
level and whitespace used.
Args:
- `json_bytes`: A bytes object containing valid JSON when decoded
- `index`: The index behind which the indentation is to be determined
Returns:
- A tuple of the indentation level and the whitespace used
"""
indentation_bytes, contains_tab = bytes(), False
for i in range(index - 1, -1, -1):
if json_bytes[i] not in [byte_codes.SPACE, byte_codes.TAB]:
break
if json_bytes[i] == byte_codes.TAB:
contains_tab = True
indentation_bytes = indentation_bytes + bytes([json_bytes[i]])
if contains_tab:
return len(indentation_bytes), "\t"
if isinstance(config.indent, int) and config.indent > 0:
return len(indentation_bytes) // config.indent, " " * config.indent
if isinstance(config.indent, str):
return len(indentation_bytes) // 2, " "
return 0, ""