@@ -21,6 +21,8 @@ export default class MapReduceMasterJob implements Job {
2121 private mapWorkersPerRegion : number ;
2222 private reduceWorkersToReach : number ;
2323 private reduceWorkers : Array < MasterP2PConnection > ;
24+ private reduceWorkersJobAcks : Map < string , boolean > ;
25+ private reduceWorkersJobGuard : boolean ;
2426 private mapFunction : string ;
2527 private reduceFunction : string ;
2628 private enqueuedTasks : Array < Task > ;
@@ -43,6 +45,8 @@ export default class MapReduceMasterJob implements Job {
4345 this . reduceWorkersToReach = params . reduceWorkers ;
4446 this . reduceWorkers = new Array < MasterP2PConnection > ( ) ;
4547 this . mapFunction = params . mapFunction ;
48+ this . reduceWorkersJobAcks = new Map ( ) ;
49+ this . reduceWorkersJobGuard = false ;
4650 this . reduceFunction = params . reduceFunction ;
4751 this . enqueuedTasks = new Array ( ) ;
4852 this . mapWorkersPerRegion = Math . ceil (
@@ -116,7 +120,8 @@ export default class MapReduceMasterJob implements Job {
116120 masterP2PConnection : MasterP2PConnection
117121 ) : Promise < void > {
118122 this . reduceWorkers . push ( masterP2PConnection ) ;
119- if ( this . mapWorkers . length === this . mapWorkersToReach ) {
123+ this . reduceWorkersJobAcks . set ( masterP2PConnection . slaveId , false ) ;
124+ if ( this . reduceWorkers . length === this . reduceWorkersToReach ) {
120125 this . status = Status . RECRUITMENT_COMPLETED ;
121126 console . log ( 'RECRUITMENT COMPLETED' ) ;
122127 }
@@ -233,6 +238,21 @@ export default class MapReduceMasterJob implements Job {
233238 ) {
234239 this . mapWorkersJobGuard = true ;
235240 }
241+ } else if (
242+ parsedMsg . channel === 'START_JOB' &&
243+ parsedMsg . payload . name === 'REDUCE_WORKER' &&
244+ parsedMsg . payload . result === 'ACK'
245+ ) {
246+ this . reduceWorkersJobAcks . set ( masterP2PConnection . slaveId , true ) ;
247+ if (
248+ this . reduceWorkersJobAcks . size === this . reduceWorkersToReach &&
249+ Array . from ( this . reduceWorkersJobAcks . values ( ) ) . every ( ( s ) => {
250+ return s === true ;
251+ } )
252+ ) {
253+ this . reduceWorkersJobGuard = true ;
254+ console . log ( 'REDUCE WORKERS JOB GUARD DISABLED' ) ;
255+ }
236256 } else if (
237257 parsedMsg . channel === 'TASK_COMPLETED' &&
238258 parsedMsg . payload . name === 'MAPREDUCE_MAP'
0 commit comments