From f19ea7b63f14988be92113f42a8a309d1f887dd8 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sat, 28 Mar 2026 19:09:12 -0300 Subject: [PATCH 1/4] (feat) add ws for executors --- deps.py | 15 +- main.py | 10 + routers/websocket.py | 150 +++++++++++ services/executor_ws_manager.py | 442 ++++++++++++++++++++++++++++++++ 4 files changed, 613 insertions(+), 4 deletions(-) create mode 100644 routers/websocket.py create mode 100644 services/executor_ws_manager.py diff --git a/deps.py b/deps.py index 99e0f776..440bf778 100644 --- a/deps.py +++ b/deps.py @@ -1,14 +1,16 @@ from fastapi import Request -from services.bots_orchestrator import BotsOrchestrator + +from database import AsyncDatabaseManager from services.accounts_service import AccountsService +from services.bots_orchestrator import BotsOrchestrator from services.docker_service import DockerService +from services.executor_service import ExecutorService +from services.executor_ws_manager import ExecutorWebSocketManager from services.gateway_service import GatewayService -from services.unified_connector_service import UnifiedConnectorService from services.market_data_service import MarketDataService from services.trading_service import TradingService -from services.executor_service import ExecutorService +from services.unified_connector_service import UnifiedConnectorService from utils.bot_archiver import BotArchiver -from database import AsyncDatabaseManager def get_bots_orchestrator(request: Request) -> BotsOrchestrator: @@ -59,3 +61,8 @@ def get_bot_archiver(request: Request) -> BotArchiver: def get_database_manager(request: Request) -> AsyncDatabaseManager: """Get AsyncDatabaseManager from app state.""" return request.app.state.db_manager + + +def get_executor_ws_manager(request: Request) -> ExecutorWebSocketManager: + """Get ExecutorWebSocketManager from app state.""" + return request.app.state.executor_ws_manager diff --git a/main.py b/main.py index 0f2d996b..91df4816 100644 --- a/main.py +++ b/main.py @@ -58,11 +58,13 @@ def patched_save_to_yml(yml_path, cm): rate_oracle, scripts, trading, + websocket, ) from services.accounts_service import AccountsService # noqa: E402 from services.bots_orchestrator import BotsOrchestrator # noqa: E402 from services.docker_service import DockerService # noqa: E402 from services.executor_service import ExecutorService # noqa: E402 +from services.executor_ws_manager import ExecutorWebSocketManager # noqa: E402 from services.gateway_service import GatewayService # noqa: E402 from services.market_data_service import MarketDataService # noqa: E402 from services.trading_service import TradingService # noqa: E402 @@ -271,6 +273,10 @@ async def lifespan(app: FastAPI): app.state.gateway_service = gateway_service app.state.bot_archiver = bot_archiver + # WebSocket manager for executor streaming + executor_ws_manager = ExecutorWebSocketManager(executor_service, market_data_service) + app.state.executor_ws_manager = executor_ws_manager + logging.info("All services started successfully") yield @@ -281,6 +287,7 @@ async def lifespan(app: FastAPI): logging.info("Shutting down services...") + await executor_ws_manager.shutdown() bots_orchestrator.stop() await accounts_service.stop() await executor_service.stop() @@ -381,6 +388,9 @@ def auth_user( app.include_router(executors.router, dependencies=[Depends(auth_user)]) app.include_router(gateway_proxy.router, dependencies=[Depends(auth_user)]) +# WebSocket router (handles its own auth) +app.include_router(websocket.router) + @app.get("/") async def root(): diff --git a/routers/websocket.py b/routers/websocket.py new file mode 100644 index 00000000..41141d13 --- /dev/null +++ b/routers/websocket.py @@ -0,0 +1,150 @@ +""" +WebSocket router for real-time executor data streaming. +""" +import asyncio +import base64 +import logging +import secrets +import time +import uuid + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +from config import settings + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["WebSocket"]) + +HEARTBEAT_INTERVAL = 30 # seconds + + +def _authenticate_websocket(websocket: WebSocket) -> bool: + """ + Authenticate a WebSocket connection using Basic Auth from headers or query params. + + Returns True if authenticated (or debug mode), False otherwise. + """ + if settings.security.debug_mode: + return True + + # Try Authorization header first + auth_header = websocket.headers.get("authorization", "") + if auth_header.startswith("Basic "): + try: + decoded = base64.b64decode(auth_header[6:]).decode("utf-8") + ws_user, ws_pass = decoded.split(":", 1) + except Exception: + return False + else: + # Fallback to query parameters + ws_user = websocket.query_params.get("username", "") + ws_pass = websocket.query_params.get("password", "") + + correct_user = secrets.compare_digest( + ws_user.encode(), settings.security.username.encode() + ) + correct_pass = secrets.compare_digest( + ws_pass.encode(), settings.security.password.encode() + ) + return correct_user and correct_pass + + +async def _heartbeat_loop(websocket: WebSocket) -> None: + """Send periodic heartbeat pings.""" + try: + while True: + await asyncio.sleep(HEARTBEAT_INTERVAL) + await websocket.send_json({ + "type": "heartbeat", + "timestamp": time.time(), + }) + except asyncio.CancelledError: + pass + except Exception: + pass + + +@router.websocket("/ws/executors") +async def executors_websocket(websocket: WebSocket) -> None: + """ + WebSocket endpoint for streaming executor data. + + Authentication: Basic Auth via Authorization header or query params + (?username=...&password=...). + + Subscribe/unsubscribe protocol: + -> {"action": "subscribe", "type": "executor_summary", "update_interval": 2.0} + <- {"type": "subscribed", "subscription_id": "executor_summary", ...} + <- {"type": "executor_summary", "subscription_id": "executor_summary", "data": {...}, ...} + -> {"action": "unsubscribe", "subscription_id": "executor_summary"} + <- {"type": "unsubscribed", "subscription_id": "executor_summary"} + + Subscription types: + - executors: filtered list of executors + - executor_detail: single executor detail + - executor_summary: aggregate summary of active executors + - performance: performance report (optionally per controller) + - positions: held positions with unrealized PnL + - executor_logs: streaming log entries for an executor + """ + await websocket.accept() + + # Authenticate + if not _authenticate_websocket(websocket): + await websocket.send_json({ + "type": "error", + "message": "Authentication failed", + }) + await websocket.close(code=4001, reason="Authentication failed") + return + + # Get manager from app state + manager = websocket.app.state.executor_ws_manager + conn_id = str(uuid.uuid4())[:12] + + await websocket.send_json({ + "type": "connected", + "connection_id": conn_id, + "timestamp": time.time(), + }) + logger.info(f"[WS-Exec] Client connected: {conn_id}") + + heartbeat_task = asyncio.create_task( + _heartbeat_loop(websocket), name=f"ws-exec-hb-{conn_id}" + ) + + try: + while True: + raw = await websocket.receive_json() + action = raw.get("action") + + if action == "subscribe": + await manager.handle_subscribe(conn_id, websocket, raw) + elif action == "unsubscribe": + sub_id = raw.get("subscription_id") + if sub_id: + await manager.handle_unsubscribe(conn_id, websocket, sub_id) + else: + await websocket.send_json({ + "type": "error", + "message": "unsubscribe requires 'subscription_id'", + }) + elif action == "ping": + await websocket.send_json({ + "type": "pong", + "timestamp": time.time(), + }) + else: + await websocket.send_json({ + "type": "error", + "message": f"Unknown action: {action}. " + f"Valid actions: subscribe, unsubscribe, ping", + }) + except WebSocketDisconnect: + logger.info(f"[WS-Exec] Client disconnected: {conn_id}") + except Exception as e: + logger.error(f"[WS-Exec] Error for {conn_id}: {e}", exc_info=True) + finally: + heartbeat_task.cancel() + manager.remove_connection(conn_id) diff --git a/services/executor_ws_manager.py b/services/executor_ws_manager.py new file mode 100644 index 00000000..1b65c1ba --- /dev/null +++ b/services/executor_ws_manager.py @@ -0,0 +1,442 @@ +""" +WebSocket manager for executor/controller data streaming. + +Provides real-time push updates for executor status, performance, +positions, summary, and logs via WebSocket subscriptions. +""" +import asyncio +import hashlib +import json +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +from fastapi import WebSocket + +from services.executor_service import ExecutorService +from services.market_data_service import MarketDataService + +logger = logging.getLogger(__name__) + +# Update interval bounds (seconds) +MIN_UPDATE_INTERVAL = 0.5 +MAX_UPDATE_INTERVAL = 60.0 +DEFAULT_UPDATE_INTERVAL = 2.0 + +SUBSCRIPTION_TYPES = { + "executors", + "executor_detail", + "executor_summary", + "performance", + "positions", + "executor_logs", +} + + +@dataclass +class ExecutorSubscription: + """Tracks a single WebSocket subscription.""" + sub_id: str + sub_type: str + update_interval: float + task: Optional[asyncio.Task] = None + + # For executors subscription + filters: Dict[str, Any] = field(default_factory=dict) + + # For executor_detail / executor_logs + executor_id: Optional[str] = None + + # For performance / positions + controller_id: Optional[str] = None + + # For executor_logs + log_level: Optional[str] = None + log_limit: int = 100 + + # Change detection + last_sent_hash: Optional[str] = None + # For logs: track count to send only new entries + last_log_count: int = 0 + + +def _compute_hash(data: Any) -> str: + """MD5 hash of JSON-serialized data for change detection.""" + raw = json.dumps(data, sort_keys=True, default=str) + return hashlib.md5(raw.encode()).hexdigest() + + +def _clamp_interval(interval: Optional[float]) -> float: + """Clamp update interval to allowed range.""" + if interval is None: + return DEFAULT_UPDATE_INTERVAL + return max(MIN_UPDATE_INTERVAL, min(MAX_UPDATE_INTERVAL, interval)) + + +class ExecutorWebSocketManager: + """ + Manages WebSocket subscriptions for executor data. + + Each subscription spawns an asyncio task that polls the relevant + ExecutorService method, computes a hash for change detection, + and pushes updates only when data changes. + """ + + def __init__( + self, + executor_service: ExecutorService, + market_data_service: MarketDataService, + ): + self._executor_service = executor_service + self._market_data_service = market_data_service + # conn_id -> {sub_id -> ExecutorSubscription} + self._subscriptions: Dict[str, Dict[str, ExecutorSubscription]] = {} + + async def handle_subscribe( + self, conn_id: str, websocket: WebSocket, msg: Dict[str, Any] + ) -> None: + """Handle a subscribe message from the client.""" + sub_type = msg.get("type") + if sub_type not in SUBSCRIPTION_TYPES: + await self._send_error( + websocket, + f"Unknown subscription type: {sub_type}. " + f"Valid types: {sorted(SUBSCRIPTION_TYPES)}", + ) + return + + interval = _clamp_interval(msg.get("update_interval")) + + # Build subscription + sub = ExecutorSubscription( + sub_id="", # set below + sub_type=sub_type, + update_interval=interval, + ) + + if sub_type == "executors": + filters = msg.get("filters", {}) + sub.filters = filters + # Hash the filters for a stable sub ID + fh = _compute_hash(filters)[:8] + sub.sub_id = f"executors_{fh}" + + elif sub_type == "executor_detail": + executor_id = msg.get("executor_id") + if not executor_id: + await self._send_error(websocket, "executor_detail requires 'executor_id'") + return + sub.executor_id = executor_id + sub.sub_id = f"executor_detail_{executor_id}" + + elif sub_type == "executor_summary": + sub.sub_id = "executor_summary" + + elif sub_type == "performance": + sub.controller_id = msg.get("controller_id") + cid = sub.controller_id or "all" + sub.sub_id = f"performance_{cid}" + + elif sub_type == "positions": + sub.controller_id = msg.get("controller_id") + cid = sub.controller_id or "all" + sub.sub_id = f"positions_{cid}" + + elif sub_type == "executor_logs": + executor_id = msg.get("executor_id") + if not executor_id: + await self._send_error(websocket, "executor_logs requires 'executor_id'") + return + sub.executor_id = executor_id + sub.log_level = msg.get("level") + sub.log_limit = msg.get("limit", 100) + sub.sub_id = f"executor_logs_{executor_id}" + + # Cancel existing subscription with same ID for this connection + conn_subs = self._subscriptions.setdefault(conn_id, {}) + if sub.sub_id in conn_subs: + old = conn_subs[sub.sub_id] + if old.task and not old.task.done(): + old.task.cancel() + + # Spawn push loop + push_fn = self._get_push_fn(sub_type) + sub.task = asyncio.create_task( + push_fn(conn_id, websocket, sub), + name=f"ws-executor-{conn_id}-{sub.sub_id}", + ) + conn_subs[sub.sub_id] = sub + + await websocket.send_json({ + "type": "subscribed", + "subscription_id": sub.sub_id, + "subscription_type": sub_type, + "update_interval": interval, + }) + logger.info(f"[WS-Exec] {conn_id} subscribed to {sub.sub_id}") + + async def handle_unsubscribe( + self, conn_id: str, websocket: WebSocket, sub_id: str + ) -> None: + """Handle an unsubscribe message from the client.""" + conn_subs = self._subscriptions.get(conn_id, {}) + sub = conn_subs.pop(sub_id, None) + if sub: + if sub.task and not sub.task.done(): + sub.task.cancel() + await websocket.send_json({ + "type": "unsubscribed", + "subscription_id": sub_id, + }) + logger.info(f"[WS-Exec] {conn_id} unsubscribed from {sub_id}") + else: + await self._send_error(websocket, f"No subscription found: {sub_id}") + + def remove_connection(self, conn_id: str) -> None: + """Clean up all subscriptions for a disconnected client.""" + conn_subs = self._subscriptions.pop(conn_id, {}) + for sub in conn_subs.values(): + if sub.task and not sub.task.done(): + sub.task.cancel() + if conn_subs: + logger.info( + f"[WS-Exec] Cleaned up {len(conn_subs)} subscriptions for {conn_id}" + ) + + async def shutdown(self) -> None: + """Cancel all subscription tasks across all connections.""" + for conn_id in list(self._subscriptions.keys()): + self.remove_connection(conn_id) + logger.info("[WS-Exec] Shutdown complete") + + # ------------------------------------------------------------------ + # Push loop dispatch + # ------------------------------------------------------------------ + + def _get_push_fn(self, sub_type: str): + return { + "executors": self._executors_push_loop, + "executor_detail": self._executor_detail_push_loop, + "executor_summary": self._summary_push_loop, + "performance": self._performance_push_loop, + "positions": self._positions_push_loop, + "executor_logs": self._logs_push_loop, + }[sub_type] + + # ------------------------------------------------------------------ + # Push loops + # ------------------------------------------------------------------ + + async def _executors_push_loop( + self, conn_id: str, websocket: WebSocket, sub: ExecutorSubscription + ) -> None: + """Poll get_executors() with filters and push on change.""" + try: + while True: + try: + filters = sub.filters + executors = await self._executor_service.get_executors( + account_name=filters.get("account_name"), + connector_name=filters.get("connector_name"), + trading_pair=filters.get("trading_pair"), + executor_type=filters.get("executor_type"), + status=filters.get("status"), + controller_id=filters.get("controller_id"), + ) + h = _compute_hash(executors) + if h != sub.last_sent_hash: + sub.last_sent_hash = h + await websocket.send_json({ + "type": "executors", + "subscription_id": sub.sub_id, + "data": executors, + "total_count": len(executors), + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"[WS-Exec] executors push error: {e}", exc_info=True) + await asyncio.sleep(sub.update_interval) + except asyncio.CancelledError: + pass + + async def _executor_detail_push_loop( + self, conn_id: str, websocket: WebSocket, sub: ExecutorSubscription + ) -> None: + """Poll get_executor() for a single executor and push on change.""" + try: + while True: + try: + data = await self._executor_service.get_executor(sub.executor_id) + h = _compute_hash(data) + if h != sub.last_sent_hash: + sub.last_sent_hash = h + await websocket.send_json({ + "type": "executor_detail", + "subscription_id": sub.sub_id, + "data": data, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"[WS-Exec] executor_detail push error: {e}", exc_info=True) + await asyncio.sleep(sub.update_interval) + except asyncio.CancelledError: + pass + + async def _summary_push_loop( + self, conn_id: str, websocket: WebSocket, sub: ExecutorSubscription + ) -> None: + """Poll get_summary() and push on change.""" + try: + while True: + try: + data = self._executor_service.get_summary() + h = _compute_hash(data) + if h != sub.last_sent_hash: + sub.last_sent_hash = h + await websocket.send_json({ + "type": "executor_summary", + "subscription_id": sub.sub_id, + "data": data, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"[WS-Exec] summary push error: {e}", exc_info=True) + await asyncio.sleep(sub.update_interval) + except asyncio.CancelledError: + pass + + async def _performance_push_loop( + self, conn_id: str, websocket: WebSocket, sub: ExecutorSubscription + ) -> None: + """Poll get_performance_report() and push on change.""" + try: + while True: + try: + data = await self._executor_service.get_performance_report( + controller_id=sub.controller_id, + market_data_service=self._market_data_service, + ) + h = _compute_hash(data) + if h != sub.last_sent_hash: + sub.last_sent_hash = h + await websocket.send_json({ + "type": "performance", + "subscription_id": sub.sub_id, + "data": data, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"[WS-Exec] performance push error: {e}", exc_info=True) + await asyncio.sleep(sub.update_interval) + except asyncio.CancelledError: + pass + + async def _positions_push_loop( + self, conn_id: str, websocket: WebSocket, sub: ExecutorSubscription + ) -> None: + """Poll get_positions_held() with unrealized PnL and push on change.""" + try: + while True: + try: + positions = self._executor_service.get_positions_held( + controller_id=sub.controller_id, + ) + # Build response dicts with unrealized PnL + position_dicts = [] + total_realized = 0.0 + total_unrealized = None + + for p in positions: + unrealized_pnl = None + parts = p.trading_pair.split("-") + if len(parts) == 2: + base, quote = parts + rate = self._market_data_service.get_rate(base, quote) + if rate is not None: + unrealized_pnl = float(p.get_unrealized_pnl(rate)) + if total_unrealized is None: + total_unrealized = 0.0 + total_unrealized += unrealized_pnl + + total_realized += float(p.realized_pnl_quote) + position_dicts.append({ + "trading_pair": p.trading_pair, + "connector_name": p.connector_name, + "account_name": p.account_name, + "controller_id": p.controller_id, + "buy_amount_base": float(p.buy_amount_base), + "buy_amount_quote": float(p.buy_amount_quote), + "sell_amount_base": float(p.sell_amount_base), + "sell_amount_quote": float(p.sell_amount_quote), + "net_amount_base": float(p.net_amount_base), + "buy_breakeven_price": float(p.buy_breakeven_price) if p.buy_breakeven_price else None, + "sell_breakeven_price": float(p.sell_breakeven_price) if p.sell_breakeven_price else None, + "matched_amount_base": float(p.matched_amount_base), + "unmatched_amount_base": float(p.unmatched_amount_base), + "position_side": p.position_side, + "realized_pnl_quote": float(p.realized_pnl_quote), + "unrealized_pnl_quote": unrealized_pnl, + "executor_count": len(p.executor_ids), + "executor_ids": p.executor_ids, + "last_updated": p.last_updated.isoformat() if p.last_updated else None, + }) + + payload = { + "total_positions": len(positions), + "total_realized_pnl": total_realized, + "total_unrealized_pnl": total_unrealized, + "positions": position_dicts, + } + + h = _compute_hash(payload) + if h != sub.last_sent_hash: + sub.last_sent_hash = h + await websocket.send_json({ + "type": "positions", + "subscription_id": sub.sub_id, + "data": payload, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"[WS-Exec] positions push error: {e}", exc_info=True) + await asyncio.sleep(sub.update_interval) + except asyncio.CancelledError: + pass + + async def _logs_push_loop( + self, conn_id: str, websocket: WebSocket, sub: ExecutorSubscription + ) -> None: + """Poll get_executor_logs() and push only new entries.""" + try: + while True: + try: + all_logs = self._executor_service.get_executor_logs( + sub.executor_id, + level=sub.log_level, + limit=sub.log_limit, + ) + current_count = len(all_logs) + if current_count > sub.last_log_count: + new_logs = all_logs[sub.last_log_count:] + sub.last_log_count = current_count + await websocket.send_json({ + "type": "executor_logs", + "subscription_id": sub.sub_id, + "data": new_logs, + "total_count": current_count, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"[WS-Exec] logs push error: {e}", exc_info=True) + await asyncio.sleep(sub.update_interval) + except asyncio.CancelledError: + pass + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + async def _send_error(websocket: WebSocket, message: str) -> None: + await websocket.send_json({"type": "error", "message": message}) From 8660f63dea2075e425c544dc1c7da56b43383cfd Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sat, 28 Mar 2026 19:47:04 -0300 Subject: [PATCH 2/4] (feat) unif ws manager --- services/websocket_manager.py | 308 ++++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 services/websocket_manager.py diff --git a/services/websocket_manager.py b/services/websocket_manager.py new file mode 100644 index 00000000..1246e185 --- /dev/null +++ b/services/websocket_manager.py @@ -0,0 +1,308 @@ +import asyncio +import logging +import time +import uuid +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from hummingbot.core.event.event_forwarder import SourceInfoEventForwarder +from hummingbot.core.event.events import OrderBookEvent, OrderBookTradeEvent +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from starlette.websockets import WebSocket + +from config import settings +from services.market_data_service import MarketDataService + +logger = logging.getLogger(__name__) + + +@dataclass +class Subscription: + subscription_id: str + sub_type: str # "candles", "order_book", or "trades" + connector: str + trading_pair: str + update_interval: float + # Candles-specific + interval: Optional[str] = None + max_records: int = 100 + # Order book-specific + depth: int = 10 + # State tracking + last_sent_candle_ts: Optional[float] = None + last_sent_ob_uid: Optional[int] = None + task: Optional[asyncio.Task] = field(default=None, repr=False) + # Trades-specific + event_forwarder: Optional[SourceInfoEventForwarder] = field(default=None, repr=False) + trade_buffer: List = field(default_factory=list) + + +class WebSocketManager: + def __init__(self, market_data_service: MarketDataService): + self._market_data_service = market_data_service + self._connections: Dict[str, Dict[str, Subscription]] = {} + + def _clamp_interval(self, interval: float) -> float: + mn = settings.market_data.ws_min_update_interval + mx = settings.market_data.ws_max_update_interval + return max(mn, min(mx, interval)) + + async def handle_subscribe(self, conn_id: str, websocket: WebSocket, msg: dict): + sub_type = msg.get("type") + connector = msg.get("connector") + trading_pair = msg.get("trading_pair") + update_interval = self._clamp_interval(msg.get("update_interval", 1.0)) + + if sub_type not in ("candles", "order_book", "trades"): + await self._send_error(websocket, f"Invalid subscription type: {sub_type}") + return + if not connector or not trading_pair: + await self._send_error(websocket, "connector and trading_pair are required") + return + + if sub_type == "candles": + interval = msg.get("interval", "1m") + max_records = msg.get("max_records", 100) + sub_id = f"candles_{connector}_{trading_pair}_{interval}" + sub = Subscription( + subscription_id=sub_id, + sub_type="candles", + connector=connector, + trading_pair=trading_pair, + update_interval=update_interval, + interval=interval, + max_records=max_records, + ) + elif sub_type == "order_book": + depth = msg.get("depth", 10) + sub_id = f"order_book_{connector}_{trading_pair}" + sub = Subscription( + subscription_id=sub_id, + sub_type="order_book", + connector=connector, + trading_pair=trading_pair, + update_interval=update_interval, + depth=depth, + ) + else: # trades + sub_id = f"trades_{connector}_{trading_pair}" + sub = Subscription( + subscription_id=sub_id, + sub_type="trades", + connector=connector, + trading_pair=trading_pair, + update_interval=update_interval, + ) + + subs = self._connections.setdefault(conn_id, {}) + + # Cancel existing subscription with same id + if sub_id in subs: + self._cleanup_subscription(subs.pop(sub_id)) + + # Start the feed / ensure it exists + try: + if sub_type == "candles": + config = CandlesConfig( + connector=connector, + trading_pair=trading_pair, + interval=sub.interval, + max_records=sub.max_records, + ) + self._market_data_service.get_candles_feed(config) + else: + # Both order_book and trades need the order book initialized + await self._market_data_service.initialize_order_book(connector, trading_pair) + except Exception as e: + await self._send_error(websocket, f"Failed to start feed: {e}") + return + + # Spawn push loop / listener + if sub_type == "candles": + sub.task = asyncio.create_task(self._candles_push_loop(websocket, sub)) + elif sub_type == "order_book": + sub.task = asyncio.create_task(self._order_book_push_loop(websocket, sub)) + else: # trades + self._attach_trade_listener(sub) + sub.task = asyncio.create_task(self._trades_push_loop(websocket, sub)) + + subs[sub_id] = sub + + await self._send_json(websocket, { + "type": "subscribed", + "subscription_id": sub_id, + }) + logger.info(f"[{conn_id}] Subscribed: {sub_id} (interval={update_interval}s)") + + async def handle_unsubscribe(self, conn_id: str, websocket: WebSocket, sub_id: str): + subs = self._connections.get(conn_id, {}) + sub = subs.pop(sub_id, None) + if sub: + self._cleanup_subscription(sub) + await self._send_json(websocket, { + "type": "unsubscribed", + "subscription_id": sub_id, + }) + logger.info(f"[{conn_id}] Unsubscribed: {sub_id}") + else: + await self._send_error(websocket, f"Subscription not found: {sub_id}") + + def remove_connection(self, conn_id: str): + subs = self._connections.pop(conn_id, {}) + for sub in subs.values(): + self._cleanup_subscription(sub) + if subs: + logger.info(f"[{conn_id}] Removed connection, cancelled {len(subs)} subscriptions") + + def _cleanup_subscription(self, sub: Subscription): + if sub.task and not sub.task.done(): + sub.task.cancel() + if sub.event_forwarder: + self._detach_trade_listener(sub) + + def shutdown(self): + for conn_id in list(self._connections.keys()): + self.remove_connection(conn_id) + logger.info("WebSocketManager shut down") + + # ==================== Push Loops ==================== + + async def _candles_push_loop(self, websocket: WebSocket, sub: Subscription): + try: + config = CandlesConfig( + connector=sub.connector, + trading_pair=sub.trading_pair, + interval=sub.interval, + max_records=sub.max_records, + ) + while True: + await asyncio.sleep(sub.update_interval) + try: + feed = self._market_data_service.get_candles_feed(config) + if not feed.ready: + continue + df = feed.candles_df + if df is None or df.empty: + continue + latest_ts = float(df["timestamp"].iloc[-1]) + new_candle = sub.last_sent_candle_ts is None or latest_ts != sub.last_sent_candle_ts + if new_candle: + # New candle row appeared — send full history + sub.last_sent_candle_ts = latest_ts + records = df.tail(sub.max_records).to_dict(orient="records") + await self._send_json(websocket, { + "type": "candles", + "subscription_id": sub.subscription_id, + "data": records, + "timestamp": time.time(), + }) + else: + # Live candle update — send only the last candle + last_record = df.iloc[-1].to_dict() + await self._send_json(websocket, { + "type": "candle_update", + "subscription_id": sub.subscription_id, + "data": last_record, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"Candles push error [{sub.subscription_id}]: {e}") + except asyncio.CancelledError: + pass + + async def _order_book_push_loop(self, websocket: WebSocket, sub: Subscription): + try: + while True: + await asyncio.sleep(sub.update_interval) + try: + ob = self._market_data_service.get_order_book(sub.connector, sub.trading_pair) + if ob is None: + continue + # Change detection via last_diff_uid (updates on every incremental diff) + uid = getattr(ob, "last_diff_uid", None) or getattr(ob, "snapshot_uid", None) + if uid is not None and sub.last_sent_ob_uid is not None and uid == sub.last_sent_ob_uid: + continue + if uid is not None: + sub.last_sent_ob_uid = uid + + snapshot = ob.snapshot + bids = snapshot[0].head(sub.depth)[["price", "amount"]].values.tolist() + asks = snapshot[1].head(sub.depth)[["price", "amount"]].values.tolist() + + await self._send_json(websocket, { + "type": "order_book", + "subscription_id": sub.subscription_id, + "data": {"bids": bids, "asks": asks}, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"Order book push error [{sub.subscription_id}]: {e}") + except asyncio.CancelledError: + pass + + # ==================== Trades ==================== + + def _attach_trade_listener(self, sub: Subscription): + ob = self._market_data_service.get_order_book(sub.connector, sub.trading_pair) + if ob is None: + logger.warning(f"No order book to attach trade listener for {sub.connector}/{sub.trading_pair}") + return + + def on_trade(event_tag: int, order_book, event: OrderBookTradeEvent): + if event.trading_pair == sub.trading_pair: + sub.trade_buffer.append({ + "timestamp": event.timestamp, + "price": float(event.price), + "amount": float(event.amount), + "side": event.type.name.lower(), + }) + + sub.event_forwarder = SourceInfoEventForwarder(on_trade) + ob.add_listener(OrderBookEvent.TradeEvent, sub.event_forwarder) + logger.info(f"Attached trade listener for {sub.connector}/{sub.trading_pair}") + + def _detach_trade_listener(self, sub: Subscription): + if not sub.event_forwarder: + return + try: + ob = self._market_data_service.get_order_book(sub.connector, sub.trading_pair) + if ob: + ob.remove_listener(OrderBookEvent.TradeEvent, sub.event_forwarder) + except Exception as e: + logger.error(f"Error detaching trade listener: {e}") + sub.event_forwarder = None + + async def _trades_push_loop(self, websocket: WebSocket, sub: Subscription): + try: + while True: + await asyncio.sleep(sub.update_interval) + if not sub.trade_buffer: + continue + try: + # Drain the buffer + trades = sub.trade_buffer[:] + sub.trade_buffer.clear() + await self._send_json(websocket, { + "type": "trades", + "subscription_id": sub.subscription_id, + "data": trades, + "timestamp": time.time(), + }) + except Exception as e: + logger.error(f"Trades push error [{sub.subscription_id}]: {e}") + except asyncio.CancelledError: + pass + + # ==================== Helpers ==================== + + @staticmethod + async def _send_json(websocket: WebSocket, data: dict): + await websocket.send_json(data) + + @staticmethod + async def _send_error(websocket: WebSocket, message: str): + await websocket.send_json({"type": "error", "message": message}) + + @staticmethod + def generate_connection_id() -> str: + return str(uuid.uuid4())[:8] From e4454b80a34af7846b62f0a71b4409fad8fe9ece Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sat, 28 Mar 2026 19:47:16 -0300 Subject: [PATCH 3/4] (feat) merge market data --- routers/websocket.py | 103 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 97 insertions(+), 6 deletions(-) diff --git a/routers/websocket.py b/routers/websocket.py index 41141d13..01bfc397 100644 --- a/routers/websocket.py +++ b/routers/websocket.py @@ -1,5 +1,5 @@ """ -WebSocket router for real-time executor data streaming. +WebSocket router for real-time market data and executor data streaming. """ import asyncio import base64 @@ -11,6 +11,7 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect from config import settings +from services.websocket_manager import WebSocketManager logger = logging.getLogger(__name__) @@ -37,9 +38,18 @@ def _authenticate_websocket(websocket: WebSocket) -> bool: except Exception: return False else: - # Fallback to query parameters - ws_user = websocket.query_params.get("username", "") - ws_pass = websocket.query_params.get("password", "") + # Fallback: ?token=base64(user:pass) query param + token = websocket.query_params.get("token") + if token: + try: + decoded = base64.b64decode(token).decode("utf-8") + ws_user, ws_pass = decoded.split(":", 1) + except Exception: + return False + else: + # Fallback to query parameters + ws_user = websocket.query_params.get("username", "") + ws_pass = websocket.query_params.get("password", "") correct_user = secrets.compare_digest( ws_user.encode(), settings.security.username.encode() @@ -65,13 +75,94 @@ async def _heartbeat_loop(websocket: WebSocket) -> None: pass +@router.websocket("/ws/market-data") +async def market_data_websocket(websocket: WebSocket) -> None: + """ + WebSocket endpoint for streaming market data. + + Authentication: Basic Auth via Authorization header, ?token=base64(user:pass), + or query params (?username=...&password=...). + + Subscribe/unsubscribe protocol: + -> {"action": "subscribe", "type": "candles", "connector": "binance", + "trading_pair": "BTC-USDT", "interval": "1m", "update_interval": 1.0} + <- {"type": "subscribed", "subscription_id": "candles_binance_BTC-USDT_1m"} + <- {"type": "candles", "subscription_id": "...", "data": [...], ...} + -> {"action": "unsubscribe", "subscription_id": "candles_binance_BTC-USDT_1m"} + <- {"type": "unsubscribed", "subscription_id": "..."} + + Subscription types: + - candles: streaming candle data for a trading pair + - order_book: order book snapshots with configurable depth + - trades: real-time trade events + """ + await websocket.accept() + + if not _authenticate_websocket(websocket): + await websocket.send_json({ + "type": "error", + "message": "Authentication failed", + }) + await websocket.close(code=4001, reason="Authentication failed") + return + + manager: WebSocketManager = websocket.app.state.websocket_manager + conn_id = manager.generate_connection_id() + + await websocket.send_json({ + "type": "connected", + "connection_id": conn_id, + "timestamp": time.time(), + }) + logger.info(f"[WS-MD] Client connected: {conn_id}") + + heartbeat_task = asyncio.create_task( + _heartbeat_loop(websocket), name=f"ws-md-hb-{conn_id}" + ) + + try: + while True: + msg = await websocket.receive_json() + action = msg.get("action") + + if action == "subscribe": + await manager.handle_subscribe(conn_id, websocket, msg) + elif action == "unsubscribe": + sub_id = msg.get("subscription_id") + if sub_id: + await manager.handle_unsubscribe(conn_id, websocket, sub_id) + else: + await websocket.send_json({ + "type": "error", + "message": "unsubscribe requires 'subscription_id'", + }) + elif action == "ping": + await websocket.send_json({ + "type": "pong", + "timestamp": time.time(), + }) + else: + await websocket.send_json({ + "type": "error", + "message": f"Unknown action: {action}. " + f"Valid actions: subscribe, unsubscribe, ping", + }) + except WebSocketDisconnect: + logger.info(f"[WS-MD] Client disconnected: {conn_id}") + except Exception as e: + logger.error(f"[WS-MD] Error for {conn_id}: {e}", exc_info=True) + finally: + heartbeat_task.cancel() + manager.remove_connection(conn_id) + + @router.websocket("/ws/executors") async def executors_websocket(websocket: WebSocket) -> None: """ WebSocket endpoint for streaming executor data. - Authentication: Basic Auth via Authorization header or query params - (?username=...&password=...). + Authentication: Basic Auth via Authorization header, ?token=base64(user:pass), + or query params (?username=...&password=...). Subscribe/unsubscribe protocol: -> {"action": "subscribe", "type": "executor_summary", "update_interval": 2.0} From a9b7d6ce607238b97e5544dd2e6077b3f471e7d1 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sun, 29 Mar 2026 20:29:43 -0300 Subject: [PATCH 4/4] (feat) add ws manager --- config.py | 29 +++++++++++++++++++++-------- deps.py | 6 ++++++ main.py | 13 +++++-------- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/config.py b/config.py index 5c2bc63b..d6550923 100644 --- a/config.py +++ b/config.py @@ -1,11 +1,12 @@ from typing import List + from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict class BrokerSettings(BaseSettings): """MQTT Broker configuration for bot communication.""" - + host: str = Field(default="localhost", description="MQTT broker host") port: int = Field(default=1883, description="MQTT broker port") username: str = Field(default="admin", description="MQTT broker username") @@ -16,7 +17,7 @@ class BrokerSettings(BaseSettings): class DatabaseSettings(BaseSettings): """Database configuration.""" - + url: str = Field( default="postgresql+asyncpg://hbot:hummingbot-api@localhost:5432/hummingbot_api", description="Database connection URL" @@ -27,7 +28,7 @@ class DatabaseSettings(BaseSettings): class MarketDataSettings(BaseSettings): """Market data feed manager configuration.""" - + cleanup_interval: int = Field( default=300, description="How often to run feed cleanup in seconds" @@ -40,13 +41,25 @@ class MarketDataSettings(BaseSettings): default=30, description="How long to wait for a candle feed to become ready in seconds" ) + ws_heartbeat_interval: int = Field( + default=30, + description="WebSocket heartbeat interval in seconds" + ) + ws_min_update_interval: float = Field( + default=0.25, + description="Minimum allowed WebSocket subscription update interval in seconds" + ) + ws_max_update_interval: float = Field( + default=60.0, + description="Maximum allowed WebSocket subscription update interval in seconds" + ) model_config = SettingsConfigDict(env_prefix="MARKET_DATA_", extra="ignore") class SecuritySettings(BaseSettings): """Security and authentication configuration.""" - + username: str = Field(default="admin", description="API basic auth username") password: str = Field(default="admin", description="API basic auth password") debug_mode: bool = Field(default=False, description="Enable debug mode (disables auth)") @@ -81,18 +94,18 @@ class GatewaySettings(BaseSettings): class AppSettings(BaseSettings): """Main application settings.""" - + # Static paths controllers_path: str = "bots/conf/controllers" controllers_module: str = "bots.controllers" password_verification_path: str = "credentials/master_account/.password_verification" - + # Environment-configurable settings logfire_environment: str = Field( default="dev", description="Logfire environment name" ) - + # Account state update interval account_update_interval: int = Field( default=5, @@ -117,7 +130,7 @@ class Settings(BaseSettings): aws: AWSSettings = Field(default_factory=AWSSettings) gateway: GatewaySettings = Field(default_factory=GatewaySettings) app: AppSettings = Field(default_factory=AppSettings) - + # Direct banned_tokens field to handle env parsing banned_tokens: List[str] = Field( default=["NAV", "ARS", "ETHW", "ETHF"], diff --git a/deps.py b/deps.py index 440bf778..dd44be38 100644 --- a/deps.py +++ b/deps.py @@ -10,6 +10,7 @@ from services.market_data_service import MarketDataService from services.trading_service import TradingService from services.unified_connector_service import UnifiedConnectorService +from services.websocket_manager import WebSocketManager from utils.bot_archiver import BotArchiver @@ -66,3 +67,8 @@ def get_database_manager(request: Request) -> AsyncDatabaseManager: def get_executor_ws_manager(request: Request) -> ExecutorWebSocketManager: """Get ExecutorWebSocketManager from app state.""" return request.app.state.executor_ws_manager + + +def get_websocket_manager(request: Request) -> WebSocketManager: + """Get WebSocketManager from app state.""" + return request.app.state.websocket_manager diff --git a/main.py b/main.py index 91df4816..1cb4ec99 100644 --- a/main.py +++ b/main.py @@ -69,6 +69,7 @@ def patched_save_to_yml(yml_path, cm): from services.market_data_service import MarketDataService # noqa: E402 from services.trading_service import TradingService # noqa: E402 from services.unified_connector_service import UnifiedConnectorService # noqa: E402 +from services.websocket_manager import WebSocketManager # noqa: E402 from utils.bot_archiver import BotArchiver # noqa: E402 from utils.security import BackendAPISecurity # noqa: E402 @@ -213,14 +214,6 @@ async def lifespan(app: FastAPI): max_retries=10 ) logging.info("ExecutorService initialized") - # Ensure lp_executor is in the registry (workspace hummingbot may load after class definition) - try: - from hummingbot.strategy_v2.executors.lp_executor.data_types import LPExecutorConfig - from hummingbot.strategy_v2.executors.lp_executor.lp_executor import LPExecutor - ExecutorService.EXECUTOR_REGISTRY["lp_executor"] = (LPExecutor, LPExecutorConfig) - logging.debug("lp_executor registered in ExecutorService") - except Exception as e: - logging.warning(f"Failed to register lp_executor: {e}") # ========================================================================= # 5. Other Services @@ -268,6 +261,9 @@ async def lifespan(app: FastAPI): app.state.trading_service = trading_service app.state.accounts_service = accounts_service app.state.executor_service = executor_service + websocket_manager = WebSocketManager(market_data_service) + app.state.websocket_manager = websocket_manager + app.state.bots_orchestrator = bots_orchestrator app.state.docker_service = docker_service app.state.gateway_service = gateway_service @@ -287,6 +283,7 @@ async def lifespan(app: FastAPI): logging.info("Shutting down services...") + websocket_manager.shutdown() await executor_ws_manager.shutdown() bots_orchestrator.stop() await accounts_service.stop()