Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {

private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();

private volatile boolean isClosed = false;
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private final String trxEsName = "trx-msg-handler";
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
Expand All @@ -58,8 +60,14 @@ public void init() {
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
isClosed = true;
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
// Then shutdown the worker pool to finish already-submitted tasks.
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
Comment thread
0xbigapple marked this conversation as resolved.
Comment thread
0xbigapple marked this conversation as resolved.
// Discard any remaining items and release references.
smartContractQueue.clear();
Comment thread
0xbigapple marked this conversation as resolved.
queue.clear();
}

public boolean isBusy() {
Expand All @@ -68,6 +76,10 @@ public boolean isBusy() {

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed, drop message");
return;
}
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
Comment on lines +79 to 83
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processMessage returns immediately when isClosed is true, before removing the received transactions from peer.getAdvInvRequest(). If close() is called while peers are still connected, those request entries can linger and later trigger TIME_OUT disconnects in PeerStatusCheck even though the peer actually responded. Consider still doing the minimal request cleanup (remove advInvRequest entries for txs in the message) while skipping validation/submission when closed, or clearing relevant state as part of shutdown.

Suggested change
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed, drop message");
return;
}
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
if (isClosed) {
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
Item item = new Item(new TransactionMessage(trx).getMessageId(), InventoryType.TRX);
peer.getAdvInvRequest().remove(item);
}
logger.warn("TransactionsMsgHandler is closed, drop message");
return;
}

Copilot uses AI. Check for mistakes.
check(peer, transactionsMessage);
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
Expand All @@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed during processing, stop submit");
break;
}
int type = trx.getRawData().getContract(0).getType().getNumber();
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
Expand All @@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
dropSmartContractCount++;
}
} else {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
try {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
} catch (RejectedExecutionException e) {
logger.warn("Submit task to {} failed", trxEsName);
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RejectedExecutionException catch logs only a generic message and drops the exception details. Including e (or at least e.getMessage()) in the log will make shutdown/rejection issues diagnosable in production, especially since this path is explicitly handling a failure mode.

Suggested change
logger.warn("Submit task to {} failed", trxEsName);
logger.warn("Submit task to {} failed", trxEsName, e);

Copilot uses AI. Check for mistakes.
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

import lombok.Getter;
import org.joda.time.DateTime;
Expand All @@ -20,7 +23,10 @@
import org.tron.common.TestConstants;
import org.tron.common.runtime.TvmTestUtils;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.TransactionMessage;
import org.tron.core.net.message.adv.TransactionsMessage;
Expand Down Expand Up @@ -132,6 +138,159 @@ public void testProcessMessage() {
}
}

@Test
public void testProcessMessageAfterClose() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
handler.close();

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);

handler.processMessage(peer, msg);

Mockito.verify(msg, Mockito.never()).getTransactions();
Comment thread
0xbigapple marked this conversation as resolved.
Mockito.verifyNoInteractions(peer);
}

@Test
public void testRejectedExecution() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
Mockito.when(mockPool.submit(Mockito.any(Runnable.class)))
.thenThrow(new RejectedExecutionException("pool closed"));
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
poolField.setAccessible(true);
poolField.set(handler, mockPool);

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = buildTransferMessage(2);
stubAdvInvRequest(peer, msg);
// 2 transfer transactions, submit throws on the first → catch + break, only called once
handler.processMessage(peer, msg);

Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
} finally {
handler.close();
}
}

@Test
public void testCloseDuringProcessing() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
Field closedField = TransactionsMsgHandler.class.getDeclaredField("isClosed");
closedField.setAccessible(true);

ExecutorService mockPool = Mockito.mock(ExecutorService.class);
// on the first submit, flip isClosed to true so the second iteration breaks
Mockito.when(mockPool.submit(Mockito.any(Runnable.class))).thenAnswer(inv -> {
closedField.set(handler, true);
return null;
});
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
poolField.setAccessible(true);
poolField.set(handler, mockPool);

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = buildTransferMessage(2);
stubAdvInvRequest(peer, msg);
handler.processMessage(peer, msg);

Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
} finally {
handler.close();
}
}

private TransactionsMessage buildTransferMessage(int count) {
List<Protocol.Transaction> txs = new ArrayList<>();
for (int i = 0; i < count; i++) {
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10 + i)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
txs.add(Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(1_700_000_000_000L + i)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build());
}
return new TransactionsMessage(txs);
}

private void stubAdvInvRequest(PeerConnection peer, TransactionsMessage msg) {
Map<Item, Long> advInvRequest = new ConcurrentHashMap<>();
for (Protocol.Transaction trx : msg.getTransactions().getTransactionsList()) {
Item item = new Item(new TransactionMessage(trx).getMessageId(),
Protocol.Inventory.InventoryType.TRX);
advInvRequest.put(item, 0L);
}
Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest);
}

@Test
public void testHandleTransaction() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
AdvService advService = Mockito.mock(AdvService.class);
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);

Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
f1.setAccessible(true);
f1.set(handler, tronNetDelegate);
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
f2.setAccessible(true);
f2.set(handler, advService);
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
f3.setAccessible(true);
f3.set(handler, chainBaseManager);

PeerConnection peer = Mockito.mock(PeerConnection.class);

BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
long now = System.currentTimeMillis();
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(now)
.setExpiration(now + 60_000)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build();
TransactionMessage trxMsg = new TransactionMessage(trx);

Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
"handleTransaction", PeerConnection.class, TransactionMessage.class);
handleTx.setAccessible(true);

// happy path → push and broadcast
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(advService).broadcast(trxMsg);

// P2pException BAD_TRX → disconnect
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
.when(tronNetDelegate).pushTransaction(Mockito.any());
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(peer).setBadPeer(true);
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
} finally {
handler.close();
}
}

class TrxEvent {

@Getter
Expand Down
Loading