Skip to content

Commit adcb175

Browse files
committed
created guard for split task
1 parent 1916d57 commit adcb175

8 files changed

Lines changed: 84 additions & 16 deletions

File tree

src/interconnected_node/broker_service_socket/handlers/applyOnRecruitmentAcceptHandler.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ export default function applyOnRecruitmentAcceptHandler(
2828
});
2929
})
3030
.setOnMessageHandler((msg: any) => {
31-
// TODO implement actual handler
32-
console.log('Slave sent this message: ' + msg);
31+
jobsRepository
32+
.get(payload.operationId)
33+
?.notifyNewMessage(masterP2PConnection, msg);
3334
})
3435
.setOnDisconnectionHandler(() => {
3536
const masterConnection = masterP2PConnectionsHub.getBySlaveId(

src/interconnected_node/contribution/common/Job.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,9 @@ export default interface Job {
1313
notifyNewMasterP2PConnection(
1414
masterP2PConnection: MasterP2PConnection
1515
): Promise<void>;
16+
17+
notifyNewMessage(
18+
masterP2PConnection: MasterP2PConnection,
19+
msg: any
20+
): Promise<void>;
1621
}

src/interconnected_node/contribution/mapreduce/jobs/MapReduceMasterJob.ts

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ export default class MapReduceMasterJob implements Job {
1616
private currentUsedMapWorkerIndex: number;
1717
private mapWorkersToReach: number;
1818
private mapWorkers: Array<MasterP2PConnection>;
19+
private mapWorkersJobAcks: Map<string, boolean>;
20+
private mapWorkersJobGuard: boolean;
1921
private mapWorkersPerRegion: number;
2022
private reduceWorkersToReach: number;
2123
private reduceWorkers: Array<MasterP2PConnection>;
@@ -35,6 +37,8 @@ export default class MapReduceMasterJob implements Job {
3537
Math.random() * this.mapWorkersToReach
3638
);
3739
this.mapWorkers = new Array<MasterP2PConnection>();
40+
this.mapWorkersJobAcks = new Map();
41+
this.mapWorkersJobGuard = false;
3842
this.reduceWorkersToReach = params.reduceWorkers;
3943
this.reduceWorkers = new Array<MasterP2PConnection>();
4044
this.mapFunction = params.mapFunction;
@@ -65,8 +69,10 @@ export default class MapReduceMasterJob implements Job {
6569
masterP2PConnection: MasterP2PConnection
6670
): Promise<void> {
6771
this.mapWorkers.push(masterP2PConnection);
72+
this.mapWorkersJobAcks.set(masterP2PConnection.slaveId, false);
6873
if (this.mapWorkers.length === this.mapWorkersToReach) {
6974
this.status = Status.REDUCE_WORKERS_RECRUITMENT;
75+
console.log('MAP WORKERS RECRUITED');
7076
const switchToReduceWorkersRecruitmentInterval = setInterval(() => {
7177
if (this.mapWorkers.every((mw) => mw.remoteDescription !== undefined)) {
7278
this.brokerServiceSocket.emit(
@@ -78,6 +84,12 @@ export default class MapReduceMasterJob implements Job {
7884
masterRole: 'NODE',
7985
}
8086
);
87+
while (this.enqueuedTasks.length > 0) {
88+
const poppedTask = this.enqueuedTasks.pop();
89+
if (poppedTask !== undefined) {
90+
this.executeSplit(poppedTask);
91+
}
92+
}
8193
clearInterval(switchToReduceWorkersRecruitmentInterval);
8294
}
8395
}, 100);
@@ -105,12 +117,6 @@ export default class MapReduceMasterJob implements Job {
105117
if (this.mapWorkers.length === this.mapWorkersToReach) {
106118
this.status = Status.RECRUITMENT_COMPLETED;
107119
console.log('RECRUITMENT COMPLETED');
108-
while (this.enqueuedTasks.length > 0) {
109-
const poppedTask = this.enqueuedTasks.pop();
110-
if (poppedTask !== undefined) {
111-
this.executeSplit(poppedTask);
112-
}
113-
}
114120
}
115121
return new Promise<void>((resolve) => {
116122
masterP2PConnection.sendMessage(
@@ -154,14 +160,20 @@ export default class MapReduceMasterJob implements Job {
154160
const mwUsed = new Array<MasterP2PConnection>();
155161
const remainingMWsFromIndex =
156162
this.mapWorkers.length - this.currentUsedMapWorkerIndex;
163+
console.log('length ' + this.mapWorkers.length.toString());
164+
console.log('index ' + this.currentUsedMapWorkerIndex.toString());
165+
console.log('remaining ' + remainingMWsFromIndex.toString());
166+
console.log('per region ' + this.mapWorkersPerRegion);
157167
if (remainingMWsFromIndex >= this.mapWorkersPerRegion) {
168+
console.log('A');
158169
mwUsed.push(
159170
...this.mapWorkers.slice(
160171
this.currentUsedMapWorkerIndex,
161172
this.mapWorkersPerRegion
162173
)
163174
);
164175
} else {
176+
console.log('B');
165177
mwUsed.push(
166178
...this.mapWorkers.slice(
167179
this.currentUsedMapWorkerIndex,
@@ -175,28 +187,58 @@ export default class MapReduceMasterJob implements Job {
175187
)
176188
);
177189
}
190+
console.log(mwUsed.length.toString());
178191
this.currentUsedMapWorkerIndex += this.mapWorkersPerRegion;
179192
if (this.currentUsedMapWorkerIndex >= this.mapWorkers.length) {
180193
this.currentUsedMapWorkerIndex =
181194
this.mapWorkersPerRegion - remainingMWsFromIndex;
182195
}
183-
task.execute(
184-
{ mapWorkers: mwUsed },
185-
() => {
186-
console.log('MASTER COMPLETED SLICE TASK');
187-
},
188-
() => {
189-
'MASTER HAD AN ERROR COMPLETING SLICE TASK';
196+
const interval = setInterval(() => {
197+
if (this.mapWorkersJobGuard === true) {
198+
clearInterval(interval);
199+
task.execute(
200+
{ mapWorkers: mwUsed },
201+
() => {
202+
console.log('MASTER COMPLETED SLICE TASK');
203+
},
204+
() => {
205+
'MASTER HAD AN ERROR COMPLETING SLICE TASK';
206+
}
207+
);
190208
}
191-
);
209+
}, 100);
192210
}
193211

194212
enqueueTask(task: Task): Promise<boolean> {
195213
if (this.status === Status.MAP_WORKERS_RECRUITMENT) {
214+
console.log('RECEIVED TASK, BUT ENQUEUED IT');
196215
this.enqueuedTasks.push(task);
197216
} else {
198217
this.executeSplit(task);
199218
}
200219
return new Promise<boolean>((resolve) => resolve(true));
201220
}
221+
222+
notifyNewMessage(
223+
masterP2PConnection: MasterP2PConnection,
224+
msg: any
225+
): Promise<void> {
226+
const parsedMsg = JSON.parse(msg);
227+
if (
228+
parsedMsg.channel === 'START_JOB' &&
229+
parsedMsg.payload.name === 'MAP_WORKER' &&
230+
parsedMsg.payload.result === 'ACK'
231+
) {
232+
this.mapWorkersJobAcks.set(masterP2PConnection.slaveId, true);
233+
if (
234+
this.mapWorkersJobAcks.size === this.mapWorkersToReach &&
235+
Array.from(this.mapWorkersJobAcks.values()).every((s) => {
236+
return s === true;
237+
})
238+
) {
239+
this.mapWorkersJobGuard = true;
240+
}
241+
}
242+
return new Promise<void>((resolve) => resolve());
243+
}
202244
}

src/interconnected_node/contribution/mapreduce/jobs/MapWorkerJob.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,11 @@ export default class MapWorkerJob implements Job {
4444
resolve();
4545
});
4646
}
47+
48+
notifyNewMessage(
49+
masterP2PConnection: MasterP2PConnection,
50+
msg: any
51+
): Promise<void> {
52+
throw new Error('Method not implemented.');
53+
}
4754
}

src/interconnected_node/contribution/mapreduce/jobs/ReduceWorkerJob.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,11 @@ export default class ReduceWorkerJob implements Job {
4141
resolve();
4242
});
4343
}
44+
45+
notifyNewMessage(
46+
masterP2PConnection: MasterP2PConnection,
47+
msg: any
48+
): Promise<void> {
49+
throw new Error('Method not implemented.');
50+
}
4451
}

src/interconnected_node/contribution/mapreduce/tasks/MapReduceRegionSplitsTask.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ export default class MapReduceRegionSplitsTask implements Task {
1818
return new Promise<void>((resolve) => {
1919
try {
2020
const mapWorkers: MasterP2PConnection[] = jobParams.mapWorkers;
21+
console.log(
22+
'SPLIT TASK map workers length' + mapWorkers.length.toString()
23+
);
2124
const roundedUpSplitsPerWorker = Math.ceil(
2225
this.splits.length / mapWorkers.length
2326
);

src/interconnected_node/p2p/message_handlers/slave/handleSlaveP2PConnectionMessage.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export default function handleSlaveP2PConnectionMessage(
1212
jobsRepository: JobsRepository
1313
) {
1414
const parsedMsg = JSON.parse(msg);
15+
console.log('channel: ' + parsedMsg.channel);
1516
switch (parsedMsg.channel) {
1617
case 'START_JOB':
1718
onSlaveStartJobMessageHandler(
@@ -28,6 +29,7 @@ export default function handleSlaveP2PConnectionMessage(
2829
slaveP2PConnection,
2930
jobsRepository
3031
);
32+
break;
3133
default:
3234
throw new Error('Unrecognized P2P message received');
3335
}

src/interconnected_node/p2p/message_handlers/slave/handlers/onSlaveStartJobMessageHandler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export default function onSlaveStartJobMessageHandler(
2020
const response = {
2121
channel: 'START_JOB',
2222
payload: {
23+
name: payload.name,
2324
result: 'ACK',
2425
},
2526
};

0 commit comments

Comments
 (0)