diff --git a/th_cli/commands/run_tests.py b/th_cli/commands/run_tests.py index 1ba7405..6b7532a 100644 --- a/th_cli/commands/run_tests.py +++ b/th_cli/commands/run_tests.py @@ -97,6 +97,11 @@ is_flag=True, help=colorize_help("Disable colored output for test execution status."), ) +@click.option( + "--no-streaming", + is_flag=True, + help=colorize_help("Disable real-time log streaming via web browser (enabled by default)."), +) @async_cmd @click.pass_context async def run_tests( @@ -107,6 +112,7 @@ async def run_tests( pics_config_folder: str | None = None, project_id: int | None = None, no_color: bool = False, + no_streaming: bool = False, ) -> None: """Execute a CLI test run from selected test cases. @@ -147,8 +153,9 @@ async def run_tests( async_apis = AsyncApis(client) test_collections_api = async_apis.test_collections_api - # Configure new log output for test. - log_path = test_logging.configure_logger_for_run(title=title) + # Configure new log output for test with real-time streaming (enabled by default) + enable_streaming = not no_streaming + log_path = test_logging.configure_logger_for_run(title=title, enable_log_streaming=enable_streaming) # Retrieve CLI project cli_project = await _get_cli_project(async_apis, project_id) @@ -195,6 +202,20 @@ async def run_tests( click.echo(colorize_key_value("Selected tests", json.dumps(selected_tests_dict, indent=JSON_INDENT))) + # Display log streaming URL if available + log_stream_url = test_logging.get_log_stream_url() + if log_stream_url: + border = click.style("═" * 60, fg="cyan", bold=True) + click.echo("") + click.echo(border) + click.echo(click.style(" 📋 Real-Time Log Viewer Available", fg="cyan", bold=True)) + click.echo(border) + click.echo(click.style(" View logs in real-time at:", fg="bright_white", bold=True)) + click.echo(" " + click.style(f"{log_stream_url}", fg="cyan", bold=True, underline=True)) + click.echo(click.style(" Logs will stream automatically as tests execute", fg="bright_white")) + click.echo(border) + click.echo("") + new_test_run = await _create_new_test_run_cli( async_apis, selected_tests=selected_tests_dict, @@ -220,6 +241,9 @@ async def run_tests( except Exception as e: raise CLIError(f"Unexpected error during test execution: {e}") finally: + # Stop log streaming + test_logging.stop_log_streaming() + if client: await client.aclose() if _webrtc_handler: diff --git a/th_cli/test_run/log_stream_handler.py b/th_cli/test_run/log_stream_handler.py new file mode 100644 index 0000000..0c56aca --- /dev/null +++ b/th_cli/test_run/log_stream_handler.py @@ -0,0 +1,160 @@ +# +# Copyright (c) 2026 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import datetime +import queue +import socket +from typing import Optional + +from loguru import logger + +from .logs_http_server import LogsHTTPServer + + +class LogStreamHandler: + """Main coordinator for real-time log streaming functionality.""" + + def __init__(self, port: int = 8998): + """Initialize the log stream handler. + + Args: + port: Port number for the HTTP server (default: 8998) + """ + self.port = port + self.http_server = LogsHTTPServer(port=port) + self.log_queue: queue.Queue = queue.Queue(maxsize=1000) + self.is_running = False + self.log_file_path: Optional[str] = None + + def start(self, test_run_title: str = "Test Execution", log_file_path: Optional[str] = None) -> str: + """Start the log streaming HTTP server. + + Args: + test_run_title: Title of the test run for display + log_file_path: Path to the log file for download functionality + + Returns: + URL where logs can be viewed + """ + if self.is_running: + logger.warning("Log stream handler already running") + return self._get_log_viewer_url() + + try: + # Store log file path for download functionality + self.log_file_path = log_file_path + + # Get local IP address + local_ip = self._get_local_ip() + + # Start HTTP server + self.http_server.start( + log_queue=self.log_queue, + test_run_title=test_run_title, + local_ip=local_ip, + log_file_path=log_file_path, + ) + + self.is_running = True + + viewer_url = f"http://{local_ip}:{self.port}" + logger.info(f"Log stream viewer started: {viewer_url}") + + return viewer_url + + except Exception as e: + logger.error(f"Failed to start log stream handler: {e}") + raise + + def stop(self): + """Stop the log streaming HTTP server.""" + if not self.is_running: + return + + try: + # Signal end of stream + if not self.log_queue.full(): + try: + self.log_queue.put_nowait(None) + except queue.Full: + pass + + # Stop HTTP server + self.http_server.stop() + + self.is_running = False + logger.info("Log stream handler stopped") + + except Exception as e: + logger.error(f"Error stopping log stream handler: {e}") + + def add_log_entry( + self, + message: str, + level: str = "INFO", + timestamp: Optional[str] = None + ): + """Add a log entry to the stream. + + Args: + message: Log message text + level: Log level (INFO, WARNING, ERROR, DEBUG, etc.) + timestamp: ISO format timestamp (auto-generated if not provided) + """ + if not self.is_running: + return + + if timestamp is None: + timestamp = datetime.datetime.now().isoformat() + + log_entry = { + "message": message, + "level": level.upper(), + "timestamp": timestamp, + } + + try: + # Try to add to queue without blocking + self.log_queue.put_nowait(log_entry) + except queue.Full: + # Queue is full, skip this entry silently to avoid blocking + # This is acceptable for real-time streaming when no browser is connected + pass + + def _get_local_ip(self) -> str: + """Get the local IP address of the machine. + + Returns: + Local IP address as string, or 'localhost' if unable to determine + """ + try: + # Create a socket connection to determine local IP + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Connect to an external host (doesn't actually send data) + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + s.close() + return local_ip + except Exception: + return "localhost" + + def _get_log_viewer_url(self) -> str: + """Get the URL for the log viewer. + + Returns: + Log viewer URL + """ + local_ip = self._get_local_ip() + return f"http://{local_ip}:{self.port}" diff --git a/th_cli/test_run/log_viewer.html b/th_cli/test_run/log_viewer.html new file mode 100644 index 0000000..4e38ca6 --- /dev/null +++ b/th_cli/test_run/log_viewer.html @@ -0,0 +1,827 @@ + + +
+ +Template error: {html.escape(str(e))}
+Test Run: {html.escape(test_run_title)}
+ + + """ + + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + # Prevent caching + self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.send_header("ETag", f'"{int(time.time())}"') + self.end_headers() + self.wfile.write(html_content.encode("utf-8")) + + def log_message(self, format, *args): + """Suppress HTTP access logs to avoid clutter.""" + pass + + +class LogsHTTPServer: + """Manages HTTP server for real-time log streaming.""" + + def __init__(self, port: int = 8998): + """Initialize the logs HTTP server. + + Args: + port: Port number for the HTTP server (default: 8998) + """ + self.port = port + self.server: Optional[ThreadingHTTPServer] = None + self.server_thread: Optional[threading.Thread] = None + + def start( + self, + log_queue: queue.Queue, + test_run_title: str = "Test Execution", + local_ip: Optional[str] = None, + log_file_path: Optional[str] = None, + ): + """Start HTTP server for log streaming. + + Args: + log_queue: Queue containing log entries to stream + test_run_title: Title of the test run for display + local_ip: Local IP address for display (defaults to localhost) + log_file_path: Path to log file for download functionality + """ + try: + # Use ThreadingHTTPServer for better concurrency + self.server = ThreadingHTTPServer(("0.0.0.0", self.port), LogStreamingHandler) + self.server.allow_reuse_address = True + + # Set required attributes on the server + self.server.log_queue = log_queue + self.server.test_run_title = test_run_title + self.server.local_ip = local_ip or "localhost" + self.server.log_file_path = log_file_path + + logger.info(f"Logs HTTP server configured for test run: {test_run_title}") + + def run_server(): + logger.info(f"Starting logs HTTP server on port {self.port}") + try: + self.server.serve_forever() + except Exception as e: + logger.error(f"Logs HTTP server error: {e}") + + self.server_thread = threading.Thread(target=run_server, daemon=True) + self.server_thread.start() + logger.info(f"Logs HTTP server thread started on port {self.port}") + + except Exception as e: + logger.error(f"Failed to start logs HTTP server: {e}") + raise + + def stop(self): + """Stop HTTP server.""" + if self.server: + try: + self.server.shutdown() + logger.info("Logs HTTP server stopped") + except Exception as e: + logger.debug(f"Error stopping logs HTTP server: {e}") + finally: + self.server = None + self.server_thread = None