#!/usr/bin/env python3 import re import sys import argparse import json import os import subprocess from collections import defaultdict from pathlib import Path from urllib.parse import quote from uuid import uuid4, UUID BASE_DIR = Path(__file__).parent.absolute() RESULTS_DIR = BASE_DIR / "results" LOGGING_DIR = BASE_DIR / "logs" REPOS_DIR = BASE_DIR / "repos" REPO_COMMANDS = ["st", "st -mard", "diff"] UPSTREAM_INTERMEDIATE_RESULTS_RE = re.compile(r"Duration of `([^`]+)`: (.+)") JANE_INTERMEDIATE_RESULTS_RE = re.compile(r"duration of (.+): (.+)") def get_repos(): if not REPOS_DIR.exists(): print("abort: no 'repos' directory found", file=sys.stderr) exit(255) yield from (REPOS_DIR / repo for repo in os.listdir(REPOS_DIR)) def get_intermediate_timings(exec_kind: str, uid: str): fd = "stdout" if exec_kind == "upstream" else "stderr" log_file = LOGGING_DIR / uid / fd with log_file.open(mode="r") as f: log_data = f.read() timings = defaultdict(list) regex = ( UPSTREAM_INTERMEDIATE_RESULTS_RE if exec_kind == "upstream" else JANE_INTERMEDIATE_RESULTS_RE ) for match in re.findall(regex, log_data): timings[match[0]].append(match[1]) return timings def post_process_results( repo_path: Path, run_uid: UUID, exec_kind: str, exec_config, cmd: str, result_path: Path, ): tmp_path = Path(f"{result_path}.tmp") with tmp_path.open(mode="r") as f: hyperfine_data = json.load(f) timings = get_intermediate_timings(exec_kind, run_uid.hex) data = { "version": 0, "run_uid": run_uid.hex, "repository": str(repo_path.absolute()), "config": exec_config, "command": cmd, "hyperfine": hyperfine_data, "intermediate_timings": timings, } print(f"Writing results to {result_path}") with result_path.open(mode="w") as f: json.dump(data, f, sort_keys=True, indent=2) tmp_path.unlink(missing_ok=True) def run_repo_command(repo_path: Path, exec_kind: str, exec_config, cmd: str): executable = exec_config["path"] subprocess_args = exec_config.get("args", "") exec_name = executable.rsplit(os.sep)[-1] filename = f"{exec_name}_{quote(cmd)}" uid = uuid4() log_dir = LOGGING_DIR / uid.hex log_dir.mkdir(parents=True, exist_ok=True) out_path = RESULTS_DIR / repo_path.name / f"{filename}.json" out_path.parent.mkdir(parents=True, exist_ok=True) out_path.touch(exist_ok=True) # Pass the env to the subcommand of hyperfine, otherwise it won't work env = exec_config.get("env", {}) env_string = " ".join(f"{k}={v}" for k, v in env.items()) full_cmd = ( f"{env_string} {executable} {cmd} {subprocess_args} " f">> {log_dir / 'stdout'} 2>> {log_dir / 'stderr'}" ) full_cmd = ( f"hyperfine --warmup 3 --show-output " f'--export-json {out_path}.tmp "{full_cmd}"' ) subprocess.run(full_cmd, check=True, shell=True, cwd=str(repo_path)) post_process_results(repo_path, uid, exec_kind, exec_config, cmd, out_path) def bench(exec_kind: str, exec_config: dict): repos_tested = 0 for repo_path in get_repos(): if not repo_path.is_dir(): continue print(f"[MAIN] Switching to repo '{repo_path}'") repos_tested += 1 for command in REPO_COMMANDS: run_repo_command(repo_path, exec_kind, exec_config, command) print(f"[MAIN] Tested {repos_tested} repositories") def main(args: argparse.Namespace): config_file = BASE_DIR / "config.json" if not config_file.exists(): print("abort: no 'config.json' found, see README for info") exit(1) for exec_kind in args.executables: print(f"[MAIN] Using {exec_kind} executable") with config_file.open(mode="r") as f: config = json.load(f) try: bench(exec_kind, config["executables"][exec_kind]) except subprocess.CalledProcessError: # errors are logged, keep benching pass if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-x", "--executable", dest="executables", action="append", required=True, choices=["upstream", "jane"], help="Type(s) of executable to run the tests with", ) main(parser.parse_args())