Coverage for mlos_bench/mlos_bench/services/config_persistence.py: 95%

157 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-07 01:52 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Helper functions to load, instantiate, and serialize Python objects that encapsulate 

6benchmark environments, tunable parameters, and service functions. 

7""" 

8 

9import json # For logging only 

10import logging 

11import os 

12import sys 

13from typing import ( 

14 TYPE_CHECKING, 

15 Any, 

16 Callable, 

17 Dict, 

18 Iterable, 

19 List, 

20 Optional, 

21 Tuple, 

22 Union, 

23) 

24 

25import json5 # To read configs with comments and other JSON5 syntax features 

26from jsonschema import SchemaError, ValidationError 

27 

28from mlos_bench.config.schemas import ConfigSchema 

29from mlos_bench.environments.base_environment import Environment 

30from mlos_bench.optimizers.base_optimizer import Optimizer 

31from mlos_bench.services.base_service import Service 

32from mlos_bench.services.types.config_loader_type import SupportsConfigLoading 

33from mlos_bench.tunables.tunable import TunableValue 

34from mlos_bench.tunables.tunable_groups import TunableGroups 

35from mlos_bench.util import ( 

36 instantiate_from_config, 

37 merge_parameters, 

38 path_join, 

39 preprocess_dynamic_configs, 

40) 

41 

42if sys.version_info < (3, 10): 

43 from importlib_resources import files 

44else: 

45 from importlib.resources import files 

46 

47if TYPE_CHECKING: 

48 from mlos_bench.schedulers.base_scheduler import Scheduler 

49 from mlos_bench.storage.base_storage import Storage 

50 

51 

52_LOG = logging.getLogger(__name__) 

53 

54 

55class ConfigPersistenceService(Service, SupportsConfigLoading): 

56 """Collection of methods to deserialize the Environment, Service, and TunableGroups 

57 objects. 

58 """ 

59 

60 BUILTIN_CONFIG_PATH = str(files("mlos_bench.config").joinpath("")).replace("\\", "/") 

61 

62 def __init__( 

63 self, 

64 config: Optional[Dict[str, Any]] = None, 

65 global_config: Optional[Dict[str, Any]] = None, 

66 parent: Optional[Service] = None, 

67 methods: Union[Dict[str, Callable], List[Callable], None] = None, 

68 ): 

69 """ 

70 Create a new instance of config persistence service. 

71 

72 Parameters 

73 ---------- 

74 config : dict 

75 Free-format dictionary that contains parameters for the service. 

76 (E.g., root path for config files, etc.) 

77 global_config : dict 

78 Free-format dictionary of global parameters. 

79 parent : Service 

80 An optional parent service that can provide mixin functions. 

81 methods : Union[Dict[str, Callable], List[Callable], None] 

82 New methods to register with the service. 

83 """ 

84 super().__init__( 

85 config, 

86 global_config, 

87 parent, 

88 self.merge_methods( 

89 methods, 

90 [ 

91 self.resolve_path, 

92 self.load_config, 

93 self.prepare_class_load, 

94 self.build_service, 

95 self.build_environment, 

96 self.load_services, 

97 self.load_environment, 

98 self.load_environment_list, 

99 ], 

100 ), 

101 ) 

102 self._config_loader_service = self 

103 

104 # Normalize and deduplicate config paths, but maintain order. 

105 self._config_path: List[str] = [] 

106 for path in self.config.get("config_path", []): 

107 if path not in self._config_path: 

108 self._config_path.append(path_join(path, abs_path=True)) 

109 # Prepend the cwd if not already on the list. 

110 cwd = path_join(os.getcwd(), abs_path=True) 

111 if cwd not in self._config_path: 

112 self._config_path.insert(0, cwd) 

113 # Append the built-in config path if not already on the list. 

114 if self.BUILTIN_CONFIG_PATH not in self._config_path: 

115 self._config_path.append(self.BUILTIN_CONFIG_PATH) 

116 

117 @property 

118 def config_paths(self) -> List[str]: 

119 """ 

120 Gets the list of config paths this service will search for config files. 

121 

122 Returns 

123 ------- 

124 List[str] 

125 """ 

126 return list(self._config_path) # make a copy to avoid modifications 

127 

128 def resolve_path(self, file_path: str, extra_paths: Optional[Iterable[str]] = None) -> str: 

129 """ 

130 Prepend the suitable `_config_path` to `path` if the latter is not absolute. If 

131 `_config_path` is `None` or `path` is absolute, return `path` as is. 

132 

133 Parameters 

134 ---------- 

135 file_path : str 

136 Path to the input config file. 

137 extra_paths : Iterable[str] 

138 Additional directories to prepend to the list of search paths. 

139 

140 Returns 

141 ------- 

142 path : str 

143 An actual path to the config or script. 

144 """ 

145 path_list = list(extra_paths or []) + self._config_path 

146 _LOG.debug("Resolve path: %s in: %s", file_path, path_list) 

147 if os.path.isabs(file_path): 

148 _LOG.debug("Path is absolute: %s", file_path) 

149 return file_path 

150 for path in path_list: 

151 full_path = path_join(path, file_path, abs_path=True) 

152 if os.path.exists(full_path): 

153 _LOG.debug("Path resolved: %s", full_path) 

154 return full_path 

155 _LOG.debug("Path not resolved: %s", file_path) 

156 return file_path 

157 

158 def load_config( 

159 self, 

160 json_file_name: str, 

161 schema_type: Optional[ConfigSchema], 

162 ) -> Dict[str, Any]: 

163 """ 

164 Load JSON config file. Search for a file relative to `_config_path` if the input 

165 path is not absolute. This method is exported to be used as a service. 

166 

167 Parameters 

168 ---------- 

169 json_file_name : str 

170 Path to the input config file. 

171 schema_type : Optional[ConfigSchema] 

172 The schema type to validate the config against. 

173 

174 Returns 

175 ------- 

176 config : Union[dict, List[dict]] 

177 Free-format dictionary that contains the configuration. 

178 """ 

179 json_file_name = self.resolve_path(json_file_name) 

180 _LOG.info("Load config: %s", json_file_name) 

181 with open(json_file_name, mode="r", encoding="utf-8") as fh_json: 

182 config = json5.load(fh_json) 

183 if schema_type is not None: 

184 try: 

185 schema_type.validate(config) 

186 except (ValidationError, SchemaError) as ex: 

187 _LOG.error( 

188 "Failed to validate config %s against schema type %s at %s", 

189 json_file_name, 

190 schema_type.name, 

191 schema_type.value, 

192 ) 

193 raise ValueError( 

194 f"Failed to validate config {json_file_name} against " 

195 f"schema type {schema_type.name} at {schema_type.value}" 

196 ) from ex 

197 if isinstance(config, dict) and config.get("$schema"): 

198 # Remove $schema attributes from the config after we've validated 

199 # them to avoid passing them on to other objects 

200 # (e.g. SqlAlchemy based storage initializers). 

201 # NOTE: we only do this for internal schemas. 

202 # Other configs that get loaded may need the schema field 

203 # (e.g. Azure ARM templates). 

204 del config["$schema"] 

205 else: 

206 _LOG.warning("Config %s is not validated against a schema.", json_file_name) 

207 return config # type: ignore[no-any-return] 

208 

209 def prepare_class_load( 

210 self, 

211 config: Dict[str, Any], 

212 global_config: Optional[Dict[str, Any]] = None, 

213 parent_args: Optional[Dict[str, TunableValue]] = None, 

214 ) -> Tuple[str, Dict[str, Any]]: 

215 """ 

216 Extract the class instantiation parameters from the configuration. Mix-in the 

217 global parameters and resolve the local file system paths, where it is required. 

218 

219 Parameters 

220 ---------- 

221 config : dict 

222 Configuration of the optimizer. 

223 global_config : dict 

224 Global configuration parameters (optional). 

225 parent_args : Dict[str, TunableValue] 

226 An optional reference of the parent CompositeEnv's const_args used to 

227 expand dynamic config parameters from. 

228 

229 Returns 

230 ------- 

231 (class_name, class_config) : (str, dict) 

232 Name of the class to instantiate and its configuration. 

233 """ 

234 class_name = config["class"] 

235 class_config = config.setdefault("config", {}) 

236 

237 # Replace any appearance of "$param_name" in the const_arg values with 

238 # the value from the parent CompositeEnv. 

239 # Note: we could consider expanding this feature to additional config 

240 # sections in the future, but for now only use it in const_args. 

241 if class_name.startswith("mlos_bench.environments."): 

242 const_args = class_config.get("const_args", {}) 

243 preprocess_dynamic_configs(dest=const_args, source=parent_args) 

244 

245 merge_parameters(dest=class_config, source=global_config) 

246 

247 for key in set(class_config).intersection(config.get("resolve_config_property_paths", [])): 

248 if isinstance(class_config[key], str): 

249 class_config[key] = self.resolve_path(class_config[key]) 

250 elif isinstance(class_config[key], (list, tuple)): 

251 class_config[key] = [self.resolve_path(path) for path in class_config[key]] 

252 else: 

253 raise ValueError(f"Parameter {key} must be a string or a list") 

254 

255 if _LOG.isEnabledFor(logging.DEBUG): 

256 _LOG.debug( 

257 "Instantiating: %s with config:\n%s", 

258 class_name, 

259 json.dumps(class_config, indent=2), 

260 ) 

261 

262 return (class_name, class_config) 

263 

264 def build_optimizer( 

265 self, 

266 *, 

267 tunables: TunableGroups, 

268 service: Service, 

269 config: Dict[str, Any], 

270 global_config: Optional[Dict[str, Any]] = None, 

271 ) -> Optimizer: 

272 """ 

273 Instantiation of mlos_bench Optimizer that depend on Service and TunableGroups. 

274 

275 A class *MUST* have a constructor that takes four named arguments: 

276 (tunables, config, global_config, service) 

277 

278 Parameters 

279 ---------- 

280 tunables : TunableGroups 

281 Tunable parameters of the environment. We need them to validate the 

282 configurations of merged-in experiments and restored/pending trials. 

283 service: Service 

284 An optional service object (e.g., providing methods to load config files, etc.) 

285 config : dict 

286 Configuration of the class to instantiate, as loaded from JSON. 

287 global_config : dict 

288 Global configuration parameters (optional). 

289 

290 Returns 

291 ------- 

292 inst : Optimizer 

293 A new instance of the `Optimizer` class. 

294 """ 

295 tunables_path = config.get("include_tunables") 

296 if tunables_path is not None: 

297 tunables = self._load_tunables(tunables_path, tunables) 

298 (class_name, class_config) = self.prepare_class_load(config, global_config) 

299 inst = instantiate_from_config( 

300 Optimizer, # type: ignore[type-abstract] 

301 class_name, 

302 tunables=tunables, 

303 config=class_config, 

304 global_config=global_config, 

305 service=service, 

306 ) 

307 _LOG.info("Created: Optimizer %s", inst) 

308 return inst 

309 

310 def build_storage( 

311 self, 

312 *, 

313 service: Service, 

314 config: Dict[str, Any], 

315 global_config: Optional[Dict[str, Any]] = None, 

316 ) -> "Storage": 

317 """ 

318 Instantiation of mlos_bench Storage objects. 

319 

320 Parameters 

321 ---------- 

322 service: Service 

323 An optional service object (e.g., providing methods to load config files, etc.) 

324 config : dict 

325 Configuration of the class to instantiate, as loaded from JSON. 

326 global_config : dict 

327 Global configuration parameters (optional). 

328 

329 Returns 

330 ------- 

331 inst : Storage 

332 A new instance of the Storage class. 

333 """ 

334 (class_name, class_config) = self.prepare_class_load(config, global_config) 

335 # pylint: disable=import-outside-toplevel 

336 from mlos_bench.storage.base_storage import Storage 

337 

338 inst = instantiate_from_config( 

339 Storage, # type: ignore[type-abstract] 

340 class_name, 

341 config=class_config, 

342 global_config=global_config, 

343 service=service, 

344 ) 

345 _LOG.info("Created: Storage %s", inst) 

346 return inst 

347 

348 def build_scheduler( # pylint: disable=too-many-arguments 

349 self, 

350 *, 

351 config: Dict[str, Any], 

352 global_config: Dict[str, Any], 

353 environment: Environment, 

354 optimizer: Optimizer, 

355 storage: "Storage", 

356 root_env_config: str, 

357 ) -> "Scheduler": 

358 """ 

359 Instantiation of mlos_bench Scheduler. 

360 

361 Parameters 

362 ---------- 

363 config : dict 

364 Configuration of the class to instantiate, as loaded from JSON. 

365 global_config : dict 

366 Global configuration parameters. 

367 environment : Environment 

368 The environment to benchmark/optimize. 

369 optimizer : Optimizer 

370 The optimizer to use. 

371 storage : Storage 

372 The storage to use. 

373 root_env_config : str 

374 Path to the root environment configuration. 

375 

376 Returns 

377 ------- 

378 inst : Scheduler 

379 A new instance of the Scheduler. 

380 """ 

381 (class_name, class_config) = self.prepare_class_load(config, global_config) 

382 # pylint: disable=import-outside-toplevel 

383 from mlos_bench.schedulers.base_scheduler import Scheduler 

384 

385 inst = instantiate_from_config( 

386 Scheduler, # type: ignore[type-abstract] 

387 class_name, 

388 config=class_config, 

389 global_config=global_config, 

390 environment=environment, 

391 optimizer=optimizer, 

392 storage=storage, 

393 root_env_config=root_env_config, 

394 ) 

395 _LOG.info("Created: Scheduler %s", inst) 

396 return inst 

397 

398 def build_environment( # pylint: disable=too-many-arguments 

399 self, 

400 config: Dict[str, Any], 

401 tunables: TunableGroups, 

402 global_config: Optional[Dict[str, Any]] = None, 

403 parent_args: Optional[Dict[str, TunableValue]] = None, 

404 service: Optional[Service] = None, 

405 ) -> Environment: 

406 """ 

407 Factory method for a new environment with a given config. 

408 

409 Parameters 

410 ---------- 

411 config : dict 

412 A dictionary with three mandatory fields: 

413 "name": Human-readable string describing the environment; 

414 "class": FQN of a Python class to instantiate; 

415 "config": Free-format dictionary to pass to the constructor. 

416 tunables : TunableGroups 

417 A (possibly empty) collection of groups of tunable parameters for 

418 all environments. 

419 global_config : dict 

420 Global parameters to add to the environment config. 

421 parent_args : Dict[str, TunableValue] 

422 An optional reference of the parent CompositeEnv's const_args used to 

423 expand dynamic config parameters from. 

424 service: Service 

425 An optional service object (e.g., providing methods to 

426 deploy or reboot a VM, etc.). 

427 

428 Returns 

429 ------- 

430 env : Environment 

431 An instance of the `Environment` class initialized with `config`. 

432 """ 

433 env_name = config["name"] 

434 (env_class, env_config) = self.prepare_class_load(config, global_config, parent_args) 

435 

436 env_services_path = config.get("include_services") 

437 if env_services_path is not None: 

438 service = self.load_services(env_services_path, global_config, service) 

439 

440 env_tunables_path = config.get("include_tunables") 

441 if env_tunables_path is not None: 

442 tunables = self._load_tunables(env_tunables_path, tunables) 

443 

444 _LOG.debug("Creating env: %s :: %s", env_name, env_class) 

445 env = Environment.new( 

446 env_name=env_name, 

447 class_name=env_class, 

448 config=env_config, 

449 global_config=global_config, 

450 tunables=tunables, 

451 service=service, 

452 ) 

453 

454 _LOG.info("Created env: %s :: %s", env_name, env) 

455 return env 

456 

457 def _build_standalone_service( 

458 self, 

459 config: Dict[str, Any], 

460 global_config: Optional[Dict[str, Any]] = None, 

461 parent: Optional[Service] = None, 

462 ) -> Service: 

463 """ 

464 Factory method for a new service with a given config. 

465 

466 Parameters 

467 ---------- 

468 config : dict 

469 A dictionary with two mandatory fields: 

470 "class": FQN of a Python class to instantiate; 

471 "config": Free-format dictionary to pass to the constructor. 

472 global_config : dict 

473 Global parameters to add to the service config. 

474 parent: Service 

475 An optional reference of the parent service to mix in. 

476 

477 Returns 

478 ------- 

479 svc : Service 

480 An instance of the `Service` class initialized with `config`. 

481 """ 

482 (svc_class, svc_config) = self.prepare_class_load(config, global_config) 

483 service = Service.new(svc_class, svc_config, global_config, parent) 

484 _LOG.info("Created service: %s", service) 

485 return service 

486 

487 def _build_composite_service( 

488 self, 

489 config_list: Iterable[Dict[str, Any]], 

490 global_config: Optional[Dict[str, Any]] = None, 

491 parent: Optional[Service] = None, 

492 ) -> Service: 

493 """ 

494 Factory method for a new service with a given config. 

495 

496 Parameters 

497 ---------- 

498 config_list : a list of dict 

499 A list where each element is a dictionary with 2 mandatory fields: 

500 "class": FQN of a Python class to instantiate; 

501 "config": Free-format dictionary to pass to the constructor. 

502 global_config : dict 

503 Global parameters to add to the service config. 

504 parent: Service 

505 An optional reference of the parent service to mix in. 

506 

507 Returns 

508 ------- 

509 svc : Service 

510 An instance of the `Service` class that is a combination of all 

511 services from the list plus the parent mix-in. 

512 """ 

513 service = Service() 

514 if parent: 

515 service.register(parent.export()) 

516 

517 for config in config_list: 

518 service.register( 

519 self._build_standalone_service(config, global_config, service).export() 

520 ) 

521 

522 if _LOG.isEnabledFor(logging.DEBUG): 

523 _LOG.debug("Created mix-in service: %s", service) 

524 

525 return service 

526 

527 def build_service( 

528 self, 

529 config: Dict[str, Any], 

530 global_config: Optional[Dict[str, Any]] = None, 

531 parent: Optional[Service] = None, 

532 ) -> Service: 

533 """ 

534 Factory method for a new service with a given config. 

535 

536 Parameters 

537 ---------- 

538 config : dict 

539 A dictionary with 2 mandatory fields: 

540 "class": FQN of a Python class to instantiate; 

541 "config": Free-format dictionary to pass to the constructor. 

542 global_config : dict 

543 Global parameters to add to the service config. 

544 parent: Service 

545 An optional reference of the parent service to mix in. 

546 

547 Returns 

548 ------- 

549 svc : Service 

550 An instance of the `Service` class that is a combination of all 

551 services from the list plus the parent mix-in. 

552 """ 

553 if _LOG.isEnabledFor(logging.DEBUG): 

554 _LOG.debug("Build service from config:\n%s", json.dumps(config, indent=2)) 

555 

556 assert isinstance(config, dict) 

557 config_list: List[Dict[str, Any]] 

558 if "class" not in config: 

559 # Top level config is a simple object with a list of services 

560 config_list = config["services"] 

561 else: 

562 # Top level config is a single service 

563 if parent is None: 

564 return self._build_standalone_service(config, global_config) 

565 config_list = [config] 

566 

567 return self._build_composite_service(config_list, global_config, parent) 

568 

569 def load_environment( # pylint: disable=too-many-arguments 

570 self, 

571 json_file_name: str, 

572 tunables: TunableGroups, 

573 global_config: Optional[Dict[str, Any]] = None, 

574 parent_args: Optional[Dict[str, TunableValue]] = None, 

575 service: Optional[Service] = None, 

576 ) -> Environment: 

577 """ 

578 Load and build new environment from the config file. 

579 

580 Parameters 

581 ---------- 

582 json_file_name : str 

583 The environment JSON configuration file. 

584 tunables : TunableGroups 

585 A (possibly empty) collection of tunables to add to the environment. 

586 global_config : dict 

587 Global parameters to add to the environment config. 

588 parent_args : Dict[str, TunableValue] 

589 An optional reference of the parent CompositeEnv's const_args used to 

590 expand dynamic config parameters from. 

591 service : Service 

592 An optional reference of the parent service to mix in. 

593 

594 Returns 

595 ------- 

596 env : Environment 

597 A new benchmarking environment. 

598 """ 

599 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT) 

600 assert isinstance(config, dict) 

601 return self.build_environment(config, tunables, global_config, parent_args, service) 

602 

603 def load_environment_list( # pylint: disable=too-many-arguments 

604 self, 

605 json_file_name: str, 

606 tunables: TunableGroups, 

607 global_config: Optional[Dict[str, Any]] = None, 

608 parent_args: Optional[Dict[str, TunableValue]] = None, 

609 service: Optional[Service] = None, 

610 ) -> List[Environment]: 

611 """ 

612 Load and build a list of environments from the config file. 

613 

614 Parameters 

615 ---------- 

616 json_file_name : str 

617 The environment JSON configuration file. 

618 Can contain either one environment or a list of environments. 

619 tunables : TunableGroups 

620 An (possibly empty) collection of tunables to add to the environment. 

621 global_config : dict 

622 Global parameters to add to the environment config. 

623 service : Service 

624 An optional reference of the parent service to mix in. 

625 parent_args : Dict[str, TunableValue] 

626 An optional reference of the parent CompositeEnv's const_args used to 

627 expand dynamic config parameters from. 

628 

629 Returns 

630 ------- 

631 env : List[Environment] 

632 A list of new benchmarking environments. 

633 """ 

634 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT) 

635 return [self.build_environment(config, tunables, global_config, parent_args, service)] 

636 

637 def load_services( 

638 self, 

639 json_file_names: Iterable[str], 

640 global_config: Optional[Dict[str, Any]] = None, 

641 parent: Optional[Service] = None, 

642 ) -> Service: 

643 """ 

644 Read the configuration files and bundle all service methods from those configs 

645 into a single Service object. 

646 

647 Parameters 

648 ---------- 

649 json_file_names : list of str 

650 A list of service JSON configuration files. 

651 global_config : dict 

652 Global parameters to add to the service config. 

653 parent : Service 

654 An optional reference of the parent service to mix in. 

655 

656 Returns 

657 ------- 

658 service : Service 

659 A collection of service methods. 

660 """ 

661 _LOG.info("Load services: %s parent: %s", json_file_names, parent.__class__.__name__) 

662 service = Service({}, global_config, parent) 

663 for fname in json_file_names: 

664 config = self.load_config(fname, ConfigSchema.SERVICE) 

665 service.register(self.build_service(config, global_config, service).export()) 

666 return service 

667 

668 def _load_tunables( 

669 self, 

670 json_file_names: Iterable[str], 

671 parent: TunableGroups, 

672 ) -> TunableGroups: 

673 """ 

674 Load a collection of tunable parameters from JSON files into the parent 

675 TunableGroup. 

676 

677 This helps allow standalone environment configs to reference 

678 overlapping tunable groups configs but still allow combining them into 

679 a single instance that each environment can reference. 

680 

681 Parameters 

682 ---------- 

683 json_file_names : list of str 

684 A list of JSON files to load. 

685 parent : TunableGroups 

686 A (possibly empty) collection of tunables to add to the new collection. 

687 

688 Returns 

689 ------- 

690 tunables : TunableGroup 

691 The larger collection of tunable parameters. 

692 """ 

693 _LOG.info("Load tunables: '%s'", json_file_names) 

694 tunables = parent.copy() 

695 for fname in json_file_names: 

696 config = self.load_config(fname, ConfigSchema.TUNABLE_PARAMS) 

697 assert isinstance(config, dict) 

698 tunables.merge(TunableGroups(config)) 

699 return tunables