Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/scriptengine/cli/se.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def parse_cmd_line_args():
arg_parser.add_argument(
"--nocolor", help="do not use colored terminal output", action="store_true"
)
arg_parser.add_argument(
"--parallel-io",
help="run loop iterations concurrently using threads (for I/O-bound tasks)",
action="store_true",
)
arg_parser.add_argument("files", help="YAML file(s) to read", nargs="+")

return arg_parser.parse_args()
Expand Down Expand Up @@ -146,6 +151,7 @@ def main():
"cli": {
"cwd": os.getcwd(),
"script_path": script_path,
"parallel": parsed_args.parallel_io,
},
"tasks": {
"timing": {
Expand Down
3 changes: 2 additions & 1 deletion src/scriptengine/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ def render(arg, context, recursive=True, boolean=False):
"""

def render_with_context(string_arg):
if "{" not in string_arg:
return string_arg
Comment on lines +108 to +109

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this little change slipped in from #128 and does not really belong here, does it?

try:
# Render string in parameter environment using context
return _param_env.from_string(string_arg).render(context)
except jinja2.TemplateError as e:
raise ScriptEngineParseJinjaError(
Expand Down
48 changes: 37 additions & 11 deletions src/scriptengine/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import ast
import concurrent.futures
import copy
import logging
import uuid
Expand Down Expand Up @@ -134,21 +135,46 @@ def append(self, todo):
self._todo.extend(todo_list)
self.log_debug(f'Append: {",".join(t.shortid for t in todo_list)}')

def _run_iteration(self, context, items):
for t in self.todo:
c = t.run(Context({**context, **items}))
if c:
self.log_warning(
f"Context update from {t.shortid} discarded in parallel loop"
)

def run(self, context):
if self.when(context):
local_context = Context(copy.deepcopy(context))
context_update = Context()
for items in self.loop(local_context):
if set(items) & set(local_context):
self.log_warning(
"The following loop variables collide with the "
f"context: {set(items) & set(local_context)}"
)
for t in self.todo:
c = t.run(Context({**local_context, **items}))
if c:
local_context += c
context_update += c

parallel = (
self._loop
and context.get("se", {}).get("cli", {}).get("parallel", False)
)

if parallel:
all_items = list(self.loop(local_context))
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = [
pool.submit(self._run_iteration, local_context, items)
for items in all_items
]
for f in concurrent.futures.as_completed(futures):
f.result()
else:
for items in self.loop(local_context):
if set(items) & set(local_context):
self.log_warning(
"The following loop variables collide with the "
f"context: {set(items) & set(local_context)}"
)
for t in self.todo:
c = t.run(Context({**local_context, **items}))
if c:
local_context += c
context_update += c

return context_update or None

def _log(self, level, msg):
Expand Down
9 changes: 9 additions & 0 deletions test-se-run/bench/bench-copy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Benchmark: base.copy loop
- base.make_dir:
path: "{{workdir}}"
- base.chdir:
path: "{{workdir}}"
- base.copy:
src: "{{srcdir}}/{{item}}"
dst: .
loop: "{{files}}"
9 changes: 9 additions & 0 deletions test-se-run/bench/bench-move-setup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Setup for base.move benchmark: copy source files into workdir
- base.make_dir:
path: "{{workdir}}"
- base.chdir:
path: "{{workdir}}"
- base.copy:
src: "{{srcdir}}/{{item}}"
dst: .
loop: "{{files}}"
10 changes: 10 additions & 0 deletions test-se-run/bench/bench-move.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Benchmark: base.move loop
# Assumes files are already present in workdir (copied by bench-move-setup.yml)
- base.make_dir:
path: "{{workdir}}/dest"
- base.chdir:
path: "{{workdir}}"
- base.move:
src: "{{item}}"
dst: dest/
loop: "{{files}}"
73 changes: 73 additions & 0 deletions test-se-run/bench/run-bench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env bash
# Benchmark: se --parallel-io vs se (sequential)
# Usage: bash run-bench.sh <dir_with_nc_files> [iterations]
# Set BENCH_WORKDIR if /tmp is too small.
set -uo pipefail

SRCDIR=${1:?'Usage: run-bench.sh <dir_with_nc_files> [iterations]'}
N=${2:-5}
DIR=$(cd "$(dirname "$0")" && pwd)
W=${BENCH_WORKDIR:-${TMPDIR:-/tmp}/se-bench-$$}

command -v se >/dev/null || { echo "ERROR: se not in PATH" >&2; exit 1; }

# Pick 10 largest .nc files
mapfile -t FILES < <(find "$SRCDIR" -maxdepth 1 -name '*.nc' -size +1M -printf '%s %f\n' | sort -rn | head -10 | awk '{print $2}')
(( ${#FILES[@]} >= 5 )) || { echo "ERROR: need >=5 .nc files >1MB" >&2; exit 1; }

YAML_LIST=$(printf ", '%s'" "${FILES[@]}"); YAML_LIST="[${YAML_LIST:2}]"

mkdir -p "$W/src"
for f in "${FILES[@]}"; do [ -f "$W/src/$f" ] || cp "$SRCDIR/$f" "$W/src/"; done

cat > "$W/ctx.yml" <<EOF
- base.context:
files: $YAML_LIST
srcdir: "$W/src"
workdir: "$W/run"
EOF

echo "se --parallel-io benchmark: ${#FILES[@]} files, $N iterations"
echo ""

TIMEFORMAT='%R'

time_copy() {
rm -rf "$W/run"
{ time se --loglevel error $1 "$W/ctx.yml" "$DIR/bench-copy.yml" ; } 2>&1 | tail -1
}

time_move() {
rm -rf "$W/run"
se --loglevel error "$W/ctx.yml" "$DIR/bench-move-setup.yml" >/dev/null 2>&1
{ time se --loglevel error $1 "$W/ctx.yml" "$DIR/bench-move.yml" ; } 2>&1 | tail -1
}

median() { printf '%s\n' "$@" | sort -n | awk -v n=$# 'NR==int(n/2)+1{print}'; }

for op in copy move; do
par=() seq=()
fn="time_$op"

$fn "" >/dev/null 2>&1 || true
$fn "--parallel-io" >/dev/null 2>&1 || true

for ((i=1; i<=N; i++)); do
if (( i%2 )); then
par+=("$($fn "--parallel-io")")
seq+=("$($fn "")")
else
seq+=("$($fn "")")
par+=("$($fn "--parallel-io")")
fi
done

mp=$(median "${par[@]}"); ms=$(median "${seq[@]}")
sp=$(awk "BEGIN{printf \"%.0f\", ($ms-$mp)/$ms*100}")
printf " %-10s par=%ss seq=%ss speedup=%s%%\n" "base.$op" "$mp" "$ms" "$sp"
printf " all par: %s\n" "${par[*]}"
printf " all seq: %s\n" "${seq[*]}"
done

echo ""
rm -rf "$W"
30 changes: 30 additions & 0 deletions tests/jobs/test_looping.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,33 @@ def test_loop_over_lists(capsys):
captured = capsys.readouterr()
assert "1 - 2" in captured.out
assert "3 - 4" in captured.out


def test_parallel_loop(capsys):
j = from_yaml(
"""
base.echo:
msg: 'Hello {{item}}'
loop: [1, 2, 3]
"""
)
j.run(Context({"se": {"cli": {"parallel": True}}}))
captured = capsys.readouterr()
assert "Hello 1" in captured.out
assert "Hello 2" in captured.out
assert "Hello 3" in captured.out


def test_parallel_loop_with_context_var(capsys):
j = from_yaml(
"""
base.echo:
msg: 'File {{item}}'
loop: '{{files}}'
"""
)
j.run(Context({"files": ["a.nc", "b.nc", "c.nc"], "se": {"cli": {"parallel": True}}}))
captured = capsys.readouterr()
assert "File a.nc" in captured.out
assert "File b.nc" in captured.out
assert "File c.nc" in captured.out