Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions th_cli/commands/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
is_flag=True,
help=colorize_help("Disable colored output for test execution status."),
)
@click.option(
"--no-streaming",
is_flag=True,
help=colorize_help("Disable real-time log streaming via web browser (enabled by default)."),
)
@async_cmd
@click.pass_context
async def run_tests(
Expand All @@ -107,6 +112,7 @@ async def run_tests(
pics_config_folder: str | None = None,
project_id: int | None = None,
no_color: bool = False,
no_streaming: bool = False,
) -> None:
"""Execute a CLI test run from selected test cases.

Expand Down Expand Up @@ -147,8 +153,9 @@ async def run_tests(
async_apis = AsyncApis(client)
test_collections_api = async_apis.test_collections_api

# Configure new log output for test.
log_path = test_logging.configure_logger_for_run(title=title)
# Configure new log output for test with real-time streaming (enabled by default)
enable_streaming = not no_streaming
log_path = test_logging.configure_logger_for_run(title=title, enable_log_streaming=enable_streaming)

# Retrieve CLI project
cli_project = await _get_cli_project(async_apis, project_id)
Expand Down Expand Up @@ -195,6 +202,20 @@ async def run_tests(

click.echo(colorize_key_value("Selected tests", json.dumps(selected_tests_dict, indent=JSON_INDENT)))

# Display log streaming URL if available
log_stream_url = test_logging.get_log_stream_url()
if log_stream_url:
border = click.style("═" * 60, fg="cyan", bold=True)
click.echo("")
click.echo(border)
click.echo(click.style(" πŸ“‹ Real-Time Log Viewer Available", fg="cyan", bold=True))
click.echo(border)
click.echo(click.style(" View logs in real-time at:", fg="bright_white", bold=True))
click.echo(" " + click.style(f"{log_stream_url}", fg="cyan", bold=True, underline=True))
click.echo(click.style(" Logs will stream automatically as tests execute", fg="bright_white"))
click.echo(border)
click.echo("")

new_test_run = await _create_new_test_run_cli(
async_apis,
selected_tests=selected_tests_dict,
Expand All @@ -220,6 +241,9 @@ async def run_tests(
except Exception as e:
raise CLIError(f"Unexpected error during test execution: {e}")
finally:
# Stop log streaming
test_logging.stop_log_streaming()

if client:
await client.aclose()
if _webrtc_handler:
Expand Down
160 changes: 160 additions & 0 deletions th_cli/test_run/log_stream_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# Copyright (c) 2026 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import queue
import socket
from typing import Optional

from loguru import logger

from .logs_http_server import LogsHTTPServer


class LogStreamHandler:
"""Main coordinator for real-time log streaming functionality."""

def __init__(self, port: int = 8998):
"""Initialize the log stream handler.

Args:
port: Port number for the HTTP server (default: 8998)
"""
self.port = port
self.http_server = LogsHTTPServer(port=port)
self.log_queue: queue.Queue = queue.Queue(maxsize=1000)
self.is_running = False
self.log_file_path: Optional[str] = None

def start(self, test_run_title: str = "Test Execution", log_file_path: Optional[str] = None) -> str:
"""Start the log streaming HTTP server.

Args:
test_run_title: Title of the test run for display
log_file_path: Path to the log file for download functionality

Returns:
URL where logs can be viewed
"""
if self.is_running:
logger.warning("Log stream handler already running")
return self._get_log_viewer_url()

try:
# Store log file path for download functionality
self.log_file_path = log_file_path

# Get local IP address
local_ip = self._get_local_ip()

# Start HTTP server
self.http_server.start(
log_queue=self.log_queue,
test_run_title=test_run_title,
local_ip=local_ip,
log_file_path=log_file_path,
)

self.is_running = True

viewer_url = f"http://{local_ip}:{self.port}"
logger.info(f"Log stream viewer started: {viewer_url}")

return viewer_url

except Exception as e:
logger.error(f"Failed to start log stream handler: {e}")
raise

def stop(self):
"""Stop the log streaming HTTP server."""
if not self.is_running:
return

try:
# Signal end of stream
if not self.log_queue.full():
try:
self.log_queue.put_nowait(None)
except queue.Full:
pass

# Stop HTTP server
self.http_server.stop()

self.is_running = False
logger.info("Log stream handler stopped")

except Exception as e:
logger.error(f"Error stopping log stream handler: {e}")

def add_log_entry(
self,
message: str,
level: str = "INFO",
timestamp: Optional[str] = None
):
"""Add a log entry to the stream.

Args:
message: Log message text
level: Log level (INFO, WARNING, ERROR, DEBUG, etc.)
timestamp: ISO format timestamp (auto-generated if not provided)
"""
if not self.is_running:
return

if timestamp is None:
timestamp = datetime.datetime.now().isoformat()

log_entry = {
"message": message,
"level": level.upper(),
"timestamp": timestamp,
}

try:
# Try to add to queue without blocking
self.log_queue.put_nowait(log_entry)
except queue.Full:
# Queue is full, skip this entry silently to avoid blocking
# This is acceptable for real-time streaming when no browser is connected
pass

def _get_local_ip(self) -> str:
"""Get the local IP address of the machine.

Returns:
Local IP address as string, or 'localhost' if unable to determine
"""
try:
# Create a socket connection to determine local IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Connect to an external host (doesn't actually send data)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception:
return "localhost"

def _get_log_viewer_url(self) -> str:
"""Get the URL for the log viewer.

Returns:
Log viewer URL
"""
local_ip = self._get_local_ip()
return f"http://{local_ip}:{self.port}"
Loading