Coverage for mlos_bench/mlos_bench/services/local/local_exec.py: 91%
77 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-14 01:58 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-14 01:58 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Helper functions to run scripts and commands locally on the scheduler side."""
7import errno
8import logging
9import os
10import shlex
11import subprocess
12import sys
13from string import Template
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Dict,
19 Iterable,
20 List,
21 Mapping,
22 Optional,
23 Tuple,
24 Union,
25)
27from mlos_bench.os_environ import environ
28from mlos_bench.services.base_service import Service
29from mlos_bench.services.local.temp_dir_context import TempDirContextService
30from mlos_bench.services.types.local_exec_type import SupportsLocalExec
32if TYPE_CHECKING:
33 from mlos_bench.tunables.tunable import TunableValue
35_LOG = logging.getLogger(__name__)
38def split_cmdline(cmdline: str) -> Iterable[List[str]]:
39 """
40 A single command line may contain multiple commands separated by special characters
41 (e.g., &&, ||, etc.) so further split the commandline into an array of subcommand
42 arrays.
44 Parameters
45 ----------
46 cmdline: str
47 The commandline to split.
49 Yields
50 ------
51 Iterable[List[str]]
52 A list of subcommands or separators, each one a list of tokens.
53 Can be rejoined as a flattened array.
54 """
55 cmdline_tokens = shlex.shlex(cmdline, posix=True, punctuation_chars=True)
56 cmdline_tokens.whitespace_split = True
57 subcmd = []
58 for token in cmdline_tokens:
59 if token[0] not in cmdline_tokens.punctuation_chars:
60 subcmd.append(token)
61 else:
62 # Separator encountered. Yield any non-empty previous subcmd we accumulated.
63 if subcmd:
64 yield subcmd
65 # Also return the separators.
66 yield [token]
67 subcmd = []
68 # Return the trailing subcommand.
69 if subcmd:
70 yield subcmd
73class LocalExecService(TempDirContextService, SupportsLocalExec):
74 """
75 Collection of methods to run scripts and commands in an external process on the node
76 acting as the scheduler.
78 Can be useful for data processing due to reduced dependency management complications
79 vs the target environment.
80 """
82 def __init__(
83 self,
84 config: Optional[Dict[str, Any]] = None,
85 global_config: Optional[Dict[str, Any]] = None,
86 parent: Optional[Service] = None,
87 methods: Union[Dict[str, Callable], List[Callable], None] = None,
88 ):
89 """
90 Create a new instance of a service to run scripts locally.
92 Parameters
93 ----------
94 config : dict
95 Free-format dictionary that contains parameters for the service.
96 (E.g., root path for config files, etc.)
97 global_config : dict
98 Free-format dictionary of global parameters.
99 parent : Service
100 An optional parent service that can provide mixin functions.
101 methods : Union[Dict[str, Callable], List[Callable], None]
102 New methods to register with the service.
103 """
104 super().__init__(
105 config,
106 global_config,
107 parent,
108 self.merge_methods(methods, [self.local_exec]),
109 )
110 self.abort_on_error = self.config.get("abort_on_error", True)
112 def local_exec(
113 self,
114 script_lines: Iterable[str],
115 env: Optional[Mapping[str, "TunableValue"]] = None,
116 cwd: Optional[str] = None,
117 ) -> Tuple[int, str, str]:
118 """
119 Execute the script lines from `script_lines` in a local process.
121 Parameters
122 ----------
123 script_lines : Iterable[str]
124 Lines of the script to run locally.
125 Treat every line as a separate command to run.
126 env : Mapping[str, Union[int, float, str]]
127 Environment variables (optional).
128 cwd : str
129 Work directory to run the script at.
130 If omitted, use `temp_dir` or create a temporary dir.
132 Returns
133 -------
134 (return_code, stdout, stderr) : (int, str, str)
135 A 3-tuple of return code, stdout, and stderr of the script process.
136 """
137 (return_code, stdout_list, stderr_list) = (0, [], [])
138 with self.temp_dir_context(cwd) as temp_dir:
140 _LOG.debug("Run in directory: %s", temp_dir)
142 for line in script_lines:
143 (return_code, stdout, stderr) = self._local_exec_script(line, env, temp_dir)
144 stdout_list.append(stdout)
145 stderr_list.append(stderr)
146 if return_code != 0 and self.abort_on_error:
147 break
149 stdout = "".join(stdout_list)
150 stderr = "".join(stderr_list)
152 _LOG.debug("Run: stdout:\n%s", stdout)
153 _LOG.debug("Run: stderr:\n%s", stderr)
155 return (return_code, stdout, stderr)
157 def _resolve_cmdline_script_path(self, subcmd_tokens: List[str]) -> List[str]:
158 """
159 Resolves local script path (first token) in the (sub)command line tokens to its
160 full path.
162 Parameters
163 ----------
164 subcmd_tokens : List[str]
165 The previously split tokens of the subcmd.
167 Returns
168 -------
169 List[str]
170 A modified sub command line with the script paths resolved.
171 """
172 script_path = self.config_loader_service.resolve_path(subcmd_tokens[0])
173 # Special case check for lone `.` which means both `source` and
174 # "current directory" (which isn't executable) in posix shells.
175 if os.path.exists(script_path) and os.path.isfile(script_path):
176 # If the script exists, use it.
177 subcmd_tokens[0] = os.path.abspath(script_path)
178 # Also check if it is a python script and prepend the currently
179 # executing python executable path to avoid requiring
180 # executable mode bits or a shebang.
181 if script_path.strip().lower().endswith(".py"):
182 subcmd_tokens.insert(0, sys.executable)
183 return subcmd_tokens
185 def _local_exec_script(
186 self,
187 script_line: str,
188 env_params: Optional[Mapping[str, "TunableValue"]],
189 cwd: str,
190 ) -> Tuple[int, str, str]:
191 """
192 Execute the script from `script_path` in a local process.
194 Parameters
195 ----------
196 script_line : str
197 Line of the script to run in the local process.
198 env_params : Mapping[str, Union[int, float, str]]
199 Environment variables.
200 cwd : str
201 Work directory to run the script at.
203 Returns
204 -------
205 (return_code, stdout, stderr) : (int, str, str)
206 A 3-tuple of return code, stdout, and stderr of the script process.
207 """
208 # Split the command line into set of subcmd tokens.
209 # For each subcmd, perform path resolution fixups for any scripts being executed.
210 subcmds = split_cmdline(script_line)
211 subcmds = [self._resolve_cmdline_script_path(subcmd) for subcmd in subcmds]
212 # Finally recombine all of the fixed up subcmd tokens into the original.
213 cmd = [token for subcmd in subcmds for token in subcmd]
215 env: Dict[str, str] = {}
216 if env_params:
217 env = {key: str(val) for (key, val) in env_params.items()}
219 if sys.platform == "win32":
220 # A hack to run Python on Windows with env variables set:
221 env_copy = environ.copy()
222 env_copy["PYTHONPATH"] = ""
223 env_copy.update(env)
224 env = env_copy
226 try:
227 if sys.platform != "win32":
228 cmd = [" ".join(cmd)]
230 _LOG.info("Run: %s", cmd)
231 if _LOG.isEnabledFor(logging.DEBUG):
232 _LOG.debug("Expands to: %s", Template(" ".join(cmd)).safe_substitute(env))
233 _LOG.debug("Current working dir: %s", cwd)
235 proc = subprocess.run(
236 cmd,
237 env=env or None,
238 cwd=cwd,
239 shell=True,
240 text=True,
241 check=False,
242 capture_output=True,
243 )
245 _LOG.debug("Run: return code = %d", proc.returncode)
246 return (proc.returncode, proc.stdout, proc.stderr)
248 except FileNotFoundError as ex:
249 _LOG.warning("File not found: %s", cmd, exc_info=ex)
251 return (errno.ENOENT, "", "File not found")