diff --git a/packages/agent/src/netdriver_agent/client/session.py b/packages/agent/src/netdriver_agent/client/session.py index c205658..1904e46 100755 --- a/packages/agent/src/netdriver_agent/client/session.py +++ b/packages/agent/src/netdriver_agent/client/session.py @@ -497,7 +497,23 @@ async def exec_cmd_in_vsys_and_mode(self, command: str, vsys: str = None, mode: line = lines[i].strip() output += await self.exec_cmd(line) i += 1 - return output + return output + + async def switch_vsys_by_mode(self, command: str, mode: Mode = None) -> str: + """ + Switch VSYS by mode and output + """ + output = "" + if mode and self._mode != mode: + output += await self.switch_mode(mode) + lines = command.splitlines() + line_size = len(lines) + i = 0 + while i < line_size: + line = lines[i].strip() + output += await self.exec_cmd(line) + i += 1 + return output async def send_cmd(self, command: str, vsys: str = None, mode: Mode = None, timeout: float = 10, catch_error: bool = True, detail_output: bool = True) -> CmdTaskResult: diff --git a/packages/agent/src/netdriver_agent/plugins/array/array_ag.py b/packages/agent/src/netdriver_agent/plugins/array/array_ag.py index bba5d7b..7fe0203 100644 --- a/packages/agent/src/netdriver_agent/plugins/array/array_ag.py +++ b/packages/agent/src/netdriver_agent/plugins/array/array_ag.py @@ -37,18 +37,15 @@ async def switch_vsys(self, vsys: str) -> str: self._logger.info(f"Switching vsys: {self._vsys} -> {vsys}") output = "" - # Already in the target vsys - if vsys == self._vsys: - return output ret : str if vsys == ArrayBase._DEFAULT_VSYS: # Switch to default vsys - ret = await self.exec_cmd_in_vsys_and_mode("exit", mode=Mode.ENABLE) + ret = await self.switch_vsys_by_mode("exit", mode=Mode.ENABLE) output += ret else: # Switch to target vsys - ret = await self.exec_cmd_in_vsys_and_mode(f"switch {vsys}", mode=Mode.ENABLE) + ret = await self.switch_vsys_by_mode(f"switch {vsys}", mode=Mode.ENABLE) output += ret # Check if there is any error diff --git a/packages/agent/src/netdriver_agent/plugins/fortinet/fortinet_fortigate.py b/packages/agent/src/netdriver_agent/plugins/fortinet/fortinet_fortigate.py index dc9c3f0..88e8608 100644 --- a/packages/agent/src/netdriver_agent/plugins/fortinet/fortinet_fortigate.py +++ b/packages/agent/src/netdriver_agent/plugins/fortinet/fortinet_fortigate.py @@ -44,22 +44,19 @@ async def switch_vsys(self, vsys: str) -> str: self._logger.info(f"Switching vsys: {self._vsys} -> {vsys}") output = "" - # Already in the target vsys - if vsys == self._vsys: - return output ret: str if vsys == self._DEFAULT_VSYS: # vsys -> default - ret = await self.exec_cmd_in_vsys_and_mode("end", mode=Mode.ENABLE) + ret = await self.switch_vsys_by_mode("end", mode=Mode.ENABLE) output += ret elif self._vsys == self._DEFAULT_VSYS: # default -> vsys - ret = await self.exec_cmd_in_vsys_and_mode(f"config vdom\nedit {vsys}", mode=Mode.ENABLE) + ret = await self.switch_vsys_by_mode(f"config vdom\nedit {vsys}", mode=Mode.ENABLE) output += ret else: # vsys1 -> vsys2 - ret = await self.exec_cmd_in_vsys_and_mode(f"next\nedit {vsys}", mode=Mode.ENABLE) + ret = await self.switch_vsys_by_mode(f"next\nedit {vsys}", mode=Mode.ENABLE) output += ret # check errors diff --git a/packages/agent/src/netdriver_agent/plugins/huawei/huawei_usg.py b/packages/agent/src/netdriver_agent/plugins/huawei/huawei_usg.py index 2b3c12a..240e8d0 100644 --- a/packages/agent/src/netdriver_agent/plugins/huawei/huawei_usg.py +++ b/packages/agent/src/netdriver_agent/plugins/huawei/huawei_usg.py @@ -38,21 +38,10 @@ def decide_current_vsys(self, prompt: str): async def switch_vsys(self, vsys: str) -> str: self._logger.info(f"Switching vsys: {self._vsys} -> {vsys}") - output = "" - # Already in the target vsys - if vsys == self._vsys: - return output - - ret: str - if vsys == self._DEFAULT_VSYS: - ret = await self.exec_cmd_in_vsys_and_mode("quit", mode=Mode.ENABLE) - output += ret - else: - ret = await self.exec_cmd_in_vsys_and_mode(f"switch vsys {vsys}", mode=Mode.CONFIG) - output += ret + output = await self.switch_vsys_by_mode(f"switch vsys {vsys}", mode=Mode.CONFIG) # check errors - err = utils.regex.catch_error_of_output(ret, + err = utils.regex.catch_error_of_output(output, self.get_error_patterns(), self.get_ignore_error_patterns()) if err: @@ -60,5 +49,6 @@ async def switch_vsys(self, vsys: str) -> str: raise SwitchVsysFailed(err, output=output) self._vsys = vsys + self._mode = Mode.ENABLE self._logger.info(f"Switched vsys to: {self._vsys}") return output diff --git a/packages/agent/src/netdriver_agent/plugins/maipu/maipu.py b/packages/agent/src/netdriver_agent/plugins/maipu/maipu.py index c851f17..7474a12 100644 --- a/packages/agent/src/netdriver_agent/plugins/maipu/maipu.py +++ b/packages/agent/src/netdriver_agent/plugins/maipu/maipu.py @@ -28,7 +28,7 @@ def get_error_patterns(self) -> list[re.Pattern]: def get_ignore_error_patterns(self) -> list[re.Pattern]: return MaiPuBase.PatternHelper.get_ignore_error_patterns() - def get_enable_password_pattern(self) -> re.Pattern: + def get_enable_password_prompt_pattern(self) -> re.Pattern: return MaiPuBase.PatternHelper.get_enable_password_prompt_pattern() def get_mode_prompt_patterns(self) -> dict[Mode, re.Pattern]: diff --git a/packages/agent/src/netdriver_agent/plugins/venustech/venustech.py b/packages/agent/src/netdriver_agent/plugins/venustech/venustech.py index 45face4..7b8ac17 100644 --- a/packages/agent/src/netdriver_agent/plugins/venustech/venustech.py +++ b/packages/agent/src/netdriver_agent/plugins/venustech/venustech.py @@ -27,7 +27,7 @@ def get_error_patterns(self) -> list[re.Pattern]: def get_ignore_error_patterns(self) -> list[re.Pattern]: return VenustechBase.PatternHelper.get_ignore_error_patterns() - def get_enable_password_pattern(self) -> re.Pattern: + def get_enable_password_prompt_pattern(self) -> re.Pattern: return VenustechBase.PatternHelper.get_enable_password_prompt_pattern() def get_more_pattern(self) -> tuple[re.Pattern, str]: diff --git a/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.py b/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.py index 3de80e8..ed7781b 100644 --- a/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.py +++ b/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.py @@ -32,17 +32,71 @@ def __init__(self, process: SSHServerProcess, conf_path: str = None): conf_path = f"{cwd_path}/fortinet_fortigate.yml" self.conf_path = conf_path super().__init__(process) + self.level = [] + self.level_type = [] + + @property + def prompt(self) -> str: + prompt = self.config.hostname + if self.level: + prompt = f'{prompt} ({self.level[-1]})' + return prompt + self.config.modes[self._mode].prompt + + def switch_level(self, command: str) -> bool: + if command.startswith('config'): + commands = command.split(' ') + self.level_type.append(commands[0]) + self.level.append(commands[-1]) + elif command.startswith('edit'): + if self.level_type and self.level_type[-1] == 'config': + commands = command.split(' ') + self.level_type.append(commands[0]) + self.level.append(commands[-1]) + else: + return False + elif command == 'end': + if self.level_type: + is_edit = False + if self.level_type[-1] == 'edit': + is_edit = True + self.level.pop() + self.level_type.pop() + if is_edit: + self.level.pop() + self.level_type.pop() + else: + return False + elif command == 'next': + if self.level_type and self.level_type[-1] == 'edit': + self.level.pop() + self.level_type.pop() + else: + return False + elif command == 'exit' and self.level: + return False + return True + + def exec_cmd_in_mode(self, command: str) -> str: + """ Execute command in current mode """ + self._logger.info(f"Exec [{command} in {self._mode}]") + if command in self.config.modes[self._mode].cmd_map: + if not self.switch_level(command): + return self.config.invalid_cmd_error + return self.config.modes[self._mode].cmd_map[command] + elif command in self.config.common_cmd_map: + if not self.switch_level(command): + return self.config.invalid_cmd_error + return self.config.common_cmd_map[command] + else: + return self.config.invalid_cmd_error async def switch_vsys(self, command: str) -> bool: return False async def switch_mode(self, command: str) -> bool: - if command not in self.config.modes[self._mode].switch_mode_cmds: - return False - match self._mode: case Mode.ENABLE: - if command == "exit": + if command == "exit" and not self.level: # logout raise ClientExit case _: diff --git a/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.yml b/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.yml index 01e1134..4078dab 100644 --- a/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.yml +++ b/packages/simunet/src/netdriver_simunet/server/handlers/fortinet/fortinet_fortigate.yml @@ -5,15 +5,28 @@ line_feed: "\r\n" modes: enable: prompt: " # " - switch_mode_cmds: - - "exit" + switch_mode_cmds: [] cmds: + - cmd: "config global" + output: "" - cmd: "config system console" output: "" - cmd: "set output standard" output: "" - cmd: "end" output: "" + - cmd: "config vdom" + output: "" + - cmd: "edit root" + output: "" + - cmd: "next" + output: "" + - cmd: "config firewall policy" + output: "" + - cmd: "edit 1" + output: "" + - cmd: "config firewall address" + output: "" common: - cmd: "show full-configuration" output: | diff --git a/tests/integration/test_fortinet_fortigate.py b/tests/integration/test_fortinet_fortigate.py index 4287d6e..f4ac844 100755 --- a/tests/integration/test_fortinet_fortigate.py +++ b/tests/integration/test_fortinet_fortigate.py @@ -38,3 +38,66 @@ async def test_pull_config(test_client: TestClient, fortinet_fortigate_dev: dict assert response.headers.get("x-correlation-id") == trace_id assert not response.json().get("err_msg") assert len(response.json().get("result")) == 1 + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_switch_vsys(test_client: TestClient, fortinet_fortigate_dev: dict): + trace_id = uuid4().hex + response = test_client.post("/api/v1/cmd", headers={"x-correlation-id": trace_id}, json={ + "protocol": fortinet_fortigate_dev.get("protocol"), + "ip": fortinet_fortigate_dev.get("ip"), + "port": fortinet_fortigate_dev.get("port"), + "username": fortinet_fortigate_dev.get("username"), + "password": fortinet_fortigate_dev.get("password"), + "enable_password": fortinet_fortigate_dev.get("enable_password"), + "vendor": "fortinet", + "model": "fortigate", + "version": "7.2", + "encode": "utf-8", + "vsys": "default", + "commands": [ + { + "type": "raw", + "mode": "enable", + "command": "show full-configuration", + "template": "" + } + ], + "timeout": 10 + }) + + assert response.status_code == 200 + assert response.json().get("code") == "OK" + assert response.headers.get("x-correlation-id") == trace_id + assert not response.json().get("err_msg") + assert len(response.json().get("result")) == 1 + + response = test_client.post("/api/v1/cmd", headers={"x-correlation-id": trace_id}, json={ + "protocol": fortinet_fortigate_dev.get("protocol"), + "ip": fortinet_fortigate_dev.get("ip"), + "port": fortinet_fortigate_dev.get("port"), + "username": fortinet_fortigate_dev.get("username"), + "password": fortinet_fortigate_dev.get("password"), + "enable_password": fortinet_fortigate_dev.get("enable_password"), + "vendor": "fortinet", + "model": "fortigate", + "version": "7.2", + "encode": "utf-8", + "vsys": "root", + "commands": [ + { + "type": "raw", + "mode": "enable", + "command": "show full-configuration", + "template": "" + } + ], + "timeout": 10 + }) + + assert response.status_code == 200 + assert response.json().get("code") == "OK" + assert response.headers.get("x-correlation-id") == trace_id + assert not response.json().get("err_msg") + assert len(response.json().get("result")) == 1 \ No newline at end of file