Skip to content

Commit 6e37903

Browse files
committed
updates
1 parent 54e955c commit 6e37903

5 files changed

Lines changed: 402 additions & 378 deletions

File tree

apps/testapp/kv/bench/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,9 @@ func printResults(elapsed time.Duration, workers, batchSize, total, success, fai
323323
fmt.Printf(rowFmt+"\n", goalLabel, "REACHED")
324324
fmt.Println(sep)
325325
fmt.Println()
326-
fmt.Println(" ====================================================")
327-
fmt.Printf(" S U C C E S S ! %s T X / S R E A C H E D !\n", formatNum(targetRPS))
328-
fmt.Println(" ====================================================")
326+
fmt.Println("====================================================")
327+
fmt.Printf(" S U C C E S S ! %s T X / S R E A C H E D !\n", formatNum(targetRPS))
328+
fmt.Println("====================================================")
329329
} else {
330330
fmt.Printf(rowFmt+"\n", goalLabel, "NOT REACHED")
331331
fmt.Println(sep)

apps/testapp/kv/http_server.go

Lines changed: 87 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,17 @@ import (
1212
"time"
1313

1414
ds "github.com/ipfs/go-datastore"
15+
"github.com/ipfs/go-datastore/query"
1516
)
1617

17-
var acceptedResp = []byte("Transaction accepted")
18-
18+
// HTTPServer wraps a KVExecutor and provides an HTTP interface for it
1919
type HTTPServer struct {
2020
executor *KVExecutor
2121
server *http.Server
2222
injectedTxs atomic.Uint64
2323
}
2424

25+
// NewHTTPServer creates a new HTTP server for the KVExecutor
2526
func NewHTTPServer(executor *KVExecutor, listenAddr string) *HTTPServer {
2627
hs := &HTTPServer{
2728
executor: executor,
@@ -38,13 +39,14 @@ func NewHTTPServer(executor *KVExecutor, listenAddr string) *HTTPServer {
3839
Addr: listenAddr,
3940
Handler: mux,
4041
ReadHeaderTimeout: 10 * time.Second,
41-
MaxHeaderBytes: 4096,
4242
}
4343

4444
return hs
4545
}
4646

47+
// Start begins listening for HTTP requests
4748
func (hs *HTTPServer) Start(ctx context.Context) error {
49+
// Start the server in a goroutine
4850
errCh := make(chan error, 1)
4951
go func() {
5052
fmt.Printf("KV Executor HTTP server starting on %s\n", hs.server.Addr)
@@ -53,8 +55,10 @@ func (hs *HTTPServer) Start(ctx context.Context) error {
5355
}
5456
}()
5557

58+
// Monitor for context cancellation
5659
go func() {
5760
<-ctx.Done()
61+
// Create a timeout context for shutdown
5862
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
5963
defer cancel()
6064

@@ -64,30 +68,42 @@ func (hs *HTTPServer) Start(ctx context.Context) error {
6468
}
6569
}()
6670

71+
// Check if the server started successfully
6772
select {
6873
case err := <-errCh:
6974
return err
70-
case <-time.After(100 * time.Millisecond):
75+
case <-time.After(100 * time.Millisecond): // Give it a moment to start
76+
// Server started successfully
7177
return nil
7278
}
7379
}
7480

81+
// Stop shuts down the HTTP server
7582
func (hs *HTTPServer) Stop() error {
7683
return hs.server.Close()
7784
}
7885

86+
// handleTx handles transaction submissions
87+
// POST /tx with raw binary data or text in request body
88+
// It is recommended to use transactions in the format "key=value" to be consistent
89+
// with the KVExecutor implementation that parses transactions in this format.
90+
// Example: "mykey=myvalue"
7991
func (hs *HTTPServer) handleTx(w http.ResponseWriter, r *http.Request) {
8092
if r.Method != http.MethodPost {
8193
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
8294
return
8395
}
8496

85-
body, err := io.ReadAll(io.LimitReader(r.Body, 4096))
86-
r.Body.Close()
97+
body, err := io.ReadAll(r.Body)
8798
if err != nil {
8899
http.Error(w, "Failed to read request body", http.StatusBadRequest)
89100
return
90101
}
102+
defer func() {
103+
if err := r.Body.Close(); err != nil {
104+
fmt.Printf("Error closing request body: %v\n", err)
105+
}
106+
}()
91107

92108
if len(body) == 0 {
93109
http.Error(w, "Empty transaction", http.StatusBadRequest)
@@ -97,7 +113,10 @@ func (hs *HTTPServer) handleTx(w http.ResponseWriter, r *http.Request) {
97113
hs.executor.InjectTx(body)
98114
hs.injectedTxs.Add(1)
99115
w.WriteHeader(http.StatusAccepted)
100-
w.Write(acceptedResp)
116+
_, err = w.Write([]byte("Transaction accepted"))
117+
if err != nil {
118+
fmt.Printf("Error writing response: %v\n", err)
119+
}
101120
}
102121

103122
func (hs *HTTPServer) handleTxBatch(w http.ResponseWriter, r *http.Request) {
@@ -160,39 +179,77 @@ func (r *bytesReaderImpl) Read(p []byte) (int, error) {
160179
return n, nil
161180
}
162181

182+
// handleKV handles direct key-value operations (GET/POST) against the database
183+
// GET /kv?key=somekey - retrieve a value
184+
// POST /kv with JSON {"key": "somekey", "value": "somevalue"} - set a value
163185
func (hs *HTTPServer) handleKV(w http.ResponseWriter, r *http.Request) {
164-
if r.Method != http.MethodGet {
165-
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
166-
return
167-
}
186+
switch r.Method {
187+
case http.MethodGet:
188+
key := r.URL.Query().Get("key")
189+
if key == "" {
190+
http.Error(w, "Missing key parameter", http.StatusBadRequest)
191+
return
192+
}
168193

169-
key := r.URL.Query().Get("key")
170-
if key == "" {
171-
http.Error(w, "Missing key parameter", http.StatusBadRequest)
172-
return
173-
}
194+
// Use r.Context() when calling the executor method
195+
value, exists := hs.executor.GetStoreValue(r.Context(), key)
196+
if !exists {
197+
// GetStoreValue now returns false on error too, check logs for details
198+
// Check if the key truly doesn't exist vs a DB error occurred.
199+
// For simplicity here, we treat both as Not Found for the client.
200+
// A more robust implementation might check the error type.
201+
_, err := hs.executor.db.Get(r.Context(), ds.NewKey(key))
202+
if errors.Is(err, ds.ErrNotFound) {
203+
http.Error(w, "Key not found", http.StatusNotFound)
204+
} else {
205+
// Some other DB error occurred
206+
http.Error(w, "Failed to retrieve key", http.StatusInternalServerError)
207+
fmt.Printf("Error retrieving key '%s' from DB: %v\n", key, err)
208+
}
209+
return
210+
}
174211

175-
value, exists := hs.executor.GetStoreValue(r.Context(), key)
176-
if !exists {
177-
if _, err := hs.executor.db.Get(r.Context(), ds.NewKey(key)); errors.Is(err, ds.ErrNotFound) {
178-
http.Error(w, "Key not found", http.StatusNotFound)
179-
} else {
180-
http.Error(w, "Failed to retrieve key", http.StatusInternalServerError)
181-
fmt.Printf("Error retrieving key '%s' from DB: %v\n", key, err)
212+
_, err := w.Write([]byte(value))
213+
if err != nil {
214+
fmt.Printf("Error writing response: %v\n", err)
182215
}
183-
return
184-
}
185216

186-
w.Write([]byte(value))
217+
default:
218+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
219+
}
187220
}
188221

222+
// handleStore returns all non-reserved key-value pairs in the store by querying the database
223+
// GET /store
189224
func (hs *HTTPServer) handleStore(w http.ResponseWriter, r *http.Request) {
190225
if r.Method != http.MethodGet {
191226
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
192227
return
193228
}
194229

195-
store := hs.executor.GetAllEntries()
230+
store := make(map[string]string)
231+
q := query.Query{} // Query all entries
232+
results, err := hs.executor.db.Query(r.Context(), q)
233+
if err != nil {
234+
http.Error(w, "Failed to query store", http.StatusInternalServerError)
235+
fmt.Printf("Error querying datastore: %v\n", err)
236+
return
237+
}
238+
defer results.Close()
239+
240+
for result := range results.Next() {
241+
if result.Error != nil {
242+
http.Error(w, "Failed during store iteration", http.StatusInternalServerError)
243+
fmt.Printf("Error iterating datastore results: %v\n", result.Error)
244+
return
245+
}
246+
// Exclude reserved genesis keys from the output
247+
dsKey := ds.NewKey(result.Key)
248+
if dsKey.Equal(genesisInitializedKey) || dsKey.Equal(genesisStateRootKey) {
249+
continue
250+
}
251+
store[result.Key] = string(result.Value)
252+
}
196253

197254
w.Header().Set("Content-Type", "application/json")
198255
if err := json.NewEncoder(w).Encode(store); err != nil {
@@ -218,5 +275,7 @@ func (hs *HTTPServer) handleStats(w http.ResponseWriter, r *http.Request) {
218275
}
219276

220277
w.Header().Set("Content-Type", "application/json")
221-
json.NewEncoder(w).Encode(stats)
278+
if err := json.NewEncoder(w).Encode(stats); err != nil {
279+
fmt.Printf("Error encoding stats response: %v\n", err)
280+
}
222281
}

apps/testapp/kv/http_server_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ func TestHandleTx(t *testing.T) {
6161
t.Errorf("expected body %q, got %q", tt.expectedBody, rr.Body.String())
6262
}
6363

64+
// Verify the transaction was added to the channel if it was a valid POST
6465
if tt.method == http.MethodPost && tt.expectedStatus == http.StatusAccepted {
66+
// Allow a moment for the channel send to potentially complete
6567
time.Sleep(10 * time.Millisecond)
6668
ctx := context.Background()
6769
retrievedTxs, err := exec.GetTxs(ctx)
@@ -74,6 +76,7 @@ func TestHandleTx(t *testing.T) {
7476
t.Errorf("expected channel to contain %q, got %q", tt.body, string(retrievedTxs[0]))
7577
}
7678
} else if tt.method == http.MethodPost {
79+
// If it was a POST but not accepted, ensure nothing ended up in the channel
7780
ctx := context.Background()
7881
retrievedTxs, err := exec.GetTxs(ctx)
7982
if err != nil {
@@ -127,7 +130,9 @@ func TestHandleKV_Get(t *testing.T) {
127130
exec := NewKVExecutor()
128131
server := NewHTTPServer(exec, ":0")
129132

133+
// Set up initial data if needed
130134
if tt.key != "" && tt.value != "" {
135+
// Create and execute the transaction directly
131136
tx := fmt.Appendf(nil, "%s=%s", tt.key, tt.value)
132137
ctx := context.Background()
133138
_, err := exec.ExecuteTxs(ctx, [][]byte{tx}, 1, time.Now(), []byte(""))
@@ -158,12 +163,15 @@ func TestHandleKV_Get(t *testing.T) {
158163
}
159164

160165
func TestHTTPServerStartStop(t *testing.T) {
166+
// Create a test server that listens on a random port
161167
exec := NewKVExecutor()
162168
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
169+
// This is just a placeholder handler
163170
w.WriteHeader(http.StatusOK)
164171
}))
165172
defer server.Close()
166173

174+
// Test the NewHTTPServer function
167175
httpServer := NewHTTPServer(exec, server.URL)
168176
if httpServer == nil {
169177
t.Fatal("NewHTTPServer returned nil")
@@ -173,23 +181,33 @@ func TestHTTPServerStartStop(t *testing.T) {
173181
t.Error("HTTPServer.executor not set correctly")
174182
}
175183

184+
// Note: We don't test Start() and Stop() methods directly
185+
// as they actually bind to ports, which can be problematic in unit tests.
186+
// In a real test environment, you might want to use integration tests for these.
187+
188+
// Test with context (minimal test just to verify it compiles)
176189
_, cancel := context.WithCancel(context.Background())
177190
defer cancel()
178191

192+
// Just create a mock test to ensure the context parameter is accepted
193+
// Don't actually start the server in the test
179194
testServer := &HTTPServer{
180195
server: &http.Server{
181-
Addr: ":0",
182-
ReadHeaderTimeout: 10 * time.Second,
196+
Addr: ":0", // Use a random port
197+
ReadHeaderTimeout: 10 * time.Second, // Add timeout to prevent Slowloris attacks
183198
},
184199
executor: exec,
185200
}
186201

202+
// Just verify the method signature works
187203
_ = testServer.Start
188204
}
189205

206+
// TestHTTPServerContextCancellation tests that the server shuts down properly when the context is cancelled
190207
func TestHTTPServerContextCancellation(t *testing.T) {
191208
exec := NewKVExecutor()
192209

210+
// Use a random available port
193211
listener, err := net.Listen("tcp", "127.0.0.1:0")
194212
if err != nil {
195213
t.Fatalf("Failed to find available port: %v", err)
@@ -202,15 +220,19 @@ func TestHTTPServerContextCancellation(t *testing.T) {
202220
serverAddr := fmt.Sprintf("127.0.0.1:%d", port)
203221
server := NewHTTPServer(exec, serverAddr)
204222

223+
// Create a context with cancel function
205224
ctx, cancel := context.WithCancel(context.Background())
206225

226+
// Start the server
207227
errCh := make(chan error, 1)
208228
go func() {
209229
errCh <- server.Start(ctx)
210230
}()
211231

232+
// Give it time to start
212233
time.Sleep(100 * time.Millisecond)
213234

235+
// Send a request to confirm it's running
214236
client := &http.Client{Timeout: 1 * time.Second}
215237
resp, err := client.Get(fmt.Sprintf("http://%s/store", serverAddr))
216238
if err != nil {
@@ -224,8 +246,10 @@ func TestHTTPServerContextCancellation(t *testing.T) {
224246
t.Fatalf("Expected status 200, got %d", resp.StatusCode)
225247
}
226248

249+
// Cancel the context to shut down the server
227250
cancel()
228251

252+
// Wait for shutdown to complete with timeout
229253
select {
230254
case err := <-errCh:
231255
if err != nil && errors.Is(err, http.ErrServerClosed) {
@@ -235,6 +259,7 @@ func TestHTTPServerContextCancellation(t *testing.T) {
235259
t.Fatal("Server shutdown timed out")
236260
}
237261

262+
// Verify server is actually shutdown by attempting a new connection
238263
_, err = client.Get(fmt.Sprintf("http://%s/store", serverAddr))
239264
if err == nil {
240265
t.Fatal("Expected connection error after shutdown, but got none")
@@ -245,6 +270,7 @@ func TestHTTPIntegration_GetKVWithMultipleHeights(t *testing.T) {
245270
exec := NewKVExecutor()
246271
ctx := context.Background()
247272

273+
// Execute transactions at different heights for the same key
248274
txsHeight1 := [][]byte{[]byte("testkey=original_value")}
249275
_, err := exec.ExecuteTxs(ctx, txsHeight1, 1, time.Now(), []byte(""))
250276
if err != nil {
@@ -259,6 +285,7 @@ func TestHTTPIntegration_GetKVWithMultipleHeights(t *testing.T) {
259285

260286
server := NewHTTPServer(exec, ":0")
261287

288+
// Test GET request - should return the latest value
262289
req := httptest.NewRequest(http.MethodGet, "/kv?key=testkey", nil)
263290
rr := httptest.NewRecorder()
264291

0 commit comments

Comments
 (0)