-
Notifications
You must be signed in to change notification settings - Fork 968
Expand file tree
/
Copy pathtest_process_pool_recovery.py
More file actions
157 lines (123 loc) · 5.5 KB
/
test_process_pool_recovery.py
File metadata and controls
157 lines (123 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Tests for process pool crash recovery
"""
import asyncio
import os
import tempfile
import unittest
from unittest.mock import MagicMock, patch
from concurrent.futures import BrokenExecutor, Future
# Set dummy API key for testing
os.environ["OPENAI_API_KEY"] = "test"
from openevolve.config import Config
from openevolve.database import Program, ProgramDatabase
from openevolve.process_parallel import ProcessParallelController, SerializableResult
class TestProcessPoolRecovery(unittest.TestCase):
"""Tests for process pool crash recovery"""
def setUp(self):
"""Set up test environment"""
self.test_dir = tempfile.mkdtemp()
# Create test config
self.config = Config()
self.config.max_iterations = 10
self.config.evaluator.parallel_evaluations = 2
self.config.evaluator.timeout = 10
self.config.database.num_islands = 2
self.config.database.in_memory = True
self.config.checkpoint_interval = 5
# Create test evaluation file
self.eval_content = """
def evaluate(program_path):
return {"score": 0.5}
"""
self.eval_file = os.path.join(self.test_dir, "evaluator.py")
with open(self.eval_file, "w") as f:
f.write(self.eval_content)
# Create test database
self.database = ProgramDatabase(self.config.database)
# Add some test programs
for i in range(2):
program = Program(
id=f"test_{i}",
code=f"def func_{i}(): return {i}",
language="python",
metrics={"score": 0.5},
iteration_found=0,
)
self.database.add(program)
def tearDown(self):
"""Clean up test environment"""
import shutil
shutil.rmtree(self.test_dir, ignore_errors=True)
def test_controller_has_recovery_tracking(self):
"""Test that controller initializes with recovery tracking attributes"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
self.assertEqual(controller.recovery_attempts, 0)
self.assertEqual(controller.max_recovery_attempts, 3)
def test_recover_process_pool_recreates_executor(self):
"""Test that _recover_process_pool recreates the executor"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Start the controller to create initial executor
controller.start()
self.assertIsNotNone(controller.executor)
original_executor = controller.executor
# Simulate recovery
with patch("time.sleep"):
controller._recover_process_pool()
# Verify executor was recreated
self.assertIsNotNone(controller.executor)
self.assertIsNot(controller.executor, original_executor)
# Clean up
controller.stop()
def test_broken_executor_triggers_recovery_and_resets_on_success(self):
"""Test that BrokenExecutor triggers recovery and counter resets on success"""
async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Track recovery calls
recovery_called = []
def mock_recover(failed_iterations=None):
recovery_called.append(failed_iterations)
controller._recover_process_pool = mock_recover
# First call raises BrokenExecutor, subsequent calls succeed
call_count = [0]
def mock_submit(iteration, island_id):
call_count[0] += 1
mock_future = MagicMock(spec=Future)
if call_count[0] == 1:
# First future raises BrokenExecutor when result() is called
mock_future.done.return_value = True
mock_future.result.side_effect = BrokenExecutor("Pool crashed")
else:
# Subsequent calls succeed
mock_result = SerializableResult(
child_program_dict={
"id": f"child_{call_count[0]}",
"code": "def evolved(): return 1",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"score": 0.7},
"iteration_found": iteration,
"metadata": {"island": island_id},
},
parent_id="test_0",
iteration_time=0.1,
iteration=iteration,
)
mock_future.done.return_value = True
mock_future.result.return_value = mock_result
mock_future.cancel.return_value = True
return mock_future
with patch.object(controller, "_submit_iteration", side_effect=mock_submit):
controller.start()
# Run evolution - should recover from crash and reset counter on success
await controller.run_evolution(
start_iteration=1, max_iterations=2, target_score=None
)
# Verify recovery was triggered
self.assertEqual(len(recovery_called), 1)
# Verify counter was reset after successful iteration
self.assertEqual(controller.recovery_attempts, 0)
asyncio.run(run_test())
if __name__ == "__main__":
unittest.main()