From 52ef3592df24a0b397e6702bf858fd56d8a67af0 Mon Sep 17 00:00:00 2001 From: bobby Date: Thu, 9 Apr 2026 14:04:15 +0800 Subject: [PATCH 1/4] feat: Add multi-worker support and auto CPU detection for SimuNet - Implement multi-process architecture with automatic worker allocation - Auto-detect CPU cores and set workers to (cores - 2), minimum 1 - Support NUM_WORKERS environment variable and --workers CLI argument - Smart worker count adjustment: limit to device count if needed - Separate log files for each worker to avoid conflicts - Replace all print() statements with log.info() for better logging - Convert all code comments to English for consistency - Update documentation with multi-worker usage examples - Enhance integration test fixtures to support multi-worker mode Performance improvements: - Distribute SSH device servers across multiple worker processes - Reduce resource contention with per-worker device allocation - Maintain single-worker mode with auto-reload for development Documentation updates: - Add multi-process mode usage in quick-start guide - Update CLAUDE.md with worker configuration details - Add feature highlight in README.md Made-with: Cursor --- CLAUDE.md | 49 ++- README.md | 1 + docs/quick-start-simunet.md | 106 ++++++- .../simunet/src/netdriver_simunet/main.py | 284 +++++++++++++++++- .../src/netdriver_simunet/server/device.py | 38 ++- .../server/handlers/command_handler.py | 10 + tests/integration/conftest.py | 24 +- 7 files changed, 482 insertions(+), 30 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index a4990ae..a0fee7b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -73,7 +73,19 @@ uv sync uv run agent # Start simulation network service (SSH servers on configured ports) +# Default: auto-detects CPU cores and uses (cores - 2) workers, minimum 1 uv run simunet + +# Specify custom number of workers +NUM_WORKERS=4 uv run simunet +# or +uv run simunet --workers 4 + +# Force single worker mode (enables auto-reload for development) +NUM_WORKERS=1 uv run simunet + +# Auto-cleanup occupied ports before starting +uv run simunet --force ``` ### Testing @@ -97,9 +109,42 @@ uv run pytest tests/bases/netdriver/agent/test_cisco_nexus.py Configuration files in `config/`: - `config/agent/agent.yml` - Agent service settings (logging, session timeouts, SSH parameters, profiles) - - Logs are written to `logs/netdriver_agent.log` + - Logs are written to `logs/netdriver_agent.log` - `config/simunet/simunet.yml` - Simulated device definitions and logging settings - - Logs are written to `logs/netdriver_simunet.log` + - Logs are written to `logs/netdriver_simunet.log` + - In multi-worker mode: `logs/netdriver_simunet_worker_0.log`, `logs/netdriver_simunet_worker_1.log`, etc. + +### Multi-Process Mode + +Simunet **automatically** uses multi-process mode for improved performance: + +**Default Behavior:** +- Automatically detects CPU cores: `workers = max(1, cpu_cores - 2)` +- Example: 8-core system → 6 workers, 4-core → 2 workers, 2-core → 1 worker +- Reserves 2 cores for system and other processes +- **Smart adjustment**: If worker count exceeds device count, automatically adjusts to match device count + +**Override Default:** +```bash +# Specify custom number of workers +NUM_WORKERS=4 uv run simunet +# or +uv run simunet --workers 4 + +# Force single worker (enables auto-reload) +NUM_WORKERS=1 uv run simunet +``` + +**Performance Guidelines:** +- Small scale (< 10 devices): 1-2 workers +- Medium scale (10-30 devices): 2-4 workers +- Large scale (> 30 devices): 4-8 workers + +**Notes:** +- Multi-worker mode (workers > 1) does not support auto-reload +- Single worker mode supports auto-reload for development +- Each worker handles a portion of devices automatically +- Separate log files are created for each worker: `logs/simunet_worker_N.log` ## Development Guidelines diff --git a/README.md b/README.md index 27beb24..9c9f633 100755 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ Features: - 📋 **Command Queue** : Ensures sequential command execution on devices, preventing configuration errors and failures caused by concurrent modifications - ⚡ **AsyncSSH Foundation** : Superior concurrency capabilities through asynchronous SSH implementation - 🔌 **Plugin Architecture** : Simplified and accelerated development of new vendor support +- 🚀 **Multi-Process Support** : Automatic worker detection based on CPU cores for optimal performance (SimuNet) ## Comparison diff --git a/docs/quick-start-simunet.md b/docs/quick-start-simunet.md index 4bba80b..25f40de 100644 --- a/docs/quick-start-simunet.md +++ b/docs/quick-start-simunet.md @@ -118,10 +118,27 @@ python -c "import netdriver.simunet; print('NetDriver Agent installed successful #### Run +**Basic Usage**: + +```bash +# Start SimuNet service +simunet + +# Start without auto-reload (production) +simunet --no-reload + +# Force cleanup occupied ports before starting +simunet --force +``` + +**Custom Configuration**: + ```bash -uvicorn netdriver.agent.main:app --host 0.0.0.0 --port 8001 +simunet --config /path/to/simunet.yml --port 8001 ``` +**Note**: When ports are already occupied, use the `--force` flag to automatically clean up and restart, avoiding manual intervention. + ### Option 2: Install via Docker #### Prerequisites @@ -181,6 +198,93 @@ ssh admin@localhost -p 18020 After successful connection, you can execute commands as you would on a real device: +## Advanced Configuration + +### Multi-Process Mode + +SimuNet automatically uses multi-process mode based on your CPU cores to improve performance. Each worker process handles a portion of the devices. + +#### Default Behavior + +**Automatic Worker Detection:** +- SimuNet automatically detects CPU cores and uses `CPU cores - 2` workers (minimum 1) +- Example: 8-core CPU → 6 workers, 4-core CPU → 2 workers, 2-core CPU → 1 worker +- This leaves resources for the system and other processes + +**Override Default:** + +```bash +# Use specific number of workers via environment variable +NUM_WORKERS=4 simunet + +# Use specific number of workers via command line +simunet --workers 4 + +# Force single worker mode +NUM_WORKERS=1 simunet +# or +simunet --workers 1 +``` + +#### Performance Recommendations + +- **Small Scale (< 10 devices)**: 1-2 workers +- **Medium Scale (10-30 devices)**: 2-4 workers +- **Large Scale (> 30 devices)**: 4-8 workers + +Each worker should ideally manage 5-10 devices for optimal performance. + +**Note**: +- Multi-worker mode (workers > 1) does not support auto-reload +- Single worker mode supports auto-reload for development + +#### Docker Deployment with Multi-Process + +```bash +# Default (auto-detect workers) +docker run -d \ + --name netdriver-simunet \ + --network host \ + -v $(pwd)/config:/app/config \ + -v $(pwd)/logs:/app/logs \ + ghcr.io/opensecflow/netdriver/netdriver-simunet:latest + +# Specify worker count +docker run -d \ + --name netdriver-simunet \ + --network host \ + -e NUM_WORKERS=4 \ + -v $(pwd)/config:/app/config \ + -v $(pwd)/logs:/app/logs \ + ghcr.io/opensecflow/netdriver/netdriver-simunet:latest +``` + +#### How Device Distribution Works + +When using multi-process mode, devices are automatically distributed among workers: + +- Worker 0 handles devices 0 to N/W-1 +- Worker 1 handles devices N/W to 2N/W-1 +- ... +- Last worker handles remaining devices + +**Example 1: 21 devices, 3 workers** +- Worker 0: devices 0-6 (7 devices, ports 18020-18025) +- Worker 1: devices 7-13 (7 devices, ports 18026-18031) +- Worker 2: devices 14-20 (7 devices, ports 18032-18040) + +**Example 2: 4 devices, 8 workers (auto-adjusted)** +- System: 10 CPU cores → 8 workers by default +- SimuNet automatically adjusts to 4 workers (matches device count) +- Worker 0: device 0 (1 device) +- Worker 1: device 1 (1 device) +- Worker 2: device 2 (1 device) +- Worker 3: device 3 (1 device) + +**Important:** If worker count exceeds device count, it's automatically adjusted to match device count to avoid empty workers. + +Each worker creates a separate log file: `logs/simunet_worker_0.log`, `logs/simunet_worker_1.log`, etc. + ## Next Steps Now that you have SimuNet running, you can set up [NetDriver Agent](./quick-start-agent.md) to connect to SimuNet diff --git a/packages/simunet/src/netdriver_simunet/main.py b/packages/simunet/src/netdriver_simunet/main.py index 2c916c1..2ab3210 100755 --- a/packages/simunet/src/netdriver_simunet/main.py +++ b/packages/simunet/src/netdriver_simunet/main.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3.10.6 # -*- coding: utf-8 -*- import os +import multiprocessing import argparse import asyncio from typing import AsyncGenerator @@ -35,10 +36,71 @@ async def start_servers(config: dict) -> AsyncGenerator[MockSSHDevice, None]: async def on_startup() -> None: - """ put all post up logic here """ + """ + Start the application, allocate device port ranges based on WORKER_ID and NUM_WORKERS environment variables + Support multi-worker mode to avoid port conflicts + """ log.info("Starting up the application...") + + # Get worker configuration from environment variables + worker_id = int(os.getenv("WORKER_ID", "0")) + num_workers = int(os.getenv("NUM_WORKERS", "1")) + + # Get all device configurations + all_devices = container.config()["devices"] + total_devices = len(all_devices) + + # Single worker mode: start all devices + if num_workers == 1: + log.info(f"Single worker mode: starting all {total_devices} devices") + app.state.servers = [] + async for server in start_servers(container.config()): + app.state.servers.append(server) + asyncio.create_task(server.start()) + return + + # Multi-worker mode: allocate devices based on worker_id + devices_per_worker = total_devices // num_workers + + # Calculate device range for current worker + start_idx = worker_id * devices_per_worker + end_idx = start_idx + devices_per_worker + + # Last worker handles all remaining devices + if worker_id == num_workers - 1: + end_idx = total_devices + + # Get device list for current worker + my_devices = all_devices[start_idx:end_idx] + + # Log port range + if my_devices: + port_range = f"{my_devices[0]['port']}-{my_devices[-1]['port']}" + log.info(f"Worker {worker_id}/{num_workers} (PID: {os.getpid()}) " + f"handling {len(my_devices)} devices, ports: {port_range}") + else: + log.warning(f"Worker {worker_id}/{num_workers} has no devices to handle") + return + + # Start devices assigned to current worker app.state.servers = [] - async for server in start_servers(container.config()): + for dev in my_devices: + host = dev.get("host", None) + port = dev["port"] + vendor = dev["vendor"] + model = dev["model"] + version = dev["version"] + + log.info(f"Worker {worker_id}: Starting SSH server {vendor}-{model}-{version} on " + f"{host if host else '0.0.0.0'}:{port}") + + server = MockSSHDevice.create_device( + vendor=vendor, + model=model, + version=version, + host=host, + port=port + ) app.state.servers.append(server) asyncio.create_task(server.start()) @@ -46,8 +108,13 @@ async def on_startup() -> None: async def on_shutdown() -> None: """ put all clean logic here """ log.info("Shutting down the application...") - for server in app.state.servers: - server.stop() + if hasattr(app.state, 'servers'): + for server in app.state.servers: + try: + if hasattr(server, '_server'): + server.stop() + except Exception as e: + log.error(f"Error stopping server: {e}") # Register event handlers on simunet_app instance @@ -55,6 +122,107 @@ async def on_shutdown() -> None: app.add_event_handler("shutdown", on_shutdown) +async def _start_devices_for_worker(worker_id: int, num_workers: int): + """ + Start allocated devices in worker process + + :param worker_id: Worker ID (0-based) + :param num_workers: Total number of workers + """ + # Get all device configurations + all_devices = container.config()["devices"] + total_devices = len(all_devices) + + # Calculate device allocation + devices_per_worker = total_devices // num_workers + start_idx = worker_id * devices_per_worker + end_idx = start_idx + devices_per_worker + + # Last worker handles all remaining devices + if worker_id == num_workers - 1: + end_idx = total_devices + + # Get device list for current worker + my_devices = all_devices[start_idx:end_idx] + + if not my_devices: + log.warning(f"Worker {worker_id}/{num_workers} has no devices to handle") + return + + port_range = f"{my_devices[0]['port']}-{my_devices[-1]['port']}" + log.info(f"Worker {worker_id}/{num_workers} (PID: {os.getpid()}) " + f"handling {len(my_devices)} devices, ports: {port_range}") + + # Start devices assigned to current worker + servers = [] + for dev in my_devices: + host = dev.get("host", None) + port = dev["port"] + vendor = dev["vendor"] + model = dev["model"] + version = dev["version"] + + log.info(f"Worker {worker_id}: Starting SSH server {vendor}-{model}-{version} on " + f"{host if host else '0.0.0.0'}:{port}") + + server = MockSSHDevice.create_device( + vendor=vendor, + model=model, + version=version, + host=host, + port=port + ) + servers.append(server) + await server.start() + + # Keep process running + log.info(f"Worker {worker_id} started {len(servers)} devices successfully") + try: + while True: + await asyncio.sleep(3600) + except asyncio.CancelledError: + log.info(f"Worker {worker_id} shutting down...") + for server in servers: + server.stop() + + +def _start_worker(worker_id: int, num_workers: int, config_file: str = None): + """ + Start single worker process (process entry point) + + :param worker_id: Worker ID (0-based) + :param num_workers: Total number of workers + :param config_file: Configuration file path + """ + # Set environment variables for current process + os.environ["WORKER_ID"] = str(worker_id) + os.environ["NUM_WORKERS"] = str(num_workers) + + # Reload configuration if config file is specified + if config_file: + os.environ["NETDRIVER_SIMUNET_CONFIG"] = config_file + container.config.from_yaml(config_file, required=True) + + # Reconfigure logging for subprocess (avoid log conflicts) + log_file = container.config.logging.log_file() + # Use separate log file for each worker + if log_file and num_workers > 1: + log_file_base = log_file.rsplit('.', 1) + if len(log_file_base) == 2: + log_file = f"{log_file_base[0]}_worker_{worker_id}.{log_file_base[1]}" + else: + log_file = f"{log_file}_worker_{worker_id}" + + logman.configure_logman( + level=container.config.logging.level(), + intercept_loggers=container.config.logging.intercept_loggers(), + log_file=log_file + ) + + # Run async device startup logic + asyncio.run(_start_devices_for_worker(worker_id, num_workers)) + + @app.get("/") async def root() -> dict: """ root endpoint """ @@ -73,7 +241,25 @@ async def health() -> dict: def start(): - """Start the simunet server with optional configuration file parameter.""" + """ + Start simunet service + + Usage: + + 1. Default mode (auto-detect workers based on CPU cores): + simunet # Auto uses (CPU cores - 2) workers (minimum 1) + + 2. Specify worker count (using environment variable): + NUM_WORKERS=4 simunet + + 3. Specify worker count (using command-line argument): + simunet --workers 4 + + Note: + - Default auto-detect: max(1, cpu_count - 2) + - Multi-worker mode does not support reload + - Single-worker mode supports reload + """ parser = argparse.ArgumentParser(description="NetDriver SimuNet Server") parser.add_argument( "-c", "--config", @@ -104,6 +290,12 @@ def start(): action="store_true", help="Disable auto-reload" ) + parser.add_argument( + "-w", "--workers", + type=int, + default=None, + help="Number of worker processes (default: auto-detect based on CPU cores, or NUM_WORKERS env var)" + ) args = parser.parse_args() @@ -119,12 +311,76 @@ def start(): log_file=container.config.logging.log_file() ) - # Handle reload flag - reload = args.reload and not args.no_reload - - uvicorn.run( - "netdriver_simunet.main:app", - host=args.host, - port=args.port, - reload=reload - ) + # Determine worker count: command-line arg > env var > auto-detect (CPU cores - 2, min 1) + num_workers = args.workers + if num_workers is None: + num_workers_env = os.getenv("NUM_WORKERS") + if num_workers_env: + num_workers = int(num_workers_env) + else: + # Auto-detect: CPU cores - 2, minimum 1 + cpu_count = os.cpu_count() or 1 + num_workers = max(1, cpu_count - 2) + log.info(f"Auto-detected {cpu_count} CPU cores, using {num_workers} workers") + + # Get total devices, limit worker count to not exceed device count + all_devices = container.config()["devices"] + total_devices = len(all_devices) + if num_workers > total_devices: + log.warning(f"Worker count ({num_workers}) exceeds device count ({total_devices}), " + f"adjusting to {total_devices} workers") + num_workers = total_devices + + if num_workers > 1: + # Multi-worker mode: auto-start multiple processes + log.info("=" * 60) + log.info(f"Starting Simunet with {num_workers} workers (multi-process mode)") + log.info(f"Note: Auto-reload is disabled in multi-worker mode") + log.info("=" * 60) + + # Create and start multiple processes + processes = [] + for worker_id in range(num_workers): + p = multiprocessing.Process( + target=_start_worker, + args=(worker_id, num_workers, args.config), + name=f"simunet-worker-{worker_id}" + ) + p.start() + log.info(f"✓ Worker {worker_id}/{num_workers} started (PID: {p.pid})") + processes.append(p) + + log.info("=" * 60) + log.info("All workers started successfully!") + log.info(f"Press Ctrl+C to stop all workers") + log.info("=" * 60) + + # Wait for all processes + try: + for p in processes: + p.join() + except KeyboardInterrupt: + log.info("\n" + "=" * 60) + log.info("Stopping all workers...") + log.info("=" * 60) + for p in processes: + p.terminate() + for p in processes: + p.join() + log.info("✓ All workers stopped") + else: + # Single-worker mode: start all devices + log.info("Starting in single-worker mode (all devices)") + + # Handle reload flag + reload = args.reload and not args.no_reload + + if reload: + log.info("Auto-reload is enabled for development") + + uvicorn.run( + "netdriver_simunet.main:app", + host=args.host, + port=args.port, + reload=reload + ) diff --git a/packages/simunet/src/netdriver_simunet/server/device.py b/packages/simunet/src/netdriver_simunet/server/device.py index 767793b..0ed8938 100755 --- a/packages/simunet/src/netdriver_simunet/server/device.py +++ b/packages/simunet/src/netdriver_simunet/server/device.py @@ -24,6 +24,11 @@ class MockSSHDevice(asyncssh.SSHServer): _server: asyncssh.SSHAcceptor _logger = log _handlers = List[CommandHandler] + _conn: Optional[asyncssh.SSHServerConnection] = None + _client_ip: Optional[str] = None + _client_port: Optional[int] = None + _server_ip: Optional[str] = None + _server_port: Optional[int] = None vendor: str model: str @@ -70,24 +75,43 @@ def __init__(self, host: str = None, port: int = 8022, def connection_made(self, conn: asyncssh.SSHServerConnection): """ Hook after connection established """ + self._conn = conn peer_name = conn.get_extra_info('peername') - client_ip, client_port = peer_name[0], peer_name[1] - self._logger.info(f"SSH connection received from {client_ip}:{client_port}") + sock_name = conn.get_extra_info('sockname') + self._client_ip, self._client_port = peer_name[0], peer_name[1] + self._server_ip, self._server_port = sock_name[0], sock_name[1] + self._logger.info(f"SSH connection received from {self._client_ip}:{self._client_port} on port {self._server_port}") + def connection_lost(self, exc: Optional[Exception]): """ Hook after connection lost """ + port_info = f" on port {self._server_port}" if self._server_port else "" + if exc: - self._logger.error(f"SSH connection error: {exc}") + self._logger.error(f"SSH connection error{port_info}: {exc}, type: {type(exc).__name__}") else: - self._logger.info('SSH connection closed') + self._logger.info(f"SSH connection closed{port_info}") + def password_auth_supported(self) -> bool: """ Configure to use password authentication """ + self._logger.info(f"password_auth_supported() called on port {self._server_port}") return True def begin_auth(self, username: str) -> bool: """ Begin user authentication """ + self._logger.info(f"begin_auth() called for user: {username} on port {self._server_port}") return True + + def public_key_auth_supported(self) -> bool: + """ Indicate if public key authentication is supported """ + self._logger.info(f"public_key_auth_supported() called on port {self._server_port}") + return False # We only use password auth + + def kbdint_auth_supported(self) -> bool: + """ Indicate if keyboard-interactive authentication is supported """ + self._logger.info(f"kbdint_auth_supported() called on port {self._server_port}") + return False # We only use password auth async def validate_password(self, username: str, password: str) -> bool: """ Validate user password """ @@ -97,8 +121,10 @@ async def validate_password(self, username: str, password: str) -> bool: async def handle_process(self, process: asyncssh.SSHServerProcess): """ Handle process created by SSH client """ width, height, pixwidth, pixheight = process.term_size - self._logger.info(f"Process started with size [{width}x{height}] pixels \ - [{pixwidth}x{pixheight}]") + if self._client_ip and self._client_port and self._server_ip and self._server_port: + self._logger.info(f"Process started with size [{width}x{height}] pixels [{pixwidth}x{pixheight}] from {self._client_ip}:{self._client_port} -> {self._server_ip}:{self._server_port}") + else: + self._logger.info(f"Process started with size [{width}x{height}] pixels [{pixwidth}x{pixheight}]") try: _handler = CommandHandlerFactory.create_handler(process, self.vendor, self.model, diff --git a/packages/simunet/src/netdriver_simunet/server/handlers/command_handler.py b/packages/simunet/src/netdriver_simunet/server/handlers/command_handler.py index 77bdeda..b07f943 100755 --- a/packages/simunet/src/netdriver_simunet/server/handlers/command_handler.py +++ b/packages/simunet/src/netdriver_simunet/server/handlers/command_handler.py @@ -19,6 +19,7 @@ class CommandHandler(abc.ABC): _mode = Mode # current mode _mode_cmd_map: Dict[str, str] # mode command dictionary _common_cmd_map: Dict[str, str] # common command dictionary + _config_cache: Dict[str, DeviceConfig] = {} # config cache (class variable, shared by all instances) info: DeviceBaseInfo conf_path: str config: DeviceConfig @@ -44,9 +45,18 @@ def __init__(self, process: SSHServerProcess): def _load_config(self): """ Load config from file """ + # check cache + if self.conf_path in self._config_cache: + self.config = self._config_cache[self.conf_path] + self._logger.info(f"Config loaded from cache: {self.conf_path}") + return + try: conf_dict = yaml.safe_load(Path(self.conf_path).read_text(encoding='utf-8')) self.config = DeviceConfig(**conf_dict) + # cache config + self._config_cache[self.conf_path] = self.config + self._logger.info(f"Config loaded and cached: {self.conf_path}") except Exception as e: self._logger.error(f"Config load failed: {e}") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 929b402..5a6ff98 100755 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -56,18 +56,28 @@ def simunet_process(request: pytest.FixtureRequest): yield None return - # Start simunet process using uvicorn directly - logman.logger.info("Starting simunet process for integration tests...") + # Start simunet process with --no-reload to avoid conflicts + # Support NUM_WORKERS environment variable for multi-process testing + import os + num_workers = os.getenv("NUM_WORKERS", "1") + logman.logger.info(f"Starting simunet process for integration tests (NUM_WORKERS={num_workers})...") + + cmd = ["uv", "run", "simunet", "--no-reload"] + if int(num_workers) > 1: + cmd.extend(["--workers", num_workers]) + process = subprocess.Popen( - ["uv", "run", "simunet"], + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True + text=True, + env={**os.environ, "NUM_WORKERS": num_workers} ) - # Wait for simunet to start up (give it some time to initialize all SSH servers) - logman.logger.info("Waiting for simunet to start up...") - time.sleep(5) + # Wait for simunet to start up (give it more time for multi-worker mode) + wait_time = 10 if int(num_workers) > 1 else 5 + logman.logger.info(f"Waiting {wait_time}s for simunet to start up...") + time.sleep(wait_time) # Check if process is still running if process.poll() is not None: From 0264a2ac8829e95c530cb7a5cd66d128e79d9e98 Mon Sep 17 00:00:00 2001 From: bobby Date: Thu, 9 Apr 2026 14:33:13 +0800 Subject: [PATCH 2/4] docs: Remove --force flag from documentation - Remove --force parameter from quick-start guide - Update CLAUDE.md to remove port cleanup references - Add multi-worker usage examples - Update command examples with worker configuration Made-with: Cursor --- CLAUDE.md | 3 --- docs/quick-start-simunet.md | 12 +++++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index a0fee7b..1c594b9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -83,9 +83,6 @@ uv run simunet --workers 4 # Force single worker mode (enables auto-reload for development) NUM_WORKERS=1 uv run simunet - -# Auto-cleanup occupied ports before starting -uv run simunet --force ``` ### Testing diff --git a/docs/quick-start-simunet.md b/docs/quick-start-simunet.md index 25f40de..5d64fd0 100644 --- a/docs/quick-start-simunet.md +++ b/docs/quick-start-simunet.md @@ -121,23 +121,25 @@ python -c "import netdriver.simunet; print('NetDriver Agent installed successful **Basic Usage**: ```bash -# Start SimuNet service +# Start SimuNet service (auto-detects CPU cores and sets workers) simunet # Start without auto-reload (production) simunet --no-reload -# Force cleanup occupied ports before starting -simunet --force +# Start with specific number of workers +simunet --workers 4 --no-reload ``` **Custom Configuration**: ```bash +# Use custom config file simunet --config /path/to/simunet.yml --port 8001 -``` -**Note**: When ports are already occupied, use the `--force` flag to automatically clean up and restart, avoiding manual intervention. +# Set workers via environment variable +NUM_WORKERS=4 simunet --no-reload +``` ### Option 2: Install via Docker From 9e8df0cff4cac5324f72d9aed891bde376944286 Mon Sep 17 00:00:00 2001 From: bobby Date: Thu, 9 Apr 2026 14:41:03 +0800 Subject: [PATCH 3/4] fix: Add console output for startup messages - Print startup messages to both console and log file - Show worker count auto-detection in terminal - Display worker startup progress in real-time - Print shutdown messages during graceful stop - Fix issue where service appeared to hang with no output This ensures users can see the startup progress in terminal instead of only writing to log files. Made-with: Cursor --- .../simunet/src/netdriver_simunet/main.py | 65 ++++++++++++++----- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/packages/simunet/src/netdriver_simunet/main.py b/packages/simunet/src/netdriver_simunet/main.py index 2ab3210..6e4452c 100755 --- a/packages/simunet/src/netdriver_simunet/main.py +++ b/packages/simunet/src/netdriver_simunet/main.py @@ -321,22 +321,31 @@ def start(): # Auto-detect: CPU cores - 2, minimum 1 cpu_count = os.cpu_count() or 1 num_workers = max(1, cpu_count - 2) - log.info(f"Auto-detected {cpu_count} CPU cores, using {num_workers} workers") + auto_msg = f"Auto-detected {cpu_count} CPU cores, using {num_workers} workers" + print(auto_msg) + log.info(auto_msg) # Get total devices, limit worker count to not exceed device count all_devices = container.config()["devices"] total_devices = len(all_devices) if num_workers > total_devices: - log.warning(f"Worker count ({num_workers}) exceeds device count ({total_devices}), " - f"adjusting to {total_devices} workers") + adjust_msg = (f"Worker count ({num_workers}) exceeds device count ({total_devices}), " + f"adjusting to {total_devices} workers") + print(adjust_msg) + log.warning(adjust_msg) num_workers = total_devices if num_workers > 1: # Multi-worker mode: auto-start multiple processes - log.info("=" * 60) - log.info(f"Starting Simunet with {num_workers} workers (multi-process mode)") - log.info(f"Note: Auto-reload is disabled in multi-worker mode") - log.info("=" * 60) + startup_msg = [ + "=" * 60, + f"Starting Simunet with {num_workers} workers (multi-process mode)", + f"Note: Auto-reload is disabled in multi-worker mode", + "=" * 60 + ] + for msg in startup_msg: + print(msg) + log.info(msg) # Create and start multiple processes processes = [] @@ -347,36 +356,56 @@ def start(): name=f"simunet-worker-{worker_id}" ) p.start() - log.info(f"✓ Worker {worker_id}/{num_workers} started (PID: {p.pid})") + worker_msg = f"✓ Worker {worker_id}/{num_workers} started (PID: {p.pid})" + print(worker_msg) + log.info(worker_msg) processes.append(p) - log.info("=" * 60) - log.info("All workers started successfully!") - log.info(f"Press Ctrl+C to stop all workers") - log.info("=" * 60) + completion_msg = [ + "=" * 60, + "All workers started successfully!", + f"Press Ctrl+C to stop all workers", + "=" * 60 + ] + for msg in completion_msg: + print(msg) + log.info(msg) # Wait for all processes try: for p in processes: p.join() except KeyboardInterrupt: - log.info("\n" + "=" * 60) - log.info("Stopping all workers...") - log.info("=" * 60) + shutdown_msg = [ + "\n" + "=" * 60, + "Stopping all workers...", + "=" * 60 + ] + for msg in shutdown_msg: + print(msg) + log.info(msg) + for p in processes: p.terminate() for p in processes: p.join() - log.info("✓ All workers stopped") + + stop_msg = "✓ All workers stopped" + print(stop_msg) + log.info(stop_msg) else: # Single-worker mode: start all devices - log.info("Starting in single-worker mode (all devices)") + startup_msg = f"Starting in single-worker mode (all {total_devices} devices)" + print(startup_msg) + log.info(startup_msg) # Handle reload flag reload = args.reload and not args.no_reload if reload: - log.info("Auto-reload is enabled for development") + reload_msg = "Auto-reload is enabled for development" + print(reload_msg) + log.info(reload_msg) uvicorn.run( "netdriver_simunet.main:app", From c163d9b03eda176f2ad5683cb1e2cd3b5248711e Mon Sep 17 00:00:00 2001 From: bobby Date: Thu, 9 Apr 2026 14:57:35 +0800 Subject: [PATCH 4/4] fix: Ensure single worker mode in CI tests - Always pass --workers parameter to simunet command - Fix issue where NUM_WORKERS env var was ignored when = 1 - This caused CI tests to use auto-detected CPU cores instead of 1 - Remove redundant NUM_WORKERS from process environment This fixes the intermittent test failures in GitHub Actions where multi-worker mode was unexpectedly enabled, causing devices to be distributed across workers and making some devices unreachable. Fixes: #test_huawei_ce failures (R0010 errors) Made-with: Cursor --- tests/integration/conftest.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 5a6ff98..8077db4 100755 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -62,20 +62,17 @@ def simunet_process(request: pytest.FixtureRequest): num_workers = os.getenv("NUM_WORKERS", "1") logman.logger.info(f"Starting simunet process for integration tests (NUM_WORKERS={num_workers})...") - cmd = ["uv", "run", "simunet", "--no-reload"] - if int(num_workers) > 1: - cmd.extend(["--workers", num_workers]) + cmd = ["uv", "run", "simunet", "--no-reload", "--workers", num_workers] process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True, - env={**os.environ, "NUM_WORKERS": num_workers} + text=True ) # Wait for simunet to start up (give it more time for multi-worker mode) - wait_time = 10 if int(num_workers) > 1 else 5 + wait_time = 15 if int(num_workers) > 1 else 5 logman.logger.info(f"Waiting {wait_time}s for simunet to start up...") time.sleep(wait_time)