Coverage for mlos_bench/mlos_bench/services/config_persistence.py: 95%
157 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Helper functions to load, instantiate, and serialize Python objects that encapsulate
6benchmark environments, tunable parameters, and service functions.
7"""
9import json # For logging only
10import logging
11import os
12import sys
13from typing import (
14 TYPE_CHECKING,
15 Any,
16 Callable,
17 Dict,
18 Iterable,
19 List,
20 Optional,
21 Tuple,
22 Union,
23)
25import json5 # To read configs with comments and other JSON5 syntax features
26from jsonschema import SchemaError, ValidationError
28from mlos_bench.config.schemas import ConfigSchema
29from mlos_bench.environments.base_environment import Environment
30from mlos_bench.optimizers.base_optimizer import Optimizer
31from mlos_bench.services.base_service import Service
32from mlos_bench.services.types.config_loader_type import SupportsConfigLoading
33from mlos_bench.tunables.tunable import TunableValue
34from mlos_bench.tunables.tunable_groups import TunableGroups
35from mlos_bench.util import (
36 instantiate_from_config,
37 merge_parameters,
38 path_join,
39 preprocess_dynamic_configs,
40)
42if sys.version_info < (3, 10):
43 from importlib_resources import files
44else:
45 from importlib.resources import files
47if TYPE_CHECKING:
48 from mlos_bench.schedulers.base_scheduler import Scheduler
49 from mlos_bench.storage.base_storage import Storage
52_LOG = logging.getLogger(__name__)
55class ConfigPersistenceService(Service, SupportsConfigLoading):
56 """Collection of methods to deserialize the Environment, Service, and TunableGroups
57 objects.
58 """
60 BUILTIN_CONFIG_PATH = str(files("mlos_bench.config").joinpath("")).replace("\\", "/")
62 def __init__(
63 self,
64 config: Optional[Dict[str, Any]] = None,
65 global_config: Optional[Dict[str, Any]] = None,
66 parent: Optional[Service] = None,
67 methods: Union[Dict[str, Callable], List[Callable], None] = None,
68 ):
69 """
70 Create a new instance of config persistence service.
72 Parameters
73 ----------
74 config : dict
75 Free-format dictionary that contains parameters for the service.
76 (E.g., root path for config files, etc.)
77 global_config : dict
78 Free-format dictionary of global parameters.
79 parent : Service
80 An optional parent service that can provide mixin functions.
81 methods : Union[Dict[str, Callable], List[Callable], None]
82 New methods to register with the service.
83 """
84 super().__init__(
85 config,
86 global_config,
87 parent,
88 self.merge_methods(
89 methods,
90 [
91 self.resolve_path,
92 self.load_config,
93 self.prepare_class_load,
94 self.build_service,
95 self.build_environment,
96 self.load_services,
97 self.load_environment,
98 self.load_environment_list,
99 ],
100 ),
101 )
102 self._config_loader_service = self
104 # Normalize and deduplicate config paths, but maintain order.
105 self._config_path: List[str] = []
106 for path in self.config.get("config_path", []):
107 if path not in self._config_path:
108 self._config_path.append(path_join(path, abs_path=True))
109 # Prepend the cwd if not already on the list.
110 cwd = path_join(os.getcwd(), abs_path=True)
111 if cwd not in self._config_path:
112 self._config_path.insert(0, cwd)
113 # Append the built-in config path if not already on the list.
114 if self.BUILTIN_CONFIG_PATH not in self._config_path:
115 self._config_path.append(self.BUILTIN_CONFIG_PATH)
117 @property
118 def config_paths(self) -> List[str]:
119 """
120 Gets the list of config paths this service will search for config files.
122 Returns
123 -------
124 List[str]
125 """
126 return list(self._config_path) # make a copy to avoid modifications
128 def resolve_path(self, file_path: str, extra_paths: Optional[Iterable[str]] = None) -> str:
129 """
130 Prepend the suitable `_config_path` to `path` if the latter is not absolute. If
131 `_config_path` is `None` or `path` is absolute, return `path` as is.
133 Parameters
134 ----------
135 file_path : str
136 Path to the input config file.
137 extra_paths : Iterable[str]
138 Additional directories to prepend to the list of search paths.
140 Returns
141 -------
142 path : str
143 An actual path to the config or script.
144 """
145 path_list = list(extra_paths or []) + self._config_path
146 _LOG.debug("Resolve path: %s in: %s", file_path, path_list)
147 if os.path.isabs(file_path):
148 _LOG.debug("Path is absolute: %s", file_path)
149 return file_path
150 for path in path_list:
151 full_path = path_join(path, file_path, abs_path=True)
152 if os.path.exists(full_path):
153 _LOG.debug("Path resolved: %s", full_path)
154 return full_path
155 _LOG.debug("Path not resolved: %s", file_path)
156 return file_path
158 def load_config(
159 self,
160 json_file_name: str,
161 schema_type: Optional[ConfigSchema],
162 ) -> Dict[str, Any]:
163 """
164 Load JSON config file. Search for a file relative to `_config_path` if the input
165 path is not absolute. This method is exported to be used as a service.
167 Parameters
168 ----------
169 json_file_name : str
170 Path to the input config file.
171 schema_type : Optional[ConfigSchema]
172 The schema type to validate the config against.
174 Returns
175 -------
176 config : Union[dict, List[dict]]
177 Free-format dictionary that contains the configuration.
178 """
179 json_file_name = self.resolve_path(json_file_name)
180 _LOG.info("Load config: %s", json_file_name)
181 with open(json_file_name, mode="r", encoding="utf-8") as fh_json:
182 config = json5.load(fh_json)
183 if schema_type is not None:
184 try:
185 schema_type.validate(config)
186 except (ValidationError, SchemaError) as ex:
187 _LOG.error(
188 "Failed to validate config %s against schema type %s at %s",
189 json_file_name,
190 schema_type.name,
191 schema_type.value,
192 )
193 raise ValueError(
194 f"Failed to validate config {json_file_name} against "
195 f"schema type {schema_type.name} at {schema_type.value}"
196 ) from ex
197 if isinstance(config, dict) and config.get("$schema"):
198 # Remove $schema attributes from the config after we've validated
199 # them to avoid passing them on to other objects
200 # (e.g. SqlAlchemy based storage initializers).
201 # NOTE: we only do this for internal schemas.
202 # Other configs that get loaded may need the schema field
203 # (e.g. Azure ARM templates).
204 del config["$schema"]
205 else:
206 _LOG.warning("Config %s is not validated against a schema.", json_file_name)
207 return config # type: ignore[no-any-return]
209 def prepare_class_load(
210 self,
211 config: Dict[str, Any],
212 global_config: Optional[Dict[str, Any]] = None,
213 parent_args: Optional[Dict[str, TunableValue]] = None,
214 ) -> Tuple[str, Dict[str, Any]]:
215 """
216 Extract the class instantiation parameters from the configuration. Mix-in the
217 global parameters and resolve the local file system paths, where it is required.
219 Parameters
220 ----------
221 config : dict
222 Configuration of the optimizer.
223 global_config : dict
224 Global configuration parameters (optional).
225 parent_args : Dict[str, TunableValue]
226 An optional reference of the parent CompositeEnv's const_args used to
227 expand dynamic config parameters from.
229 Returns
230 -------
231 (class_name, class_config) : (str, dict)
232 Name of the class to instantiate and its configuration.
233 """
234 class_name = config["class"]
235 class_config = config.setdefault("config", {})
237 # Replace any appearance of "$param_name" in the const_arg values with
238 # the value from the parent CompositeEnv.
239 # Note: we could consider expanding this feature to additional config
240 # sections in the future, but for now only use it in const_args.
241 if class_name.startswith("mlos_bench.environments."):
242 const_args = class_config.get("const_args", {})
243 preprocess_dynamic_configs(dest=const_args, source=parent_args)
245 merge_parameters(dest=class_config, source=global_config)
247 for key in set(class_config).intersection(config.get("resolve_config_property_paths", [])):
248 if isinstance(class_config[key], str):
249 class_config[key] = self.resolve_path(class_config[key])
250 elif isinstance(class_config[key], (list, tuple)):
251 class_config[key] = [self.resolve_path(path) for path in class_config[key]]
252 else:
253 raise ValueError(f"Parameter {key} must be a string or a list")
255 if _LOG.isEnabledFor(logging.DEBUG):
256 _LOG.debug(
257 "Instantiating: %s with config:\n%s",
258 class_name,
259 json.dumps(class_config, indent=2),
260 )
262 return (class_name, class_config)
264 def build_optimizer(
265 self,
266 *,
267 tunables: TunableGroups,
268 service: Service,
269 config: Dict[str, Any],
270 global_config: Optional[Dict[str, Any]] = None,
271 ) -> Optimizer:
272 """
273 Instantiation of mlos_bench Optimizer that depend on Service and TunableGroups.
275 A class *MUST* have a constructor that takes four named arguments:
276 (tunables, config, global_config, service)
278 Parameters
279 ----------
280 tunables : TunableGroups
281 Tunable parameters of the environment. We need them to validate the
282 configurations of merged-in experiments and restored/pending trials.
283 service: Service
284 An optional service object (e.g., providing methods to load config files, etc.)
285 config : dict
286 Configuration of the class to instantiate, as loaded from JSON.
287 global_config : dict
288 Global configuration parameters (optional).
290 Returns
291 -------
292 inst : Optimizer
293 A new instance of the `Optimizer` class.
294 """
295 tunables_path = config.get("include_tunables")
296 if tunables_path is not None:
297 tunables = self._load_tunables(tunables_path, tunables)
298 (class_name, class_config) = self.prepare_class_load(config, global_config)
299 inst = instantiate_from_config(
300 Optimizer, # type: ignore[type-abstract]
301 class_name,
302 tunables=tunables,
303 config=class_config,
304 global_config=global_config,
305 service=service,
306 )
307 _LOG.info("Created: Optimizer %s", inst)
308 return inst
310 def build_storage(
311 self,
312 *,
313 service: Service,
314 config: Dict[str, Any],
315 global_config: Optional[Dict[str, Any]] = None,
316 ) -> "Storage":
317 """
318 Instantiation of mlos_bench Storage objects.
320 Parameters
321 ----------
322 service: Service
323 An optional service object (e.g., providing methods to load config files, etc.)
324 config : dict
325 Configuration of the class to instantiate, as loaded from JSON.
326 global_config : dict
327 Global configuration parameters (optional).
329 Returns
330 -------
331 inst : Storage
332 A new instance of the Storage class.
333 """
334 (class_name, class_config) = self.prepare_class_load(config, global_config)
335 # pylint: disable=import-outside-toplevel
336 from mlos_bench.storage.base_storage import Storage
338 inst = instantiate_from_config(
339 Storage, # type: ignore[type-abstract]
340 class_name,
341 config=class_config,
342 global_config=global_config,
343 service=service,
344 )
345 _LOG.info("Created: Storage %s", inst)
346 return inst
348 def build_scheduler( # pylint: disable=too-many-arguments
349 self,
350 *,
351 config: Dict[str, Any],
352 global_config: Dict[str, Any],
353 environment: Environment,
354 optimizer: Optimizer,
355 storage: "Storage",
356 root_env_config: str,
357 ) -> "Scheduler":
358 """
359 Instantiation of mlos_bench Scheduler.
361 Parameters
362 ----------
363 config : dict
364 Configuration of the class to instantiate, as loaded from JSON.
365 global_config : dict
366 Global configuration parameters.
367 environment : Environment
368 The environment to benchmark/optimize.
369 optimizer : Optimizer
370 The optimizer to use.
371 storage : Storage
372 The storage to use.
373 root_env_config : str
374 Path to the root environment configuration.
376 Returns
377 -------
378 inst : Scheduler
379 A new instance of the Scheduler.
380 """
381 (class_name, class_config) = self.prepare_class_load(config, global_config)
382 # pylint: disable=import-outside-toplevel
383 from mlos_bench.schedulers.base_scheduler import Scheduler
385 inst = instantiate_from_config(
386 Scheduler, # type: ignore[type-abstract]
387 class_name,
388 config=class_config,
389 global_config=global_config,
390 environment=environment,
391 optimizer=optimizer,
392 storage=storage,
393 root_env_config=root_env_config,
394 )
395 _LOG.info("Created: Scheduler %s", inst)
396 return inst
398 def build_environment( # pylint: disable=too-many-arguments
399 self,
400 config: Dict[str, Any],
401 tunables: TunableGroups,
402 global_config: Optional[Dict[str, Any]] = None,
403 parent_args: Optional[Dict[str, TunableValue]] = None,
404 service: Optional[Service] = None,
405 ) -> Environment:
406 """
407 Factory method for a new environment with a given config.
409 Parameters
410 ----------
411 config : dict
412 A dictionary with three mandatory fields:
413 "name": Human-readable string describing the environment;
414 "class": FQN of a Python class to instantiate;
415 "config": Free-format dictionary to pass to the constructor.
416 tunables : TunableGroups
417 A (possibly empty) collection of groups of tunable parameters for
418 all environments.
419 global_config : dict
420 Global parameters to add to the environment config.
421 parent_args : Dict[str, TunableValue]
422 An optional reference of the parent CompositeEnv's const_args used to
423 expand dynamic config parameters from.
424 service: Service
425 An optional service object (e.g., providing methods to
426 deploy or reboot a VM, etc.).
428 Returns
429 -------
430 env : Environment
431 An instance of the `Environment` class initialized with `config`.
432 """
433 env_name = config["name"]
434 (env_class, env_config) = self.prepare_class_load(config, global_config, parent_args)
436 env_services_path = config.get("include_services")
437 if env_services_path is not None:
438 service = self.load_services(env_services_path, global_config, service)
440 env_tunables_path = config.get("include_tunables")
441 if env_tunables_path is not None:
442 tunables = self._load_tunables(env_tunables_path, tunables)
444 _LOG.debug("Creating env: %s :: %s", env_name, env_class)
445 env = Environment.new(
446 env_name=env_name,
447 class_name=env_class,
448 config=env_config,
449 global_config=global_config,
450 tunables=tunables,
451 service=service,
452 )
454 _LOG.info("Created env: %s :: %s", env_name, env)
455 return env
457 def _build_standalone_service(
458 self,
459 config: Dict[str, Any],
460 global_config: Optional[Dict[str, Any]] = None,
461 parent: Optional[Service] = None,
462 ) -> Service:
463 """
464 Factory method for a new service with a given config.
466 Parameters
467 ----------
468 config : dict
469 A dictionary with two mandatory fields:
470 "class": FQN of a Python class to instantiate;
471 "config": Free-format dictionary to pass to the constructor.
472 global_config : dict
473 Global parameters to add to the service config.
474 parent: Service
475 An optional reference of the parent service to mix in.
477 Returns
478 -------
479 svc : Service
480 An instance of the `Service` class initialized with `config`.
481 """
482 (svc_class, svc_config) = self.prepare_class_load(config, global_config)
483 service = Service.new(svc_class, svc_config, global_config, parent)
484 _LOG.info("Created service: %s", service)
485 return service
487 def _build_composite_service(
488 self,
489 config_list: Iterable[Dict[str, Any]],
490 global_config: Optional[Dict[str, Any]] = None,
491 parent: Optional[Service] = None,
492 ) -> Service:
493 """
494 Factory method for a new service with a given config.
496 Parameters
497 ----------
498 config_list : a list of dict
499 A list where each element is a dictionary with 2 mandatory fields:
500 "class": FQN of a Python class to instantiate;
501 "config": Free-format dictionary to pass to the constructor.
502 global_config : dict
503 Global parameters to add to the service config.
504 parent: Service
505 An optional reference of the parent service to mix in.
507 Returns
508 -------
509 svc : Service
510 An instance of the `Service` class that is a combination of all
511 services from the list plus the parent mix-in.
512 """
513 service = Service()
514 if parent:
515 service.register(parent.export())
517 for config in config_list:
518 service.register(
519 self._build_standalone_service(config, global_config, service).export()
520 )
522 if _LOG.isEnabledFor(logging.DEBUG):
523 _LOG.debug("Created mix-in service: %s", service)
525 return service
527 def build_service(
528 self,
529 config: Dict[str, Any],
530 global_config: Optional[Dict[str, Any]] = None,
531 parent: Optional[Service] = None,
532 ) -> Service:
533 """
534 Factory method for a new service with a given config.
536 Parameters
537 ----------
538 config : dict
539 A dictionary with 2 mandatory fields:
540 "class": FQN of a Python class to instantiate;
541 "config": Free-format dictionary to pass to the constructor.
542 global_config : dict
543 Global parameters to add to the service config.
544 parent: Service
545 An optional reference of the parent service to mix in.
547 Returns
548 -------
549 svc : Service
550 An instance of the `Service` class that is a combination of all
551 services from the list plus the parent mix-in.
552 """
553 if _LOG.isEnabledFor(logging.DEBUG):
554 _LOG.debug("Build service from config:\n%s", json.dumps(config, indent=2))
556 assert isinstance(config, dict)
557 config_list: List[Dict[str, Any]]
558 if "class" not in config:
559 # Top level config is a simple object with a list of services
560 config_list = config["services"]
561 else:
562 # Top level config is a single service
563 if parent is None:
564 return self._build_standalone_service(config, global_config)
565 config_list = [config]
567 return self._build_composite_service(config_list, global_config, parent)
569 def load_environment( # pylint: disable=too-many-arguments
570 self,
571 json_file_name: str,
572 tunables: TunableGroups,
573 global_config: Optional[Dict[str, Any]] = None,
574 parent_args: Optional[Dict[str, TunableValue]] = None,
575 service: Optional[Service] = None,
576 ) -> Environment:
577 """
578 Load and build new environment from the config file.
580 Parameters
581 ----------
582 json_file_name : str
583 The environment JSON configuration file.
584 tunables : TunableGroups
585 A (possibly empty) collection of tunables to add to the environment.
586 global_config : dict
587 Global parameters to add to the environment config.
588 parent_args : Dict[str, TunableValue]
589 An optional reference of the parent CompositeEnv's const_args used to
590 expand dynamic config parameters from.
591 service : Service
592 An optional reference of the parent service to mix in.
594 Returns
595 -------
596 env : Environment
597 A new benchmarking environment.
598 """
599 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT)
600 assert isinstance(config, dict)
601 return self.build_environment(config, tunables, global_config, parent_args, service)
603 def load_environment_list( # pylint: disable=too-many-arguments
604 self,
605 json_file_name: str,
606 tunables: TunableGroups,
607 global_config: Optional[Dict[str, Any]] = None,
608 parent_args: Optional[Dict[str, TunableValue]] = None,
609 service: Optional[Service] = None,
610 ) -> List[Environment]:
611 """
612 Load and build a list of environments from the config file.
614 Parameters
615 ----------
616 json_file_name : str
617 The environment JSON configuration file.
618 Can contain either one environment or a list of environments.
619 tunables : TunableGroups
620 An (possibly empty) collection of tunables to add to the environment.
621 global_config : dict
622 Global parameters to add to the environment config.
623 service : Service
624 An optional reference of the parent service to mix in.
625 parent_args : Dict[str, TunableValue]
626 An optional reference of the parent CompositeEnv's const_args used to
627 expand dynamic config parameters from.
629 Returns
630 -------
631 env : List[Environment]
632 A list of new benchmarking environments.
633 """
634 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT)
635 return [self.build_environment(config, tunables, global_config, parent_args, service)]
637 def load_services(
638 self,
639 json_file_names: Iterable[str],
640 global_config: Optional[Dict[str, Any]] = None,
641 parent: Optional[Service] = None,
642 ) -> Service:
643 """
644 Read the configuration files and bundle all service methods from those configs
645 into a single Service object.
647 Parameters
648 ----------
649 json_file_names : list of str
650 A list of service JSON configuration files.
651 global_config : dict
652 Global parameters to add to the service config.
653 parent : Service
654 An optional reference of the parent service to mix in.
656 Returns
657 -------
658 service : Service
659 A collection of service methods.
660 """
661 _LOG.info("Load services: %s parent: %s", json_file_names, parent.__class__.__name__)
662 service = Service({}, global_config, parent)
663 for fname in json_file_names:
664 config = self.load_config(fname, ConfigSchema.SERVICE)
665 service.register(self.build_service(config, global_config, service).export())
666 return service
668 def _load_tunables(
669 self,
670 json_file_names: Iterable[str],
671 parent: TunableGroups,
672 ) -> TunableGroups:
673 """
674 Load a collection of tunable parameters from JSON files into the parent
675 TunableGroup.
677 This helps allow standalone environment configs to reference
678 overlapping tunable groups configs but still allow combining them into
679 a single instance that each environment can reference.
681 Parameters
682 ----------
683 json_file_names : list of str
684 A list of JSON files to load.
685 parent : TunableGroups
686 A (possibly empty) collection of tunables to add to the new collection.
688 Returns
689 -------
690 tunables : TunableGroup
691 The larger collection of tunable parameters.
692 """
693 _LOG.info("Load tunables: '%s'", json_file_names)
694 tunables = parent.copy()
695 for fname in json_file_names:
696 config = self.load_config(fname, ConfigSchema.TUNABLE_PARAMS)
697 assert isinstance(config, dict)
698 tunables.merge(TunableGroups(config))
699 return tunables