Coverage for mlos_bench/mlos_bench/schedulers/sync_scheduler.py: 88%

32 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-07 01:52 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""A simple single-threaded synchronous optimization loop implementation.""" 

6 

7import logging 

8from datetime import datetime 

9 

10from pytz import UTC 

11 

12from mlos_bench.environments.status import Status 

13from mlos_bench.schedulers.base_scheduler import Scheduler 

14from mlos_bench.storage.base_storage import Storage 

15 

16_LOG = logging.getLogger(__name__) 

17 

18 

19class SyncScheduler(Scheduler): 

20 """A simple single-threaded synchronous optimization loop implementation.""" 

21 

22 def start(self) -> None: 

23 """Start the optimization loop.""" 

24 super().start() 

25 

26 is_warm_up = self.optimizer.supports_preload 

27 if not is_warm_up: 

28 _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) 

29 

30 not_done = True 

31 while not_done: 

32 _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) 

33 self._run_schedule(is_warm_up) 

34 not_done = self._schedule_new_optimizer_suggestions() 

35 is_warm_up = False 

36 

37 def run_trial(self, trial: Storage.Trial) -> None: 

38 """ 

39 Set up and run a single trial. 

40 

41 Save the results in the storage. 

42 """ 

43 super().run_trial(trial) 

44 

45 if not self.environment.setup(trial.tunables, trial.config(self.global_config)): 

46 _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) 

47 # FIXME: Use the actual timestamp from the environment. 

48 _LOG.info("QUEUE: Update trial results: %s :: %s", trial, Status.FAILED) 

49 trial.update(Status.FAILED, datetime.now(UTC)) 

50 return 

51 

52 # Block and wait for the final result. 

53 (status, timestamp, results) = self.environment.run() 

54 _LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results) 

55 

56 # In async mode (TODO), poll the environment for status and telemetry 

57 # and update the storage with the intermediate results. 

58 (_status, _timestamp, telemetry) = self.environment.status() 

59 

60 # Use the status and timestamp from `.run()` as it is the final status of the experiment. 

61 # TODO: Use the `.status()` output in async mode. 

62 trial.update_telemetry(status, timestamp, telemetry) 

63 

64 trial.update(status, timestamp, results) 

65 _LOG.info("QUEUE: Update trial results: %s :: %s %s", trial, status, results)