blob: 3947e183d82f3d1ea7cb019196619615a82e3009 [file] [log] [blame]
import os
import sys
import time
from subprocess import CalledProcessError, check_call
from typing import List, IO, Optional, Tuple
def which(command: str, paths: Optional[str] = None) -> Optional[str]:
"""which(command, [paths]) - Look up the given command in the paths string
(or the PATH environment variable, if unspecified)."""
if paths is None:
paths = os.environ.get('PATH', '')
# Check for absolute match first.
if os.path.exists(command):
return command
# Would be nice if Python had a lib function for this.
if not paths:
paths = os.defpath
# Get suffixes to search.
# On Cygwin, 'PATHEXT' may exist but it should not be used.
if os.pathsep == ';':
pathext = os.environ.get('PATHEXT', '').split(';')
else:
pathext = ['']
# Search the paths...
for path in paths.split(os.pathsep):
for ext in pathext:
p = os.path.join(path, command + ext)
if os.path.exists(p):
return p
return None
def has_no_extension(file_name: str) -> bool:
root, ext = os.path.splitext(file_name)
return ext == ""
def is_valid_single_input_file(file_name: str) -> bool:
root, ext = os.path.splitext(file_name)
return ext in (".i", ".ii", ".c", ".cpp", ".m", "")
def time_to_str(time: float) -> str:
"""
Convert given time in seconds into a human-readable string.
"""
return f"{time:.2f}s"
def memory_to_str(memory: int) -> str:
"""
Convert given number of bytes into a human-readable string.
"""
if memory:
try:
import humanize
return humanize.naturalsize(memory, gnu=True)
except ImportError:
# no formatter installed, let's keep it in bytes
return f"{memory}B"
# If memory is 0, we didn't succeed measuring it.
return "N/A"
def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]:
"""
Run command with arguments. Wait for command to complete and measure
execution time and peak memory consumption.
If the exit code was zero then return, otherwise raise
CalledProcessError. The CalledProcessError object will have the
return code in the returncode attribute.
The arguments are the same as for the call and check_call functions.
Return a tuple of execution time and peak memory.
"""
peak_mem = 0
start_time = time.time()
try:
import psutil as ps
def get_memory(process: ps.Process) -> int:
mem = 0
# we want to gather memory usage from all of the child processes
descendants = list(process.children(recursive=True))
descendants.append(process)
for subprocess in descendants:
try:
mem += subprocess.memory_info().rss
except (ps.NoSuchProcess, ps.AccessDenied):
continue
return mem
with ps.Popen(*popenargs, **kwargs) as process:
# while the process is running calculate resource utilization.
while (process.is_running() and
process.status() != ps.STATUS_ZOMBIE):
# track the peak utilization of the process
peak_mem = max(peak_mem, get_memory(process))
time.sleep(.5)
if process.is_running():
process.kill()
if process.returncode != 0:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(process.returncode, cmd)
except ImportError:
# back off to subprocess if we don't have psutil installed
peak_mem = 0
check_call(*popenargs, **kwargs)
return time.time() - start_time, peak_mem
def run_script(script_path: str, build_log_file: IO, cwd: str,
out=sys.stdout, err=sys.stderr, verbose: int = 0):
"""
Run the provided script if it exists.
"""
if os.path.exists(script_path):
try:
if verbose == 1:
out.write(f" Executing: {script_path}\n")
check_call(f"chmod +x '{script_path}'", cwd=cwd,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
check_call(f"'{script_path}'", cwd=cwd,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
except CalledProcessError:
err.write(f"Error: Running {script_path} failed. "
f"See {build_log_file.name} for details.\n")
sys.exit(-1)
def is_comment_csv_line(entries: List[str]) -> bool:
"""
Treat CSV lines starting with a '#' as a comment.
"""
return len(entries) > 0 and entries[0].startswith("#")