Skip to content

Commit 020e070

Browse files
committed
implemented an actual queue system in MapWorkerJob and ReduceWorkerJob
1 parent 465c865 commit 020e070

2 files changed

Lines changed: 66 additions & 16 deletions

File tree

src/interconnected_node/contribution/mapreduce/jobs/MapWorkerJob.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import Task from '../../common/Task';
55

66
export default class MapWorkerJob implements Job {
77
private mapFunction: string;
8+
private isExecutingTask: boolean;
9+
private tasks: Array<Task>;
810

911
constructor(params: any, private slaveP2PConnection: SlaveP2PConnection) {
1012
this.mapFunction = params.mapFunction;
13+
this.isExecutingTask = false;
14+
this.tasks = new Array<Task>();
1115
}
1216

1317
get operationId(): string {
@@ -27,16 +31,37 @@ export default class MapWorkerJob implements Job {
2731
});
2832
}
2933

34+
private taskExecution(task: Task): Promise<void> {
35+
return new Promise<void>((resolve) => {
36+
this.isExecutingTask = true;
37+
task
38+
.execute(
39+
{
40+
mapFunction: this.mapFunction,
41+
slaveP2PConnection: this.slaveP2PConnection,
42+
},
43+
() => console.log('COMPLETED MAP TASK'),
44+
() => console.log('ERROR ON MAP TASK')
45+
)
46+
.then(() => {
47+
const nextTask = this.tasks.shift();
48+
if (nextTask !== undefined) {
49+
this.taskExecution(nextTask);
50+
} else {
51+
this.isExecutingTask = false;
52+
}
53+
resolve();
54+
});
55+
});
56+
}
57+
3058
enqueueTask(task: Task): Promise<boolean> {
3159
return new Promise<boolean>((resolve) => {
32-
task.execute(
33-
{
34-
mapFunction: this.mapFunction,
35-
slaveP2PConnection: this.slaveP2PConnection,
36-
},
37-
() => console.log('COMPLETED MAP TASK'),
38-
() => console.log('ERROR ON MAP TASK')
39-
);
60+
if (this.isExecutingTask) {
61+
this.tasks.push(task);
62+
} else {
63+
this.taskExecution(task);
64+
}
4065
resolve(true);
4166
});
4267
}

src/interconnected_node/contribution/mapreduce/jobs/ReduceWorkerJob.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import Task from '../../common/Task';
55

66
export default class ReduceWorkerJob implements Job {
77
private reduceFunction: string;
8+
private isExecutingTask: boolean;
9+
private tasks: Array<Task>;
810

911
constructor(params: any, private slaveP2PConnection: SlaveP2PConnection) {
1012
this.reduceFunction = params.reduceFunction;
13+
this.isExecutingTask = false;
14+
this.tasks = new Array<Task>();
1115
}
1216

1317
get operationId(): string {
@@ -27,16 +31,37 @@ export default class ReduceWorkerJob implements Job {
2731
});
2832
}
2933

34+
private taskExecution(task: Task): Promise<void> {
35+
return new Promise<void>((resolve) => {
36+
this.isExecutingTask = true;
37+
task
38+
.execute(
39+
{
40+
reduceFunction: this.reduceFunction,
41+
slaveP2PConnection: this.slaveP2PConnection,
42+
},
43+
() => console.log('COMPLETED REDUCE TASK'),
44+
() => console.log('ERROR ON REDUCE TASK')
45+
)
46+
.then(() => {
47+
const nextTask = this.tasks.shift();
48+
if (nextTask !== undefined) {
49+
this.taskExecution(nextTask);
50+
} else {
51+
this.isExecutingTask = false;
52+
}
53+
resolve();
54+
});
55+
});
56+
}
57+
3058
enqueueTask(task: Task): Promise<boolean> {
3159
return new Promise<boolean>((resolve) => {
32-
task.execute(
33-
{
34-
reduceFunction: this.reduceFunction,
35-
slaveP2PConnection: this.slaveP2PConnection,
36-
},
37-
() => console.log('COMPLETED REDUCE TASK'),
38-
() => console.log('ERROR ON REDUCE TASK')
39-
);
60+
if (this.isExecutingTask) {
61+
this.tasks.push(task);
62+
} else {
63+
this.taskExecution(task);
64+
}
4065
resolve(true);
4166
});
4267
}

0 commit comments

Comments
 (0)