-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.go
More file actions
264 lines (215 loc) · 6.76 KB
/
storage.go
File metadata and controls
264 lines (215 loc) · 6.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package storage
import (
"context"
"fmt"
"strings"
"sync"
"github.com/tarantool/go-option"
"github.com/tarantool/go-storage/driver"
"github.com/tarantool/go-storage/kv"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
txPkg "github.com/tarantool/go-storage/tx"
"github.com/tarantool/go-storage/watch"
)
// rangeOptions contains configuration options for range operations.
type rangeOptions struct {
Prefix string // Prefix filter for range queries.
Limit int // Maximum number of results to return.
}
// RangeOption is a function that configures range operation options.
type RangeOption func(*rangeOptions)
// WithPrefix configures a range operation to filter keys by the specified prefix.
func WithPrefix(prefix string) RangeOption {
return func(opts *rangeOptions) {
opts.Prefix = prefix
}
}
// WithLimit configures a range operation to limit the number of results returned.
func WithLimit(limit int) RangeOption {
return func(opts *rangeOptions) {
opts.Limit = limit
}
}
// Storage is the main interface for key-value storage operations.
// It provides methods for watching changes, transaction management, and range queries.
type Storage interface {
// Watch streams changes for a specific key or prefix.
// Options:
// - WithPrefix: watch for changes on keys with the specified prefix
Watch(ctx context.Context, key []byte, opts ...watch.Option) <-chan watch.Event
// Tx creates a new transaction.
// The context manages timeouts and cancellation for the transaction.
Tx(ctx context.Context) txPkg.Tx
// Range queries a range of keys with optional filtering.
// Options:
// - WithPrefix: filter keys by prefix
// - WithLimit: limit the number of results returned
Range(ctx context.Context, opts ...RangeOption) ([]kv.KeyValue, error)
}
// storageOptions contains configuration options for storage instances.
type storageOptions struct{}
// Option is a function that configures storage options.
type Option func(*storageOptions)
// WithTimeout configures a default timeout for storage operations.
// This is a dummy option for demonstration purposes.
func WithTimeout() Option {
return func(_ *storageOptions) {
// Dummy implementation.
}
}
// WithRetry configures retry behavior for failed operations.
// This is a dummy option for demonstration purposes.
func WithRetry() Option {
return func(_ *storageOptions) {
// Dummy implementation.
}
}
// storage is the concrete implementation of the Storage interface.
type storage struct {
driver driver.Driver // Underlying storage driver.
}
// Watch implements the Storage interface for watching key changes.
func (s storage) Watch(ctx context.Context, key []byte, opts ...watch.Option) <-chan watch.Event {
eventCh, cleanup, err := s.driver.Watch(ctx, key, opts...)
if err != nil {
// Return a closed channel on error.
ch := make(chan watch.Event)
close(ch)
return ch
}
if cleanup != nil {
var once sync.Once
wrapperChan := make(chan watch.Event, cap(eventCh))
done := make(chan struct{})
go func() {
defer close(wrapperChan)
defer close(done)
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventCh:
if !ok {
return
}
select {
case <-ctx.Done():
return
case wrapperChan <- event:
}
}
}
}()
go func() {
select {
case <-ctx.Done():
case <-done:
}
once.Do(cleanup)
}()
return wrapperChan
}
return eventCh
}
// Tx implements the Storage interface for transaction creation.
func (s storage) Tx(ctx context.Context) txPkg.Tx {
return newTx(ctx, s.driver)
}
// Range implements the Storage interface for range queries.
func (s storage) Range(ctx context.Context, opts ...RangeOption) ([]kv.KeyValue, error) {
rangeOpts := &rangeOptions{Prefix: "", Limit: 0}
for _, opt := range opts {
opt(rangeOpts)
}
if rangeOpts.Prefix == "" {
return nil, nil
}
// Create a Get operation with the prefix.
key := rangeOpts.Prefix
if key != "" && !strings.HasSuffix(key, "/") {
key += "/"
}
ops := []operation.Operation{operation.Get([]byte(key))}
response, err := s.driver.Execute(ctx, nil, ops, nil)
if err != nil {
return nil, fmt.Errorf("failed to execute ops: %w", err)
}
var kvs []kv.KeyValue
for _, r := range response.Results {
kvs = append(kvs, r.Values...)
}
return kvs, nil
}
// NewStorage creates a new Storage instance with the specified driver.
// Optional StorageOption parameters can be provided to configure the storage.
func NewStorage(driver driver.Driver, _ ...Option) Storage {
return &storage{
driver: driver,
}
}
// tx is the internal implementation of the Tx interface.
type tx struct {
driver driver.Driver
ctx context.Context //nolint:containedctx // Context is stored for transaction execution
predicates option.Generic[[]predicate.Predicate]
thenOps option.Generic[[]operation.Operation]
elseOps option.Generic[[]operation.Operation]
}
// newTx creates a new transaction builder with the given driver and context.
func newTx(ctx context.Context, driver driver.Driver) txPkg.Tx {
return &tx{
driver: driver,
ctx: ctx,
predicates: option.None[[]predicate.Predicate](),
thenOps: option.None[[]operation.Operation](),
elseOps: option.None[[]operation.Operation](),
}
}
// If adds predicates to the transaction condition.
// Empty predicate list means always true (unconditional execution).
// If should be called before Then/Else.
func (tb *tx) If(predicates ...predicate.Predicate) txPkg.Tx {
if tb.predicates.IsSome() {
panic("predicates are already set")
} else if tb.thenOps.IsSome() || tb.elseOps.IsSome() {
panic("If can only be called before Then/Else")
}
tb.predicates = option.Some(predicates)
return tb
}
// Then adds operations to execute if predicates evaluate to true.
// At least one Then call is required.
// Then can only be called before Else.
func (tb *tx) Then(operations ...operation.Operation) txPkg.Tx {
if tb.thenOps.IsSome() {
panic("then operations are already set")
} else if tb.elseOps.IsSome() {
panic("Then can only be called before Else")
}
tb.thenOps = option.Some(operations)
return tb
}
// Else adds operations to execute if predicates evaluate to false.
// This is optional.
// Else can only be called before Commit.
func (tb *tx) Else(operations ...operation.Operation) txPkg.Tx {
if tb.elseOps.IsSome() {
panic("else operations are already set")
}
tb.elseOps = option.Some(operations)
return tb
}
// Commit atomically executes the transaction by delegating to the driver.
func (tb *tx) Commit() (txPkg.Response, error) {
resp, err := tb.driver.Execute(
tb.ctx,
tb.predicates.UnwrapOr(nil),
tb.thenOps.UnwrapOr(nil),
tb.elseOps.UnwrapOr(nil),
)
if err != nil {
return txPkg.Response{}, fmt.Errorf("tx execute failed: %w", err)
}
return resp, nil
}