Skip to content

Commit 5e64de2

Browse files
committed
trickle: Add a /<channel>/next endpoint.
1 parent 49a9428 commit 5e64de2

2 files changed

Lines changed: 34 additions & 2 deletions

File tree

trickle/README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ Publishers POST to /channel-name/seq
2222

2323
Subscribers GET to /channel-name/seq
2424

25+
Clients can query the next segment seq with GET to /channel-name/next
26+
2527
The `channel-name` is any valid HTTP path part.
2628

2729
The `seq` is an integer sequence number indicating the segment, and must increase sequentially without gaps.
@@ -48,12 +50,22 @@ Servers may offer some grace with leading sequence numbers to avoid data races,
4850

4951
Publishers are responsible for segmenting content (if necessary) and subscribers are responsible for re-assembling content (if necessary)
5052

51-
Publishers can POST with `seq=-1` to write to the current server write head. Successful publisher POST responses include `Lp-Trickle-Seq` metadata (HTTP header) with the effective segment index written by the server, which is especially useful when `-1` was used.
53+
Successful publisher POST responses include `Lp-Trickle-Seq` metadata (HTTP header) with the effective segment index written by the server.
54+
55+
If a publisher sends `Lp-Trickle-Reset`, the server treats it as a restart signal for any `seq` value.
56+
The server closes prior segments to unblock waiting subscribers while still allowing preconnected/ahead segments.
5257

5358
Subscribers can initiate a subscribe with a `seq` of -1 to retrieve the most recent publish. With preconnects, the subscriber may be waiting for the *next* publish. For video this allows clients to eg, start streaming at the live edge of the next GOP.
5459

5560
Subscribers can retrieve the current `seq` with the `Lp-Trickle-Seq` metadata (HTTP header). This is useful in case `-1` was used to initiate the subscription; the subscribing client can then pre-connect to `Lp-Trickle-Seq + 1`
5661

62+
GET `/channel-name/next` returns the next segment seq as plain text in the response body and in the `Lp-Trickle-Latest` response header.
63+
If the channel does not exist, the server returns 404.
64+
If the channel is closed, the server also includes `Lp-Trickle-Closed: terminated`.
65+
66+
`Lp-Trickle-Seq` from a publisher write using `seq=-1` is not sufficient by itself to drive real-time ordering if writes overlap.
67+
It is still useful for post-facto mapping/observability, reconciliation after segment close, and debugging dropped/misordered assumptions.
68+
5769
Subscribers can initiate a subscribe with a `seq` of -N to get the Nth-from-last segment. (TODO)
5870

5971
The server should send subscribers `Lp-Trickle-Size` metadata to indicate the size of the content up until now. This allows clients to know where the live edge is, eg video implementations can decode-and-discard frames up until the edge to achieve immediate playback without waiting for the next segment. (TODO)

trickle/trickle_server.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ func ConfigureServer(config TrickleServerConfig) *Server {
117117
)
118118

119119
mux.HandleFunc("POST "+basePath+"{streamName}", streamManager.handleCreate)
120+
mux.HandleFunc("GET "+basePath+"{streamName}/next", streamManager.handleNext)
120121
mux.HandleFunc("GET "+basePath+"{streamName}/{idx}", streamManager.handleGet)
121122
mux.HandleFunc("POST "+basePath+"{streamName}/{idx}", streamManager.handlePost)
122123
mux.HandleFunc("DELETE "+basePath+"{streamName}/{idx}", streamManager.closeSeq)
@@ -373,7 +374,7 @@ func (tr *timeoutReader) Close() error {
373374
func (s *Stream) handlePost(w http.ResponseWriter, r *http.Request, idx int) {
374375
segment, _ := s.getForWrite(idx)
375376

376-
if idx == -1 && r.Header.Get("Lp-Trickle-Reset") != "" {
377+
if r.Header.Get("Lp-Trickle-Reset") != "" {
377378
// Usually means the publisher had to restart for some reason.
378379
// Close prior segments to unblock subscribers for any hanging writes
379380
// but allow for preconnected segments (sometimes they come out-of-order)
@@ -522,6 +523,25 @@ func (sm *Server) handleGet(w http.ResponseWriter, r *http.Request) {
522523
stream.handleGet(w, r, idx)
523524
}
524525

526+
func (sm *Server) handleNext(w http.ResponseWriter, r *http.Request) {
527+
stream, exists := sm.getStream(r.PathValue("streamName"))
528+
if !exists {
529+
http.Error(w, "Stream not found", http.StatusNotFound)
530+
return
531+
}
532+
stream.mutex.RLock()
533+
nextWrite := stream.nextWrite
534+
closed := stream.closed
535+
stream.mutex.RUnlock()
536+
537+
next := strconv.Itoa(nextWrite)
538+
w.Header().Set("Lp-Trickle-Latest", next)
539+
if closed {
540+
w.Header().Set("Lp-Trickle-Closed", "terminated")
541+
}
542+
w.Write([]byte(next))
543+
}
544+
525545
func (s *Stream) handleGet(w http.ResponseWriter, r *http.Request, idx int) {
526546
segment, latestSeq, exists, closed := s.getForRead(idx)
527547
if !exists {

0 commit comments

Comments
 (0)