@@ -89,22 +89,26 @@ export default class MapReduceMasterJob implements Job {
8989 this . mapWorkersJobAcks . set ( masterP2PConnection . slaveId , false ) ;
9090 if ( this . mapWorkers . length === this . mapWorkersToReach ) {
9191 this . status = Status . REDUCE_WORKERS_RECRUITMENT ;
92- console . log ( 'MAP WORKERS RECRUITED, EMITTING NEW RECRUITMENT REQUEST' ) ;
93- this . brokerServiceSocket . emit ( BrokerServiceChannels . RECRUITMENT_REQUEST , {
94- operationId : this . operationId ,
95- nodesToReach : this . reduceWorkersToReach ,
96- masterId : this . interconnectedNodeId ,
97- masterRole : 'NODE' ,
98- } ) ;
92+ console . log ( 'MAP WORKERS RECRUITED' ) ;
9993 const switchToReduceWorkersRecruitmentInterval = setInterval ( ( ) => {
10094 if ( this . mapWorkers . every ( ( mw ) => mw . remoteDescription !== undefined ) ) {
95+ clearInterval ( switchToReduceWorkersRecruitmentInterval ) ;
96+ console . log ( 'EMITTING NEW RECRUITMENT REQUEST FOR REDUCE WORKERS' ) ;
97+ this . brokerServiceSocket . emit (
98+ BrokerServiceChannels . RECRUITMENT_REQUEST ,
99+ {
100+ operationId : this . operationId ,
101+ nodesToReach : this . reduceWorkersToReach ,
102+ masterId : this . interconnectedNodeId ,
103+ masterRole : 'NODE' ,
104+ }
105+ ) ;
101106 while ( this . enqueuedTasks . length > 0 ) {
102107 const retrievedTask = this . enqueuedTasks . shift ( ) ;
103108 if ( retrievedTask !== undefined ) {
104109 this . executeSplit ( retrievedTask ) ;
105110 }
106111 }
107- clearInterval ( switchToReduceWorkersRecruitmentInterval ) ;
108112 }
109113 } , 100 ) ;
110114 }
0 commit comments