Skip to content

Commit 39b0d57

Browse files
committed
handled reduce task sending without usign signal
1 parent 020e070 commit 39b0d57

1 file changed

Lines changed: 21 additions & 5 deletions

File tree

src/interconnected_node/contribution/mapreduce/jobs/MapReduceMasterJob.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ export default class MapReduceMasterJob implements Job {
3030
private reduceFunction: string;
3131
private enqueuedTasks: Array<Task>;
3232
private intermediateResults: Map<string, Object[]>;
33+
private reduceTasksToSendQueue: Array<any>;
34+
private isSendingReduceTasks: boolean;
3335

3436
constructor(
3537
params: any,
@@ -60,6 +62,8 @@ export default class MapReduceMasterJob implements Job {
6062
this.mapWorkersToReach / this.reduceWorkersToReach
6163
);
6264
this.intermediateResults = new Map();
65+
this.reduceTasksToSendQueue = new Array<Task>();
66+
this.isSendingReduceTasks = false;
6367
}
6468

6569
get operationId(): string {
@@ -222,10 +226,12 @@ export default class MapReduceMasterJob implements Job {
222226
}, 100);
223227
}
224228

225-
private sendReduceTask(regionId: string, intermediateResult: Object[]): void {
226-
const interval = setInterval(() => {
227-
if (this.reduceWorkersJobGuard === true) {
228-
clearInterval(interval);
229+
private shiftReduceTaskToSendFromQueue(): void {
230+
if (this.reduceWorkersJobGuard === true && !this.isSendingReduceTasks) {
231+
const rt = this.reduceTasksToSendQueue.shift();
232+
if (rt !== undefined) {
233+
const regionId = rt.regionId;
234+
const intermediateResult = rt.intermediateResult;
229235
const rw = this.reduceWorkers[this.currentUsedReduceWorkerIndex];
230236
if (++this.currentUsedReduceWorkerIndex === this.reduceWorkers.length) {
231237
this.currentUsedReduceWorkerIndex = 0;
@@ -242,8 +248,17 @@ export default class MapReduceMasterJob implements Job {
242248
},
243249
})
244250
);
251+
this.shiftReduceTaskToSendFromQueue();
245252
}
246-
}, 100);
253+
}
254+
}
255+
256+
private sendReduceTask(regionId: string, intermediateResult: Object[]): void {
257+
this.reduceTasksToSendQueue.push({
258+
regionId: regionId,
259+
intermediateResult: intermediateResult,
260+
});
261+
this.shiftReduceTaskToSendFromQueue();
247262
}
248263

249264
enqueueTask(task: Task): Promise<boolean> {
@@ -287,6 +302,7 @@ export default class MapReduceMasterJob implements Job {
287302
})
288303
) {
289304
this.reduceWorkersJobGuard = true;
305+
this.shiftReduceTaskToSendFromQueue();
290306
console.log('REDUCE WORKERS JOB GUARD DISABLED');
291307
}
292308
} else if (

0 commit comments

Comments
 (0)