Coverage for mlos_bench/mlos_bench/services/remote/azure/azure_vm_services.py: 98%

101 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-07 01:52 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""A collection Service functions for managing VMs on Azure.""" 

6 

7import json 

8import logging 

9from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union 

10 

11import requests 

12 

13from mlos_bench.environments.status import Status 

14from mlos_bench.services.base_service import Service 

15from mlos_bench.services.remote.azure.azure_deployment_services import ( 

16 AzureDeploymentService, 

17) 

18from mlos_bench.services.types.host_ops_type import SupportsHostOps 

19from mlos_bench.services.types.host_provisioner_type import SupportsHostProvisioning 

20from mlos_bench.services.types.os_ops_type import SupportsOSOps 

21from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec 

22from mlos_bench.util import merge_parameters 

23 

24_LOG = logging.getLogger(__name__) 

25 

26 

27class AzureVMService( 

28 AzureDeploymentService, 

29 SupportsHostProvisioning, 

30 SupportsHostOps, 

31 SupportsOSOps, 

32 SupportsRemoteExec, 

33): 

34 """Helper methods to manage VMs on Azure.""" 

35 

36 # pylint: disable=too-many-ancestors 

37 

38 # Azure Compute REST API calls as described in 

39 # https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines 

40 

41 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/start 

42 _URL_START = ( 

43 "https://management.azure.com" 

44 "/subscriptions/{subscription}" 

45 "/resourceGroups/{resource_group}" 

46 "/providers/Microsoft.Compute" 

47 "/virtualMachines/{vm_name}" 

48 "/start" 

49 "?api-version=2022-03-01" 

50 ) 

51 

52 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/power-off 

53 _URL_STOP = ( 

54 "https://management.azure.com" 

55 "/subscriptions/{subscription}" 

56 "/resourceGroups/{resource_group}" 

57 "/providers/Microsoft.Compute" 

58 "/virtualMachines/{vm_name}" 

59 "/powerOff" 

60 "?api-version=2022-03-01" 

61 ) 

62 

63 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/deallocate 

64 _URL_DEALLOCATE = ( 

65 "https://management.azure.com" 

66 "/subscriptions/{subscription}" 

67 "/resourceGroups/{resource_group}" 

68 "/providers/Microsoft.Compute" 

69 "/virtualMachines/{vm_name}" 

70 "/deallocate" 

71 "?api-version=2022-03-01" 

72 ) 

73 

74 # TODO: This is probably the more correct URL to use for the deprovision operation. 

75 # However, previous code used the deallocate URL above, so for now, we keep 

76 # that and handle that change later. 

77 # See Also: #498 

78 _URL_DEPROVISION = _URL_DEALLOCATE 

79 

80 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/delete 

81 # _URL_DEPROVISION = ( 

82 # "https://management.azure.com" 

83 # "/subscriptions/{subscription}" 

84 # "/resourceGroups/{resource_group}" 

85 # "/providers/Microsoft.Compute" 

86 # "/virtualMachines/{vm_name}" 

87 # "/delete" 

88 # "?api-version=2022-03-01" 

89 # ) 

90 

91 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/restart 

92 _URL_REBOOT = ( 

93 "https://management.azure.com" 

94 "/subscriptions/{subscription}" 

95 "/resourceGroups/{resource_group}" 

96 "/providers/Microsoft.Compute" 

97 "/virtualMachines/{vm_name}" 

98 "/restart" 

99 "?api-version=2022-03-01" 

100 ) 

101 

102 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/run-command 

103 _URL_REXEC_RUN = ( 

104 "https://management.azure.com" 

105 "/subscriptions/{subscription}" 

106 "/resourceGroups/{resource_group}" 

107 "/providers/Microsoft.Compute" 

108 "/virtualMachines/{vm_name}" 

109 "/runCommand" 

110 "?api-version=2022-03-01" 

111 ) 

112 

113 def __init__( 

114 self, 

115 config: Optional[Dict[str, Any]] = None, 

116 global_config: Optional[Dict[str, Any]] = None, 

117 parent: Optional[Service] = None, 

118 methods: Union[Dict[str, Callable], List[Callable], None] = None, 

119 ): 

120 """ 

121 Create a new instance of Azure VM services proxy. 

122 

123 Parameters 

124 ---------- 

125 config : dict 

126 Free-format dictionary that contains the benchmark environment 

127 configuration. 

128 global_config : dict 

129 Free-format dictionary of global parameters. 

130 parent : Service 

131 Parent service that can provide mixin functions. 

132 methods : Union[Dict[str, Callable], List[Callable], None] 

133 New methods to register with the service. 

134 """ 

135 super().__init__( 

136 config, 

137 global_config, 

138 parent, 

139 self.merge_methods( 

140 methods, 

141 [ 

142 # SupportsHostProvisioning 

143 self.provision_host, 

144 self.deprovision_host, 

145 self.deallocate_host, 

146 self.wait_host_deployment, 

147 # SupportsHostOps 

148 self.start_host, 

149 self.stop_host, 

150 self.restart_host, 

151 self.wait_host_operation, 

152 # SupportsOSOps 

153 self.shutdown, 

154 self.reboot, 

155 self.wait_os_operation, 

156 # SupportsRemoteExec 

157 self.remote_exec, 

158 self.get_remote_exec_results, 

159 ], 

160 ), 

161 ) 

162 

163 # As a convenience, allow reading customData out of a file, rather than 

164 # embedding it in a json config file. 

165 # Note: ARM templates expect this data to be base64 encoded, but that 

166 # can be done using the `base64()` string function inside the ARM template. 

167 self._custom_data_file = self.config.get("customDataFile", None) 

168 if self._custom_data_file: 

169 if self._deploy_params.get("customData", None): 

170 raise ValueError("Both customDataFile and customData are specified.") 

171 self._custom_data_file = self.config_loader_service.resolve_path( 

172 self._custom_data_file 

173 ) 

174 with open(self._custom_data_file, "r", encoding="utf-8") as custom_data_fh: 

175 self._deploy_params["customData"] = custom_data_fh.read() 

176 

177 def _set_default_params(self, params: dict) -> dict: # pylint: disable=no-self-use 

178 # Try and provide a semi sane default for the deploymentName if not provided 

179 # since this is a common way to set the deploymentName and can same some 

180 # config work for the caller. 

181 if "vmName" in params and "deploymentName" not in params: 

182 params["deploymentName"] = f"{params['vmName']}-deployment" 

183 _LOG.info( 

184 "deploymentName missing from params. Defaulting to '%s'.", 

185 params["deploymentName"], 

186 ) 

187 return params 

188 

189 def wait_host_deployment(self, params: dict, *, is_setup: bool) -> Tuple[Status, dict]: 

190 """ 

191 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED. 

192 Return TIMED_OUT when timing out. 

193 

194 Parameters 

195 ---------- 

196 params : dict 

197 Flat dictionary of (key, value) pairs of tunable parameters. 

198 is_setup : bool 

199 If True, wait for VM being deployed; otherwise, wait for successful deprovisioning. 

200 

201 Returns 

202 ------- 

203 result : (Status, dict) 

204 A pair of Status and result. 

205 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

206 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

207 """ 

208 return self._wait_deployment(params, is_setup=is_setup) 

209 

210 def wait_host_operation(self, params: dict) -> Tuple[Status, dict]: 

211 """ 

212 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED. 

213 Return TIMED_OUT when timing out. 

214 

215 Parameters 

216 ---------- 

217 params: dict 

218 Flat dictionary of (key, value) pairs of tunable parameters. 

219 Must have the "asyncResultsUrl" key to get the results. 

220 If the key is not present, return Status.PENDING. 

221 

222 Returns 

223 ------- 

224 result : (Status, dict) 

225 A pair of Status and result. 

226 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

227 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

228 """ 

229 _LOG.info("Wait for operation on VM %s", params["vmName"]) 

230 # Try and provide a semi sane default for the deploymentName 

231 params.setdefault(f"{params['vmName']}-deployment") 

232 return self._wait_while(self._check_operation_status, Status.RUNNING, params) 

233 

234 def wait_os_operation(self, params: dict) -> Tuple["Status", dict]: 

235 return self.wait_host_operation(params) 

236 

237 def provision_host(self, params: dict) -> Tuple[Status, dict]: 

238 """ 

239 Check if Azure VM is ready. Deploy a new VM, if necessary. 

240 

241 Parameters 

242 ---------- 

243 params : dict 

244 Flat dictionary of (key, value) pairs of tunable parameters. 

245 HostEnv tunables are variable parameters that, together with the 

246 HostEnv configuration, are sufficient to provision a VM. 

247 

248 Returns 

249 ------- 

250 result : (Status, dict={}) 

251 A pair of Status and result. The result is the input `params` plus the 

252 parameters extracted from the response JSON, or {} if the status is FAILED. 

253 Status is one of {PENDING, SUCCEEDED, FAILED} 

254 """ 

255 return self._provision_resource(params) 

256 

257 def deprovision_host(self, params: dict) -> Tuple[Status, dict]: 

258 """ 

259 Deprovisions the VM on Azure by deleting it. 

260 

261 Parameters 

262 ---------- 

263 params : dict 

264 Flat dictionary of (key, value) pairs of tunable parameters. 

265 

266 Returns 

267 ------- 

268 result : (Status, dict={}) 

269 A pair of Status and result. The result is always {}. 

270 Status is one of {PENDING, SUCCEEDED, FAILED} 

271 """ 

272 params = self._set_default_params(params) 

273 config = merge_parameters( 

274 dest=self.config.copy(), 

275 source=params, 

276 required_keys=[ 

277 "subscription", 

278 "resourceGroup", 

279 "deploymentName", 

280 "vmName", 

281 ], 

282 ) 

283 _LOG.info("Deprovision VM: %s", config["vmName"]) 

284 _LOG.info("Deprovision deployment: %s", config["deploymentName"]) 

285 # TODO: Properly deprovision *all* resources specified in the ARM template. 

286 return self._azure_rest_api_post_helper( 

287 config, 

288 self._URL_DEPROVISION.format( 

289 subscription=config["subscription"], 

290 resource_group=config["resourceGroup"], 

291 vm_name=config["vmName"], 

292 ), 

293 ) 

294 

295 def deallocate_host(self, params: dict) -> Tuple[Status, dict]: 

296 """ 

297 Deallocates the VM on Azure by shutting it down then releasing the compute 

298 resources. 

299 

300 Note: This can cause the VM to arrive on a new host node when its 

301 restarted, which may have different performance characteristics. 

302 

303 Parameters 

304 ---------- 

305 params : dict 

306 Flat dictionary of (key, value) pairs of tunable parameters. 

307 

308 Returns 

309 ------- 

310 result : (Status, dict={}) 

311 A pair of Status and result. The result is always {}. 

312 Status is one of {PENDING, SUCCEEDED, FAILED} 

313 """ 

314 params = self._set_default_params(params) 

315 config = merge_parameters( 

316 dest=self.config.copy(), 

317 source=params, 

318 required_keys=[ 

319 "subscription", 

320 "resourceGroup", 

321 "vmName", 

322 ], 

323 ) 

324 _LOG.info("Deallocate VM: %s", config["vmName"]) 

325 return self._azure_rest_api_post_helper( 

326 config, 

327 self._URL_DEALLOCATE.format( 

328 subscription=config["subscription"], 

329 resource_group=config["resourceGroup"], 

330 vm_name=config["vmName"], 

331 ), 

332 ) 

333 

334 def start_host(self, params: dict) -> Tuple[Status, dict]: 

335 """ 

336 Start the VM on Azure. 

337 

338 Parameters 

339 ---------- 

340 params : dict 

341 Flat dictionary of (key, value) pairs of tunable parameters. 

342 

343 Returns 

344 ------- 

345 result : (Status, dict={}) 

346 A pair of Status and result. The result is always {}. 

347 Status is one of {PENDING, SUCCEEDED, FAILED} 

348 """ 

349 params = self._set_default_params(params) 

350 config = merge_parameters( 

351 dest=self.config.copy(), 

352 source=params, 

353 required_keys=[ 

354 "subscription", 

355 "resourceGroup", 

356 "vmName", 

357 ], 

358 ) 

359 _LOG.info("Start VM: %s :: %s", config["vmName"], params) 

360 return self._azure_rest_api_post_helper( 

361 config, 

362 self._URL_START.format( 

363 subscription=config["subscription"], 

364 resource_group=config["resourceGroup"], 

365 vm_name=config["vmName"], 

366 ), 

367 ) 

368 

369 def stop_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]: 

370 """ 

371 Stops the VM on Azure by initiating a graceful shutdown. 

372 

373 Parameters 

374 ---------- 

375 params : dict 

376 Flat dictionary of (key, value) pairs of tunable parameters. 

377 force : bool 

378 If True, force stop the Host/VM. 

379 

380 Returns 

381 ------- 

382 result : (Status, dict={}) 

383 A pair of Status and result. The result is always {}. 

384 Status is one of {PENDING, SUCCEEDED, FAILED} 

385 """ 

386 params = self._set_default_params(params) 

387 config = merge_parameters( 

388 dest=self.config.copy(), 

389 source=params, 

390 required_keys=[ 

391 "subscription", 

392 "resourceGroup", 

393 "vmName", 

394 ], 

395 ) 

396 _LOG.info("Stop VM: %s", config["vmName"]) 

397 return self._azure_rest_api_post_helper( 

398 config, 

399 self._URL_STOP.format( 

400 subscription=config["subscription"], 

401 resource_group=config["resourceGroup"], 

402 vm_name=config["vmName"], 

403 ), 

404 ) 

405 

406 def shutdown(self, params: dict, force: bool = False) -> Tuple["Status", dict]: 

407 return self.stop_host(params, force) 

408 

409 def restart_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]: 

410 """ 

411 Reboot the VM on Azure by initiating a graceful shutdown. 

412 

413 Parameters 

414 ---------- 

415 params : dict 

416 Flat dictionary of (key, value) pairs of tunable parameters. 

417 force : bool 

418 If True, force restart the Host/VM. 

419 

420 Returns 

421 ------- 

422 result : (Status, dict={}) 

423 A pair of Status and result. The result is always {}. 

424 Status is one of {PENDING, SUCCEEDED, FAILED} 

425 """ 

426 params = self._set_default_params(params) 

427 config = merge_parameters( 

428 dest=self.config.copy(), 

429 source=params, 

430 required_keys=[ 

431 "subscription", 

432 "resourceGroup", 

433 "vmName", 

434 ], 

435 ) 

436 _LOG.info("Reboot VM: %s", config["vmName"]) 

437 return self._azure_rest_api_post_helper( 

438 config, 

439 self._URL_REBOOT.format( 

440 subscription=config["subscription"], 

441 resource_group=config["resourceGroup"], 

442 vm_name=config["vmName"], 

443 ), 

444 ) 

445 

446 def reboot(self, params: dict, force: bool = False) -> Tuple["Status", dict]: 

447 return self.restart_host(params, force) 

448 

449 def remote_exec( 

450 self, 

451 script: Iterable[str], 

452 config: dict, 

453 env_params: dict, 

454 ) -> Tuple[Status, dict]: 

455 """ 

456 Run a command on Azure VM. 

457 

458 Parameters 

459 ---------- 

460 script : Iterable[str] 

461 A list of lines to execute as a script on a remote VM. 

462 config : dict 

463 Flat dictionary of (key, value) pairs of the Environment parameters. 

464 They usually come from `const_args` and `tunable_params` 

465 properties of the Environment. 

466 env_params : dict 

467 Parameters to pass as *shell* environment variables into the script. 

468 This is usually a subset of `config` with some possible conversions. 

469 

470 Returns 

471 ------- 

472 result : (Status, dict) 

473 A pair of Status and result. 

474 Status is one of {PENDING, SUCCEEDED, FAILED} 

475 """ 

476 config = self._set_default_params(config) 

477 config = merge_parameters( 

478 dest=self.config.copy(), 

479 source=config, 

480 required_keys=[ 

481 "subscription", 

482 "resourceGroup", 

483 "vmName", 

484 ], 

485 ) 

486 

487 if _LOG.isEnabledFor(logging.INFO): 

488 _LOG.info("Run a script on VM: %s\n %s", config["vmName"], "\n ".join(script)) 

489 

490 json_req = { 

491 "commandId": "RunShellScript", 

492 "script": list(script), 

493 "parameters": [{"name": key, "value": val} for (key, val) in env_params.items()], 

494 } 

495 

496 url = self._URL_REXEC_RUN.format( 

497 subscription=config["subscription"], 

498 resource_group=config["resourceGroup"], 

499 vm_name=config["vmName"], 

500 ) 

501 

502 if _LOG.isEnabledFor(logging.DEBUG): 

503 _LOG.debug("Request: POST %s\n%s", url, json.dumps(json_req, indent=2)) 

504 

505 response = requests.post( 

506 url, 

507 json=json_req, 

508 headers=self._get_headers(), 

509 timeout=self._request_timeout, 

510 ) 

511 

512 if _LOG.isEnabledFor(logging.DEBUG): 

513 _LOG.debug( 

514 "Response: %s\n%s", 

515 response, 

516 json.dumps(response.json(), indent=2) if response.content else "", 

517 ) 

518 else: 

519 _LOG.info("Response: %s", response) 

520 

521 if response.status_code == 200: 

522 # TODO: extract the results from JSON response 

523 return (Status.SUCCEEDED, config) 

524 elif response.status_code == 202: 

525 return ( 

526 Status.PENDING, 

527 {**config, "asyncResultsUrl": response.headers.get("Azure-AsyncOperation")}, 

528 ) 

529 else: 

530 _LOG.error("Response: %s :: %s", response, response.text) 

531 # _LOG.error("Bad Request:\n%s", response.request.body) 

532 return (Status.FAILED, {}) 

533 

534 def get_remote_exec_results(self, config: dict) -> Tuple[Status, dict]: 

535 """ 

536 Get the results of the asynchronously running command. 

537 

538 Parameters 

539 ---------- 

540 config : dict 

541 Flat dictionary of (key, value) pairs of tunable parameters. 

542 Must have the "asyncResultsUrl" key to get the results. 

543 If the key is not present, return Status.PENDING. 

544 

545 Returns 

546 ------- 

547 result : (Status, dict) 

548 A pair of Status and result. 

549 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

550 A dict can have an "stdout" key with the remote output. 

551 """ 

552 _LOG.info("Check the results on VM: %s", config.get("vmName")) 

553 (status, result) = self.wait_host_operation(config) 

554 _LOG.debug("Result: %s :: %s", status, result) 

555 if not status.is_succeeded(): 

556 # TODO: Extract the telemetry and status from stdout, if available 

557 return (status, result) 

558 val = result.get("properties", {}).get("output", {}).get("value", []) 

559 return (status, {"stdout": val[0].get("message", "")} if val else {})