Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
msgpack_client_async.cpp
Go to the documentation of this file.
3#include "napi.h"
4#include <cstdint>
5#include <vector>
6
7using namespace bb::nodejs::msgpack_client;
8
10 : ObjectWrap(info)
11{
12 Napi::Env env = info.Env();
13
14 // Arg 0: shared memory base name (string)
15 if (info.Length() < 1 || !info[0].IsString()) {
16 throw Napi::TypeError::New(env, "First argument must be a string (shared memory name)");
17 }
18 std::string shm_name = info[0].As<Napi::String>();
19
20 size_t client_id = 0;
21 if (info.Length() >= 2 && info[1].IsNumber()) {
22 client_id = static_cast<size_t>(info[1].As<Napi::Number>().Uint32Value());
23 }
24
25 client_ = bb::ipc::IpcClient::create_mpsc_shm(shm_name, client_id);
26
27 // Connect to bb server
28 if (!client_->connect()) {
29 throw Napi::Error::New(env, "Failed to connect to shared memory server");
30 }
31}
32
33Napi::Value MsgpackClientAsync::setResponseCallback(const Napi::CallbackInfo& info)
34{
35 Napi::Env env = info.Env();
36
37 // Arg 0: JavaScript callback function
38 if (info.Length() < 1 || !info[0].IsFunction()) {
39 throw Napi::TypeError::New(env, "First argument must be a function");
40 }
41
42 // Store the callback for lazy TSFN creation
43 // Don't create TSFN yet - it will be created on first acquire()
44 js_callback_ = Napi::Persistent(info[0].As<Napi::Function>());
45
46 // Start background polling thread now that callback is registered
48
49 // Detach the thread - it will run until process exits
50 // No need for explicit shutdown or join
51 poll_thread_.detach();
52
53 return env.Undefined();
54}
55
57{
58 constexpr uint64_t TIMEOUT_NS = 1000000000; // 1s
59
60 while (true) { // Run forever until process exits
61 // Poll for response (blocks with timeout using futex)
62 std::span<const uint8_t> response = client_->receive(TIMEOUT_NS);
63
64 if (response.empty()) {
65 // Timeout - just continue polling
66 continue;
67 }
68
69 // Copy response data before releasing (span is invalidated by release())
70 auto* response_data = new std::vector<uint8_t>(response.begin(), response.end());
71
72 // Release the message in ring buffer to free space
73 client_->release(response.size());
74
75 // Lock mutex to safely access TSFN
76 {
78
79 // TSFN is active - invoke JavaScript callback
80 // The callback will handle matching this response to the correct promise
81 auto status = tsfn_.NonBlockingCall(
82 response_data, [](Napi::Env env, Napi::Function js_callback, std::vector<uint8_t>* data) {
83 // This lambda runs on the JavaScript main thread!
84 // Safe to create JS objects and call functions here
85
86 // Create Buffer with response data
87 auto js_buffer = Napi::Buffer<uint8_t>::Copy(env, data->data(), data->size());
88
89 // Call the registered JavaScript callback with the response
90 // TypeScript will pop its queue and resolve the appropriate promise
91 js_callback.Call({ js_buffer });
92
93 // Clean up response data
94 delete data;
95 });
96
97 if (status != napi_ok) {
98 // Failed to queue callback - likely process is exiting
99 // Just clean up and continue (process will exit soon anyway)
100 delete response_data;
101 }
102 }
103 }
104}
105
106Napi::Value MsgpackClientAsync::call(const Napi::CallbackInfo& info)
107{
108 Napi::Env env = info.Env();
109
110 // Arg 0: msgpack buffer to send
111 if (info.Length() < 1 || !info[0].IsBuffer()) {
112 throw Napi::TypeError::New(env, "First argument must be a Buffer");
113 }
114
115 auto input_buffer = info[0].As<Napi::Buffer<uint8_t>>();
116 const uint8_t* input_data = input_buffer.Data();
117 size_t input_len = input_buffer.Length();
118
119 // Send request (non-blocking write to ring buffer with no timeout)
120 // TypeScript will handle promise creation and queueing
121 if (!client_->send(input_data, input_len, 0)) {
122 throw Napi::Error::New(env, "Failed to send request, ring buffer full. Make it bigger?");
123 }
124
125 // Return undefined - TypeScript manages promises
126 return env.Undefined();
127}
128
129Napi::Value MsgpackClientAsync::acquire(const Napi::CallbackInfo& info)
130{
131 Napi::Env env = info.Env();
132
134
135 if (ref_count_ == 0) {
136 // Lazily create TSFN when first needed (0 → 1)
137 tsfn_ = Napi::ThreadSafeFunction::New(env,
138 js_callback_.Value(), // The actual JS function to call
139 "ShmResponseCallback", // Resource name for debugging
140 0, // Unlimited queue size
141 1 // Initial thread count (must be >= 1)
142 );
143 }
144
145 ref_count_++;
146 return env.Undefined();
147}
148
149Napi::Value MsgpackClientAsync::release(const Napi::CallbackInfo& info)
150{
152
153 ref_count_--;
154
155 if (ref_count_ == 0) {
156 // Destroy TSFN when no longer needed (1 → 0)
157 // This releases the initial reference, bringing ref count to 0
158 tsfn_.Release();
159 }
160
161 return info.Env().Undefined();
162}
163
164Napi::Function MsgpackClientAsync::get_class(Napi::Env env)
165{
166 return DefineClass(
167 env,
168 "MsgpackClientAsync",
169 {
170 MsgpackClientAsync::InstanceMethod("setResponseCallback", &MsgpackClientAsync::setResponseCallback),
171 MsgpackClientAsync::InstanceMethod("call", &MsgpackClientAsync::call),
172 MsgpackClientAsync::InstanceMethod("acquire", &MsgpackClientAsync::acquire),
173 MsgpackClientAsync::InstanceMethod("release", &MsgpackClientAsync::release),
174 });
175}
static std::unique_ptr< IpcClient > create_mpsc_shm(const std::string &base_name, size_t client_id)
static Napi::Function get_class(Napi::Env env)
void poll_responses()
Background thread function that polls for responses.
Napi::Value call(const Napi::CallbackInfo &info)
Send a msgpack buffer asynchronously.
Napi::Value release(const Napi::CallbackInfo &info)
Release a reference to allow the event loop to exit Called by TypeScript when there are no pending ca...
Napi::Value setResponseCallback(const Napi::CallbackInfo &info)
Set the JavaScript callback to be invoked when responses arrive.
std::unique_ptr< bb::ipc::IpcClient > client_
Napi::Value acquire(const Napi::CallbackInfo &info)
Acquire a reference to keep the event loop alive Called by TypeScript when there are pending callback...
#define info(...)
Definition log.hpp:93
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::byte * data