Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%

83 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-07 01:52 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Tests for mlos_bench.event_loop_context background thread logic.""" 

6 

7import asyncio 

8import sys 

9import time 

10from asyncio import AbstractEventLoop 

11from threading import Thread 

12from types import TracebackType 

13from typing import Optional, Type 

14 

15import pytest 

16from typing_extensions import Literal 

17 

18from mlos_bench.event_loop_context import EventLoopContext 

19 

20 

21class EventLoopContextCaller: 

22 """ 

23 Simple class to test the EventLoopContext. 

24 

25 See Also: SshService 

26 """ 

27 

28 EVENT_LOOP_CONTEXT = EventLoopContext() 

29 

30 def __init__(self, instance_id: int) -> None: 

31 self._id = instance_id 

32 self._in_context = False 

33 

34 def __repr__(self) -> str: 

35 return f"{self.__class__.__name__}(id={self._id})" 

36 

37 def __enter__(self) -> None: 

38 assert not self._in_context 

39 self.EVENT_LOOP_CONTEXT.enter() 

40 self._in_context = True 

41 

42 def __exit__( 

43 self, 

44 ex_type: Optional[Type[BaseException]], 

45 ex_val: Optional[BaseException], 

46 ex_tb: Optional[TracebackType], 

47 ) -> Literal[False]: 

48 assert self._in_context 

49 self.EVENT_LOOP_CONTEXT.exit() 

50 self._in_context = False 

51 return False 

52 

53 

54@pytest.mark.filterwarnings( 

55 "ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0" 

56) 

57def test_event_loop_context() -> None: 

58 """Test event loop context background thread setup/cleanup handling.""" 

59 # pylint: disable=protected-access,too-many-statements 

60 

61 # Should start with no event loop thread. 

62 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None 

63 

64 # The background thread should only be created upon context entry. 

65 event_loop_caller_instance_1 = EventLoopContextCaller(1) 

66 assert event_loop_caller_instance_1 

67 assert not event_loop_caller_instance_1._in_context 

68 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None 

69 

70 event_loop: Optional[AbstractEventLoop] = None 

71 

72 # After we enter the instance context, we should have a background thread. 

73 with event_loop_caller_instance_1: 

74 assert event_loop_caller_instance_1._in_context 

75 assert ( # type: ignore[unreachable] 

76 isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread) 

77 ) 

78 # Give the thread a chance to start. 

79 # Mostly important on the underpowered Windows CI machines. 

80 time.sleep(0.25) 

81 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive() 

82 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1 

83 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

84 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

85 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop 

86 

87 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2) 

88 assert event_loop_caller_instance_2 

89 assert not event_loop_caller_instance_2._in_context 

90 

91 with event_loop_caller_instance_2: 

92 assert event_loop_caller_instance_2._in_context 

93 assert event_loop_caller_instance_1._in_context 

94 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2 

95 # We should only get one thread for all instances. 

96 assert ( 

97 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread 

98 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread 

99 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread 

100 ) 

101 assert ( 

102 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop 

103 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop 

104 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop 

105 ) 

106 

107 assert not event_loop_caller_instance_2._in_context 

108 

109 # The background thread should remain running since we have another context still open. 

110 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1 

111 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None 

112 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive() 

113 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

114 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

115 

116 start = time.time() 

117 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine( 

118 asyncio.sleep(0.1, result="foo") 

119 ) 

120 assert 0.0 <= time.time() - start < 0.1 

121 assert future.result(timeout=0.2) == "foo" 

122 assert 0.1 <= time.time() - start <= 0.2 

123 

124 # Once we exit the last context, the background thread should be stopped 

125 # and unusable for running co-routines. 

126 

127 assert ( # type: ignore[unreachable] # (false positives) 

128 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None 

129 ) 

130 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0 

131 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None 

132 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

133 # Check that the event loop has no more tasks. 

134 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_ready") 

135 # Windows ProactorEventLoopPolicy adds a dummy task. 

136 if sys.platform == "win32" and isinstance( 

137 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop 

138 ): 

139 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1 

140 else: 

141 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 0 

142 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_scheduled") 

143 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) == 0 

144 

145 with pytest.raises( 

146 AssertionError 

147 ): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"): 

148 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine( 

149 asyncio.sleep(0.1, result="foo") 

150 ) 

151 raise ValueError(f"Future should not have been available to wait on {future.result()}") 

152 

153 # Test that when re-entering the context we have the same event loop. 

154 with event_loop_caller_instance_1: 

155 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

156 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

157 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop 

158 

159 # Test running again. 

160 start = time.time() 

161 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine( 

162 asyncio.sleep(0.1, result="foo") 

163 ) 

164 assert 0.0 <= time.time() - start < 0.1 

165 assert future.result(timeout=0.2) == "foo" 

166 assert 0.1 <= time.time() - start <= 0.2 

167 

168 

169if __name__ == "__main__": 

170 # For debugging in Windows which has issues with pytest detection in vscode. 

171 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])