Coverage for mlos_bench/mlos_bench/services/local/local_exec.py: 91%
78 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 01:50 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 01:50 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Helper functions to run scripts and commands locally on the scheduler side."""
7import errno
8import logging
9import os
10import shlex
11import subprocess
12import sys
13from collections.abc import Callable, Iterable, Mapping
14from string import Template
15from typing import TYPE_CHECKING, Any
17from mlos_bench.os_environ import environ
18from mlos_bench.services.base_service import Service
19from mlos_bench.services.local.temp_dir_context import TempDirContextService
20from mlos_bench.services.types.local_exec_type import SupportsLocalExec
22if TYPE_CHECKING:
23 from mlos_bench.tunables.tunable import TunableValue
25_LOG = logging.getLogger(__name__)
28def split_cmdline(cmdline: str) -> Iterable[list[str]]:
29 """
30 A single command line may contain multiple commands separated by special characters
31 (e.g., &&, ||, etc.) so further split the commandline into an array of subcommand
32 arrays.
34 Parameters
35 ----------
36 cmdline: str
37 The commandline to split.
39 Yields
40 ------
41 Iterable[list[str]]
42 A list of subcommands or separators, each one a list of tokens.
43 Can be rejoined as a flattened array.
44 """
45 cmdline_tokens = shlex.shlex(cmdline, posix=True, punctuation_chars=True)
46 cmdline_tokens.whitespace_split = True
47 subcmd = []
48 for token in cmdline_tokens:
49 if token[0] not in cmdline_tokens.punctuation_chars:
50 subcmd.append(token)
51 else:
52 # Separator encountered. Yield any non-empty previous subcmd we accumulated.
53 if subcmd:
54 yield subcmd
55 # Also return the separators.
56 yield [token]
57 subcmd = []
58 # Return the trailing subcommand.
59 if subcmd:
60 yield subcmd
63class LocalExecService(TempDirContextService, SupportsLocalExec):
64 """
65 Collection of methods to run scripts and commands in an external process on the node
66 acting as the scheduler.
68 Can be useful for data processing due to reduced dependency management complications
69 vs the target environment.
70 """
72 def __init__(
73 self,
74 config: dict[str, Any] | None = None,
75 global_config: dict[str, Any] | None = None,
76 parent: Service | None = None,
77 methods: dict[str, Callable] | list[Callable] | None = None,
78 ):
79 """
80 Create a new instance of a service to run scripts locally.
82 Parameters
83 ----------
84 config : dict
85 Free-format dictionary that contains parameters for the service.
86 (E.g., root path for config files, etc.)
87 global_config : dict
88 Free-format dictionary of global parameters.
89 parent : Service
90 An optional parent service that can provide mixin functions.
91 methods : Union[dict[str, Callable], list[Callable], None]
92 New methods to register with the service.
93 """
94 super().__init__(
95 config,
96 global_config,
97 parent,
98 self.merge_methods(methods, [self.local_exec]),
99 )
100 self.abort_on_error = self.config.get("abort_on_error", True)
102 def local_exec(
103 self,
104 script_lines: Iterable[str],
105 env: Mapping[str, "TunableValue"] | None = None,
106 cwd: str | None = None,
107 ) -> tuple[int, str, str]:
108 """
109 Execute the script lines from `script_lines` in a local process.
111 Parameters
112 ----------
113 script_lines : Iterable[str]
114 Lines of the script to run locally.
115 Treat every line as a separate command to run.
116 env : Mapping[str, Union[int, float, str]]
117 Environment variables (optional).
118 cwd : str
119 Work directory to run the script at.
120 If omitted, use `temp_dir` or create a temporary dir.
122 Returns
123 -------
124 (return_code, stdout, stderr) : (int, str, str)
125 A 3-tuple of return code, stdout, and stderr of the script process.
126 """
127 (return_code, stdout_list, stderr_list) = (0, [], [])
128 with self.temp_dir_context(cwd) as temp_dir:
130 _LOG.debug("Run in directory: %s", temp_dir)
132 for line in script_lines:
133 (return_code, stdout, stderr) = self._local_exec_script(line, env, temp_dir)
134 stdout_list.append(stdout)
135 stderr_list.append(stderr)
136 if return_code != 0 and self.abort_on_error:
137 break
139 stdout = "".join(stdout_list)
140 stderr = "".join(stderr_list)
142 _LOG.debug("Run: stdout:\n%s", stdout)
143 _LOG.debug("Run: stderr:\n%s", stderr)
145 return (return_code, stdout, stderr)
147 def _resolve_cmdline_script_path(self, subcmd_tokens: list[str]) -> list[str]:
148 """
149 Resolves local script path (first token) in the (sub)command line tokens to its
150 full path.
152 Parameters
153 ----------
154 subcmd_tokens : list[str]
155 The previously split tokens of the subcmd.
157 Returns
158 -------
159 list[str]
160 A modified sub command line with the script paths resolved.
161 """
162 script_path = self.config_loader_service.resolve_path(subcmd_tokens[0])
163 # Special case check for lone `.` which means both `source` and
164 # "current directory" (which isn't executable) in posix shells.
165 if os.path.exists(script_path) and os.path.isfile(script_path):
166 # If the script exists, use it.
167 subcmd_tokens[0] = os.path.abspath(script_path)
168 # Also check if it is a python script and prepend the currently
169 # executing python executable path to avoid requiring
170 # executable mode bits or a shebang.
171 if script_path.strip().lower().endswith(".py"):
172 subcmd_tokens.insert(0, sys.executable)
173 return subcmd_tokens
175 def _local_exec_script(
176 self,
177 script_line: str,
178 env_params: Mapping[str, "TunableValue"] | None,
179 cwd: str,
180 ) -> tuple[int, str, str]:
181 """
182 Execute the script from `script_path` in a local process.
184 Parameters
185 ----------
186 script_line : str
187 Line of the script to run in the local process.
188 env_params : Mapping[str, Union[int, float, str]]
189 Environment variables.
190 cwd : str
191 Work directory to run the script at.
193 Returns
194 -------
195 (return_code, stdout, stderr) : (int, str, str)
196 A 3-tuple of return code, stdout, and stderr of the script process.
197 """
198 # Split the command line into set of subcmd tokens.
199 # For each subcmd, perform path resolution fixups for any scripts being executed.
200 subcmds = split_cmdline(script_line)
201 subcmds = [self._resolve_cmdline_script_path(subcmd) for subcmd in subcmds]
202 # Finally recombine all of the fixed up subcmd tokens into the original.
203 cmd = [token for subcmd in subcmds for token in subcmd]
205 env: dict[str, str] = {}
206 if env_params:
207 env = {key: str(val) for (key, val) in env_params.items()}
209 if sys.platform == "win32":
210 # A hack to run Python on Windows with env variables set:
211 env_copy = environ.copy()
212 env_copy["PYTHONPATH"] = ""
213 env_copy.update(env)
214 env = env_copy
216 try:
217 if sys.platform != "win32":
218 cmd = [" ".join(cmd)]
220 _LOG.info("Run: %s", cmd)
221 if _LOG.isEnabledFor(logging.DEBUG):
222 _LOG.debug("Expands to: %s", Template(" ".join(cmd)).safe_substitute(env))
223 _LOG.debug("Current working dir: %s", cwd)
225 proc = subprocess.run(
226 cmd,
227 env=env or None,
228 cwd=cwd,
229 shell=True,
230 text=True,
231 check=False,
232 capture_output=True,
233 )
235 _LOG.debug("Run: return code = %d", proc.returncode)
236 return (proc.returncode, proc.stdout, proc.stderr)
238 except FileNotFoundError as ex:
239 _LOG.warning("File not found: %s", cmd, exc_info=ex)
241 return (errno.ENOENT, "", "File not found")