Skip to content

Commit 5c48afb

Browse files
authored
Update streamPrediction to do non-blocking send to errChan (#84)
* Non-blocking send to errChan * Create sendError helper method
1 parent 3716050 commit 5c48afb

1 file changed

Lines changed: 15 additions & 23 deletions

File tree

stream.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,20 @@ func (e *SSEEvent) String() string {
9797
}
9898
}
9999

100+
func (r *Client) sendError(err error, errChan chan error) {
101+
select {
102+
case errChan <- err:
103+
default:
104+
}
105+
}
106+
100107
func (r *Client) Stream(ctx context.Context, identifier string, input PredictionInput, webhook *Webhook) (<-chan SSEEvent, <-chan error) {
101108
sseChan := make(chan SSEEvent, 64)
102109
errChan := make(chan error, 64)
103110

104111
id, err := ParseIdentifier(identifier)
105112
if err != nil {
106-
errChan <- err
113+
r.sendError(err, errChan)
107114
return sseChan, errChan
108115
}
109116

@@ -115,7 +122,7 @@ func (r *Client) Stream(ctx context.Context, identifier string, input Prediction
115122
}
116123

117124
if err != nil {
118-
errChan <- err
125+
r.sendError(err, errChan)
119126
return sseChan, errChan
120127
}
121128

@@ -136,16 +143,13 @@ func (r *Client) StreamPrediction(ctx context.Context, prediction *Prediction) (
136143
func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, lastEvent *SSEEvent, sseChan chan SSEEvent, errChan chan error) {
137144
url := prediction.URLs["stream"]
138145
if url == "" {
139-
errChan <- errors.New("streaming not supported or not enabled for this prediction")
146+
r.sendError(errors.New("streaming not supported or not enabled for this prediction"), errChan)
140147
return
141148
}
142149

143150
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
144151
if err != nil {
145-
select {
146-
case errChan <- fmt.Errorf("failed to create request: %w", err):
147-
default:
148-
}
152+
r.sendError(fmt.Errorf("failed to create request: %w", err), errChan)
149153
return
150154
}
151155
req.Header.Set("Accept", "text/event-stream")
@@ -163,18 +167,12 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
163167
} else {
164168
defer resp.Body.Close()
165169
}
166-
select {
167-
case errChan <- fmt.Errorf("failed to send request: %w", err):
168-
default:
169-
}
170+
r.sendError(fmt.Errorf("failed to send request: %w", err), errChan)
170171
return
171172
}
172173

173174
if resp.StatusCode != http.StatusOK {
174-
select {
175-
case errChan <- fmt.Errorf("received invalid status code: %d", resp.StatusCode):
176-
default:
177-
}
175+
r.sendError(fmt.Errorf("received invalid status code: %d", resp.StatusCode), errChan)
178176
return
179177
}
180178

@@ -229,10 +227,7 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
229227

230228
event, err := decodeSSEEvent(b)
231229
if err != nil {
232-
select {
233-
case errChan <- err:
234-
default:
235-
}
230+
r.sendError(err, errChan)
236231
continue
237232
}
238233

@@ -269,10 +264,7 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
269264
}
270265

271266
if !errors.Is(err, context.Canceled) {
272-
select {
273-
case errChan <- err:
274-
default:
275-
}
267+
r.sendError(err, errChan)
276268
}
277269
}
278270

0 commit comments

Comments
 (0)