Coverage for mlos_bench/mlos_bench/event_loop_context.py: 96%
53 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 01:50 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 01:50 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""EventLoopContext class definition."""
7import asyncio
8import logging
9import sys
10from asyncio import AbstractEventLoop
11from collections.abc import Coroutine
12from concurrent.futures import Future
13from threading import Lock as ThreadLock
14from threading import Thread
15from typing import Any, TypeAlias, TypeVar
17CoroReturnType = TypeVar("CoroReturnType") # pylint: disable=invalid-name
18"""Type variable for the return type of an :external:py:mod:`asyncio` coroutine."""
20FutureReturnType: TypeAlias = Future[CoroReturnType]
21"""Type variable for the return type of a :py:class:`~concurrent.futures.Future`."""
23_LOG = logging.getLogger(__name__)
26class EventLoopContext:
27 """
28 EventLoopContext encapsulates a background thread for :external:py:mod:`asyncio`
29 event loop processing as an aid for context managers.
31 There is generally only expected to be one of these, either as a base class instance
32 if it's specific to that functionality or for the full mlos_bench process to support
33 parallel trial runners, for instance.
35 It's :py:meth:`.enter` and :py:meth:`.exit` routines are expected to be called
36 from the caller's context manager routines (e.g., __enter__ and __exit__).
37 """
39 def __init__(self) -> None:
40 self._event_loop: AbstractEventLoop | None = None
41 self._event_loop_thread: Thread | None = None
42 self._event_loop_thread_lock = ThreadLock()
43 self._event_loop_thread_refcnt: int = 0
45 def _run_event_loop(self) -> None:
46 """Runs the asyncio event loop in a background thread."""
47 assert self._event_loop is not None
48 asyncio.set_event_loop(self._event_loop)
49 self._event_loop.run_forever()
51 def enter(self) -> None:
52 """Manages starting the background thread for event loop processing."""
53 # Start the background thread if it's not already running.
54 with self._event_loop_thread_lock:
55 if not self._event_loop_thread:
56 assert self._event_loop_thread_refcnt == 0
57 if self._event_loop is None:
58 if sys.platform == "win32":
59 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
60 self._event_loop = asyncio.new_event_loop()
61 assert not self._event_loop.is_running()
62 self._event_loop_thread = Thread(target=self._run_event_loop, daemon=True)
63 self._event_loop_thread.start()
64 self._event_loop_thread_refcnt += 1
66 def exit(self) -> None:
67 """Manages cleaning up the background thread for event loop processing."""
68 with self._event_loop_thread_lock:
69 self._event_loop_thread_refcnt -= 1
70 assert self._event_loop_thread_refcnt >= 0
71 if self._event_loop_thread_refcnt == 0:
72 assert self._event_loop is not None
73 self._event_loop.call_soon_threadsafe(self._event_loop.stop)
74 _LOG.info("Waiting for event loop thread to stop...")
75 assert self._event_loop_thread is not None
76 self._event_loop_thread.join(timeout=3)
77 if self._event_loop_thread.is_alive():
78 raise RuntimeError("Failed to stop event loop thread.")
79 self._event_loop_thread = None
81 def run_coroutine(self, coro: Coroutine[Any, Any, CoroReturnType]) -> FutureReturnType:
82 """
83 Runs the given coroutine in the background event loop thread and returns a
84 Future that can be used to wait for the result.
86 Parameters
87 ----------
88 coro : Coroutine[Any, Any, CoroReturnType]
89 The coroutine to run.
91 Returns
92 -------
93 concurrent.futures.Future[CoroReturnType]
94 A future that will be completed when the coroutine completes.
95 """
96 assert self._event_loop_thread_refcnt > 0
97 assert self._event_loop is not None
98 return asyncio.run_coroutine_threadsafe(coro, self._event_loop)