-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathRuntimeNodeApiAsync.cpp
More file actions
273 lines (240 loc) · 7.52 KB
/
RuntimeNodeApiAsync.cpp
File metadata and controls
273 lines (240 loc) · 7.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#include "RuntimeNodeApiAsync.hpp"
#include <ReactCommon/CallInvoker.h>
#include "Logger.hpp"
#include "ThreadsafeFunction.hpp"
struct AsyncJob {
using IdType = uint64_t;
enum State { Created, Queued, Completed, Cancelled, Deleted };
IdType id{};
State state{};
napi_env env;
napi_value async_resource;
napi_value async_resource_name;
napi_async_execute_callback execute;
napi_async_complete_callback complete;
void* data{nullptr};
static AsyncJob* fromWork(napi_async_work work) {
return reinterpret_cast<AsyncJob*>(work);
}
static napi_async_work toWork(AsyncJob* job) {
return reinterpret_cast<napi_async_work>(job);
}
};
class AsyncWorkRegistry {
public:
using IdType = AsyncJob::IdType;
std::shared_ptr<AsyncJob> create(napi_env env,
napi_value async_resource,
napi_value async_resource_name,
napi_async_execute_callback execute,
napi_async_complete_callback complete,
void* data) {
const auto job = std::shared_ptr<AsyncJob>(new AsyncJob{
.id = next_id(),
.state = AsyncJob::State::Created,
.env = env,
.async_resource = async_resource,
.async_resource_name = async_resource_name,
.execute = execute,
.complete = complete,
.data = data,
});
jobs_[job->id] = job;
return job;
}
std::shared_ptr<AsyncJob> get(napi_async_work work) const {
const auto job = AsyncJob::fromWork(work);
if (!job) {
return {};
}
if (const auto it = jobs_.find(job->id); it != jobs_.end()) {
return it->second;
}
return {};
}
bool release(IdType id) {
if (const auto it = jobs_.find(id); it != jobs_.end()) {
it->second->state = AsyncJob::State::Deleted;
jobs_.erase(it);
return true;
}
return false;
}
private:
IdType next_id() {
if (current_id_ == std::numeric_limits<IdType>::max()) [[unlikely]] {
current_id_ = 0;
}
return ++current_id_;
}
IdType current_id_{0};
std::unordered_map<IdType, std::shared_ptr<AsyncJob>> jobs_;
};
static std::unordered_map<napi_env, std::weak_ptr<facebook::react::CallInvoker>>
callInvokers;
static AsyncWorkRegistry asyncWorkRegistry;
namespace callstack::nodeapihost {
void setCallInvoker(napi_env env,
const std::shared_ptr<facebook::react::CallInvoker>& invoker) {
callInvokers[env] = invoker;
}
std::weak_ptr<facebook::react::CallInvoker> getCallInvoker(napi_env env) {
return callInvokers.contains(env)
? callInvokers[env]
: std::weak_ptr<facebook::react::CallInvoker>{};
}
napi_status napi_create_async_work(napi_env env,
napi_value async_resource,
napi_value async_resource_name,
napi_async_execute_callback execute,
napi_async_complete_callback complete,
void* data,
napi_async_work* result) {
const auto job = asyncWorkRegistry.create(
env, async_resource, async_resource_name, execute, complete, data);
if (!job) {
log_debug("Error: Failed to create async work job");
return napi_generic_failure;
}
*result = AsyncJob::toWork(job.get());
return napi_ok;
}
napi_status napi_queue_async_work(
node_api_basic_env env, napi_async_work work) {
const auto job = asyncWorkRegistry.get(work);
if (!job) {
log_debug("Error: Received null job in napi_queue_async_work");
return napi_invalid_arg;
}
const auto invoker = getCallInvoker(env).lock();
if (!invoker) {
log_debug("Error: No CallInvoker available for async work");
return napi_invalid_arg;
}
invoker->invokeAsync([env, weakJob = std::weak_ptr{job}]() {
const auto job = weakJob.lock();
if (!job) {
log_debug("Error: Async job has been deleted before execution");
return;
}
if (job->state == AsyncJob::State::Queued) {
job->execute(job->env, job->data);
}
job->complete(env,
job->state == AsyncJob::State::Cancelled ? napi_cancelled : napi_ok,
job->data);
job->state = AsyncJob::State::Completed;
});
job->state = AsyncJob::State::Queued;
return napi_ok;
}
napi_status napi_delete_async_work(
node_api_basic_env env, napi_async_work work) {
const auto job = asyncWorkRegistry.get(work);
if (!job) {
log_debug("Error: Received non-existent job in napi_delete_async_work");
return napi_invalid_arg;
}
if (!asyncWorkRegistry.release(job->id)) {
log_debug("Error: Failed to release async work job");
return napi_generic_failure;
}
return napi_ok;
}
napi_status napi_cancel_async_work(
node_api_basic_env env, napi_async_work work) {
const auto job = asyncWorkRegistry.get(work);
if (!job) {
log_debug("Error: Received null job in napi_cancel_async_work");
return napi_invalid_arg;
}
switch (job->state) {
case AsyncJob::State::Completed:
log_debug("Error: Cannot cancel async work that is already completed");
return napi_generic_failure;
case AsyncJob::State::Deleted:
log_debug("Warning: Async work job is already deleted");
return napi_generic_failure;
case AsyncJob::State::Cancelled:
log_debug("Warning: Async work job is already cancelled");
return napi_ok;
}
job->state = AsyncJob::State::Cancelled;
return napi_ok;
}
napi_status napi_create_threadsafe_function(napi_env env,
napi_value func,
napi_value async_resource,
napi_value async_resource_name,
size_t max_queue_size,
size_t initial_thread_count,
void* thread_finalize_data,
napi_finalize thread_finalize_cb,
void* context,
napi_threadsafe_function_call_js call_js_cb,
napi_threadsafe_function* result) {
const auto function = ThreadSafeFunction::create(getCallInvoker(env),
env,
func,
async_resource,
async_resource_name,
max_queue_size,
initial_thread_count,
thread_finalize_data,
thread_finalize_cb,
context,
call_js_cb);
*result = function->getHandle();
return napi_ok;
}
napi_status napi_get_threadsafe_function_context(
napi_threadsafe_function func, void** result) {
const auto function = ThreadSafeFunction::get(func);
if (!function) {
return napi_invalid_arg;
}
return function->getContext(result);
}
napi_status napi_call_threadsafe_function(napi_threadsafe_function func,
void* data,
napi_threadsafe_function_call_mode is_blocking) {
const auto function = ThreadSafeFunction::get(func);
if (!function) {
return napi_invalid_arg;
}
return function->call(data, is_blocking);
}
napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func) {
const auto function = ThreadSafeFunction::get(func);
if (!function) {
return napi_invalid_arg;
}
return function->acquire();
}
napi_status napi_release_threadsafe_function(
napi_threadsafe_function func, napi_threadsafe_function_release_mode mode) {
const auto function = ThreadSafeFunction::get(func);
if (!function) {
return napi_invalid_arg;
}
return function->release(mode);
}
napi_status napi_unref_threadsafe_function(
node_api_basic_env env, napi_threadsafe_function func) {
const auto function = ThreadSafeFunction::get(func);
if (!function) {
return napi_invalid_arg;
}
// RN has no libuv loop to unref; we only update internal state for parity.
return function->unref();
}
napi_status napi_ref_threadsafe_function(
node_api_basic_env env, napi_threadsafe_function func) {
const auto function = ThreadSafeFunction::get(func);
if (!function) {
return napi_invalid_arg;
}
// RN has no libuv loop to ref; we only update internal state for parity.
return function->ref();
}
} // namespace callstack::nodeapihost