Coverage for mlos_bench/mlos_bench/services/local/local_exec.py: 91%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-21 01:50 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Helper functions to run scripts and commands locally on the scheduler side.""" 

6 

7import errno 

8import logging 

9import os 

10import shlex 

11import subprocess 

12import sys 

13from collections.abc import Callable, Iterable, Mapping 

14from string import Template 

15from typing import TYPE_CHECKING, Any 

16 

17from mlos_bench.os_environ import environ 

18from mlos_bench.services.base_service import Service 

19from mlos_bench.services.local.temp_dir_context import TempDirContextService 

20from mlos_bench.services.types.local_exec_type import SupportsLocalExec 

21 

22if TYPE_CHECKING: 

23 from mlos_bench.tunables.tunable import TunableValue 

24 

25_LOG = logging.getLogger(__name__) 

26 

27 

28def split_cmdline(cmdline: str) -> Iterable[list[str]]: 

29 """ 

30 A single command line may contain multiple commands separated by special characters 

31 (e.g., &&, ||, etc.) so further split the commandline into an array of subcommand 

32 arrays. 

33 

34 Parameters 

35 ---------- 

36 cmdline: str 

37 The commandline to split. 

38 

39 Yields 

40 ------ 

41 Iterable[list[str]] 

42 A list of subcommands or separators, each one a list of tokens. 

43 Can be rejoined as a flattened array. 

44 """ 

45 cmdline_tokens = shlex.shlex(cmdline, posix=True, punctuation_chars=True) 

46 cmdline_tokens.whitespace_split = True 

47 subcmd = [] 

48 for token in cmdline_tokens: 

49 if token[0] not in cmdline_tokens.punctuation_chars: 

50 subcmd.append(token) 

51 else: 

52 # Separator encountered. Yield any non-empty previous subcmd we accumulated. 

53 if subcmd: 

54 yield subcmd 

55 # Also return the separators. 

56 yield [token] 

57 subcmd = [] 

58 # Return the trailing subcommand. 

59 if subcmd: 

60 yield subcmd 

61 

62 

63class LocalExecService(TempDirContextService, SupportsLocalExec): 

64 """ 

65 Collection of methods to run scripts and commands in an external process on the node 

66 acting as the scheduler. 

67 

68 Can be useful for data processing due to reduced dependency management complications 

69 vs the target environment. 

70 """ 

71 

72 def __init__( 

73 self, 

74 config: dict[str, Any] | None = None, 

75 global_config: dict[str, Any] | None = None, 

76 parent: Service | None = None, 

77 methods: dict[str, Callable] | list[Callable] | None = None, 

78 ): 

79 """ 

80 Create a new instance of a service to run scripts locally. 

81 

82 Parameters 

83 ---------- 

84 config : dict 

85 Free-format dictionary that contains parameters for the service. 

86 (E.g., root path for config files, etc.) 

87 global_config : dict 

88 Free-format dictionary of global parameters. 

89 parent : Service 

90 An optional parent service that can provide mixin functions. 

91 methods : Union[dict[str, Callable], list[Callable], None] 

92 New methods to register with the service. 

93 """ 

94 super().__init__( 

95 config, 

96 global_config, 

97 parent, 

98 self.merge_methods(methods, [self.local_exec]), 

99 ) 

100 self.abort_on_error = self.config.get("abort_on_error", True) 

101 

102 def local_exec( 

103 self, 

104 script_lines: Iterable[str], 

105 env: Mapping[str, "TunableValue"] | None = None, 

106 cwd: str | None = None, 

107 ) -> tuple[int, str, str]: 

108 """ 

109 Execute the script lines from `script_lines` in a local process. 

110 

111 Parameters 

112 ---------- 

113 script_lines : Iterable[str] 

114 Lines of the script to run locally. 

115 Treat every line as a separate command to run. 

116 env : Mapping[str, Union[int, float, str]] 

117 Environment variables (optional). 

118 cwd : str 

119 Work directory to run the script at. 

120 If omitted, use `temp_dir` or create a temporary dir. 

121 

122 Returns 

123 ------- 

124 (return_code, stdout, stderr) : (int, str, str) 

125 A 3-tuple of return code, stdout, and stderr of the script process. 

126 """ 

127 (return_code, stdout_list, stderr_list) = (0, [], []) 

128 with self.temp_dir_context(cwd) as temp_dir: 

129 

130 _LOG.debug("Run in directory: %s", temp_dir) 

131 

132 for line in script_lines: 

133 (return_code, stdout, stderr) = self._local_exec_script(line, env, temp_dir) 

134 stdout_list.append(stdout) 

135 stderr_list.append(stderr) 

136 if return_code != 0 and self.abort_on_error: 

137 break 

138 

139 stdout = "".join(stdout_list) 

140 stderr = "".join(stderr_list) 

141 

142 _LOG.debug("Run: stdout:\n%s", stdout) 

143 _LOG.debug("Run: stderr:\n%s", stderr) 

144 

145 return (return_code, stdout, stderr) 

146 

147 def _resolve_cmdline_script_path(self, subcmd_tokens: list[str]) -> list[str]: 

148 """ 

149 Resolves local script path (first token) in the (sub)command line tokens to its 

150 full path. 

151 

152 Parameters 

153 ---------- 

154 subcmd_tokens : list[str] 

155 The previously split tokens of the subcmd. 

156 

157 Returns 

158 ------- 

159 list[str] 

160 A modified sub command line with the script paths resolved. 

161 """ 

162 script_path = self.config_loader_service.resolve_path(subcmd_tokens[0]) 

163 # Special case check for lone `.` which means both `source` and 

164 # "current directory" (which isn't executable) in posix shells. 

165 if os.path.exists(script_path) and os.path.isfile(script_path): 

166 # If the script exists, use it. 

167 subcmd_tokens[0] = os.path.abspath(script_path) 

168 # Also check if it is a python script and prepend the currently 

169 # executing python executable path to avoid requiring 

170 # executable mode bits or a shebang. 

171 if script_path.strip().lower().endswith(".py"): 

172 subcmd_tokens.insert(0, sys.executable) 

173 return subcmd_tokens 

174 

175 def _local_exec_script( 

176 self, 

177 script_line: str, 

178 env_params: Mapping[str, "TunableValue"] | None, 

179 cwd: str, 

180 ) -> tuple[int, str, str]: 

181 """ 

182 Execute the script from `script_path` in a local process. 

183 

184 Parameters 

185 ---------- 

186 script_line : str 

187 Line of the script to run in the local process. 

188 env_params : Mapping[str, Union[int, float, str]] 

189 Environment variables. 

190 cwd : str 

191 Work directory to run the script at. 

192 

193 Returns 

194 ------- 

195 (return_code, stdout, stderr) : (int, str, str) 

196 A 3-tuple of return code, stdout, and stderr of the script process. 

197 """ 

198 # Split the command line into set of subcmd tokens. 

199 # For each subcmd, perform path resolution fixups for any scripts being executed. 

200 subcmds = split_cmdline(script_line) 

201 subcmds = [self._resolve_cmdline_script_path(subcmd) for subcmd in subcmds] 

202 # Finally recombine all of the fixed up subcmd tokens into the original. 

203 cmd = [token for subcmd in subcmds for token in subcmd] 

204 

205 env: dict[str, str] = {} 

206 if env_params: 

207 env = {key: str(val) for (key, val) in env_params.items()} 

208 

209 if sys.platform == "win32": 

210 # A hack to run Python on Windows with env variables set: 

211 env_copy = environ.copy() 

212 env_copy["PYTHONPATH"] = "" 

213 env_copy.update(env) 

214 env = env_copy 

215 

216 try: 

217 if sys.platform != "win32": 

218 cmd = [" ".join(cmd)] 

219 

220 _LOG.info("Run: %s", cmd) 

221 if _LOG.isEnabledFor(logging.DEBUG): 

222 _LOG.debug("Expands to: %s", Template(" ".join(cmd)).safe_substitute(env)) 

223 _LOG.debug("Current working dir: %s", cwd) 

224 

225 proc = subprocess.run( 

226 cmd, 

227 env=env or None, 

228 cwd=cwd, 

229 shell=True, 

230 text=True, 

231 check=False, 

232 capture_output=True, 

233 ) 

234 

235 _LOG.debug("Run: return code = %d", proc.returncode) 

236 return (proc.returncode, proc.stdout, proc.stderr) 

237 

238 except FileNotFoundError as ex: 

239 _LOG.warning("File not found: %s", cmd, exc_info=ex) 

240 

241 return (errno.ENOENT, "", "File not found")