Skip to content

Commit df8a12a

Browse files
[Bug] Add error handling for submitting jobs using pysqa (#961)
* Add error handling for submitting jobs using pysqa * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add more tests --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 7f1aa33 commit df8a12a

2 files changed

Lines changed: 47 additions & 2 deletions

File tree

src/executorlib/task_scheduler/file/spawner_pysqa.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from subprocess import CalledProcessError
23
from typing import Optional
34

45
from pysqa import QueueAdapter
@@ -90,8 +91,14 @@ def execute_with_pysqa(
9091
)
9192
submit_kwargs.update(resource_dict)
9293
set_current_directory_in_environment()
93-
queue_id = qa.submit_job(**submit_kwargs)
94-
dump(file_name=file_name, data_dict={"queue_id": queue_id})
94+
try:
95+
queue_id = qa.submit_job(**submit_kwargs)
96+
except (ValueError, CalledProcessError) as error:
97+
dump(file_name=file_name, data_dict={"error": error})
98+
file_name_out = os.path.splitext(file_name)[0][:-2]
99+
os.rename(file_name_out + "_i.h5", file_name_out + "_o.h5")
100+
else:
101+
dump(file_name=file_name, data_dict={"queue_id": queue_id})
95102
return queue_id
96103

97104

tests/unit/executor/test_flux_cluster.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,27 @@
2525
skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
2626

2727

28+
template = """\
29+
#!/bin/bash
30+
# flux: --job-name={{job_name}}
31+
# flux: --env=CORES={{cores}}
32+
# flux: --output=time.out
33+
# flux: --error=error.out
34+
# flux: --nslots={{cores}}
35+
# flux: --queue={{queue}}
36+
{%- if run_time_max %}
37+
# flux: --time-limit={{run_time_max}}s
38+
{%- endif %}
39+
{%- if dependency_list %}
40+
{%- for jobid in dependency_list %}
41+
# flux: --dependency=afterok:{{jobid}}
42+
{%- endfor %}
43+
{%- endif %}
44+
45+
{{command}}
46+
"""
47+
48+
2849
def echo(i):
2950
sleep(1)
3051
return i
@@ -71,6 +92,23 @@ def test_executor(self):
7192
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
7293
self.assertTrue(fs1.done())
7394

95+
def test_executor_wrong_queue_name(self):
96+
with self.assertRaises(ValueError):
97+
with FluxClusterExecutor(
98+
resource_dict={
99+
"cores": 2,
100+
"cwd": "executorlib_cache",
101+
"submission_template": template,
102+
"queue": "test",
103+
},
104+
block_allocation=False,
105+
cache_directory="executorlib_cache",
106+
pmi_mode=pmi,
107+
) as exe:
108+
cloudpickle_register(ind=1)
109+
fs1 = exe.submit(mpi_funct, 1)
110+
print(fs1.result())
111+
74112
def test_executor_no_cwd(self):
75113
with FluxClusterExecutor(
76114
resource_dict={"cores": 2},

0 commit comments

Comments
 (0)