Skip to content

Commit e70025f

Browse files
committed
first proper map execution
1 parent 29508df commit e70025f

2 files changed

Lines changed: 14 additions & 5 deletions

File tree

src/interconnected_node/contribution/mapreduce/jobs/MapWorkerJob.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ export default class MapWorkerJob implements Job {
1818
console.log(
1919
'I am a MapWorker that has received the code:\n' + this.mapFunction
2020
);
21-
console.log('executing the received code');
22-
eval(this.mapFunction);
2321
return new Promise<void>((resolve) => {
2422
resolve();
2523
});
@@ -30,8 +28,12 @@ export default class MapWorkerJob implements Job {
3028
}
3129

3230
enqueueTask(task: Task): Promise<boolean> {
33-
console.log('MAP WORKER RECEIVED MAP TASK');
3431
return new Promise<boolean>((resolve) => {
32+
task.execute(
33+
{ mapFunction: this.mapFunction },
34+
() => console.log('COMPLETED MAP TASK'),
35+
() => console.log('ERROR ON MAP TASK')
36+
);
3537
resolve(true);
3638
});
3739
}
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import Task from '../../common/Task';
22

33
export default class MapReduceMapTask implements Task {
4+
private regionId: string;
5+
private splits: Object[];
6+
47
constructor(params: any) {
5-
//TODO implement
8+
this.regionId = params.regionId;
9+
this.splits = params.splits;
610
}
711

812
execute(
913
jobParams: any,
1014
onCompletionCallback: () => void,
1115
onErrorCallback: () => void
1216
): Promise<void> {
13-
throw new Error('Method not implemented.');
17+
const mapFunction = jobParams.mapFunction;
18+
this.splits.forEach((s) => eval(mapFunction)(s));
19+
onCompletionCallback();
20+
return new Promise<void>((resolve) => resolve());
1421
}
1522
}

0 commit comments

Comments
 (0)