Skip to content

Commit b5e1b73

Browse files
committed
master is able to determine when it has received all intermediate results for a mapped region
1 parent 6f4072b commit b5e1b73

1 file changed

Lines changed: 27 additions & 8 deletions

File tree

src/interconnected_node/contribution/mapreduce/jobs/MapReduceMasterJob.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export default class MapReduceMasterJob implements Job {
2424
private mapFunction: string;
2525
private reduceFunction: string;
2626
private enqueuedTasks: Array<Task>;
27+
private intermediateResults: Map<string, Object[]>;
2728

2829
constructor(
2930
params: any,
@@ -47,6 +48,7 @@ export default class MapReduceMasterJob implements Job {
4748
this.mapWorkersPerRegion = Math.ceil(
4849
this.mapWorkersToReach / this.reduceWorkersToReach
4950
);
51+
this.intermediateResults = new Map();
5052
}
5153

5254
get operationId(): string {
@@ -235,14 +237,31 @@ export default class MapReduceMasterJob implements Job {
235237
parsedMsg.channel === 'TASK_COMPLETED' &&
236238
parsedMsg.payload.name === 'MAPREDUCE_MAP'
237239
) {
238-
console.log(
239-
'received map result: ' +
240-
parsedMsg.payload.params.regionId +
241-
' ' +
242-
parsedMsg.payload.params.splitsTotal +
243-
'\n' +
244-
parsedMsg.payload.params.intermediateResults
245-
);
240+
const regionId = parsedMsg.payload.params.regionId;
241+
const mapWorkerIntermediateResults =
242+
parsedMsg.payload.params.intermediateResults;
243+
const accumulatedMWIntermediateResults =
244+
this.intermediateResults.get(regionId);
245+
let updatedIntermediateResults: Object[];
246+
if (accumulatedMWIntermediateResults === undefined) {
247+
updatedIntermediateResults = mapWorkerIntermediateResults;
248+
} else {
249+
updatedIntermediateResults = accumulatedMWIntermediateResults.concat(
250+
mapWorkerIntermediateResults
251+
);
252+
}
253+
this.intermediateResults.set(regionId, updatedIntermediateResults);
254+
if (
255+
updatedIntermediateResults.length ===
256+
parsedMsg.payload.params.splitsTotal
257+
) {
258+
console.log(
259+
'RECEIVED ALL IRs FOR REGION ' +
260+
regionId +
261+
'\n' +
262+
updatedIntermediateResults
263+
);
264+
}
246265
}
247266
return new Promise<void>((resolve) => resolve());
248267
}

0 commit comments

Comments
 (0)