diff --git a/src/scriptengine/cli/se.py b/src/scriptengine/cli/se.py index e9202b1..de4dc48 100644 --- a/src/scriptengine/cli/se.py +++ b/src/scriptengine/cli/se.py @@ -65,6 +65,11 @@ def parse_cmd_line_args(): arg_parser.add_argument( "--nocolor", help="do not use colored terminal output", action="store_true" ) + arg_parser.add_argument( + "--parallel-io", + help="run loop iterations concurrently using threads (for I/O-bound tasks)", + action="store_true", + ) arg_parser.add_argument("files", help="YAML file(s) to read", nargs="+") return arg_parser.parse_args() @@ -146,6 +151,7 @@ def main(): "cli": { "cwd": os.getcwd(), "script_path": script_path, + "parallel": parsed_args.parallel_io, }, "tasks": { "timing": { diff --git a/src/scriptengine/jinja.py b/src/scriptengine/jinja.py index 3483cfa..9c90876 100644 --- a/src/scriptengine/jinja.py +++ b/src/scriptengine/jinja.py @@ -105,8 +105,9 @@ def render(arg, context, recursive=True, boolean=False): """ def render_with_context(string_arg): + if "{" not in string_arg: + return string_arg try: - # Render string in parameter environment using context return _param_env.from_string(string_arg).render(context) except jinja2.TemplateError as e: raise ScriptEngineParseJinjaError( diff --git a/src/scriptengine/jobs.py b/src/scriptengine/jobs.py index d856217..1b883da 100644 --- a/src/scriptengine/jobs.py +++ b/src/scriptengine/jobs.py @@ -6,6 +6,7 @@ """ import ast +import concurrent.futures import copy import logging import uuid @@ -134,21 +135,46 @@ def append(self, todo): self._todo.extend(todo_list) self.log_debug(f'Append: {",".join(t.shortid for t in todo_list)}') + def _run_iteration(self, context, items): + for t in self.todo: + c = t.run(Context({**context, **items})) + if c: + self.log_warning( + f"Context update from {t.shortid} discarded in parallel loop" + ) + def run(self, context): if self.when(context): local_context = Context(copy.deepcopy(context)) context_update = Context() - for items in self.loop(local_context): - if set(items) & set(local_context): - self.log_warning( - "The following loop variables collide with the " - f"context: {set(items) & set(local_context)}" - ) - for t in self.todo: - c = t.run(Context({**local_context, **items})) - if c: - local_context += c - context_update += c + + parallel = ( + self._loop + and context.get("se", {}).get("cli", {}).get("parallel", False) + ) + + if parallel: + all_items = list(self.loop(local_context)) + with concurrent.futures.ThreadPoolExecutor() as pool: + futures = [ + pool.submit(self._run_iteration, local_context, items) + for items in all_items + ] + for f in concurrent.futures.as_completed(futures): + f.result() + else: + for items in self.loop(local_context): + if set(items) & set(local_context): + self.log_warning( + "The following loop variables collide with the " + f"context: {set(items) & set(local_context)}" + ) + for t in self.todo: + c = t.run(Context({**local_context, **items})) + if c: + local_context += c + context_update += c + return context_update or None def _log(self, level, msg): diff --git a/test-se-run/bench/bench-copy.yml b/test-se-run/bench/bench-copy.yml new file mode 100644 index 0000000..ebd1db2 --- /dev/null +++ b/test-se-run/bench/bench-copy.yml @@ -0,0 +1,9 @@ +# Benchmark: base.copy loop +- base.make_dir: + path: "{{workdir}}" +- base.chdir: + path: "{{workdir}}" +- base.copy: + src: "{{srcdir}}/{{item}}" + dst: . + loop: "{{files}}" diff --git a/test-se-run/bench/bench-move-setup.yml b/test-se-run/bench/bench-move-setup.yml new file mode 100644 index 0000000..338c57a --- /dev/null +++ b/test-se-run/bench/bench-move-setup.yml @@ -0,0 +1,9 @@ +# Setup for base.move benchmark: copy source files into workdir +- base.make_dir: + path: "{{workdir}}" +- base.chdir: + path: "{{workdir}}" +- base.copy: + src: "{{srcdir}}/{{item}}" + dst: . + loop: "{{files}}" diff --git a/test-se-run/bench/bench-move.yml b/test-se-run/bench/bench-move.yml new file mode 100644 index 0000000..aee3835 --- /dev/null +++ b/test-se-run/bench/bench-move.yml @@ -0,0 +1,10 @@ +# Benchmark: base.move loop +# Assumes files are already present in workdir (copied by bench-move-setup.yml) +- base.make_dir: + path: "{{workdir}}/dest" +- base.chdir: + path: "{{workdir}}" +- base.move: + src: "{{item}}" + dst: dest/ + loop: "{{files}}" diff --git a/test-se-run/bench/run-bench.sh b/test-se-run/bench/run-bench.sh new file mode 100755 index 0000000..bf58451 --- /dev/null +++ b/test-se-run/bench/run-bench.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +# Benchmark: se --parallel-io vs se (sequential) +# Usage: bash run-bench.sh [iterations] +# Set BENCH_WORKDIR if /tmp is too small. +set -uo pipefail + +SRCDIR=${1:?'Usage: run-bench.sh [iterations]'} +N=${2:-5} +DIR=$(cd "$(dirname "$0")" && pwd) +W=${BENCH_WORKDIR:-${TMPDIR:-/tmp}/se-bench-$$} + +command -v se >/dev/null || { echo "ERROR: se not in PATH" >&2; exit 1; } + +# Pick 10 largest .nc files +mapfile -t FILES < <(find "$SRCDIR" -maxdepth 1 -name '*.nc' -size +1M -printf '%s %f\n' | sort -rn | head -10 | awk '{print $2}') +(( ${#FILES[@]} >= 5 )) || { echo "ERROR: need >=5 .nc files >1MB" >&2; exit 1; } + +YAML_LIST=$(printf ", '%s'" "${FILES[@]}"); YAML_LIST="[${YAML_LIST:2}]" + +mkdir -p "$W/src" +for f in "${FILES[@]}"; do [ -f "$W/src/$f" ] || cp "$SRCDIR/$f" "$W/src/"; done + +cat > "$W/ctx.yml" <&1 | tail -1 +} + +time_move() { + rm -rf "$W/run" + se --loglevel error "$W/ctx.yml" "$DIR/bench-move-setup.yml" >/dev/null 2>&1 + { time se --loglevel error $1 "$W/ctx.yml" "$DIR/bench-move.yml" ; } 2>&1 | tail -1 +} + +median() { printf '%s\n' "$@" | sort -n | awk -v n=$# 'NR==int(n/2)+1{print}'; } + +for op in copy move; do + par=() seq=() + fn="time_$op" + + $fn "" >/dev/null 2>&1 || true + $fn "--parallel-io" >/dev/null 2>&1 || true + + for ((i=1; i<=N; i++)); do + if (( i%2 )); then + par+=("$($fn "--parallel-io")") + seq+=("$($fn "")") + else + seq+=("$($fn "")") + par+=("$($fn "--parallel-io")") + fi + done + + mp=$(median "${par[@]}"); ms=$(median "${seq[@]}") + sp=$(awk "BEGIN{printf \"%.0f\", ($ms-$mp)/$ms*100}") + printf " %-10s par=%ss seq=%ss speedup=%s%%\n" "base.$op" "$mp" "$ms" "$sp" + printf " all par: %s\n" "${par[*]}" + printf " all seq: %s\n" "${seq[*]}" +done + +echo "" +rm -rf "$W" diff --git a/tests/jobs/test_looping.py b/tests/jobs/test_looping.py index 74dbf24..130ee6d 100644 --- a/tests/jobs/test_looping.py +++ b/tests/jobs/test_looping.py @@ -163,3 +163,33 @@ def test_loop_over_lists(capsys): captured = capsys.readouterr() assert "1 - 2" in captured.out assert "3 - 4" in captured.out + + +def test_parallel_loop(capsys): + j = from_yaml( + """ + base.echo: + msg: 'Hello {{item}}' + loop: [1, 2, 3] + """ + ) + j.run(Context({"se": {"cli": {"parallel": True}}})) + captured = capsys.readouterr() + assert "Hello 1" in captured.out + assert "Hello 2" in captured.out + assert "Hello 3" in captured.out + + +def test_parallel_loop_with_context_var(capsys): + j = from_yaml( + """ + base.echo: + msg: 'File {{item}}' + loop: '{{files}}' + """ + ) + j.run(Context({"files": ["a.nc", "b.nc", "c.nc"], "se": {"cli": {"parallel": True}}})) + captured = capsys.readouterr() + assert "File a.nc" in captured.out + assert "File b.nc" in captured.out + assert "File c.nc" in captured.out