Skip to content

Commit 1730662

Browse files
committed
Remove subinterpreter pool
The shared-GIL subinterpreter pool created 32 subinterpreters at startup but caused fatal errors on shutdown when numpy/torch threads were running. Changes: - Remove subinterp context mode (worker and owngil remain) - Delete py_subinterp_pool.c/h - Remove pool_slot from py_context_t and py_env_resource_t - Simplify py_context_acquire/release - Update tests to use worker mode
1 parent 522114e commit 1730662

8 files changed

Lines changed: 27 additions & 862 deletions

File tree

c_src/py_nif.c

Lines changed: 4 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -112,25 +112,8 @@ static void py_env_resource_dtor(ErlNifEnv *env, void *obj) {
112112
PyGILState_STATE gstate = PyGILState_Ensure();
113113

114114
#ifdef HAVE_SUBINTERPRETERS
115-
if (res->pool_slot >= 0) {
116-
/* Created in a shared-GIL subinterpreter - must DECREF in correct interpreter */
117-
subinterp_slot_t *slot = subinterp_pool_get(res->pool_slot);
118-
119-
/* Verify slot is still valid and has same interpreter */
120-
if (slot != NULL && slot->initialized && slot->interp != NULL) {
121-
int64_t slot_interp_id = PyInterpreterState_GetID(slot->interp);
122-
if (slot_interp_id == res->interp_id) {
123-
/* Same interpreter, safe to DECREF */
124-
PyThreadState *saved = PyThreadState_Swap(slot->tstate);
125-
Py_XDECREF(res->globals);
126-
Py_XDECREF(res->locals);
127-
PyThreadState_Swap(saved);
128-
}
129-
/* If interp_id mismatch, slot was reused - skip DECREF */
130-
}
131-
/* If slot invalid/not initialized, interpreter destroyed - skip DECREF */
132-
} else if (res->interp_id != 0) {
133-
/* OWN_GIL subinterpreter: pool_slot == -1 but interp_id != 0
115+
if (res->interp_id != 0) {
116+
/* OWN_GIL subinterpreter: interp_id != 0
134117
* These dicts were created in an OWN_GIL interpreter. We cannot safely
135118
* DECREF them here because:
136119
* 1. The interpreter might already be destroyed
@@ -296,7 +279,6 @@ static int is_inline_schedule_marker(PyObject *obj);
296279
#include "py_event_loop.c"
297280
#include "py_worker_pool.h"
298281
#include "py_worker_pool.c"
299-
#include "py_subinterp_pool.c"
300282
#include "py_subinterp_thread.c"
301283
#include "py_reactor_buffer.c"
302284
#include "py_channel.c"
@@ -409,36 +391,6 @@ static void context_destructor(ErlNifEnv *env, void *obj) {
409391
return;
410392
}
411393

412-
#ifdef HAVE_SUBINTERPRETERS
413-
/* For subinterpreter mode: clean up context's own dictionaries and release pool slot */
414-
if (ctx->is_subinterp && ctx->pool_slot >= 0) {
415-
/* Clean up Python objects with GIL */
416-
if (runtime_is_running()) {
417-
subinterp_slot_t *slot = subinterp_pool_get(ctx->pool_slot);
418-
if (slot != NULL && slot->initialized) {
419-
PyGILState_STATE gstate = PyGILState_Ensure();
420-
PyThreadState *saved = PyThreadState_Swap(slot->tstate);
421-
422-
Py_XDECREF(ctx->module_cache);
423-
Py_XDECREF(ctx->globals);
424-
Py_XDECREF(ctx->locals);
425-
426-
PyThreadState_Swap(saved);
427-
PyGILState_Release(gstate);
428-
}
429-
}
430-
ctx->module_cache = NULL;
431-
ctx->globals = NULL;
432-
ctx->locals = NULL;
433-
434-
subinterp_pool_free(ctx->pool_slot);
435-
ctx->pool_slot = -1;
436-
ctx->destroyed = true;
437-
atomic_fetch_add(&g_counters.ctx_destroyed, 1);
438-
return;
439-
}
440-
#endif
441-
442394
if (!runtime_is_running()) {
443395
return;
444396
}
@@ -1190,34 +1142,6 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg
11901142
/* Save main thread state and release GIL for other threads */
11911143
g_main_thread_state = PyEval_SaveThread();
11921144

1193-
/* Initialize subinterpreter pool (Python 3.12+) before starting executors */
1194-
#ifdef HAVE_SUBINTERPRETERS
1195-
{
1196-
int pool_size = DEFAULT_POOL_SIZE; /* Default pool size */
1197-
/* Check for config */
1198-
if (argc > 0 && enif_is_map(env, argv[0])) {
1199-
ERL_NIF_TERM key = enif_make_atom(env, "pool_size");
1200-
ERL_NIF_TERM value;
1201-
if (enif_get_map_value(env, argv[0], key, &value)) {
1202-
enif_get_int(env, value, &pool_size);
1203-
}
1204-
}
1205-
1206-
/* Restore GIL temporarily to create subinterpreters */
1207-
PyEval_RestoreThread(g_main_thread_state);
1208-
int pool_result = subinterp_pool_init(pool_size);
1209-
g_main_thread_state = PyEval_SaveThread();
1210-
1211-
if (pool_result < 0) {
1212-
PyEval_RestoreThread(g_main_thread_state);
1213-
g_main_thread_state = NULL;
1214-
Py_Finalize();
1215-
atomic_store(&g_runtime_state, PY_STATE_STOPPED);
1216-
return make_error(env, "subinterp_pool_init_failed");
1217-
}
1218-
}
1219-
#endif
1220-
12211145
/* Start executors based on execution mode */
12221146
int executor_result = 0;
12231147
switch (g_execution_mode) {
@@ -1332,20 +1256,12 @@ static ERL_NIF_TERM nif_finalize(ErlNifEnv *env, int argc, const ERL_NIF_TERM ar
13321256
Py_XDECREF(g_numpy_ndarray_type);
13331257
g_numpy_ndarray_type = NULL;
13341258

1335-
#ifdef HAVE_SUBINTERPRETERS
1336-
/* Step 4: Shutdown subinterpreter pool - must be done with GIL held */
1337-
subinterp_pool_shutdown();
1338-
#endif
1339-
13401259
g_main_thread_state = PyEval_SaveThread();
13411260
} else {
13421261
/* Fallback to PyGILState if no main thread state saved */
13431262
PyGILState_STATE gstate = PyGILState_Ensure();
13441263
Py_XDECREF(g_numpy_ndarray_type);
13451264
g_numpy_ndarray_type = NULL;
1346-
#ifdef HAVE_SUBINTERPRETERS
1347-
subinterp_pool_shutdown();
1348-
#endif
13491265
PyGILState_Release(gstate);
13501266
}
13511267

@@ -3017,7 +2933,6 @@ static void owngil_execute_create_local_env(py_context_t *ctx) {
30172933
}
30182934

30192935
/* Store interpreter info for destructor */
3020-
res->pool_slot = -1; /* OWN_GIL doesn't use pool slots */
30212936
PyInterpreterState *interp = PyInterpreterState_Get();
30222937
if (interp != NULL) {
30232938
res->interp_id = PyInterpreterState_GetID(interp);
@@ -4064,7 +3979,6 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
40643979
return make_error(env, "invalid_mode");
40653980
}
40663981

4067-
bool use_subinterp = (strcmp(mode_str, "subinterp") == 0);
40683982
bool use_owngil = (strcmp(mode_str, "owngil") == 0);
40693983

40703984
/* Allocate context resource */
@@ -4075,7 +3989,7 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
40753989

40763990
/* Initialize fields */
40773991
ctx->interp_id = atomic_fetch_add(&g_context_id_counter, 1);
4078-
ctx->is_subinterp = use_subinterp || use_owngil;
3992+
ctx->is_subinterp = use_owngil;
40793993
ctx->destroyed = false;
40803994
ctx->has_callback_handler = false;
40813995
ctx->callback_pipe[0] = -1;
@@ -4091,7 +4005,6 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
40914005
}
40924006

40934007
#ifdef HAVE_SUBINTERPRETERS
4094-
ctx->pool_slot = -1; /* Default: not using pool */
40954008
ctx->uses_own_gil = false;
40964009

40974010
if (use_owngil) {
@@ -4107,76 +4020,7 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
41074020
enif_release_resource(ctx);
41084021
atomic_fetch_add(&g_counters.ctx_created, 1);
41094022
return enif_make_tuple3(env, ATOM_OK, ref, enif_make_uint(env, ctx->interp_id));
4110-
} else if (use_subinterp) {
4111-
/* Allocate a slot from the subinterpreter pool */
4112-
int slot = subinterp_pool_alloc();
4113-
if (slot < 0) {
4114-
close(ctx->callback_pipe[0]);
4115-
close(ctx->callback_pipe[1]);
4116-
enif_release_resource(ctx);
4117-
return make_error(env, "pool_exhausted");
4118-
}
4119-
4120-
ctx->pool_slot = slot;
4121-
4122-
/* Get the pool slot for interpreter access */
4123-
subinterp_slot_t *pool_slot = subinterp_pool_get(slot);
4124-
if (pool_slot == NULL || !pool_slot->initialized) {
4125-
subinterp_pool_free(slot);
4126-
close(ctx->callback_pipe[0]);
4127-
close(ctx->callback_pipe[1]);
4128-
enif_release_resource(ctx);
4129-
return make_error(env, "pool_slot_invalid");
4130-
}
4131-
4132-
/* Create context's own namespace dictionaries.
4133-
* Each context needs its own globals/locals for isolation,
4134-
* even though they share the interpreter. */
4135-
PyGILState_STATE gstate = PyGILState_Ensure();
4136-
PyThreadState *saved = PyThreadState_Swap(pool_slot->tstate);
4137-
4138-
ctx->globals = PyDict_New();
4139-
ctx->locals = PyDict_New();
4140-
ctx->module_cache = PyDict_New();
4141-
4142-
if (ctx->globals == NULL || ctx->locals == NULL || ctx->module_cache == NULL) {
4143-
Py_XDECREF(ctx->globals);
4144-
Py_XDECREF(ctx->locals);
4145-
Py_XDECREF(ctx->module_cache);
4146-
PyThreadState_Swap(saved);
4147-
PyGILState_Release(gstate);
4148-
subinterp_pool_free(slot);
4149-
close(ctx->callback_pipe[0]);
4150-
close(ctx->callback_pipe[1]);
4151-
enif_release_resource(ctx);
4152-
return make_error(env, "dict_alloc_failed");
4153-
}
4154-
4155-
/* Import __builtins__ into globals */
4156-
PyObject *builtins = PyEval_GetBuiltins();
4157-
PyDict_SetItemString(ctx->globals, "__builtins__", builtins);
4158-
4159-
/* Import erlang module into globals */
4160-
PyObject *erlang_module = PyImport_ImportModule("erlang");
4161-
if (erlang_module != NULL) {
4162-
PyDict_SetItemString(ctx->globals, "erlang", erlang_module);
4163-
Py_DECREF(erlang_module);
4164-
} else {
4165-
PyErr_Clear();
4166-
}
4167-
4168-
PyThreadState_Swap(saved);
4169-
PyGILState_Release(gstate);
4170-
4171-
#ifdef DEBUG
4172-
fprintf(stderr, "[NIF] Created context %u using pool slot %d with own namespace\n",
4173-
ctx->interp_id, slot);
4174-
fflush(stderr);
4175-
#endif
4176-
} else
4177-
#else
4178-
/* Pre-3.12 Python - ignore subinterp mode request */
4179-
(void)use_subinterp;
4023+
}
41804024
#endif
41814025
{
41824026
/* Worker mode - create a thread state in main interpreter */
@@ -4254,39 +4098,6 @@ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_
42544098
atomic_fetch_add(&g_counters.ctx_destroyed, 1);
42554099
return ATOM_OK;
42564100
}
4257-
4258-
if (ctx->is_subinterp && ctx->pool_slot >= 0) {
4259-
/* Clean up context's own namespace dictionaries */
4260-
if (runtime_is_running()) {
4261-
subinterp_slot_t *slot = subinterp_pool_get(ctx->pool_slot);
4262-
if (slot != NULL && slot->initialized) {
4263-
PyGILState_STATE gstate = PyGILState_Ensure();
4264-
PyThreadState *saved = PyThreadState_Swap(slot->tstate);
4265-
4266-
Py_XDECREF(ctx->module_cache);
4267-
Py_XDECREF(ctx->globals);
4268-
Py_XDECREF(ctx->locals);
4269-
4270-
PyThreadState_Swap(saved);
4271-
PyGILState_Release(gstate);
4272-
}
4273-
}
4274-
ctx->globals = NULL;
4275-
ctx->locals = NULL;
4276-
ctx->module_cache = NULL;
4277-
4278-
/* Release the pool slot back to the pool */
4279-
subinterp_pool_free(ctx->pool_slot);
4280-
ctx->pool_slot = -1;
4281-
4282-
#ifdef DEBUG
4283-
fprintf(stderr, "[NIF] Destroyed context %u, released pool slot\n", ctx->interp_id);
4284-
fflush(stderr);
4285-
#endif
4286-
4287-
atomic_fetch_add(&g_counters.ctx_destroyed, 1);
4288-
return ATOM_OK;
4289-
}
42904101
#endif
42914102

42924103
/* Worker mode - clean up Python objects with GIL */
@@ -4803,7 +4614,6 @@ static ERL_NIF_TERM nif_create_local_env(ErlNifEnv *env, int argc, const ERL_NIF
48034614
res->globals = NULL;
48044615
res->locals = NULL;
48054616
res->interp_id = 0;
4806-
res->pool_slot = -1;
48074617

48084618
#ifdef HAVE_SUBINTERPRETERS
48094619
/* OWN_GIL mode: dispatch to the dedicated thread to create dicts */
@@ -4835,15 +4645,6 @@ static ERL_NIF_TERM nif_create_local_env(ErlNifEnv *env, int argc, const ERL_NIF
48354645
return make_error(env, "acquire_failed");
48364646
}
48374647

4838-
/* Store interpreter info for destructor */
4839-
#ifdef HAVE_SUBINTERPRETERS
4840-
if (ctx->is_subinterp && ctx->pool_slot >= 0) {
4841-
res->pool_slot = ctx->pool_slot;
4842-
PyInterpreterState *interp = PyInterpreterState_Get();
4843-
res->interp_id = PyInterpreterState_GetID(interp);
4844-
}
4845-
#endif
4846-
48474648
/* Copy globals from context to inherit preloaded code */
48484649
res->globals = PyDict_Copy(ctx->globals);
48494650
if (res->globals == NULL) {

c_src/py_nif.h

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,6 @@ static inline PyObject *Py_NewRef(PyObject *o) {
148148

149149
/** @} */
150150

151-
/* Include subinterpreter pool header for shared-GIL pool model */
152-
#include "py_subinterp_pool.h"
153-
154151
/* Include subinterpreter thread pool header for OWN_GIL parallelism */
155152
#include "py_subinterp_thread.h"
156153

@@ -814,9 +811,6 @@ typedef struct {
814811
int callback_pipe[2];
815812

816813
#ifdef HAVE_SUBINTERPRETERS
817-
/** @brief Index into subinterpreter pool (-1 = not using pool / worker mode) */
818-
int pool_slot;
819-
820814
/* ========== OWN_GIL mode fields ========== */
821815

822816
/** @brief Whether this context uses OWN_GIL mode (dedicated pthread) */
@@ -941,9 +935,6 @@ typedef enum {
941935
/** @brief Worker mode: Uses PyGILState_Ensure/Release (main interpreter) */
942936
PY_GUARD_WORKER,
943937

944-
/** @brief Subinterp mode: GIL + PyThreadState_Swap to pool slot */
945-
PY_GUARD_SUBINTERP,
946-
947938
/** @brief OWN_GIL mode: dispatch to dedicated pthread with its own GIL */
948939
PY_GUARD_OWN_GIL
949940
} py_guard_mode_t;
@@ -1009,28 +1000,9 @@ static inline py_context_guard_t py_context_acquire(py_context_t *ctx) {
10091000
return guard;
10101001
}
10111002

1012-
/* Acquire the GIL first (works for both modes) */
1003+
/* Acquire the GIL first */
10131004
guard.gstate = PyGILState_Ensure();
10141005

1015-
#ifdef HAVE_SUBINTERPRETERS
1016-
if (ctx->is_subinterp && ctx->pool_slot >= 0) {
1017-
/* Subinterpreter mode: swap to the pool slot's thread state */
1018-
subinterp_slot_t *slot = subinterp_pool_get(ctx->pool_slot);
1019-
1020-
if (slot == NULL || !slot->initialized) {
1021-
/* Pool slot invalid - release GIL and fail */
1022-
PyGILState_Release(guard.gstate);
1023-
return guard;
1024-
}
1025-
1026-
/* Swap to subinterpreter's thread state */
1027-
guard.saved_tstate = PyThreadState_Swap(slot->tstate);
1028-
guard.mode = PY_GUARD_SUBINTERP;
1029-
guard.acquired = true;
1030-
return guard;
1031-
}
1032-
#endif
1033-
10341006
/* Worker mode: just use the GIL we acquired */
10351007
guard.mode = PY_GUARD_WORKER;
10361008
guard.acquired = true;
@@ -1052,13 +1024,6 @@ static inline void py_context_release(py_context_guard_t *guard) {
10521024
return;
10531025
}
10541026

1055-
#ifdef HAVE_SUBINTERPRETERS
1056-
if (guard->mode == PY_GUARD_SUBINTERP) {
1057-
/* Swap back to saved thread state */
1058-
PyThreadState_Swap(guard->saved_tstate);
1059-
}
1060-
#endif
1061-
10621027
/* Release the GIL */
10631028
PyGILState_Release(guard->gstate);
10641029
guard->acquired = false;
@@ -1336,8 +1301,6 @@ typedef struct {
13361301
PyObject *locals;
13371302
/** @brief Interpreter ID that owns these dicts (0 = main interpreter) */
13381303
int64_t interp_id;
1339-
/** @brief Pool slot index (-1 for main interpreter) */
1340-
int pool_slot;
13411304
} py_env_resource_t;
13421305

13431306
/** @brief Resource type for py_env_resource_t (process-local Python environment) */

0 commit comments

Comments
 (0)