Coverage for mlos_bench/mlos_bench/launcher.py: 94%

203 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-21 01:50 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6A helper class to load the configuration files, parse the command line parameters, and 

7instantiate the main components of mlos_bench system. 

8 

9It is used in the :py:mod:`mlos_bench.run` module to run the benchmark/optimizer 

10from the command line. 

11""" 

12 

13import argparse 

14import logging 

15import sys 

16from collections.abc import Iterable 

17from typing import Any 

18 

19from mlos_bench.config.schemas import ConfigSchema 

20from mlos_bench.dict_templater import DictTemplater 

21from mlos_bench.environments.base_environment import Environment 

22from mlos_bench.optimizers.base_optimizer import Optimizer 

23from mlos_bench.optimizers.mock_optimizer import MockOptimizer 

24from mlos_bench.optimizers.one_shot_optimizer import OneShotOptimizer 

25from mlos_bench.schedulers.base_scheduler import Scheduler 

26from mlos_bench.schedulers.trial_runner import TrialRunner 

27from mlos_bench.services.base_service import Service 

28from mlos_bench.services.config_persistence import ConfigPersistenceService 

29from mlos_bench.services.local.local_exec import LocalExecService 

30from mlos_bench.services.types.config_loader_type import SupportsConfigLoading 

31from mlos_bench.storage.base_storage import Storage 

32from mlos_bench.tunables.tunable import TunableValue 

33from mlos_bench.tunables.tunable_groups import TunableGroups 

34from mlos_bench.util import try_parse_val 

35 

36_LOG_LEVEL = logging.INFO 

37_LOG_FORMAT = "%(asctime)s %(filename)s:%(lineno)d %(funcName)s %(levelname)s %(message)s" 

38logging.basicConfig(level=_LOG_LEVEL, format=_LOG_FORMAT) 

39 

40_LOG = logging.getLogger(__name__) 

41 

42 

43class Launcher: 

44 # pylint: disable=too-few-public-methods,too-many-instance-attributes 

45 """Command line launcher for mlos_bench and mlos_core.""" 

46 

47 def __init__(self, description: str, long_text: str = "", argv: list[str] | None = None): 

48 # pylint: disable=too-many-statements 

49 # pylint: disable=too-complex 

50 # pylint: disable=too-many-locals 

51 _LOG.info("Launch: %s", description) 

52 epilog = """ 

53 Additional --key=value pairs can be specified to augment or override 

54 values listed in --globals. 

55 Other required_args values can also be pulled from shell environment 

56 variables. 

57 

58 For additional details, please see the website or the README.md files in 

59 the source tree: 

60 <https://github.com/microsoft/MLOS/tree/main/mlos_bench/> 

61 """ 

62 parser = argparse.ArgumentParser(description=f"{description} : {long_text}", epilog=epilog) 

63 (args, path_args, args_rest) = self._parse_args(parser, argv) 

64 

65 # Bootstrap config loader: command line takes priority. 

66 config_path = args.config_path or [] 

67 self._config_loader = ConfigPersistenceService({"config_path": config_path}) 

68 if args.config: 

69 config = self._config_loader.load_config(args.config, ConfigSchema.CLI) 

70 assert isinstance(config, dict) 

71 # Merge the args paths for the config loader with the paths from JSON file. 

72 config_path += config.get("config_path", []) 

73 self._config_loader = ConfigPersistenceService({"config_path": config_path}) 

74 else: 

75 config = {} 

76 

77 log_level = args.log_level or config.get("log_level", _LOG_LEVEL) 

78 try: 

79 log_level = int(log_level) 

80 except ValueError: 

81 # failed to parse as an int - leave it as a string and let logging 

82 # module handle whether it's an appropriate log name or not 

83 log_level = logging.getLevelName(log_level) 

84 logging.root.setLevel(log_level) 

85 log_file = args.log_file or config.get("log_file") 

86 if log_file: 

87 log_handler = logging.FileHandler(log_file) 

88 log_handler.setLevel(log_level) 

89 log_handler.setFormatter(logging.Formatter(_LOG_FORMAT)) 

90 logging.root.addHandler(log_handler) 

91 

92 # Prepare global_config from a combination of global config files, cli 

93 # configs, and cli args. 

94 args_dict = vars(args) 

95 # teardown (bool) conflicts with Environment configs that use it for shell 

96 # commands (list), so we exclude it from copying over 

97 excluded_cli_args = path_args + ["teardown"] 

98 # Include (almost) any item from the cli config file that either isn't in 

99 # the cli args at all or whose cli arg is missing. 

100 cli_config_args = { 

101 key: val 

102 for (key, val) in config.items() 

103 if (args_dict.get(key) is None) and key not in excluded_cli_args 

104 } 

105 

106 self.global_config = self._load_config( 

107 args_globals=config.get("globals", []) + (args.globals or []), 

108 config_path=(args.config_path or []) + config.get("config_path", []), 

109 args_rest=args_rest, 

110 global_config=cli_config_args, 

111 ) 

112 # TODO: Can we generalize these two rules using excluded_cli_args? 

113 # experiment_id is generally taken from --globals files, but we also allow 

114 # overriding it on the CLI. 

115 # It's useful to keep it there explicitly mostly for the --help output. 

116 if args.experiment_id: 

117 self.global_config["experiment_id"] = args.experiment_id 

118 # trial_config_repeat_count is a scheduler property but it's convenient to 

119 # set it via command line 

120 if args.trial_config_repeat_count: 

121 self.global_config["trial_config_repeat_count"] = args.trial_config_repeat_count 

122 self.global_config.setdefault("num_trial_runners", 1) 

123 if args.num_trial_runners: 

124 self.global_config["num_trial_runners"] = args.num_trial_runners 

125 if self.global_config["num_trial_runners"] <= 0: 

126 raise ValueError( 

127 f"""Invalid num_trial_runners: {self.global_config["num_trial_runners"]}""" 

128 ) 

129 # Ensure that the trial_id is present since it gets used by some other 

130 # configs but is typically controlled by the run optimize loop. 

131 self.global_config.setdefault("trial_id", 1) 

132 

133 self.global_config = DictTemplater(self.global_config).expand_vars(use_os_env=True) 

134 assert isinstance(self.global_config, dict) 

135 

136 # --service cli args should override the config file values. 

137 service_files: list[str] = config.get("services", []) + (args.service or []) 

138 # Add a LocalExecService as the parent service for all other services. 

139 self._parent_service: Service = LocalExecService(parent=self._config_loader) 

140 assert isinstance(self._parent_service, SupportsConfigLoading) 

141 self._parent_service = self._parent_service.load_services( 

142 service_files, 

143 self.global_config, 

144 self._parent_service, 

145 ) 

146 

147 self.storage = self._load_storage( 

148 args.storage or config.get("storage"), 

149 lazy_schema_create=False if args.create_update_storage_schema_only else None, 

150 ) 

151 _LOG.info("Init storage: %s", self.storage) 

152 

153 if args.create_update_storage_schema_only: 

154 _LOG.info("Create/update storage schema only.") 

155 self.storage.update_schema() 

156 sys.exit(0) 

157 

158 env_path = args.environment or config.get("environment") 

159 if not env_path: 

160 _LOG.error("No environment config specified.") 

161 parser.error( 

162 "At least the Environment config must be specified." 

163 " Run `mlos_bench --help` and consult `README.md` for more info." 

164 ) 

165 self.root_env_config = self._config_loader.resolve_path(env_path) 

166 

167 # Create the TrialRunners and their Environments and Services from the JSON files. 

168 self.trial_runners = TrialRunner.create_from_json( 

169 config_loader=self._config_loader, 

170 global_config=self.global_config, 

171 svcs_json=service_files, 

172 env_json=self.root_env_config, 

173 num_trial_runners=self.global_config["num_trial_runners"], 

174 ) 

175 

176 _LOG.info( 

177 "Init %d trial runners for environments: %s", 

178 len(self.trial_runners), 

179 [trial_runner.environment for trial_runner in self.trial_runners], 

180 ) 

181 

182 # NOTE: Init tunable values *after* the Environment(s), but *before* the Optimizer 

183 # TODO: should we assign the same or different tunables for all TrialRunner Environments? 

184 self.tunables = self._init_tunable_values( 

185 self.trial_runners[0].environment, 

186 args.random_init or config.get("random_init", False), 

187 config.get("random_seed") if args.random_seed is None else args.random_seed, 

188 config.get("tunable_values", []) + (args.tunable_values or []), 

189 ) 

190 _LOG.info("Init tunables: %s", self.tunables) 

191 

192 self.optimizer = self._load_optimizer(args.optimizer or config.get("optimizer")) 

193 _LOG.info("Init optimizer: %s", self.optimizer) 

194 

195 self.teardown: bool = ( 

196 bool(args.teardown) 

197 if args.teardown is not None 

198 else bool(config.get("teardown", True)) 

199 ) 

200 self.scheduler = self._load_scheduler(args.scheduler or config.get("scheduler")) 

201 _LOG.info("Init scheduler: %s", self.scheduler) 

202 

203 @property 

204 def config_loader(self) -> ConfigPersistenceService: 

205 """Get the config loader service.""" 

206 return self._config_loader 

207 

208 @property 

209 def root_environment(self) -> Environment: 

210 """ 

211 Gets the root (prototypical) Environment from the first TrialRunner. 

212 

213 Note: All TrialRunners have the same Environment config and are made 

214 unique by their use of the unique trial_runner_id assigned to each 

215 TrialRunner's Environment's global_config. 

216 

217 Notes 

218 ----- 

219 This is mostly for convenience and backwards compatibility. 

220 """ 

221 return self.trial_runners[0].environment 

222 

223 @property 

224 def service(self) -> Service: 

225 """Get the parent service.""" 

226 return self._parent_service 

227 

228 @staticmethod 

229 def _parse_args( 

230 parser: argparse.ArgumentParser, 

231 argv: list[str] | None, 

232 ) -> tuple[argparse.Namespace, list[str], list[str]]: 

233 """Parse the command line arguments.""" 

234 

235 class PathArgsTracker: 

236 """Simple class to help track which arguments are paths.""" 

237 

238 def __init__(self, parser: argparse.ArgumentParser): 

239 self._parser = parser 

240 self.path_args: list[str] = [] 

241 

242 def add_argument(self, *args: Any, **kwargs: Any) -> None: 

243 """Add an argument to the parser and track its destination.""" 

244 self.path_args.append(self._parser.add_argument(*args, **kwargs).dest) 

245 

246 path_args_tracker = PathArgsTracker(parser) 

247 

248 path_args_tracker.add_argument( 

249 "--config", 

250 required=False, 

251 help=( 

252 "Main JSON5 configuration file. Its keys are the same as the " 

253 "command line options and can be overridden by the latter.\n" 

254 "\n" 

255 "See the `mlos_bench/config/` tree at https://github.com/microsoft/MLOS/ " 

256 "for additional config examples for this and other arguments." 

257 ), 

258 ) 

259 

260 path_args_tracker.add_argument( 

261 "--log_file", 

262 "--log-file", 

263 required=False, 

264 help="Path to the log file. Use stdout if omitted.", 

265 ) 

266 

267 parser.add_argument( 

268 "--log_level", 

269 "--log-level", 

270 required=False, 

271 type=str, 

272 help=( 

273 f"Logging level. Default is {logging.getLevelName(_LOG_LEVEL)}. " 

274 "Set to DEBUG for debug, WARNING for warnings only." 

275 ), 

276 ) 

277 

278 path_args_tracker.add_argument( 

279 "--config_path", 

280 "--config-path", 

281 "--config-paths", 

282 "--config_paths", 

283 nargs="+", 

284 action="extend", 

285 required=False, 

286 help="One or more locations of JSON config files.", 

287 ) 

288 

289 path_args_tracker.add_argument( 

290 "--service", 

291 "--services", 

292 nargs="+", 

293 action="extend", 

294 required=False, 

295 help=( 

296 "Path to JSON file with the configuration " 

297 "of the service(s) for environment(s) to use." 

298 ), 

299 ) 

300 

301 path_args_tracker.add_argument( 

302 "--environment", 

303 required=False, 

304 help="Path to JSON file with the configuration of the benchmarking environment(s).", 

305 ) 

306 

307 path_args_tracker.add_argument( 

308 "--optimizer", 

309 required=False, 

310 help=( 

311 "Path to the optimizer configuration file. If omitted, run " 

312 "a single trial with default (or specified in --tunable_values)." 

313 ), 

314 ) 

315 

316 parser.add_argument( 

317 "--trial_config_repeat_count", 

318 "--trial-config-repeat-count", 

319 required=False, 

320 type=int, 

321 help=( 

322 "Number of times to repeat each config. " 

323 "Default is 1 trial per config, though more may be advised." 

324 ), 

325 ) 

326 

327 parser.add_argument( 

328 "--num_trial_runners", 

329 "--num-trial-runners", 

330 required=False, 

331 type=int, 

332 help=( 

333 "Number of TrialRunners to use for executing benchmark Environments. " 

334 "Individual TrialRunners can be identified in configs with $trial_runner_id " 

335 "and optionally run in parallel." 

336 ), 

337 ) 

338 

339 path_args_tracker.add_argument( 

340 "--scheduler", 

341 required=False, 

342 help=( 

343 "Path to the scheduler configuration file. By default, use " 

344 "a single worker synchronous scheduler." 

345 ), 

346 ) 

347 

348 path_args_tracker.add_argument( 

349 "--storage", 

350 required=False, 

351 help=( 

352 "Path to the storage configuration file. " 

353 "If omitted, use the ephemeral in-memory SQL storage." 

354 ), 

355 ) 

356 

357 parser.add_argument( 

358 "--random_init", 

359 "--random-init", 

360 required=False, 

361 default=False, 

362 dest="random_init", 

363 action="store_true", 

364 help="Initialize tunables with random values. (Before applying --tunable_values).", 

365 ) 

366 

367 parser.add_argument( 

368 "--random_seed", 

369 "--random-seed", 

370 required=False, 

371 type=int, 

372 help="Seed to use with --random_init", 

373 ) 

374 

375 path_args_tracker.add_argument( 

376 "--tunable_values", 

377 "--tunable-values", 

378 nargs="+", 

379 action="extend", 

380 required=False, 

381 help=( 

382 "Path to one or more JSON files that contain values of the tunable " 

383 "parameters. This can be used for a single trial (when no --optimizer " 

384 "is specified) or as default values for the first run in optimization." 

385 ), 

386 ) 

387 

388 path_args_tracker.add_argument( 

389 "--globals", 

390 nargs="+", 

391 action="extend", 

392 required=False, 

393 help=( 

394 "Path to one or more JSON files that contain additional " 

395 "[private] parameters of the benchmarking environment." 

396 ), 

397 ) 

398 

399 parser.add_argument( 

400 "--no_teardown", 

401 "--no-teardown", 

402 required=False, 

403 default=None, 

404 dest="teardown", 

405 action="store_false", 

406 help="Disable teardown of the environment after the benchmark.", 

407 ) 

408 

409 parser.add_argument( 

410 "--experiment_id", 

411 "--experiment-id", 

412 required=False, 

413 default=None, 

414 help=""" 

415 Experiment ID to use for the benchmark. 

416 If omitted, the value from the --cli config or --globals is used. 

417 

418 This is used to store and reload trial results from the storage. 

419 NOTE: It is **important** to change this value when incompatible 

420 changes are made to config files, scripts, versions, etc. 

421 This is left as a manual operation as detection of what is 

422 "incompatible" is not easily automatable across systems. 

423 """, 

424 ) 

425 

426 parser.add_argument( 

427 "--create-update-storage-schema-only", 

428 required=False, 

429 default=False, 

430 dest="create_update_storage_schema_only", 

431 action="store_true", 

432 help=( 

433 "Makes sure that the storage schema is up to date " 

434 "for the current version of mlos_bench." 

435 ), 

436 ) 

437 

438 # By default we use the command line arguments, but allow the caller to 

439 # provide some explicitly for testing purposes. 

440 if argv is None: 

441 argv = sys.argv[1:].copy() 

442 (args, args_rest) = parser.parse_known_args(argv) 

443 

444 return (args, path_args_tracker.path_args, args_rest) 

445 

446 @staticmethod 

447 def _try_parse_extra_args(cmdline: Iterable[str]) -> dict[str, TunableValue]: 

448 """Helper function to parse global key/value pairs from the command line.""" 

449 _LOG.debug("Extra args: %s", cmdline) 

450 

451 config: dict[str, TunableValue] = {} 

452 key = None 

453 for elem in cmdline: 

454 if elem.startswith("--"): 

455 if key is not None: 

456 raise ValueError("Command line argument has no value: " + key) 

457 key = elem[2:] 

458 kv_split = key.split("=", 1) 

459 if len(kv_split) == 2: 

460 config[kv_split[0].strip()] = try_parse_val(kv_split[1]) 

461 key = None 

462 else: 

463 if key is None: 

464 raise ValueError("Command line argument has no key: " + elem) 

465 config[key.strip()] = try_parse_val(elem) 

466 key = None 

467 

468 if key is not None: 

469 # Handles missing trailing elem from last --key arg. 

470 raise ValueError("Command line argument has no value: " + key) 

471 

472 # Convert "max-suggestions" to "max_suggestions" for compatibility with 

473 # other CLI options to use as common python/json variable replacements. 

474 config = {k.replace("-", "_"): v for k, v in config.items()} 

475 

476 _LOG.debug("Parsed config: %s", config) 

477 return config 

478 

479 def _load_config( 

480 self, 

481 *, 

482 args_globals: Iterable[str], 

483 config_path: Iterable[str], 

484 args_rest: Iterable[str], 

485 global_config: dict[str, Any], 

486 ) -> dict[str, Any]: 

487 """Get key/value pairs of the global configuration parameters from the specified 

488 config files (if any) and command line arguments. 

489 """ 

490 for config_file in args_globals or []: 

491 conf = self._config_loader.load_config(config_file, ConfigSchema.GLOBALS) 

492 assert isinstance(conf, dict) 

493 global_config.update(conf) 

494 global_config.update(Launcher._try_parse_extra_args(args_rest)) 

495 if config_path: 

496 global_config["config_path"] = config_path 

497 return global_config 

498 

499 def _init_tunable_values( 

500 self, 

501 env: Environment, 

502 random_init: bool, 

503 seed: int | None, 

504 args_tunables: str | None, 

505 ) -> TunableGroups: 

506 """Initialize the tunables and load key/value pairs of the tunable values from 

507 given JSON files, if specified. 

508 """ 

509 tunables = env.tunable_params 

510 _LOG.debug("Init tunables: default = %s", tunables) 

511 

512 if random_init: 

513 tunables = MockOptimizer( 

514 tunables=tunables, 

515 service=None, 

516 config={"start_with_defaults": False, "seed": seed}, 

517 ).suggest() 

518 _LOG.debug("Init tunables: random = %s", tunables) 

519 

520 if args_tunables is not None: 

521 for data_file in args_tunables: 

522 values = self._config_loader.load_config(data_file, ConfigSchema.TUNABLE_VALUES) 

523 assert isinstance(values, dict) 

524 tunables.assign(values) 

525 _LOG.debug("Init tunables: load %s = %s", data_file, tunables) 

526 

527 return tunables 

528 

529 def _load_optimizer(self, args_optimizer: str | None) -> Optimizer: 

530 """ 

531 Instantiate the Optimizer object from JSON config file, if specified in the 

532 --optimizer command line option. 

533 

534 If config file not specified, create a one-shot optimizer to run a single 

535 benchmark trial. 

536 """ 

537 if args_optimizer is None: 

538 # global_config may contain additional properties, so we need to 

539 # strip those out before instantiating the basic oneshot optimizer. 

540 config = { 

541 key: val 

542 for key, val in self.global_config.items() 

543 if key in OneShotOptimizer.BASE_SUPPORTED_CONFIG_PROPS 

544 } 

545 return OneShotOptimizer(self.tunables, config=config, service=self._parent_service) 

546 class_config = self._config_loader.load_config(args_optimizer, ConfigSchema.OPTIMIZER) 

547 assert isinstance(class_config, dict) 

548 optimizer = self._config_loader.build_optimizer( 

549 tunables=self.tunables, 

550 service=self._parent_service, 

551 config=class_config, 

552 global_config=self.global_config, 

553 ) 

554 return optimizer 

555 

556 def _load_storage( 

557 self, 

558 args_storage: str | None, 

559 lazy_schema_create: bool | None = None, 

560 ) -> Storage: 

561 """ 

562 Instantiate the Storage object from JSON file provided in the --storage command 

563 line parameter. 

564 

565 If omitted, create an ephemeral in-memory SQL storage instead. 

566 """ 

567 if args_storage is None: 

568 # pylint: disable=import-outside-toplevel 

569 from mlos_bench.storage.sql.storage import SqlStorage 

570 

571 return SqlStorage( 

572 service=self._parent_service, 

573 config={ 

574 "drivername": "sqlite", 

575 "database": ":memory:", 

576 "lazy_schema_create": True, 

577 }, 

578 ) 

579 class_config = self._config_loader.load_config(args_storage, ConfigSchema.STORAGE) 

580 assert isinstance(class_config, dict) 

581 if lazy_schema_create is not None: 

582 class_config["lazy_schema_create"] = lazy_schema_create 

583 storage = self._config_loader.build_storage( 

584 service=self._parent_service, 

585 config=class_config, 

586 global_config=self.global_config, 

587 ) 

588 return storage 

589 

590 def _load_scheduler(self, args_scheduler: str | None) -> Scheduler: 

591 """ 

592 Instantiate the Scheduler object from JSON file provided in the --scheduler 

593 command line parameter. 

594 

595 Create a simple synchronous single-threaded scheduler if omitted. 

596 """ 

597 # Set `teardown` for scheduler only to prevent conflicts with other configs. 

598 global_config = self.global_config.copy() 

599 global_config.setdefault("teardown", self.teardown) 

600 if args_scheduler is None: 

601 # pylint: disable=import-outside-toplevel 

602 from mlos_bench.schedulers.sync_scheduler import SyncScheduler 

603 

604 return SyncScheduler( 

605 # All config values can be overridden from global config 

606 config={ 

607 "experiment_id": "UNDEFINED - override from global config", 

608 "trial_id": 0, 

609 "config_id": -1, 

610 "trial_config_repeat_count": 1, 

611 "teardown": self.teardown, 

612 }, 

613 global_config=self.global_config, 

614 trial_runners=self.trial_runners, 

615 optimizer=self.optimizer, 

616 storage=self.storage, 

617 root_env_config=self.root_env_config, 

618 ) 

619 class_config = self._config_loader.load_config(args_scheduler, ConfigSchema.SCHEDULER) 

620 assert isinstance(class_config, dict) 

621 return self._config_loader.build_scheduler( 

622 config=class_config, 

623 global_config=self.global_config, 

624 trial_runners=self.trial_runners, 

625 optimizer=self.optimizer, 

626 storage=self.storage, 

627 root_env_config=self.root_env_config, 

628 )