Skip to content

Commit 1070de9

Browse files
committed
Wait while the task is alive
To confirm task is alive, wait task_heartbeat in set_task.
1 parent fb94884 commit 1070de9

4 files changed

Lines changed: 42 additions & 25 deletions

File tree

lib/perfectqueue/task_metadata.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ def timeout
5757
end
5858
end
5959

60+
def last_heartbeat
61+
@attributes[:timeout] || 0
62+
end
63+
6064
def finished?
6165
status == TaskStatus::FINISHED
6266
end

lib/perfectqueue/task_monitor.rb

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ def initialize(config, child_heartbeat=nil, force_stop=nil)
2828
@child_heartbeat_interval = (@config[:child_heartbeat_interval] || 2).to_i
2929
@task_heartbeat_interval = (@config[:task_heartbeat_interval] || 2).to_i
3030
@last_child_heartbeat = Time.now.to_i
31-
@last_task_heartbeat = Time.now.to_i
3231

3332
@task = nil
3433

@@ -59,9 +58,11 @@ def set_task(task, runner)
5958
task.runner = runner
6059
@mutex.synchronize {
6160
@task = task
62-
@last_task_heartbeat = @task.timeout.to_i
63-
Thread.pass until @thread.stop? if @thread
6461
}
62+
now = Time.now.to_i
63+
while @task && @task.last_heartbeat + @task_heartbeat_interval < now
64+
sleep 1
65+
end
6566
end
6667

6768
def stop_task(immediate)
@@ -102,7 +103,6 @@ def external_task_heartbeat(task, &block)
102103
@mutex.synchronize {
103104
if task == @task
104105
ret = block.call if block
105-
@last_task_heartbeat = Time.now.to_i
106106
end
107107
ret
108108
}
@@ -116,20 +116,18 @@ def run
116116
next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval
117117

118118
if @task
119-
next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval
119+
next_task_heartbeat = @task.last_heartbeat + @task_heartbeat_interval
120120
next_time = [next_child_heartbeat, next_task_heartbeat].min
121121
else
122-
next_task_heartbeat = nil
123122
next_time = next_child_heartbeat
124123
end
125124

126125
next_wait = next_time - now
127126
@cond.wait(next_wait) if next_wait > 0
128127

129128
now = Time.now.to_i
130-
if @task && next_task_heartbeat && next_task_heartbeat <= now
129+
if @task && @task.last_heartbeat + @task_heartbeat_interval <= now
131130
task_heartbeat
132-
@last_task_heartbeat = now
133131
end
134132

135133
if next_child_heartbeat <= now
@@ -146,12 +144,12 @@ def run
146144

147145
private
148146
def task_heartbeat
149-
v = @task.heartbeat!(last_heartbeat: @last_task_heartbeat)
150-
@task.attributes[:timeout] = v
151-
v
147+
task = @task
148+
task.attributes[:timeout] = task.heartbeat!(last_heartbeat: task.last_heartbeat)
152149
rescue
153150
# finished, cancel_requested, preempted, etc.
154151
kill_task($!)
152+
@task = nil
155153
end
156154
end
157155

spec/multiprocess/child_process_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
end
7474

7575
describe '#process' do
76-
let (:task){ double('task', key: double, timeout: Time.now.to_i) }
76+
let (:task){ double('task', key: double, last_heartbeat: Time.now.to_i) }
7777
before do
7878
expect(runner_insntace).to receive(:run)
7979
end

spec/task_monitor_spec.rb

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
ret = double('ret')
2424
tm.instance_variable_set(:@task, task)
2525
expect(tm.external_task_heartbeat(task){ret}).to eq(ret)
26-
expect(tm.instance_variable_get(:@last_task_heartbeat)).to eq(epoch)
2726
end
2827
end
2928

@@ -44,7 +43,8 @@
4443
let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) }
4544
let (:err){ StandardError.new('heartbeat preempted') }
4645
let (:now){ Time.now.to_i }
47-
let (:task){ double('task', timeout: now) }
46+
let (:task){ double('task', attributes: {}, last_heartbeat: now) }
47+
let (:runner){ double('runner') }
4848
before do
4949
tm.set_task(task, double('runner'))
5050
end
@@ -66,13 +66,10 @@
6666
it 'update timeout' do
6767
tasks = client.acquire(now: now-80)
6868
task = tasks[0]
69-
expect(task.timeout.to_i).to eq(now-80+config[:alive_time])
70-
tm.set_task(task, double('runner'))
69+
expect(task.last_heartbeat).to eq(now-80+config[:alive_time])
7170
allow(Time).to receive(:now).and_return(now-50)
72-
Timeout.timeout(5) do
73-
sleep 0.5 until tm.instance_variable_get(:@last_task_heartbeat) == now-50
74-
end
75-
expect(task.timeout.to_i).to eq(now-50+config[:alive_time])
71+
tm.set_task(task, runner)
72+
expect(task.last_heartbeat).to eq(now-50+config[:alive_time])
7673
end
7774
end
7875
context 'stolen' do
@@ -94,20 +91,38 @@
9491
task2 = tasks[0]
9592
expect(task2.timeout.to_i).to eq(now-60+config[:alive_time])
9693

97-
tm.set_task(task1, double('runner'))
9894
allow(Time).to receive(:now).and_return(now-50)
95+
expect(runner).to receive(:kill)
96+
tm.set_task(task1, runner)
97+
end
98+
end
99+
context 'timeout but can acquire' do
100+
before do
101+
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
102+
client.init_database
103+
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
104+
tm.start
105+
end
106+
after do
107+
tm.stop
108+
end
109+
it 'raise error' do
110+
tasks = client.acquire(now: now-80)
111+
task1 = tasks[0]
112+
expect(task1.timeout.to_i).to eq(now-80+config[:alive_time])
113+
114+
allow(Time).to receive(:now).and_return(now-50)
115+
tm.set_task(task1, runner)
99116

100-
flag = false
101-
expect(task1.runner).to receive(:kill){flag = true}
102-
Timeout.timeout(5){ sleep 0.5 until flag }
117+
expect(task1.runner).to eq(runner)
103118
end
104119
end
105120
end
106121
end
107122

108123
describe PerfectQueue::TaskMonitorHook do
109124
let (:task) do
110-
obj = AcquiredTask.new(double(:client).as_null_object, 'key', {}, double)
125+
obj = AcquiredTask.new(double(:client).as_null_object, 'key', {timeout: Time.now.to_i}, double)
111126
tm = TaskMonitor.new(logger: double('logger').as_null_object)
112127
tm.set_task(obj, double('runner'))
113128
obj

0 commit comments

Comments
 (0)