@@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
8787 /** @var Clock */
8888 private $ clock ;
8989
90+ /** @var LoopInterface */
91+ private $ loop ;
92+
93+ /** @var int */
94+ private $ idleConnectionTimeout ;
95+
9096 /**
9197 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
9298 *
@@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
95101 * connections in order to then parse incoming data as HTTP.
96102 * See also [listen()](#listen) for more details.
97103 *
98- * @param LoopInterface $loop
99104 * @param callable $requestHandler
105+ * @param int $idleConnectionTimeout
100106 * @see self::listen()
101107 */
102- public function __construct (LoopInterface $ loop , $ requestHandler )
108+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectionTimeout )
103109 {
104110 if (!\is_callable ($ requestHandler )) {
105111 throw new \InvalidArgumentException ('Invalid request handler given ' );
106112 }
107113
114+ $ this ->loop = $ loop ;
108115 $ this ->callback = $ requestHandler ;
109116 $ this ->clock = new Clock ($ loop );
110117 $ this ->parser = new RequestHeaderParser ($ this ->clock );
118+ $ this ->idleConnectionTimeout = $ idleConnectionTimeout ;
111119
112120 $ that = $ this ;
113121 $ this ->parser ->on ('headers ' , function (ServerRequestInterface $ request , ConnectionInterface $ conn ) use ($ that ) {
@@ -134,7 +142,7 @@ public function __construct(LoopInterface $loop, $requestHandler)
134142 */
135143 public function listen (ServerInterface $ socket )
136144 {
137- $ socket ->on ('connection ' , array ($ this -> parser , 'handle ' ));
145+ $ socket ->on ('connection ' , array ($ this , 'parseRequest ' ));
138146 }
139147
140148 /** @internal */
@@ -359,7 +367,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
359367
360368 // either wait for next request over persistent connection or end connection
361369 if ($ persist ) {
362- $ this ->parser -> handle ($ connection );
370+ $ this ->parseRequest ($ connection );
363371 } else {
364372 $ connection ->end ();
365373 }
@@ -380,13 +388,34 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
380388 // write streaming body and then wait for next request over persistent connection
381389 if ($ persist ) {
382390 $ body ->pipe ($ connection , array ('end ' => false ));
383- $ parser = $ this -> parser ;
384- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
391+ $ that = $ this ;
392+ $ body ->on ('end ' , function () use ($ connection , $ body , & $ that ) {
385393 $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
386- $ parser -> handle ($ connection );
394+ $ that -> parseRequest ($ connection );
387395 });
388396 } else {
389397 $ body ->pipe ($ connection );
390398 }
391399 }
400+
401+ /**
402+ * @internal
403+ */
404+ public function parseRequest (ConnectionInterface $ connection )
405+ {
406+ $ idleConnectionTimeout = $ this ->idleConnectionTimeout ;
407+ $ loop = $ this ->loop ;
408+ $ idleConnectionTimeoutHandler = function () use ($ connection ) {
409+ $ connection ->close ();
410+ };
411+ $ timer = $ this ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
412+ $ connection ->once ('close ' , function () use ($ loop , &$ timer ) {
413+ $ loop ->cancelTimer ($ timer );
414+ });
415+ $ connection ->once ('data ' , function () use ($ loop , &$ timer ) {
416+ $ loop ->cancelTimer ($ timer );
417+ });
418+
419+ $ this ->parser ->handle ($ connection );
420+ }
392421}
0 commit comments