@@ -89,18 +89,15 @@ export default class MapReduceMasterJob implements Job {
8989 this . mapWorkersJobAcks . set ( masterP2PConnection . slaveId , false ) ;
9090 if ( this . mapWorkers . length === this . mapWorkersToReach ) {
9191 this . status = Status . REDUCE_WORKERS_RECRUITMENT ;
92- console . log ( 'MAP WORKERS RECRUITED' ) ;
92+ console . log ( 'MAP WORKERS RECRUITED, EMITTING NEW RECRUITMENT REQUEST' ) ;
93+ this . brokerServiceSocket . emit ( BrokerServiceChannels . RECRUITMENT_REQUEST , {
94+ operationId : this . operationId ,
95+ nodesToReach : this . reduceWorkersToReach ,
96+ masterId : this . interconnectedNodeId ,
97+ masterRole : 'NODE' ,
98+ } ) ;
9399 const switchToReduceWorkersRecruitmentInterval = setInterval ( ( ) => {
94100 if ( this . mapWorkers . every ( ( mw ) => mw . remoteDescription !== undefined ) ) {
95- this . brokerServiceSocket . emit (
96- BrokerServiceChannels . RECRUITMENT_REQUEST ,
97- {
98- operationId : this . operationId ,
99- nodesToReach : this . reduceWorkersToReach ,
100- masterId : this . interconnectedNodeId ,
101- masterRole : 'NODE' ,
102- }
103- ) ;
104101 while ( this . enqueuedTasks . length > 0 ) {
105102 const retrievedTask = this . enqueuedTasks . shift ( ) ;
106103 if ( retrievedTask !== undefined ) {
@@ -231,6 +228,7 @@ export default class MapReduceMasterJob implements Job {
231228 const rt = this . reduceTasksToSendQueue . shift ( ) ;
232229 if ( rt !== undefined ) {
233230 const regionId = rt . regionId ;
231+ console . log ( 'SENDING REDUCE TASK FOR REGION ' + regionId ) ;
234232 const intermediateResult = rt . intermediateResult ;
235233 const rw = this . reduceWorkers [ this . currentUsedReduceWorkerIndex ] ;
236234 if ( ++ this . currentUsedReduceWorkerIndex === this . reduceWorkers . length ) {
@@ -280,6 +278,7 @@ export default class MapReduceMasterJob implements Job {
280278 parsedMsg . payload . name === 'MAP_WORKER' &&
281279 parsedMsg . payload . result === 'ACK'
282280 ) {
281+ console . log ( 'RECEIVED MAP WORKER JOB ACK' ) ;
283282 this . mapWorkersJobAcks . set ( masterP2PConnection . slaveId , true ) ;
284283 if (
285284 this . mapWorkersJobAcks . size === this . mapWorkersToReach &&
@@ -288,12 +287,14 @@ export default class MapReduceMasterJob implements Job {
288287 } )
289288 ) {
290289 this . mapWorkersJobGuard = true ;
290+ console . log ( 'MAP WORKERS JOB GUARD DISABLED' ) ;
291291 }
292292 } else if (
293293 parsedMsg . channel === 'START_JOB' &&
294294 parsedMsg . payload . name === 'REDUCE_WORKER' &&
295295 parsedMsg . payload . result === 'ACK'
296296 ) {
297+ console . log ( 'RECEIVED REDUCE WORKER JOB ACK' ) ;
297298 this . reduceWorkersJobAcks . set ( masterP2PConnection . slaveId , true ) ;
298299 if (
299300 this . reduceWorkersJobAcks . size === this . reduceWorkersToReach &&
@@ -302,8 +303,8 @@ export default class MapReduceMasterJob implements Job {
302303 } )
303304 ) {
304305 this . reduceWorkersJobGuard = true ;
305- this . shiftReduceTaskToSendFromQueue ( ) ;
306306 console . log ( 'REDUCE WORKERS JOB GUARD DISABLED' ) ;
307+ this . shiftReduceTaskToSendFromQueue ( ) ;
307308 }
308309 } else if (
309310 parsedMsg . channel === 'TASK_COMPLETED' &&
@@ -327,13 +328,17 @@ export default class MapReduceMasterJob implements Job {
327328 updatedIntermediateResults . length ===
328329 parsedMsg . payload . params . splitsTotal
329330 ) {
331+ console . log ( 'COMPLETED MAPPING REGION ' + regionId ) ;
330332 this . intermediateResults . delete ( regionId ) ;
331333 this . sendReduceTask ( regionId , updatedIntermediateResults ) ;
332334 }
333335 } else if (
334336 parsedMsg . channel === 'TASK_COMPLETED' &&
335337 parsedMsg . payload . name === 'MAPREDUCE_REDUCE'
336338 ) {
339+ console . log (
340+ 'RECEIVED REDUCE RESULT FOR REGION ' + parsedMsg . payload . params . regionId
341+ ) ;
337342 this . slaveP2PConnection . sendMessage (
338343 JSON . stringify ( {
339344 channel : 'TASK_COMPLETED' ,
0 commit comments