Skip to content

Commit 4e657af

Browse files
BoxStore: add helper methods to create safe executors #287
1 parent 217ad2b commit 4e657af

7 files changed

Lines changed: 431 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ For more insights into what changed in the ObjectBox C++ core, [check the Object
66

77
## Next release
88

9+
- Add [ObjectBoxThreadPoolExecutor](objectbox-java/src/main/java/io/objectbox/ObjectBoxThreadPoolExecutor.java), a
10+
default implementation of a `ThreadPoolExecutor` that properly cleans up thread-local Box resources.
11+
- Add methods `newCachedThreadPoolExecutor()` and `newFixedThreadPoolExecutor()` to `BoxStore` to help create instances
12+
of `ObjectBoxThreadPoolExecutor` for common uses.
13+
- Add methods `newCachedThreadPoolDispatcher()` and `newFixedThreadPoolDispatcher()` to the Kotlin extension functions
14+
for `BoxStore` to help create common coroutine dispatchers backed by an `ObjectBoxThreadPoolExecutor`.
915
- `BoxStore.runInTx` and `callInTx` close a write cursor even if the runnable or callable throws. This would previously
1016
result in cursor not closed warnings when the cursor was closed by the finalizer daemon.
1117

objectbox-java/src/main/java/io/objectbox/Box.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,6 @@ Cursor<T> getReader() {
8888
return cursor;
8989
}
9090

91-
/**
92-
* Returns if for the calling thread this has a Cursor, if any, for the currently active transaction.
93-
*/
94-
boolean hasActiveTxCursorForCurrentThread() {
95-
return activeTxCursor.get() != null;
96-
}
97-
9891
Cursor<T> getActiveTxCursor() {
9992
Transaction activeTx = store.activeTx.get();
10093
if (activeTx != null) {
@@ -172,6 +165,13 @@ public void closeThreadResources() {
172165
}
173166
}
174167

168+
/**
169+
* Returns if for the calling thread this has a reader Cursor.
170+
*/
171+
boolean hasReaderCursorForCurrentThread() {
172+
return threadLocalReader.get() != null;
173+
}
174+
175175
/**
176176
* If there is one, and it was created using the given {@code tx}, removes and closes the {@link #activeTxCursor}
177177
* for the current thread.
@@ -189,6 +189,13 @@ void closeActiveTxCursorForCurrentThread(Transaction tx) {
189189
}
190190
}
191191

192+
/**
193+
* Returns if for the calling thread this has a Cursor, if any, for the currently active transaction.
194+
*/
195+
boolean hasActiveTxCursorForCurrentThread() {
196+
return activeTxCursor.get() != null;
197+
}
198+
192199
/** Used by tests */
193200
int getPropertyId(String propertyName) {
194201
Cursor<T> reader = getReader();

objectbox-java/src/main/java/io/objectbox/BoxStore.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636
import java.util.concurrent.Callable;
3737
import java.util.concurrent.ConcurrentHashMap;
3838
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
3940
import java.util.concurrent.Future;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.SynchronousQueue;
43+
import java.util.concurrent.ThreadFactory;
4044
import java.util.concurrent.TimeUnit;
4145

4246
import javax.annotation.Nullable;
@@ -1339,6 +1343,59 @@ public void setDbExceptionListener(@Nullable DbExceptionListener dbExceptionList
13391343
nativeSetDbExceptionListener(getNativeStore(), dbExceptionListener);
13401344
}
13411345

1346+
/**
1347+
* Like {@link Executors#newCachedThreadPool()} but uses {@link ObjectBoxThreadPoolExecutor} to ensure proper
1348+
* cleanup of thread-local resources.
1349+
*
1350+
* @return a new {@link ObjectBoxThreadPoolExecutor}
1351+
*/
1352+
public ObjectBoxThreadPoolExecutor newCachedThreadPoolExecutor() {
1353+
return new ObjectBoxThreadPoolExecutor(this, 0, Integer.MAX_VALUE,
1354+
60L, TimeUnit.SECONDS,
1355+
new SynchronousQueue<>());
1356+
}
1357+
1358+
/**
1359+
* Like {@link Executors#newCachedThreadPool(ThreadFactory)} but uses {@link ObjectBoxThreadPoolExecutor} to ensure
1360+
* proper cleanup of thread-local resources.
1361+
*
1362+
* @return a new {@link ObjectBoxThreadPoolExecutor}
1363+
*/
1364+
public ObjectBoxThreadPoolExecutor newCachedThreadPoolExecutor(ThreadFactory threadFactory) {
1365+
return new ObjectBoxThreadPoolExecutor(this, 0, Integer.MAX_VALUE,
1366+
60L, TimeUnit.SECONDS,
1367+
new SynchronousQueue<>(),
1368+
threadFactory);
1369+
}
1370+
1371+
/**
1372+
* Like {@link Executors#newFixedThreadPool(int)} but uses {@link ObjectBoxThreadPoolExecutor} to ensure proper
1373+
* cleanup of thread-local resources.
1374+
*
1375+
* @param nThreads the number of threads in the pool
1376+
* @return a new {@link ObjectBoxThreadPoolExecutor}
1377+
*/
1378+
public ObjectBoxThreadPoolExecutor newFixedThreadPoolExecutor(int nThreads) {
1379+
return new ObjectBoxThreadPoolExecutor(this, nThreads, nThreads,
1380+
0L, TimeUnit.MILLISECONDS,
1381+
new LinkedBlockingQueue<>());
1382+
}
1383+
1384+
/**
1385+
* Like {@link Executors#newFixedThreadPool(int, ThreadFactory)} but uses {@link ObjectBoxThreadPoolExecutor} to
1386+
* ensure proper cleanup of thread-local resources.
1387+
*
1388+
* @param nThreads the number of threads in the pool
1389+
* @param threadFactory the factory to use when creating new threads
1390+
* @return a new {@link ObjectBoxThreadPoolExecutor}
1391+
*/
1392+
public ObjectBoxThreadPoolExecutor newFixedThreadPoolExecutor(int nThreads, ThreadFactory threadFactory) {
1393+
return new ObjectBoxThreadPoolExecutor(this, nThreads, nThreads,
1394+
0L, TimeUnit.MILLISECONDS,
1395+
new LinkedBlockingQueue<>(),
1396+
threadFactory);
1397+
}
1398+
13421399
@Internal
13431400
public Future<?> internalScheduleThread(Runnable runnable) {
13441401
return internalThreadPool().submit(runnable);
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright © 2026 ObjectBox Ltd. <https://objectbox.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox;
18+
19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.RejectedExecutionHandler;
21+
import java.util.concurrent.ThreadFactory;
22+
import java.util.concurrent.ThreadPoolExecutor;
23+
import java.util.concurrent.TimeUnit;
24+
25+
/**
26+
* A {@link ThreadPoolExecutor} that automatically releases thread-local ObjectBox resources after each task execution
27+
* by calling {@link BoxStore#closeThreadResources()}.
28+
* <p>
29+
* This is useful when using a thread pool with ObjectBox to ensure that thread-local resources (currently readers only)
30+
* are properly cleaned up after each task completes.
31+
* <p>
32+
* <b>Recommended:</b> Use the factory methods {@link BoxStore#newFixedThreadPoolExecutor(int)} or
33+
* {@link BoxStore#newCachedThreadPoolExecutor()} to create instances of this executor.
34+
* <p>
35+
* Example usage:
36+
* <pre>
37+
* BoxStore boxStore = MyObjectBox.builder().build();
38+
*
39+
* // Recommended: Use BoxStore factory methods
40+
* ObjectBoxThreadPoolExecutor executor = boxStore.newFixedThreadPoolExecutor(4);
41+
*
42+
* // Or for a cached thread pool
43+
* ObjectBoxThreadPoolExecutor cachedExecutor = boxStore.newCachedThreadPoolExecutor();
44+
*
45+
* // Advanced: Direct construction for custom configuration
46+
* ObjectBoxThreadPoolExecutor customExecutor = new ObjectBoxThreadPoolExecutor(
47+
* boxStore,
48+
* 4, // core pool size
49+
* 8, // maximum pool size
50+
* 60L, TimeUnit.SECONDS, // keep-alive time
51+
* new LinkedBlockingQueue&lt;&gt;()
52+
* );
53+
* </pre>
54+
*/
55+
public class ObjectBoxThreadPoolExecutor extends ThreadPoolExecutor {
56+
57+
private final BoxStore boxStore;
58+
59+
/**
60+
* Creates a new ObjectBoxThreadPoolExecutor with the given parameters.
61+
* <p>
62+
* See {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)} for parameter
63+
* details.
64+
*
65+
* @param boxStore the BoxStore instance for which to close thread resources
66+
*/
67+
public ObjectBoxThreadPoolExecutor(BoxStore boxStore, int corePoolSize, int maximumPoolSize,
68+
long keepAliveTime, TimeUnit unit,
69+
BlockingQueue<Runnable> workQueue) {
70+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
71+
this.boxStore = boxStore;
72+
}
73+
74+
/**
75+
* Creates a new ObjectBoxThreadPoolExecutor with the given parameters.
76+
* <p>
77+
* See {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory)} for
78+
* parameter details.
79+
*
80+
* @param boxStore the BoxStore instance for which to close thread resources
81+
*/
82+
public ObjectBoxThreadPoolExecutor(BoxStore boxStore, int corePoolSize, int maximumPoolSize,
83+
long keepAliveTime, TimeUnit unit,
84+
BlockingQueue<Runnable> workQueue,
85+
ThreadFactory threadFactory) {
86+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
87+
this.boxStore = boxStore;
88+
}
89+
90+
/**
91+
* Creates a new ObjectBoxThreadPoolExecutor with the given parameters.
92+
* <p>
93+
* See
94+
* {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, RejectedExecutionHandler)}
95+
* for parameter details.
96+
*
97+
* @param boxStore the BoxStore instance for which to close thread resources
98+
*/
99+
public ObjectBoxThreadPoolExecutor(BoxStore boxStore, int corePoolSize, int maximumPoolSize,
100+
long keepAliveTime, TimeUnit unit,
101+
BlockingQueue<Runnable> workQueue,
102+
RejectedExecutionHandler handler) {
103+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
104+
this.boxStore = boxStore;
105+
}
106+
107+
/**
108+
* Creates a new ObjectBoxThreadPoolExecutor with the given parameters.
109+
* <p>
110+
* See
111+
* {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory,
112+
* RejectedExecutionHandler)} for parameter details.
113+
*
114+
* @param boxStore the BoxStore instance for which to close thread resources
115+
*/
116+
public ObjectBoxThreadPoolExecutor(BoxStore boxStore, int corePoolSize, int maximumPoolSize,
117+
long keepAliveTime, TimeUnit unit,
118+
BlockingQueue<Runnable> workQueue,
119+
ThreadFactory threadFactory,
120+
RejectedExecutionHandler handler) {
121+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
122+
this.boxStore = boxStore;
123+
}
124+
125+
/**
126+
* Releases thread-local ObjectBox resources after each task execution.
127+
*/
128+
@Override
129+
protected void afterExecute(Runnable runnable, Throwable throwable) {
130+
super.afterExecute(runnable, throwable);
131+
boxStore.closeThreadResources();
132+
}
133+
}

objectbox-kotlin/src/main/kotlin/io/objectbox/kotlin/BoxStore.kt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package io.objectbox.kotlin
1818

1919
import io.objectbox.Box
2020
import io.objectbox.BoxStore
21+
import kotlinx.coroutines.asCoroutineDispatcher
2122
import java.util.concurrent.Callable
23+
import java.util.concurrent.ThreadFactory
2224
import kotlin.coroutines.resume
2325
import kotlin.coroutines.resumeWithException
2426
import kotlin.coroutines.suspendCoroutine
@@ -51,3 +53,42 @@ suspend fun <V : Any> BoxStore.awaitCallInTx(callable: Callable<V?>): V? {
5153
}
5254
}
5355
}
56+
57+
/**
58+
* Creates a coroutine dispatcher backed by a thread pool created with [BoxStore.newCachedThreadPoolExecutor] that
59+
* automatically cleans up thread-local ObjectBox resources after each task.
60+
*
61+
* @return a [kotlinx.coroutines.CoroutineDispatcher] backed by an ObjectBox-aware cached thread pool
62+
* @see BoxStore.newCachedThreadPoolExecutor
63+
*/
64+
fun BoxStore.newCachedThreadPoolDispatcher() = newCachedThreadPoolExecutor().asCoroutineDispatcher()
65+
66+
/**
67+
* Creates a coroutine dispatcher backed by a thread pool created with [BoxStore.newCachedThreadPoolExecutor] that
68+
* automatically cleans up thread-local ObjectBox resources after each task.
69+
*
70+
* @return a [kotlinx.coroutines.CoroutineDispatcher] backed by an ObjectBox-aware cached thread pool
71+
* @see BoxStore.newCachedThreadPoolExecutor
72+
*/
73+
fun BoxStore.newCachedThreadPoolDispatcher(threadFactory: ThreadFactory) =
74+
newCachedThreadPoolExecutor(threadFactory).asCoroutineDispatcher()
75+
76+
/**
77+
* Creates a coroutine dispatcher backed by a thread pool created with [BoxStore.newFixedThreadPoolExecutor] that
78+
* automatically cleans up thread-local ObjectBox resources after each task.
79+
*
80+
* @return a [kotlinx.coroutines.CoroutineDispatcher] backed by an ObjectBox-aware fixed thread pool
81+
* @see BoxStore.newFixedThreadPoolExecutor
82+
*/
83+
fun BoxStore.newFixedThreadPoolDispatcher(nThreads: Int) =
84+
newFixedThreadPoolExecutor(nThreads).asCoroutineDispatcher()
85+
86+
/**
87+
* Creates a coroutine dispatcher backed by a thread pool created with [BoxStore.newFixedThreadPoolExecutor] that
88+
* automatically cleans up thread-local ObjectBox resources after each task.
89+
*
90+
* @return a [kotlinx.coroutines.CoroutineDispatcher] backed by an ObjectBox-aware fixed thread pool
91+
* @see BoxStore.newFixedThreadPoolExecutor
92+
*/
93+
fun BoxStore.newFixedThreadPoolDispatcher(nThreads: Int, threadFactory: ThreadFactory) =
94+
newFixedThreadPoolExecutor(nThreads, threadFactory).asCoroutineDispatcher()

tests/objectbox-java-test/src/test/java/io/objectbox/BoxStoreTestK.kt

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@ package io.objectbox
22

33
import io.objectbox.kotlin.awaitCallInTx
44
import io.objectbox.kotlin.boxFor
5+
import io.objectbox.kotlin.newCachedThreadPoolDispatcher
6+
import io.objectbox.kotlin.newFixedThreadPoolDispatcher
7+
import kotlinx.coroutines.ExecutorCoroutineDispatcher
58
import kotlinx.coroutines.ExperimentalCoroutinesApi
69
import kotlinx.coroutines.test.runTest
7-
import org.junit.Assert.assertEquals
10+
import kotlinx.coroutines.withContext
11+
import org.junit.Assert.*
812
import org.junit.Test
13+
import java.util.concurrent.Executors
914

1015

1116
class BoxStoreTestK : AbstractObjectBoxTest() {
@@ -42,4 +47,66 @@ class BoxStoreTestK : AbstractObjectBoxTest() {
4247
assertEquals("Hello", note!!.simpleString)
4348
}
4449
}
50+
51+
/**
52+
* This appears to duplicate [ObjectBoxThreadPoolExecutorTest.executor_cleansThreadResources], however,
53+
* this makes sure that a coroutine continues to be executed like a runnable and clean up of
54+
* [ObjectBoxThreadPoolExecutor] continues to work in this case.
55+
*/
56+
@Test
57+
fun dispatcherBackedByObjectBoxExecutor_cleansThreadResources() {
58+
runTest {
59+
// Use a single thread to make it easy to check thread-local Box resources have been cleaned up
60+
val dispatcher = store.newFixedThreadPoolDispatcher(1)
61+
62+
val hasReaderCursor = withContext(dispatcher) {
63+
val box = store.boxFor<TestEntity>()
64+
val entity = createTestEntity("dispatcher-test", 1)
65+
val id = box.put(entity)
66+
box.get(id)
67+
box.hasReaderCursorForCurrentThread()
68+
}
69+
// Verify that a thread-local reader cursor was created
70+
assertTrue(hasReaderCursor)
71+
72+
// Check the thread-local reader cursor was released after the previous coroutine was executed
73+
val hasReaderCursorAfter = withContext(dispatcher) {
74+
store.boxFor<TestEntity>().hasReaderCursorForCurrentThread()
75+
}
76+
assertFalse(hasReaderCursorAfter)
77+
}
78+
}
79+
80+
@Test
81+
fun newCachedThreadPoolDispatcher_works() {
82+
runTest {
83+
assertDispatcher(store.newCachedThreadPoolDispatcher())
84+
assertDispatcher(store.newCachedThreadPoolDispatcher(Executors.defaultThreadFactory()))
85+
}
86+
}
87+
88+
@Test
89+
fun newFixedThreadPoolDispatcher_works() {
90+
runTest {
91+
assertDispatcher(store.newFixedThreadPoolDispatcher(2))
92+
assertDispatcher(store.newFixedThreadPoolDispatcher(2, Executors.defaultThreadFactory()))
93+
}
94+
}
95+
96+
/**
97+
* Quickly checks the pre-configured dispatchers work.
98+
*/
99+
private suspend fun assertDispatcher(dispatcher: ExecutorCoroutineDispatcher) {
100+
dispatcher.use { dispatcher ->
101+
// Create at least a write and a read transaction
102+
val testEntity: TestEntity? = withContext(dispatcher) {
103+
val box = store.boxFor<TestEntity>()
104+
val entity = createTestEntity("dispatcher-test", 1)
105+
val id = box.put(entity)
106+
107+
box.get(id)
108+
}
109+
assertEquals("dispatcher-test", testEntity!!.simpleString)
110+
}
111+
}
45112
}

0 commit comments

Comments
 (0)