Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ venv.bak/

.vscode
.idea
.cursor

# custom
*.pkl
Expand Down
63 changes: 63 additions & 0 deletions ms_agent/a2a/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""A2A (Agent-to-Agent) protocol support for ms-agent.

This package provides:

- **Server**: ``MSAgentA2AExecutor`` bridges A2A requests to ms-agent's
agent runtime, allowing ms-agent to be called by remote A2A clients.
- **Client**: ``A2AClientManager`` sends messages to remote A2A agents
over HTTP, enabling ms-agent to delegate work to external agents.
- **Agent Card**: ``build_agent_card`` / ``generate_agent_card_json``
produce the A2A discovery document from ms-agent config.

All SDK-dependent imports are lazy so the package can be imported even
when ``a2a-sdk`` is not installed (the tools and CLI will gracefully
degrade).
"""

from .client import A2AClientManager
from .errors import (A2AServerError, AgentLoadError, ConfigError, LLMError,
MaxTasksError, RateLimitError, TaskNotFoundError,
wrap_a2a_error)
from .session_store import A2AAgentStore, A2ATaskEntry
from .translator import (a2a_message_to_ms_messages, collect_full_response,
extract_text_from_a2a_message, ms_messages_to_text)


def __getattr__(name):
"""Lazy-load SDK-dependent symbols on first access."""
if name == 'MSAgentA2AExecutor':
from .executor import MSAgentA2AExecutor
return MSAgentA2AExecutor
if name == 'configure_a2a_logging':
from .executor import configure_a2a_logging
return configure_a2a_logging
if name == 'build_agent_card':
from .agent_card import build_agent_card
return build_agent_card
if name == 'generate_agent_card_json':
from .agent_card import generate_agent_card_json
return generate_agent_card_json
raise AttributeError(f'module {__name__!r} has no attribute {name!r}')


__all__ = [
'A2AAgentStore',
'A2AClientManager',
'A2AServerError',
'A2ATaskEntry',
'AgentLoadError',
'ConfigError',
'LLMError',
'MSAgentA2AExecutor',
'MaxTasksError',
'RateLimitError',
'TaskNotFoundError',
'a2a_message_to_ms_messages',
'build_agent_card',
'collect_full_response',
'configure_a2a_logging',
'extract_text_from_a2a_message',
'generate_agent_card_json',
'ms_messages_to_text',
'wrap_a2a_error',
]
114 changes: 114 additions & 0 deletions ms_agent/a2a/agent_card.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os
from typing import Any, Dict, List

import json
from ms_agent.utils.logger import get_logger

logger = get_logger()

_DEFAULT_VERSION = '0.1.0'


def build_agent_card(
config_path: str | None = None,
host: str = '0.0.0.0',
port: int = 5000,
version: str = _DEFAULT_VERSION,
title: str = 'MS-Agent',
description: str = ('Lightweight framework for empowering agents '
'with autonomous exploration'),
skills: list[dict] | None = None,
) -> dict:
"""Build an A2A ``AgentCard`` dict from ms-agent config.

The returned dict matches the A2A AgentCard schema and can be passed
directly to ``a2a.types.AgentCard(**card_dict)`` or serialised to JSON.
"""
from a2a.types import (
AgentCard,
AgentCapabilities,
AgentSkill,
)

resolved_host = host if host != '0.0.0.0' else 'localhost'
url = f'http://{resolved_host}:{port}/'

if config_path and os.path.exists(config_path):
try:
from ms_agent.config.config import Config
config = Config.from_task(config_path)
cfg_desc = getattr(config, 'description', None)
if cfg_desc:
description = str(cfg_desc)
cfg_name = getattr(config, 'name', None)
if cfg_name:
title = str(cfg_name)
except Exception:
logger.debug(
'Could not load config for agent card metadata', exc_info=True)

skill_list: list[AgentSkill] = []
if skills:
for s in skills:
skill_list.append(
AgentSkill(
id=s.get('id', 'general'),
name=s.get('name', title),
description=s.get('description', description),
tags=s.get('tags', []),
examples=s.get('examples', []),
))
else:
skill_list.append(
AgentSkill(
id='general',
name=title,
description=description,
tags=['general', 'agent'],
examples=['Help me research a topic'],
))

card = AgentCard(
name=title.lower().replace(' ', '-'),
description=description,
url=url,
version=version,
capabilities=AgentCapabilities(streaming=True),
skills=skill_list,
defaultInputModes=['text'],
defaultOutputModes=['text'],
)
return card


def generate_agent_card_json(
config_path: str | None = None,
output_path: str = 'agent-card.json',
host: str = '0.0.0.0',
port: int = 5000,
version: str = _DEFAULT_VERSION,
title: str = 'MS-Agent',
description: str = ('Lightweight framework for empowering agents '
'with autonomous exploration'),
skills: list[dict] | None = None,
) -> dict:
"""Build an agent card and optionally write it to disk as JSON."""
card = build_agent_card(
config_path=config_path,
host=host,
port=port,
version=version,
title=title,
description=description,
skills=skills,
)

card_dict = card.model_dump(by_alias=True, exclude_none=True)

if output_path:
abs_path = os.path.abspath(output_path)
with open(abs_path, 'w') as f:
json.dump(card_dict, f, indent=2)
logger.info('A2A Agent Card written to %s', abs_path)

return card_dict
129 changes: 129 additions & 0 deletions ms_agent/a2a/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
from typing import Any, Dict, List, Optional

import httpx
from ms_agent.utils.logger import get_logger

logger = get_logger()


class A2AClientManager:
"""Lifecycle manager for remote A2A agent connections.

Each configured agent (from ``a2a_agents`` in the YAML config) is
represented by its URL. Connections use HTTP via ``httpx`` and the
A2A SDK's ``ClientFactory``.
"""

def __init__(self, a2a_agents_config: dict | None = None):
self._config: Dict[str, dict] = a2a_agents_config or {}
self._http_client: Optional[httpx.AsyncClient] = None

def _get_http_client(self) -> httpx.AsyncClient:
if self._http_client is None or self._http_client.is_closed:
self._http_client = httpx.AsyncClient(timeout=300.0)
return self._http_client

async def call_agent(
self,
agent_name: str,
query: str,
) -> str:
"""Send a message to a remote A2A agent and return the text response.

Discovers the agent via its Agent Card, then sends a message using
the A2A SDK client. Supports both streaming and non-streaming
responses.
"""
cfg = self._config.get(agent_name)
if cfg is None:
return f'Error: A2A agent "{agent_name}" not configured'

url = cfg.get('url', '')
if not url:
return f'Error: A2A agent "{agent_name}" has no URL configured'

try:
from a2a.client import (
A2ACardResolver,
ClientConfig,
ClientFactory,
)
from a2a.client.helpers import create_text_message_object

http_client = self._get_http_client()

auth_headers = self._build_auth_headers(cfg)
if auth_headers:
http_client = httpx.AsyncClient(
timeout=300.0, headers=auth_headers)
Comment on lines +58 to +59
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A new httpx.AsyncClient is created here but never closed, which will lead to resource leaks (file descriptors and sockets). Additionally, creating a new client for every request is inefficient. Consider using a context manager or reusing the existing self._http_client by passing headers to the request methods if the SDK allows it.


resolver = A2ACardResolver(httpx_client=http_client, base_url=url)
card = await resolver.get_agent_card()

factory = ClientFactory(
config=ClientConfig(httpx_client=http_client))
client = factory.create(card)

message = create_text_message_object(content=query)
result_parts: list[str] = []

async for event in client.send_message(message):
if hasattr(event, 'parts'):
for part in event.parts:
part_obj = part
if hasattr(part, 'root'):
part_obj = part.root
if hasattr(part_obj, 'text'):
result_parts.append(part_obj.text)
elif isinstance(event, tuple) and len(event) == 2:
task, update = event
if update and hasattr(update, 'status'):
status = update.status
msg = getattr(status, 'message', None)
if msg and hasattr(msg, 'parts'):
for part in msg.parts:
part_obj = part
if hasattr(part, 'root'):
part_obj = part.root
if hasattr(part_obj, 'text'):
result_parts.append(part_obj.text)
if task and hasattr(task, 'artifacts'):
for artifact in (task.artifacts or []):
for part in (artifact.parts or []):
part_obj = part
if hasattr(part, 'root'):
part_obj = part.root
if hasattr(part_obj, 'text'):
result_parts.append(part_obj.text)

return '\n'.join(result_parts) if result_parts else '(no output)'

except Exception as e:
logger.error(
'A2A call to %s failed: %s', agent_name, e, exc_info=True)
return f'Error calling A2A agent "{agent_name}": {e}'

@staticmethod
def _build_auth_headers(cfg: dict) -> dict[str, str]:
"""Build authentication headers from agent config."""
auth = cfg.get('auth')
if not auth:
return {}

auth_type = auth.get('type', '').lower()
if auth_type == 'bearer':
token_env = auth.get('token_env', '')
token = auth.get('token', '') or os.environ.get(token_env, '')
if token:
return {'Authorization': f'Bearer {token}'}

return {}

def list_agents(self) -> List[str]:
return list(self._config.keys())

async def close_all(self) -> None:
if self._http_client and not self._http_client.is_closed:
await self._http_client.aclose()
self._http_client = None
80 changes: 80 additions & 0 deletions ms_agent/a2a/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from ms_agent.utils.logger import get_logger

logger = get_logger()


class A2AServerError(Exception):
"""Base exception for A2A server-side errors in ms-agent."""

def __init__(self, code: int, message: str, data: dict | None = None):
self.code = code
self.message = message
self.data = data or {}
super().__init__(message)


class TaskNotFoundError(A2AServerError):

def __init__(self, task_id: str):
super().__init__(-32001, 'Task not found', {'taskId': task_id})


class AgentLoadError(A2AServerError):

def __init__(self, detail: str):
super().__init__(-32002, 'Failed to load agent', {'detail': detail})


class LLMError(A2AServerError):

def __init__(self, detail: str):
super().__init__(-32003, 'LLM generation failed', {'detail': detail})


class RateLimitError(A2AServerError):

def __init__(self, detail: str = ''):
super().__init__(-32004, 'Rate limit exceeded', {'detail': detail})


class ConfigError(A2AServerError):

def __init__(self, detail: str):
super().__init__(-32005, 'Invalid configuration', {'detail': detail})


class MaxTasksError(A2AServerError):

def __init__(self, max_tasks: int):
super().__init__(-32006, 'Maximum concurrent tasks reached',
{'max': max_tasks})


_EXCEPTION_MAP: list[tuple[type, int, str]] = [
(FileNotFoundError, -32002, 'Resource not found'),
(PermissionError, -32000, 'Permission denied'),
(TimeoutError, -32004, 'Request timed out'),
(ValueError, -32602, 'Invalid params'),
]


def wrap_a2a_error(exc: Exception) -> dict:
"""Convert an ms-agent exception into a JSON-RPC-style error dict.

Returns a dict with ``code``, ``message``, and ``data`` keys suitable
for logging or constructing an A2A ``ServerError``.
"""
if isinstance(exc, A2AServerError):
return {'code': exc.code, 'message': exc.message, 'data': exc.data}

for exc_type, code, msg in _EXCEPTION_MAP:
if isinstance(exc, exc_type):
return {'code': code, 'message': msg, 'data': {'detail': str(exc)}}

return {
'code': -32603,
'message': 'Internal error',
'data': {
'detail': str(exc)
}
}
Loading
Loading