Coverage for mlos_bench/mlos_bench/environments/remote/remote_env.py: 88%
64 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Remotely executed benchmark/script environment.
8e.g. Application Environment
9"""
11import logging
12from datetime import datetime
13from typing import Dict, Iterable, Optional, Tuple
15from pytz import UTC
17from mlos_bench.environments.script_env import ScriptEnv
18from mlos_bench.environments.status import Status
19from mlos_bench.services.base_service import Service
20from mlos_bench.services.types.host_ops_type import SupportsHostOps
21from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec
22from mlos_bench.tunables.tunable import TunableValue
23from mlos_bench.tunables.tunable_groups import TunableGroups
25_LOG = logging.getLogger(__name__)
28class RemoteEnv(ScriptEnv):
29 """
30 Environment to run benchmarks and scripts on a remote host OS.
32 e.g. Application Environment
33 """
35 def __init__( # pylint: disable=too-many-arguments
36 self,
37 *,
38 name: str,
39 config: dict,
40 global_config: Optional[dict] = None,
41 tunables: Optional[TunableGroups] = None,
42 service: Optional[Service] = None,
43 ):
44 """
45 Create a new environment for remote execution.
47 Parameters
48 ----------
49 name: str
50 Human-readable name of the environment.
51 config : dict
52 Free-format dictionary that contains the benchmark environment
53 configuration. Each config must have at least the "tunable_params"
54 and the "const_args" sections.
55 `RemoteEnv` must also have at least some of the following parameters:
56 {setup, run, teardown, wait_boot}
57 global_config : dict
58 Free-format dictionary of global parameters (e.g., security credentials)
59 to be mixed in into the "const_args" section of the local config.
60 tunables : TunableGroups
61 A collection of tunable parameters for *all* environments.
62 service: Service
63 An optional service object (e.g., providing methods to
64 deploy or reboot a Host, VM, OS, etc.).
65 """
66 super().__init__(
67 name=name,
68 config=config,
69 global_config=global_config,
70 tunables=tunables,
71 service=service,
72 )
74 self._wait_boot = self.config.get("wait_boot", False)
76 assert self._service is not None and isinstance(
77 self._service, SupportsRemoteExec
78 ), "RemoteEnv requires a service that supports remote execution operations"
79 self._remote_exec_service: SupportsRemoteExec = self._service
81 if self._wait_boot:
82 assert self._service is not None and isinstance(
83 self._service, SupportsHostOps
84 ), "RemoteEnv requires a service that supports host operations"
85 self._host_service: SupportsHostOps = self._service
87 def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -> bool:
88 """
89 Check if the environment is ready and set up the application and benchmarks on a
90 remote host.
92 Parameters
93 ----------
94 tunables : TunableGroups
95 A collection of tunable OS and application parameters along with their
96 values. Setting these parameters should not require an OS reboot.
97 global_config : dict
98 Free-format dictionary of global parameters of the environment
99 that are not used in the optimization process.
101 Returns
102 -------
103 is_success : bool
104 True if operation is successful, false otherwise.
105 """
106 if not super().setup(tunables, global_config):
107 return False
109 if self._wait_boot:
110 _LOG.info("Wait for the remote environment to start: %s", self)
111 (status, params) = self._host_service.start_host(self._params)
112 if status.is_pending():
113 (status, _) = self._host_service.wait_host_operation(params)
114 if not status.is_succeeded():
115 return False
117 if self._script_setup:
118 _LOG.info("Set up the remote environment: %s", self)
119 (status, _timestamp, _output) = self._remote_exec(self._script_setup)
120 _LOG.info("Remote set up complete: %s :: %s", self, status)
121 self._is_ready = status.is_succeeded()
122 else:
123 self._is_ready = True
125 return self._is_ready
127 def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
128 """
129 Runs the run script on the remote environment.
131 This can be used to, for instance, submit a new experiment to the
132 remote application environment by (re)configuring an application and
133 launching the benchmark, or run a script that collects the results.
135 Returns
136 -------
137 (status, timestamp, output) : (Status, datetime, dict)
138 3-tuple of (Status, timestamp, output) values, where `output` is a dict
139 with the results or None if the status is not COMPLETED.
140 If run script is a benchmark, then the score is usually expected to
141 be in the `score` field.
142 """
143 _LOG.info("Run script remotely on: %s", self)
144 (status, timestamp, _) = result = super().run()
145 if not (status.is_ready() and self._script_run):
146 return result
148 (status, timestamp, output) = self._remote_exec(self._script_run)
149 if status.is_succeeded() and output is not None:
150 output = self._extract_stdout_results(output.get("stdout", ""))
151 _LOG.info("Remote run complete: %s :: %s = %s", self, status, output)
152 return (status, timestamp, output)
154 def teardown(self) -> None:
155 """Clean up and shut down the remote environment."""
156 if self._script_teardown:
157 _LOG.info("Remote teardown: %s", self)
158 (status, _timestamp, _output) = self._remote_exec(self._script_teardown)
159 _LOG.info("Remote teardown complete: %s :: %s", self, status)
160 super().teardown()
162 def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optional[dict]]:
163 """
164 Run a script on the remote host.
166 Parameters
167 ----------
168 script : [str]
169 List of commands to be executed on the remote host.
171 Returns
172 -------
173 result : (Status, datetime, dict)
174 3-tuple of Status, timestamp, and dict with the benchmark/script results.
175 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
176 """
177 env_params = self._get_env_params()
178 _LOG.debug("Submit script: %s with %s", self, env_params)
179 (status, output) = self._remote_exec_service.remote_exec(
180 script,
181 config=self._params,
182 env_params=env_params,
183 )
184 _LOG.debug("Script submitted: %s %s :: %s", self, status, output)
185 if status in {Status.PENDING, Status.SUCCEEDED}:
186 (status, output) = self._remote_exec_service.get_remote_exec_results(output)
187 _LOG.debug("Status: %s :: %s", status, output)
188 # FIXME: get the timestamp from the remote environment!
189 timestamp = datetime.now(UTC)
190 return (status, timestamp, output)