@@ -13,6 +13,8 @@ import (
1313
1414 "github.com/spf13/cobra"
1515
16+ "wsnet2/binary"
17+ "wsnet2/client"
1618 "wsnet2/pb"
1719)
1820
@@ -89,7 +91,7 @@ func runLoad(ctx context.Context, roomCount, players, watchers int, withWatchabl
8991 wg .Done ()
9092 }(roomCount )
9193 }
92- for i := 0 ; i < roomCount ; i ++ {
94+ for i := range roomCount {
9395 time .Sleep (5 * time .Millisecond )
9496 go func (i int ) {
9597 wid := fmt .Sprintf ("%v-%v" , pid , i )
@@ -157,13 +159,24 @@ func runLoadRoom(ctx context.Context, p, w int, group uint32, lifetime time.Dura
157159 wg .Wait ()
158160 }()
159161
160- masterId := fmt .Sprintf ("master-%v" , cidsuffix )
162+ playerIds := make ([]string , 0 , p )
163+ for n := range p {
164+ playerIds = append (playerIds , fmt .Sprintf ("player-%v-%d" , cidsuffix , n ))
165+ }
166+ propKey := "cidsuffix"
167+ propVal := binary .MarshalStr8 (cidsuffix )
168+ query := client .NewQuery ().Equal (propKey , propVal )
169+ props := binary .MarshalDict (binary.Dict {propKey : propVal })
170+ ready := make (chan struct {})
171+
172+ masterId := playerIds [0 ]
161173 logger .Debugf ("%s create %s" , logprefix , masterId )
162174 room , master , err := createRoom (context .Background (), masterId , & pb.RoomOption {
163175 Visible : true ,
164176 Joinable : true ,
165177 Watchable : true ,
166178 SearchGroup : group ,
179+ PublicProps : props ,
167180 })
168181 if err != nil {
169182 return err
@@ -172,45 +185,236 @@ func runLoadRoom(ctx context.Context, p, w int, group uint32, lifetime time.Dura
172185
173186 wg .Add (1 )
174187 go func () {
175- rttSum , rttCnt , rttMax , avg := runMaster (ctx , master , lifetime , group , logprefix )
188+ rttSum , rttCnt , rttMax , avg := runLoadMaster (ctx , master , lifetime , ready , playerIds , logprefix )
176189 logger .Infof ("%s end RTT sum=%v cnt=%v avg=%v max=%v" , logprefix , rttSum , rttCnt , avg , rttMax )
177190 wg .Done ()
178191 }()
179192
180193 time .Sleep (time .Second )
181194
182- for i := 0 ; i < p ; i ++ {
183- playerId := fmt .Sprintf ("player-%v-%v" , cidsuffix , i )
184-
185- logger .Debugf ("%s watch %s" , logprefix , playerId )
186- _ , player , err := joinRoom (context .Background (), playerId , room .Id , nil )
195+ for _ , playerId := range playerIds [1 :] {
196+ time .Sleep (5 * time .Millisecond )
197+ logger .Debugf ("%s join %s" , logprefix , playerId )
198+ _ , player , err := joinRandom (context .Background (), playerId , group , query )
187199 if err != nil {
188200 return err
189201 }
190202
191203 wg .Add (1 )
192204 go func () {
193- runPlayer (ctx , player , masterId , logprefix )
205+ runLoadPlayer (ctx , player , lifetime , ready , playerIds , logprefix )
194206 wg .Done ()
195207 }()
196208 }
197209
198- for i := 0 ; i < w ; i ++ {
210+ for i := range w {
199211 watcherId := fmt .Sprintf ("watcher-%v-%v" , cidsuffix , i )
200212
201- logger .Debugf ("%s join %s" , logprefix , watcherId )
213+ logger .Debugf ("%s watch %s" , logprefix , watcherId )
202214 _ , watcher , err := watchRoom (context .Background (), watcherId , room .Id , nil )
203215 if err != nil {
204216 return err
205217 }
206218
207219 wg .Add (1 )
208220 go func () {
209- runWatcher (ctx , watcher , logprefix )
221+ runLoadWatcher (ctx , watcher , logprefix )
210222 wg .Done ()
211223 }()
212224 }
213225
214226 wg .Wait ()
215227 return nil
216228}
229+
230+ // runLoadMaster runs master player for loadtest
231+ //
232+ // 1. 全員入ってくるまで待つ
233+ // - ready chanで通知
234+ //
235+ // 2. メッセージ送信
236+ // - size: 300±150b、5%の確率で+1kb
237+ // - freq: 15msg / 1sec
238+ // - type: broadcastとtargets(全員)を交互に
239+ //
240+ // 3. lifetime経過でLeave
241+ func runLoadMaster (ctx context.Context , conn * client.Connection , lifetime time.Duration , ready chan struct {}, pids []string , logprefix string ) (rttSum , rttCnt , rttMax int64 , rttAvg float64 ) {
242+ logger .Debugf ("%s %s start" , logprefix , conn .UserId ())
243+ sendctx , cancel := context .WithCancel (ctx )
244+ var wg sync.WaitGroup
245+ wg .Add (3 )
246+
247+ go func () {
248+ defer wg .Done ()
249+
250+ var c <- chan time.Time
251+ if lifetime > 0 {
252+ c = time .After (lifetime )
253+ }
254+ var msg string
255+ select {
256+ case <- ctx .Done ():
257+ msg = "context done"
258+ case <- c :
259+ msg = "lifetime elapsed"
260+ }
261+ cancel ()
262+ logger .Debugf ("%v master leave" , logprefix )
263+ conn .Leave (msg )
264+ }()
265+
266+ go func () {
267+ defer wg .Done ()
268+
269+ for range len (pids ) {
270+ _ , ok := waitEvent (conn , lifetime , binary .EvTypeJoined )
271+ if ! ok {
272+ return
273+ }
274+ }
275+ logger .Debugf ("%v all players joined" , logprefix )
276+ close (ready )
277+
278+ for {
279+ ev , ok := waitEvent (conn , lifetime , binary .EvTypePong )
280+ if ! ok {
281+ return
282+ }
283+ p , _ := binary .UnmarshalEvPongPayload (ev .Payload ())
284+ rtt := time .Now ().UnixMilli () - int64 (p .Timestamp )
285+ if rtt > RttThreshold {
286+ logger .Warnf ("%s master rtt=%d" , logprefix , rtt )
287+ }
288+ rttSum += rtt
289+ rttCnt ++
290+ if rttMax < rtt {
291+ rttMax = rtt
292+ }
293+ }
294+ }()
295+
296+ go func () {
297+ defer wg .Done ()
298+
299+ select {
300+ case <- sendctx .Done ():
301+ return
302+ case <- ready :
303+ }
304+
305+ logger .Debugf ("%v change room unjoinable" , logprefix )
306+ conn .Send (binary .MsgTypeRoomProp , binary .MarshalRoomPropPayload (
307+ true , false , true , LoadSearchGroup , uint32 (len (pids )), 0 , nil , nil ))
308+
309+ tick := time .NewTicker (time .Second / 10 )
310+ defer tick .Stop ()
311+
312+ broadcast := true
313+ for {
314+ select {
315+ case <- sendctx .Done ():
316+ return
317+ case <- tick .C :
318+ }
319+
320+ // 300±150、300周辺が高頻度になるように
321+ size := 150 + rand .IntN (101 ) + rand .IntN (101 ) + rand .IntN (101 )
322+ if rand .IntN (20 ) == 0 { // 5%
323+ size += 1000
324+ }
325+
326+ if broadcast {
327+ conn .Broadcast (msgBody [:size ])
328+ } else {
329+ conn .ToTargets (msgBody [:size ], pids ... )
330+ }
331+ broadcast = ! broadcast
332+ }
333+ }()
334+
335+ msg , err := conn .Wait (ctx )
336+ if err != nil {
337+ logger .Errorf ("%s %v error: %v" , logprefix , conn .UserId (), err )
338+ }
339+ logger .Debugf ("%s %v end: %v" , logprefix , conn .UserId (), msg )
340+
341+ wg .Wait ()
342+ return rttSum , rttCnt , rttMax , float64 (rttSum ) / float64 (rttCnt )
343+ }
344+
345+ // runLoadPlayer runs player for loadtest
346+ //
347+ // 1. 全員入室を待つ
348+ // - ready chanを待つ
349+ //
350+ // 2. メッセージ送信
351+ // - size: 250±150b、5%の確率で+1kb
352+ // - freq: 10msg / 1sec
353+ // - type: broadcastとtargets(全員)を交互に
354+ //
355+ // 3. 誰かが抜けたら終了
356+ // - 正常系ならMasterが最初に抜ける
357+ func runLoadPlayer (ctx context.Context , conn * client.Connection , lifetime time.Duration , ready chan struct {}, pids []string , logprefix string ) {
358+ logger .Debugf ("%s %s start" , logprefix , conn .UserId ())
359+
360+ sendctx , cancel := context .WithCancel (ctx )
361+ defer cancel ()
362+
363+ go func () {
364+ _ , ok := waitEvent (conn , lifetime , binary .EvTypeLeft )
365+ cancel ()
366+ if ! ok {
367+ return
368+ }
369+ conn .Leave ("leave" )
370+ discardEvents (conn )
371+ }()
372+
373+ go func () {
374+ select {
375+ case <- sendctx .Done ():
376+ return
377+ case <- ready :
378+ }
379+
380+ tick := time .NewTicker (time .Second / 10 )
381+ defer tick .Stop ()
382+
383+ broadcast := true
384+ for {
385+ select {
386+ case <- sendctx .Done ():
387+ return
388+ case <- tick .C :
389+ }
390+
391+ // 200±150
392+ size := 50 + rand .IntN (101 ) + rand .IntN (101 ) + rand .IntN (101 )
393+ if rand .IntN (20 ) == 0 { // 5%
394+ size += 1000
395+ }
396+
397+ logger .Debugf ("%v msgsize = %v" , logprefix , size )
398+
399+ if broadcast {
400+ conn .Broadcast (msgBody [:size ])
401+ } else {
402+ conn .ToTargets (msgBody [:size ], pids ... )
403+ }
404+ broadcast = ! broadcast
405+ }
406+ }()
407+
408+ msg , err := conn .Wait (ctx )
409+ if err != nil {
410+ logger .Errorf ("%s %v error: %v" , logprefix , conn .UserId (), err )
411+ }
412+ logger .Debugf ("%s %v end: %v" , logprefix , conn .UserId (), msg )
413+ }
414+
415+ // runLoadWatcher runs watcher for loadtest
416+ //
417+ // pattern: same as soak test
418+ func runLoadWatcher (ctx context.Context , conn * client.Connection , logprefix string ) {
419+ runSoakWatcher (ctx , conn , logprefix )
420+ }
0 commit comments