Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%
83 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-07 01:52 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Tests for mlos_bench.event_loop_context background thread logic."""
7import asyncio
8import sys
9import time
10from asyncio import AbstractEventLoop
11from threading import Thread
12from types import TracebackType
13from typing import Optional, Type
15import pytest
16from typing_extensions import Literal
18from mlos_bench.event_loop_context import EventLoopContext
21class EventLoopContextCaller:
22 """
23 Simple class to test the EventLoopContext.
25 See Also: SshService
26 """
28 EVENT_LOOP_CONTEXT = EventLoopContext()
30 def __init__(self, instance_id: int) -> None:
31 self._id = instance_id
32 self._in_context = False
34 def __repr__(self) -> str:
35 return f"{self.__class__.__name__}(id={self._id})"
37 def __enter__(self) -> None:
38 assert not self._in_context
39 self.EVENT_LOOP_CONTEXT.enter()
40 self._in_context = True
42 def __exit__(
43 self,
44 ex_type: Optional[Type[BaseException]],
45 ex_val: Optional[BaseException],
46 ex_tb: Optional[TracebackType],
47 ) -> Literal[False]:
48 assert self._in_context
49 self.EVENT_LOOP_CONTEXT.exit()
50 self._in_context = False
51 return False
54@pytest.mark.filterwarnings(
55 "ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0"
56)
57def test_event_loop_context() -> None:
58 """Test event loop context background thread setup/cleanup handling."""
59 # pylint: disable=protected-access,too-many-statements
61 # Should start with no event loop thread.
62 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
64 # The background thread should only be created upon context entry.
65 event_loop_caller_instance_1 = EventLoopContextCaller(1)
66 assert event_loop_caller_instance_1
67 assert not event_loop_caller_instance_1._in_context
68 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None
70 event_loop: Optional[AbstractEventLoop] = None
72 # After we enter the instance context, we should have a background thread.
73 with event_loop_caller_instance_1:
74 assert event_loop_caller_instance_1._in_context
75 assert ( # type: ignore[unreachable]
76 isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread)
77 )
78 # Give the thread a chance to start.
79 # Mostly important on the underpowered Windows CI machines.
80 time.sleep(0.25)
81 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
82 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
83 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
84 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
85 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
87 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2)
88 assert event_loop_caller_instance_2
89 assert not event_loop_caller_instance_2._in_context
91 with event_loop_caller_instance_2:
92 assert event_loop_caller_instance_2._in_context
93 assert event_loop_caller_instance_1._in_context
94 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2
95 # We should only get one thread for all instances.
96 assert (
97 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread
98 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread
99 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread
100 )
101 assert (
102 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
103 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop
104 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop
105 )
107 assert not event_loop_caller_instance_2._in_context
109 # The background thread should remain running since we have another context still open.
110 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
111 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None
112 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
113 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
114 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
116 start = time.time()
117 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
118 asyncio.sleep(0.1, result="foo")
119 )
120 assert 0.0 <= time.time() - start < 0.1
121 assert future.result(timeout=0.2) == "foo"
122 assert 0.1 <= time.time() - start <= 0.2
124 # Once we exit the last context, the background thread should be stopped
125 # and unusable for running co-routines.
127 assert ( # type: ignore[unreachable] # (false positives)
128 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
129 )
130 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0
131 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None
132 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
133 # Check that the event loop has no more tasks.
134 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_ready")
135 # Windows ProactorEventLoopPolicy adds a dummy task.
136 if sys.platform == "win32" and isinstance(
137 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop
138 ):
139 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1
140 else:
141 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 0
142 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_scheduled")
143 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) == 0
145 with pytest.raises(
146 AssertionError
147 ): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"):
148 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
149 asyncio.sleep(0.1, result="foo")
150 )
151 raise ValueError(f"Future should not have been available to wait on {future.result()}")
153 # Test that when re-entering the context we have the same event loop.
154 with event_loop_caller_instance_1:
155 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
156 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
157 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop
159 # Test running again.
160 start = time.time()
161 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
162 asyncio.sleep(0.1, result="foo")
163 )
164 assert 0.0 <= time.time() - start < 0.1
165 assert future.result(timeout=0.2) == "foo"
166 assert 0.1 <= time.time() - start <= 0.2
169if __name__ == "__main__":
170 # For debugging in Windows which has issues with pytest detection in vscode.
171 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])