Skip to content

Commit 595f6a8

Browse files
committed
fix(net): fix RejectedExecutionException during shutdown trxHandlePool
1 parent 039821c commit 595f6a8

File tree

2 files changed

+97
-3
lines changed

2 files changed

+97
-3
lines changed

framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.concurrent.BlockingQueue;
44
import java.util.concurrent.ExecutorService;
55
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.RejectedExecutionException;
67
import java.util.concurrent.ScheduledExecutorService;
78
import java.util.concurrent.TimeUnit;
89
import lombok.Getter;
@@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {
4445

4546
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
4647

48+
private volatile boolean isClosed = false;
4749
private int threadNum = Args.getInstance().getValidateSignThreadNum();
4850
private final String trxEsName = "trx-msg-handler";
4951
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
@@ -58,8 +60,14 @@ public void init() {
5860
}
5961

6062
public void close() {
61-
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
63+
isClosed = true;
64+
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
6265
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
66+
// Then shutdown the worker pool to finish already-submitted tasks.
67+
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
68+
// Discard any remaining items and release references.
69+
smartContractQueue.clear();
70+
queue.clear();
6371
}
6472

6573
public boolean isBusy() {
@@ -68,6 +76,10 @@ public boolean isBusy() {
6876

6977
@Override
7078
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
79+
if (isClosed) {
80+
logger.warn("TransactionsMsgHandler is closed, drop message");
81+
return;
82+
}
7183
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
7284
check(peer, transactionsMessage);
7385
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
@@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
7890
int trxHandlePoolQueueSize = 0;
7991
int dropSmartContractCount = 0;
8092
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
93+
if (isClosed) {
94+
logger.warn("TransactionsMsgHandler is closed during processing, stop submit.");
95+
break;
96+
}
8197
int type = trx.getRawData().getContract(0).getType().getNumber();
8298
if (type == ContractType.TriggerSmartContract_VALUE
8399
|| type == ContractType.CreateSmartContract_VALUE) {
@@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
87103
dropSmartContractCount++;
88104
}
89105
} else {
90-
ExecutorServiceManager.submit(
91-
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
106+
try {
107+
ExecutorServiceManager.submit(
108+
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
109+
} catch (RejectedExecutionException e) {
110+
logger.warn("Submit task to {} failed", trxEsName, e);
111+
break;
112+
}
92113
}
93114
}
94115

framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.protobuf.Any;
44
import com.google.protobuf.ByteString;
55
import java.lang.reflect.Field;
6+
import java.lang.reflect.Method;
67
import java.util.ArrayList;
78
import java.util.List;
89
import java.util.Map;
@@ -20,7 +21,10 @@
2021
import org.tron.common.TestConstants;
2122
import org.tron.common.runtime.TvmTestUtils;
2223
import org.tron.common.utils.ByteArray;
24+
import org.tron.core.ChainBaseManager;
2325
import org.tron.core.config.args.Args;
26+
import org.tron.core.exception.P2pException;
27+
import org.tron.core.exception.P2pException.TypeEnum;
2428
import org.tron.core.net.TronNetDelegate;
2529
import org.tron.core.net.message.adv.TransactionMessage;
2630
import org.tron.core.net.message.adv.TransactionsMessage;
@@ -132,6 +136,75 @@ public void testProcessMessage() {
132136
}
133137
}
134138

139+
@Test
140+
public void testProcessMessageAfterClose() throws Exception {
141+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
142+
handler.init();
143+
handler.close();
144+
145+
PeerConnection peer = Mockito.mock(PeerConnection.class);
146+
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);
147+
148+
handler.processMessage(peer, msg);
149+
150+
Mockito.verify(msg, Mockito.never()).getTransactions();
151+
Mockito.verifyNoInteractions(peer);
152+
}
153+
154+
@Test
155+
public void testHandleTransaction() throws Exception {
156+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
157+
158+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
159+
AdvService advService = Mockito.mock(AdvService.class);
160+
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);
161+
162+
Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
163+
f1.setAccessible(true);
164+
f1.set(handler, tronNetDelegate);
165+
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
166+
f2.setAccessible(true);
167+
f2.set(handler, advService);
168+
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
169+
f3.setAccessible(true);
170+
f3.set(handler, chainBaseManager);
171+
172+
PeerConnection peer = Mockito.mock(PeerConnection.class);
173+
174+
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
175+
.setAmount(10)
176+
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
177+
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
178+
.build();
179+
long now = System.currentTimeMillis();
180+
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
181+
Protocol.Transaction.raw.newBuilder()
182+
.setTimestamp(now)
183+
.setExpiration(now + 60_000)
184+
.setRefBlockNum(1)
185+
.addContract(Protocol.Transaction.Contract.newBuilder()
186+
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
187+
.setParameter(Any.pack(tc)).build()).build())
188+
.build();
189+
TransactionMessage trxMsg = new TransactionMessage(trx);
190+
191+
Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
192+
"handleTransaction", PeerConnection.class, TransactionMessage.class);
193+
handleTx.setAccessible(true);
194+
195+
// happy path → push and broadcast
196+
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
197+
handleTx.invoke(handler, peer, trxMsg);
198+
Mockito.verify(advService).broadcast(trxMsg);
199+
200+
// P2pException BAD_TRX → disconnect
201+
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
202+
.when(tronNetDelegate).pushTransaction(Mockito.any());
203+
handleTx.invoke(handler, peer, trxMsg);
204+
Mockito.verify(peer).setBadPeer(true);
205+
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
206+
}
207+
135208
class TrxEvent {
136209

137210
@Getter

0 commit comments

Comments
 (0)