Skip to content

Commit 327bdfe

Browse files
authored
Merge pull request #313 from clue-labs/queued-cancellation
Fix awaiting queued handlers when cancelling a queued handler
2 parents 4fbeee0 + bab4715 commit 327bdfe

2 files changed

Lines changed: 92 additions & 5 deletions

File tree

src/Middleware/LimitConcurrentRequestsMiddleware.php

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,21 +143,39 @@ public function __invoke(ServerRequestInterface $request, $next)
143143
$queue[$id] = $deferred;
144144

145145
$pending = &$this->pending;
146-
return $this->await($deferred->promise()->then(function () use ($request, $next, $body, &$pending) {
146+
$that = $this;
147+
return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) {
147148
// invoke next request handler
148149
++$pending;
149-
$ret = $next($request);
150+
151+
try {
152+
$response = $next($request);
153+
} catch (\Exception $e) {
154+
$that->processQueue();
155+
throw $e;
156+
} catch (\Throwable $e) { // @codeCoverageIgnoreStart
157+
// handle Errors just like Exceptions (PHP 7+ only)
158+
$that->processQueue();
159+
throw $e; // @codeCoverageIgnoreEnd
160+
}
150161

151162
// resume readable stream and replay buffered events
152163
if ($body instanceof PauseBufferStream) {
153164
$body->resumeImplicit();
154165
}
155166

156-
return $ret;
157-
}));
167+
// if the next handler returns a pending promise, we have to
168+
// await its resolution before invoking next queued request
169+
return $that->await(Promise\resolve($response));
170+
});
158171
}
159172

160-
private function await(PromiseInterface $promise)
173+
/**
174+
* @internal
175+
* @param PromiseInterface $promise
176+
* @return PromiseInterface
177+
*/
178+
public function await(PromiseInterface $promise)
161179
{
162180
$that = $this;
163181

tests/Middleware/LimitConcurrentRequestsMiddlewareTest.php

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,31 @@ public function testReceivesNextRequestAfterPreviousHandlerIsSettled()
296296
$middleware($request, $this->expectCallableOnceWith($request));
297297
}
298298

299+
public function testReceivesNextRequestWhichThrowsAfterPreviousHandlerIsSettled()
300+
{
301+
$request = new ServerRequest(
302+
'POST',
303+
'http://example.com/',
304+
array(),
305+
'hello'
306+
);
307+
308+
$deferred = new Deferred();
309+
$middleware = new LimitConcurrentRequestsMiddleware(1);
310+
$middleware($request, function () use ($deferred) {
311+
return $deferred->promise();
312+
});
313+
314+
$second = $middleware($request, function () {
315+
throw new \RuntimeException();
316+
});
317+
318+
$this->assertTrue($second instanceof PromiseInterface);
319+
$second->then(null, $this->expectCallableOnce());
320+
321+
$deferred->reject(new \RuntimeException());
322+
}
323+
299324
public function testPendingRequestCanBeCancelledAndForwardsCancellationToInnerPromise()
300325
{
301326
$request = new ServerRequest(
@@ -365,6 +390,50 @@ public function testReceivesNextRequestAfterPreviousHandlerIsCancelled()
365390
$middleware($request, $this->expectCallableOnceWith($request));
366391
}
367392

393+
public function testRejectsWhenQueuedPromiseIsCancelled()
394+
{
395+
$request = new ServerRequest(
396+
'POST',
397+
'http://example.com/',
398+
array(),
399+
'hello'
400+
);
401+
402+
$deferred = new Deferred();
403+
$middleware = new LimitConcurrentRequestsMiddleware(1);
404+
$first = $middleware($request, function () use ($deferred) {
405+
return $deferred->promise();
406+
});
407+
408+
$second = $middleware($request, $this->expectCallableNever());
409+
410+
$this->assertTrue($second instanceof PromiseInterface);
411+
$second->cancel();
412+
$second->then(null, $this->expectCallableOnce());
413+
}
414+
415+
public function testDoesNotInvokeNextHandlersWhenQueuedPromiseIsCancelled()
416+
{
417+
$request = new ServerRequest(
418+
'POST',
419+
'http://example.com/',
420+
array(),
421+
'hello'
422+
);
423+
424+
$deferred = new Deferred();
425+
$middleware = new LimitConcurrentRequestsMiddleware(1);
426+
$first = $middleware($request, function () use ($deferred) {
427+
return $deferred->promise();
428+
});
429+
430+
$second = $middleware($request, $this->expectCallableNever());
431+
/* $third = */ $middleware($request, $this->expectCallableNever());
432+
433+
$this->assertTrue($second instanceof PromiseInterface);
434+
$second->cancel();
435+
}
436+
368437
public function testReceivesStreamingBodyChangesInstanceWithCustomBodyButSameDataWhenDequeued()
369438
{
370439
$stream = new ThroughStream();

0 commit comments

Comments
 (0)