55import java .nio .ByteBuffer ;
66import java .nio .channels .AsynchronousCloseException ;
77import java .nio .channels .AsynchronousSocketChannel ;
8+ import java .nio .channels .ClosedChannelException ;
89import java .nio .channels .CompletionHandler ;
910import java .nio .channels .InterruptedByTimeoutException ;
10- import java .util .concurrent .CompletableFuture ;
1111import java .util .concurrent .TimeUnit ;
1212import java .util .concurrent .atomic .AtomicBoolean ;
13+ import java .util .concurrent .atomic .AtomicInteger ;
1314import java .util .function .Consumer ;
1415import javasabr .rlib .common .util .BufferUtils ;
1516import javasabr .rlib .network .BufferAllocator ;
16- import javasabr .rlib .network .Connection ;
1717import javasabr .rlib .network .Network ;
1818import javasabr .rlib .network .NetworkConfig ;
1919import javasabr .rlib .network .UnsafeConnection ;
@@ -55,6 +55,7 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) {
5555 };
5656
5757 final AtomicBoolean reading = new AtomicBoolean (false );
58+ final AtomicInteger emptyReadsCounter = new AtomicInteger (0 );
5859
5960 final C connection ;
6061 final AsynchronousSocketChannel socketChannel ;
@@ -117,8 +118,9 @@ protected void startReadImpl() {
117118 socketChannel .read (buffer , buffer , readChannelHandler );
118119 } catch (RuntimeException ex ) {
119120 log .error (ex );
120- reading .compareAndSet (true , false );
121- retryReadLater ();
121+ if (reading .compareAndSet (true , false )) {
122+ retryReadLater ();
123+ }
122124 }
123125 }
124126
@@ -392,20 +394,11 @@ protected void freeTempBigBuffers() {
392394 protected void handleReceivedData (int receivedBytes , ByteBuffer readingBuffer ) {
393395 updateActivityFunction .run ();
394396 if (receivedBytes == -1 ) {
395- log .debug (remoteAddress (), "[%s] Received empty bytes from channel" ::formatted );
396- if (connection .closed ()) {
397- reading .compareAndSet (true , false );
398- return ;
399- } else if (!socketChannel .isOpen ()) {
400- connection .close ();
401- return ;
402- }
403- if (reading .compareAndSet (false , true )) {
404- retryReadLater ();
405- }
397+ handleEmptyReadFromChannel ();
406398 } else {
407399 log .debug (remoteAddress (), receivedBytes , "[%s] Received [%s] bytes from channel" ::formatted );
408400 readingBuffer .flip ();
401+ emptyReadsCounter .set (0 );
409402 try {
410403 readPackets (readingBuffer );
411404 } catch (Exception e ) {
@@ -415,6 +408,31 @@ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) {
415408 }
416409 }
417410
411+ protected void handleEmptyReadFromChannel () {
412+ log .debug (remoteAddress (), "[%s] Received empty bytes from channel" ::formatted );
413+
414+ if (connection .closed ()) {
415+ reading .compareAndSet (true , false );
416+ return ;
417+ } else if (!socketChannel .isOpen ()) {
418+ connection .close ();
419+ return ;
420+ }
421+
422+ NetworkConfig config = connection
423+ .network ()
424+ .config ();
425+
426+ if (emptyReadsCounter .incrementAndGet () > config .maxEmptyReadsBeforeClose ()) {
427+ connection .close ();
428+ return ;
429+ }
430+
431+ if (reading .compareAndSet (true , false )) {
432+ retryReadLater ();
433+ }
434+ }
435+
418436 /**
419437 * Handles the exception during receiving data from the channel.
420438 *
@@ -424,12 +442,14 @@ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) {
424442 protected void handleFailedReceiving (Throwable exception , ByteBuffer readingBuffer ) {
425443 if (exception instanceof InterruptedByTimeoutException ) {
426444 if (reading .compareAndSet (true , false )) {
427- startRead ();
445+ retryReadLater ();
428446 }
429447 return ;
430448 }
431449 if (exception instanceof AsynchronousCloseException ) {
432450 log .info (remoteAddress (), "[%s] Connection was closed" ::formatted );
451+ } else if (exception instanceof ClosedChannelException ) {
452+ log .info (remoteAddress (), "[%s] Connection was closed" ::formatted );
433453 } else {
434454 log .error (exception );
435455 connection .close ();
0 commit comments