Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b9d87fc
Replace cpu-benchmark with similar stress-ng monte-carlo test
quantumsteve Feb 9, 2026
d3778c2
Add dependency psutil
quantumsteve Feb 10, 2026
827c189
terminate io process
quantumsteve Feb 10, 2026
e41861c
check for division by zero
quantumsteve Feb 10, 2026
901e87b
mute stress-ng
quantumsteve Feb 10, 2026
8c560de
Added a --quiet flag to another stress-ng invocation
henricasanova Feb 11, 2026
ffac645
Fixed the zombie problem
henricasanova Feb 11, 2026
e802679
Since memsize argument document says MB (and not MiB), I changed
henricasanova Feb 12, 2026
45124d9
typo-- !!
henricasanova Feb 12, 2026
c92654e
try to workaround missing cpu_queue
quantumsteve Feb 12, 2026
510ca58
typos
quantumsteve Feb 13, 2026
8bb70bc
Rewrite/Re-engineering of wfbench so that the execution proceeds in
henricasanova Feb 20, 2026
ebe24ab
Made it to that even if wfbench is ^C-ed, it doesn't leave runaway
henricasanova Feb 20, 2026
85079cb
Minor fix
henricasanova Feb 20, 2026
8f46f0e
check container output
quantumsteve Mar 2, 2026
61fe80c
bug-- in bin/wfbench
henricasanova Mar 18, 2026
4dee4fd
bug-- in wfbench
henricasanova Mar 18, 2026
2feb997
Merge branch 'stress-ng_cpu_benchmark' of github.com:wfcommons/WfComm…
henricasanova Mar 18, 2026
50e42f9
Updated the create_benchmark() method to allow specifying the number of
henricasanova Mar 18, 2026
4da3f66
Merge branch 'main' into stress-ng_cpu_benchmark
henricasanova Mar 19, 2026
54c212e
Insane race-condition bug fix if wfbench.py (having to deal with killing
henricasanova Mar 19, 2026
68fb5db
cleanup
quantumsteve Mar 20, 2026
8bd49b5
cleanup
quantumsteve Mar 20, 2026
8639ee7
commented out code
quantumsteve Mar 20, 2026
90a6a49
Updagted wfbench to make it callable as a module
henricasanova Mar 21, 2026
9a77ef8
Made the Swift/T translator create a README file with instructions
henricasanova Mar 21, 2026
f627804
Modified swift-t translator fork-exec wfbench (which is known to be
henricasanova Mar 21, 2026
03b058a
Made Swift/T translator use python_exec()
henricasanova Mar 21, 2026
50bc158
test re-enabling
henricasanova Mar 21, 2026
22efdf1
Merge branch 'stress-ng_cpu_benchmark' into stress-ng_cpu_benchmark-w…
henricasanova Mar 21, 2026
32963ae
test updates
henricasanova Mar 21, 2026
f08f54e
Removed all traces of cpu-benchmark.cpp
henricasanova Mar 22, 2026
3aa7021
added a sleep to let redis server time to start in the swift/t container
henricasanova Mar 23, 2026
0a99665
small test fix/cleanup
henricasanova Mar 23, 2026
225b689
cleanup
quantumsteve Mar 23, 2026
a486c92
Update bin/wfbench
henricasanova Mar 27, 2026
6d58507
set type to integer
quantumsteve Apr 6, 2026
1325d1b
add hipified code
quantumsteve Apr 13, 2026
b633026
check CUDA/HIP return values
quantumsteve Apr 13, 2026
2b47fc3
missed file
quantumsteve Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 48 additions & 27 deletions bin/wfbench
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021-2025 The WfCommons Team.
Expand All @@ -20,13 +20,17 @@ import re
import json
import logging
import pandas as pd
import psutil

from io import StringIO
from filelock import FileLock
from pathos.helpers import mp as multiprocessing
from typing import List, Optional


int32_max = 2147483647


# Configure logging
logging.basicConfig(
level=logging.INFO, # Change this to control the verbosity
Expand All @@ -39,6 +43,20 @@ logging.basicConfig(
this_dir = pathlib.Path(__file__).resolve().parent


def kill_process_and_children(proc):
if proc is None:
return
try:
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
parent.kill()

except psutil.NoSuchProcess:
pass # Process is already dead


def log_info(msg: str):
"""
Log an info message to stderr
Expand Down Expand Up @@ -165,34 +183,39 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue,
:rtype: List
"""
total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%"
cpu_work_per_thread = int(cpu_work / cpu_threads)

cpu_procs = []
mem_procs = []
cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"]
cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2
cpu_samples = min(cpu_work_per_thread, int32_max)
cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max
if cpu_ops > int32_max:
log_info("Exceeded maximum allowed value of cpu work.")
cpu_ops = int32_max

cpu_proc = None
mem_proc = None

cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}",
"--monte-carlo-method", "pi",
"--monte-carlo-rand", "lcg",
"--monte-carlo-samples", f"{cpu_samples}",
"--monte-carlo-ops", f"{cpu_ops}",
"--quiet"]
mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
"--vm-bytes", f"{total_mem}", "--vm-keep"]
"--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"]

for i in range(cpu_threads):
cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if cpu_threads > 0:
cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid)

# NOTE: might be a good idea to use psutil to set the affinity (works across platforms)
if core:
os.sched_setaffinity(cpu_proc.pid, {core})
cpu_procs.append(cpu_proc)

# Start a thread to monitor the progress of each CPU benchmark process
monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue))
monitor_thread.start()

if mem_threads > 0:
# NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
if core:
os.sched_setaffinity(mem_proc.pid, {core})
mem_procs.append(mem_proc)

return cpu_procs, mem_procs
return [cpu_proc, mem_proc]


def io_read_benchmark_user_input_data_size(inputs,
Expand Down Expand Up @@ -446,35 +469,33 @@ def main():
log_debug(f"{args.name} acquired core {core}")

mem_threads=int(10 - 10 * args.percent_cpu)
cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue,
[cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue,
cpu_threads=int(10 * args.percent_cpu),
mem_threads=mem_threads,
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
cpu_work=int32_max**2 if args.time else int(args.cpu_work),
core=core,
total_mem=mem_bytes)

procs.extend(cpu_procs)
procs.append(cpu_proc)
if args.time:
time.sleep(int(args.time))
for proc in procs:
if isinstance(proc, multiprocessing.Process):
if proc.is_alive():
proc.terminate()
elif isinstance(proc, subprocess.Popen):
proc.terminate()
kill_process_and_children(proc)
else:
for proc in procs:
if isinstance(proc, subprocess.Popen):
proc.wait()
if io_proc is not None and io_proc.is_alive():
# io_proc.terminate()
io_proc.terminate()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling terminate here is causing the io_proc to not finish.

Copy link
Copy Markdown
Contributor

@henricasanova henricasanova Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but the problem is that if we call join() instead of terminate, then it seems to hang... so the I/O process isn't the kind of process that ever terminates perhaps? I'll inspect the code tomorrow Thursday when I have a minute.

io_proc.join()

for mem_proc in mem_procs:
try:
os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails
except subprocess.TimeoutExpired:
log_debug("Memory process did not terminate; force-killing.")
try:
kill_process_and_children(mem_proc)
except subprocess.TimeoutExpired:
log_debug("Memory process did not terminate; force-killing.")
# As a fallback, use pkill if any remaining instances are stuck
subprocess.Popen(["pkill", "-f", "stress-ng"]).wait()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ dependencies = [
"networkx",
"numpy",
"pandas",
"psutil",
"python-dateutil",
"requests",
"scipy>=1.16.1",
"pyyaml",
"pandas",
"shortuuid",
"stringcase",
"filelock",
Expand Down
5 changes: 3 additions & 2 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=
working_dir=working_dir,
user="wfcommons",
tty=True,
detach=True
detach=True,
init=True # For zombies
)

# Installing WfCommons on container
Expand Down Expand Up @@ -165,4 +166,4 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow):
# sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n")
workflow2_output_bytes += output_file.size
assert (workflow1_input_bytes == workflow2_input_bytes)
assert (workflow1_output_bytes == workflow2_output_bytes)
assert (workflow1_output_bytes == workflow2_output_bytes)
Loading