Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%
82 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 01:50 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 01:50 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Tests for mlos_bench.event_loop_context background thread logic."""
7import asyncio
8import sys
9import time
10from asyncio import AbstractEventLoop
11from threading import Thread
12from types import TracebackType
13from typing import Literal
15import pytest
17from mlos_bench.event_loop_context import EventLoopContext
20class EventLoopContextCaller:
21 """
22 Simple class to test the EventLoopContext.
24 See Also: SshService
25 """
27 EVENT_LOOP_CONTEXT = EventLoopContext()
29 def __init__(self, instance_id: int) -> None:
30 self._id = instance_id
31 self._in_context = False
33 def __repr__(self) -> str:
34 return f"{self.__class__.__name__}(id={self._id})"
36 def __enter__(self) -> None:
37 assert not self._in_context
38 self.EVENT_LOOP_CONTEXT.enter()
39 self._in_context = True
41 def __exit__(
42 self,
43 ex_type: type[BaseException] | None,
44 ex_val: BaseException | None,
45 ex_tb: TracebackType | None,
46 ) -> Literal[False]:
47 assert self._in_context
48 self.EVENT_LOOP_CONTEXT.exit()
49 self._in_context = False
50 return False
53@pytest.mark.filterwarnings(
54 "ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0"
55)
56def test_event_loop_context() -> None:
57 """Test event loop context background thread setup/cleanup handling."""
58 # pylint: disable=protected-access,too-many-statements
60 # Should start with no event loop thread.
61 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
63 # The background thread should only be created upon context entry.
64 event_loop_caller_instance_1 = EventLoopContextCaller(1)
65 assert event_loop_caller_instance_1
66 assert not event_loop_caller_instance_1._in_context
67 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None
69 event_loop: AbstractEventLoop | None = None
71 # After we enter the instance context, we should have a background thread.
72 with event_loop_caller_instance_1:
73 assert event_loop_caller_instance_1._in_context
74 assert ( # type: ignore[unreachable]
75 isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread)
76 )
77 # Give the thread a chance to start.
78 # Mostly important on the underpowered Windows CI machines.
79 time.sleep(0.25)
80 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
81 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
82 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
83 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
84 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
86 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2)
87 assert event_loop_caller_instance_2
88 assert not event_loop_caller_instance_2._in_context
90 with event_loop_caller_instance_2:
91 assert event_loop_caller_instance_2._in_context
92 assert event_loop_caller_instance_1._in_context
93 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2
94 # We should only get one thread for all instances.
95 assert (
96 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread
97 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread
98 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread
99 )
100 assert (
101 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
102 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop
103 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop
104 )
106 assert not event_loop_caller_instance_2._in_context
108 # The background thread should remain running since we have another context still open.
109 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
110 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None
111 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
112 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
113 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
115 start = time.time()
116 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
117 asyncio.sleep(0.1, result="foo")
118 )
119 assert 0.0 <= time.time() - start < 0.1
120 assert future.result(timeout=0.2) == "foo"
121 assert 0.1 <= time.time() - start <= 0.2
123 # Once we exit the last context, the background thread should be stopped
124 # and unusable for running co-routines.
126 assert ( # type: ignore[unreachable] # (false positives)
127 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
128 )
129 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0
130 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None
131 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
132 # Check that the event loop has no more tasks.
133 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_ready")
134 # Windows ProactorEventLoopPolicy adds a dummy task.
135 if sys.platform == "win32" and isinstance(
136 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop
137 ):
138 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1
139 else:
140 assert (
141 len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) # pyright: ignore
142 == 0
143 )
144 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_scheduled")
145 assert (
146 len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) # pyright: ignore
147 == 0
148 )
150 with pytest.raises(
151 AssertionError
152 ): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"):
153 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
154 asyncio.sleep(0.1, result="foo")
155 )
156 raise ValueError(f"Future should not have been available to wait on {future.result()}")
158 # Test that when re-entering the context we have the same event loop.
159 with event_loop_caller_instance_1:
160 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
161 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
162 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop
164 # Test running again.
165 start = time.time()
166 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
167 asyncio.sleep(0.1, result="foo")
168 )
169 assert 0.0 <= time.time() - start < 0.1
170 assert future.result(timeout=0.2) == "foo"
171 assert 0.1 <= time.time() - start <= 0.2
174if __name__ == "__main__":
175 # For debugging in Windows which has issues with pytest detection in vscode.
176 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])