Coverage for mlos_bench/mlos_bench/services/remote/azure/azure_vm_services.py: 98%
101 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""A collection Service functions for managing VMs on Azure."""
7import json
8import logging
9from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
11import requests
13from mlos_bench.environments.status import Status
14from mlos_bench.services.base_service import Service
15from mlos_bench.services.remote.azure.azure_deployment_services import (
16 AzureDeploymentService,
17)
18from mlos_bench.services.types.host_ops_type import SupportsHostOps
19from mlos_bench.services.types.host_provisioner_type import SupportsHostProvisioning
20from mlos_bench.services.types.os_ops_type import SupportsOSOps
21from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec
22from mlos_bench.util import merge_parameters
24_LOG = logging.getLogger(__name__)
27class AzureVMService(
28 AzureDeploymentService,
29 SupportsHostProvisioning,
30 SupportsHostOps,
31 SupportsOSOps,
32 SupportsRemoteExec,
33):
34 """Helper methods to manage VMs on Azure."""
36 # pylint: disable=too-many-ancestors
38 # Azure Compute REST API calls as described in
39 # https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines
41 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/start
42 _URL_START = (
43 "https://management.azure.com"
44 "/subscriptions/{subscription}"
45 "/resourceGroups/{resource_group}"
46 "/providers/Microsoft.Compute"
47 "/virtualMachines/{vm_name}"
48 "/start"
49 "?api-version=2022-03-01"
50 )
52 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/power-off
53 _URL_STOP = (
54 "https://management.azure.com"
55 "/subscriptions/{subscription}"
56 "/resourceGroups/{resource_group}"
57 "/providers/Microsoft.Compute"
58 "/virtualMachines/{vm_name}"
59 "/powerOff"
60 "?api-version=2022-03-01"
61 )
63 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/deallocate
64 _URL_DEALLOCATE = (
65 "https://management.azure.com"
66 "/subscriptions/{subscription}"
67 "/resourceGroups/{resource_group}"
68 "/providers/Microsoft.Compute"
69 "/virtualMachines/{vm_name}"
70 "/deallocate"
71 "?api-version=2022-03-01"
72 )
74 # TODO: This is probably the more correct URL to use for the deprovision operation.
75 # However, previous code used the deallocate URL above, so for now, we keep
76 # that and handle that change later.
77 # See Also: #498
78 _URL_DEPROVISION = _URL_DEALLOCATE
80 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/delete
81 # _URL_DEPROVISION = (
82 # "https://management.azure.com"
83 # "/subscriptions/{subscription}"
84 # "/resourceGroups/{resource_group}"
85 # "/providers/Microsoft.Compute"
86 # "/virtualMachines/{vm_name}"
87 # "/delete"
88 # "?api-version=2022-03-01"
89 # )
91 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/restart
92 _URL_REBOOT = (
93 "https://management.azure.com"
94 "/subscriptions/{subscription}"
95 "/resourceGroups/{resource_group}"
96 "/providers/Microsoft.Compute"
97 "/virtualMachines/{vm_name}"
98 "/restart"
99 "?api-version=2022-03-01"
100 )
102 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/run-command
103 _URL_REXEC_RUN = (
104 "https://management.azure.com"
105 "/subscriptions/{subscription}"
106 "/resourceGroups/{resource_group}"
107 "/providers/Microsoft.Compute"
108 "/virtualMachines/{vm_name}"
109 "/runCommand"
110 "?api-version=2022-03-01"
111 )
113 def __init__(
114 self,
115 config: Optional[Dict[str, Any]] = None,
116 global_config: Optional[Dict[str, Any]] = None,
117 parent: Optional[Service] = None,
118 methods: Union[Dict[str, Callable], List[Callable], None] = None,
119 ):
120 """
121 Create a new instance of Azure VM services proxy.
123 Parameters
124 ----------
125 config : dict
126 Free-format dictionary that contains the benchmark environment
127 configuration.
128 global_config : dict
129 Free-format dictionary of global parameters.
130 parent : Service
131 Parent service that can provide mixin functions.
132 methods : Union[Dict[str, Callable], List[Callable], None]
133 New methods to register with the service.
134 """
135 super().__init__(
136 config,
137 global_config,
138 parent,
139 self.merge_methods(
140 methods,
141 [
142 # SupportsHostProvisioning
143 self.provision_host,
144 self.deprovision_host,
145 self.deallocate_host,
146 self.wait_host_deployment,
147 # SupportsHostOps
148 self.start_host,
149 self.stop_host,
150 self.restart_host,
151 self.wait_host_operation,
152 # SupportsOSOps
153 self.shutdown,
154 self.reboot,
155 self.wait_os_operation,
156 # SupportsRemoteExec
157 self.remote_exec,
158 self.get_remote_exec_results,
159 ],
160 ),
161 )
163 # As a convenience, allow reading customData out of a file, rather than
164 # embedding it in a json config file.
165 # Note: ARM templates expect this data to be base64 encoded, but that
166 # can be done using the `base64()` string function inside the ARM template.
167 self._custom_data_file = self.config.get("customDataFile", None)
168 if self._custom_data_file:
169 if self._deploy_params.get("customData", None):
170 raise ValueError("Both customDataFile and customData are specified.")
171 self._custom_data_file = self.config_loader_service.resolve_path(
172 self._custom_data_file
173 )
174 with open(self._custom_data_file, "r", encoding="utf-8") as custom_data_fh:
175 self._deploy_params["customData"] = custom_data_fh.read()
177 def _set_default_params(self, params: dict) -> dict: # pylint: disable=no-self-use
178 # Try and provide a semi sane default for the deploymentName if not provided
179 # since this is a common way to set the deploymentName and can same some
180 # config work for the caller.
181 if "vmName" in params and "deploymentName" not in params:
182 params["deploymentName"] = f"{params['vmName']}-deployment"
183 _LOG.info(
184 "deploymentName missing from params. Defaulting to '%s'.",
185 params["deploymentName"],
186 )
187 return params
189 def wait_host_deployment(self, params: dict, *, is_setup: bool) -> Tuple[Status, dict]:
190 """
191 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED.
192 Return TIMED_OUT when timing out.
194 Parameters
195 ----------
196 params : dict
197 Flat dictionary of (key, value) pairs of tunable parameters.
198 is_setup : bool
199 If True, wait for VM being deployed; otherwise, wait for successful deprovisioning.
201 Returns
202 -------
203 result : (Status, dict)
204 A pair of Status and result.
205 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
206 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
207 """
208 return self._wait_deployment(params, is_setup=is_setup)
210 def wait_host_operation(self, params: dict) -> Tuple[Status, dict]:
211 """
212 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED.
213 Return TIMED_OUT when timing out.
215 Parameters
216 ----------
217 params: dict
218 Flat dictionary of (key, value) pairs of tunable parameters.
219 Must have the "asyncResultsUrl" key to get the results.
220 If the key is not present, return Status.PENDING.
222 Returns
223 -------
224 result : (Status, dict)
225 A pair of Status and result.
226 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
227 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
228 """
229 _LOG.info("Wait for operation on VM %s", params["vmName"])
230 # Try and provide a semi sane default for the deploymentName
231 params.setdefault(f"{params['vmName']}-deployment")
232 return self._wait_while(self._check_operation_status, Status.RUNNING, params)
234 def wait_os_operation(self, params: dict) -> Tuple["Status", dict]:
235 return self.wait_host_operation(params)
237 def provision_host(self, params: dict) -> Tuple[Status, dict]:
238 """
239 Check if Azure VM is ready. Deploy a new VM, if necessary.
241 Parameters
242 ----------
243 params : dict
244 Flat dictionary of (key, value) pairs of tunable parameters.
245 HostEnv tunables are variable parameters that, together with the
246 HostEnv configuration, are sufficient to provision a VM.
248 Returns
249 -------
250 result : (Status, dict={})
251 A pair of Status and result. The result is the input `params` plus the
252 parameters extracted from the response JSON, or {} if the status is FAILED.
253 Status is one of {PENDING, SUCCEEDED, FAILED}
254 """
255 return self._provision_resource(params)
257 def deprovision_host(self, params: dict) -> Tuple[Status, dict]:
258 """
259 Deprovisions the VM on Azure by deleting it.
261 Parameters
262 ----------
263 params : dict
264 Flat dictionary of (key, value) pairs of tunable parameters.
266 Returns
267 -------
268 result : (Status, dict={})
269 A pair of Status and result. The result is always {}.
270 Status is one of {PENDING, SUCCEEDED, FAILED}
271 """
272 params = self._set_default_params(params)
273 config = merge_parameters(
274 dest=self.config.copy(),
275 source=params,
276 required_keys=[
277 "subscription",
278 "resourceGroup",
279 "deploymentName",
280 "vmName",
281 ],
282 )
283 _LOG.info("Deprovision VM: %s", config["vmName"])
284 _LOG.info("Deprovision deployment: %s", config["deploymentName"])
285 # TODO: Properly deprovision *all* resources specified in the ARM template.
286 return self._azure_rest_api_post_helper(
287 config,
288 self._URL_DEPROVISION.format(
289 subscription=config["subscription"],
290 resource_group=config["resourceGroup"],
291 vm_name=config["vmName"],
292 ),
293 )
295 def deallocate_host(self, params: dict) -> Tuple[Status, dict]:
296 """
297 Deallocates the VM on Azure by shutting it down then releasing the compute
298 resources.
300 Note: This can cause the VM to arrive on a new host node when its
301 restarted, which may have different performance characteristics.
303 Parameters
304 ----------
305 params : dict
306 Flat dictionary of (key, value) pairs of tunable parameters.
308 Returns
309 -------
310 result : (Status, dict={})
311 A pair of Status and result. The result is always {}.
312 Status is one of {PENDING, SUCCEEDED, FAILED}
313 """
314 params = self._set_default_params(params)
315 config = merge_parameters(
316 dest=self.config.copy(),
317 source=params,
318 required_keys=[
319 "subscription",
320 "resourceGroup",
321 "vmName",
322 ],
323 )
324 _LOG.info("Deallocate VM: %s", config["vmName"])
325 return self._azure_rest_api_post_helper(
326 config,
327 self._URL_DEALLOCATE.format(
328 subscription=config["subscription"],
329 resource_group=config["resourceGroup"],
330 vm_name=config["vmName"],
331 ),
332 )
334 def start_host(self, params: dict) -> Tuple[Status, dict]:
335 """
336 Start the VM on Azure.
338 Parameters
339 ----------
340 params : dict
341 Flat dictionary of (key, value) pairs of tunable parameters.
343 Returns
344 -------
345 result : (Status, dict={})
346 A pair of Status and result. The result is always {}.
347 Status is one of {PENDING, SUCCEEDED, FAILED}
348 """
349 params = self._set_default_params(params)
350 config = merge_parameters(
351 dest=self.config.copy(),
352 source=params,
353 required_keys=[
354 "subscription",
355 "resourceGroup",
356 "vmName",
357 ],
358 )
359 _LOG.info("Start VM: %s :: %s", config["vmName"], params)
360 return self._azure_rest_api_post_helper(
361 config,
362 self._URL_START.format(
363 subscription=config["subscription"],
364 resource_group=config["resourceGroup"],
365 vm_name=config["vmName"],
366 ),
367 )
369 def stop_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]:
370 """
371 Stops the VM on Azure by initiating a graceful shutdown.
373 Parameters
374 ----------
375 params : dict
376 Flat dictionary of (key, value) pairs of tunable parameters.
377 force : bool
378 If True, force stop the Host/VM.
380 Returns
381 -------
382 result : (Status, dict={})
383 A pair of Status and result. The result is always {}.
384 Status is one of {PENDING, SUCCEEDED, FAILED}
385 """
386 params = self._set_default_params(params)
387 config = merge_parameters(
388 dest=self.config.copy(),
389 source=params,
390 required_keys=[
391 "subscription",
392 "resourceGroup",
393 "vmName",
394 ],
395 )
396 _LOG.info("Stop VM: %s", config["vmName"])
397 return self._azure_rest_api_post_helper(
398 config,
399 self._URL_STOP.format(
400 subscription=config["subscription"],
401 resource_group=config["resourceGroup"],
402 vm_name=config["vmName"],
403 ),
404 )
406 def shutdown(self, params: dict, force: bool = False) -> Tuple["Status", dict]:
407 return self.stop_host(params, force)
409 def restart_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]:
410 """
411 Reboot the VM on Azure by initiating a graceful shutdown.
413 Parameters
414 ----------
415 params : dict
416 Flat dictionary of (key, value) pairs of tunable parameters.
417 force : bool
418 If True, force restart the Host/VM.
420 Returns
421 -------
422 result : (Status, dict={})
423 A pair of Status and result. The result is always {}.
424 Status is one of {PENDING, SUCCEEDED, FAILED}
425 """
426 params = self._set_default_params(params)
427 config = merge_parameters(
428 dest=self.config.copy(),
429 source=params,
430 required_keys=[
431 "subscription",
432 "resourceGroup",
433 "vmName",
434 ],
435 )
436 _LOG.info("Reboot VM: %s", config["vmName"])
437 return self._azure_rest_api_post_helper(
438 config,
439 self._URL_REBOOT.format(
440 subscription=config["subscription"],
441 resource_group=config["resourceGroup"],
442 vm_name=config["vmName"],
443 ),
444 )
446 def reboot(self, params: dict, force: bool = False) -> Tuple["Status", dict]:
447 return self.restart_host(params, force)
449 def remote_exec(
450 self,
451 script: Iterable[str],
452 config: dict,
453 env_params: dict,
454 ) -> Tuple[Status, dict]:
455 """
456 Run a command on Azure VM.
458 Parameters
459 ----------
460 script : Iterable[str]
461 A list of lines to execute as a script on a remote VM.
462 config : dict
463 Flat dictionary of (key, value) pairs of the Environment parameters.
464 They usually come from `const_args` and `tunable_params`
465 properties of the Environment.
466 env_params : dict
467 Parameters to pass as *shell* environment variables into the script.
468 This is usually a subset of `config` with some possible conversions.
470 Returns
471 -------
472 result : (Status, dict)
473 A pair of Status and result.
474 Status is one of {PENDING, SUCCEEDED, FAILED}
475 """
476 config = self._set_default_params(config)
477 config = merge_parameters(
478 dest=self.config.copy(),
479 source=config,
480 required_keys=[
481 "subscription",
482 "resourceGroup",
483 "vmName",
484 ],
485 )
487 if _LOG.isEnabledFor(logging.INFO):
488 _LOG.info("Run a script on VM: %s\n %s", config["vmName"], "\n ".join(script))
490 json_req = {
491 "commandId": "RunShellScript",
492 "script": list(script),
493 "parameters": [{"name": key, "value": val} for (key, val) in env_params.items()],
494 }
496 url = self._URL_REXEC_RUN.format(
497 subscription=config["subscription"],
498 resource_group=config["resourceGroup"],
499 vm_name=config["vmName"],
500 )
502 if _LOG.isEnabledFor(logging.DEBUG):
503 _LOG.debug("Request: POST %s\n%s", url, json.dumps(json_req, indent=2))
505 response = requests.post(
506 url,
507 json=json_req,
508 headers=self._get_headers(),
509 timeout=self._request_timeout,
510 )
512 if _LOG.isEnabledFor(logging.DEBUG):
513 _LOG.debug(
514 "Response: %s\n%s",
515 response,
516 json.dumps(response.json(), indent=2) if response.content else "",
517 )
518 else:
519 _LOG.info("Response: %s", response)
521 if response.status_code == 200:
522 # TODO: extract the results from JSON response
523 return (Status.SUCCEEDED, config)
524 elif response.status_code == 202:
525 return (
526 Status.PENDING,
527 {**config, "asyncResultsUrl": response.headers.get("Azure-AsyncOperation")},
528 )
529 else:
530 _LOG.error("Response: %s :: %s", response, response.text)
531 # _LOG.error("Bad Request:\n%s", response.request.body)
532 return (Status.FAILED, {})
534 def get_remote_exec_results(self, config: dict) -> Tuple[Status, dict]:
535 """
536 Get the results of the asynchronously running command.
538 Parameters
539 ----------
540 config : dict
541 Flat dictionary of (key, value) pairs of tunable parameters.
542 Must have the "asyncResultsUrl" key to get the results.
543 If the key is not present, return Status.PENDING.
545 Returns
546 -------
547 result : (Status, dict)
548 A pair of Status and result.
549 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
550 A dict can have an "stdout" key with the remote output.
551 """
552 _LOG.info("Check the results on VM: %s", config.get("vmName"))
553 (status, result) = self.wait_host_operation(config)
554 _LOG.debug("Result: %s :: %s", status, result)
555 if not status.is_succeeded():
556 # TODO: Extract the telemetry and status from stdout, if available
557 return (status, result)
558 val = result.get("properties", {}).get("output", {}).get("value", [])
559 return (status, {"stdout": val[0].get("message", "")} if val else {})