5353from apache_beam .yaml .yaml_errors import maybe_with_exception_handling_transform_fn
5454from apache_beam .yaml .yaml_provider import dicts_to_rows
5555
56- # Import js2py package if it exists
56+ # Import quickjs package if it exists
5757try :
58- import js2py
59- from js2py .base import JsObjectWrapper
58+ import quickjs
6059except ImportError :
61- js2py = None
62- JsObjectWrapper = object
60+ quickjs = None
6361
6462_str_expression_fields = {
6563 'AssignTimestamps' : 'timestamp' ,
@@ -178,18 +176,34 @@ def _check_mapping_arguments(
178176 raise ValueError (f'{ transform_name } cannot specify "name" without "path"' )
179177
180178
181- # js2py's JsObjectWrapper object has a self-referencing __dict__ property
182- # that cannot be pickled without implementing the __getstate__ and
183- # __setstate__ methods.
184- class _CustomJsObjectWrapper (JsObjectWrapper ):
185- def __init__ (self , js_obj ):
186- super ().__init__ (js_obj .__dict__ ['_obj' ])
179+ class _QuickJsCallable :
180+ def __init__ (self , source , name = None ):
181+ self .source = source
182+ self .name = name
183+ self ._func = None
184+
185+ def _get_func (self ):
186+ if self ._func is None :
187+ if quickjs is None :
188+ raise ValueError ("quickjs is not installed." )
189+ context = quickjs .Context ()
190+ if self .name :
191+ context .eval (self .source )
192+ self ._func = context .get (self .name )
193+ else :
194+ self ._func = context .eval (self .source )
195+ return self ._func
196+
197+ def __call__ (self , * args , ** kwargs ):
198+ return self ._get_func ()(* args , ** kwargs )
187199
188200 def __getstate__ (self ):
189- return self .__dict__ . copy ()
201+ return { 'source' : self .source , 'name' : self . name }
190202
191203 def __setstate__ (self , state ):
192- self .__dict__ .update (state )
204+ self .source = state ['source' ]
205+ self .name = state ['name' ]
206+ self ._func = None
193207
194208
195209# TODO(yaml) Improve type inferencing for JS UDF's
@@ -210,78 +224,49 @@ def py_value_to_js_dict(py_value):
210224def _expand_javascript_mapping_func (
211225 original_fields , expression = None , callable = None , path = None , name = None ):
212226
213- # Check for installed js2py package
214- if js2py is None :
227+ # Check for installed quickjs package
228+ if quickjs is None :
215229 raise ValueError (
216- "Javascript mapping functions are not supported on"
217- " Python 3.12 or later." )
218-
219- # import remaining js2py objects
220- from js2py import base
221- from js2py .constructors import jsdate
222- from js2py .internals import simplex
223-
224- js_array_type = (
225- base .PyJsArray ,
226- base .PyJsArrayBuffer ,
227- base .PyJsInt8Array ,
228- base .PyJsUint8Array ,
229- base .PyJsUint8ClampedArray ,
230- base .PyJsInt16Array ,
231- base .PyJsUint16Array ,
232- base .PyJsInt32Array ,
233- base .PyJsUint32Array ,
234- base .PyJsFloat32Array ,
235- base .PyJsFloat64Array )
236-
237- def _js_object_to_py_object (obj ):
238- if isinstance (obj , (base .PyJsNumber , base .PyJsString , base .PyJsBoolean )):
239- return base .to_python (obj )
240- elif isinstance (obj , js_array_type ):
241- return [_js_object_to_py_object (value ) for value in obj .to_list ()]
242- elif isinstance (obj , jsdate .PyJsDate ):
243- return obj .to_utc_dt ()
244- elif isinstance (obj , (base .PyJsNull , base .PyJsUndefined )):
245- return None
246- elif isinstance (obj , base .PyJsError ):
247- raise RuntimeError (obj ['message' ])
248- elif isinstance (obj , base .PyJsObject ):
249- return {
250- key : _js_object_to_py_object (value ['value' ])
251- for (key , value ) in obj .own .items ()
252- }
253- elif isinstance (obj , base .JsObjectWrapper ):
254- return _js_object_to_py_object (obj ._obj )
230+ "Javascript mapping functions require the 'quickjs' package." )
255231
256- return obj
232+ import json
257233
258234 if expression :
259- source = '\n ' .join (['function(__row__) {' ] + [
260- f' { name } = __row__.{ name } '
261- for name in original_fields if name in expression
262- ] + [' return (' + expression + ')' ] + ['}' ])
263- js_func = _CustomJsObjectWrapper (js2py .eval_js (source ))
235+ source = '\n ' .join (
236+ ['function fn(json_row) {' , ' const __row__ = JSON.parse(json_row);' ] +
237+ [
238+ f' const { name } = __row__.{ name } ;'
239+ for name in original_fields if name in expression
240+ ] + [' return JSON.stringify(' + expression + ');' ] + ['}' ])
241+ js_func = _QuickJsCallable (source , "fn" )
264242
265243 elif callable :
266- js_func = _CustomJsObjectWrapper (js2py .eval_js (callable ))
244+ # Wrap the callable in a named function to use quickjs.Function
245+ source = (
246+ f"function fn(json_row) {{ "
247+ f"const row = JSON.parse(json_row); "
248+ f"return JSON.stringify(({ callable } )(row)); }}" )
249+ js_func = _QuickJsCallable (source , "fn" )
267250
268251 else :
269252 if not path .endswith ('.js' ):
270253 raise ValueError (f'File "{ path } " is not a valid .js file.' )
271254 udf_code = FileSystems .open (path ).read ().decode ()
272- js = js2py .EvalJs ()
273- js .eval (udf_code )
274- js_func = _CustomJsObjectWrapper (getattr (js , name ))
255+ bridge_source = (
256+ udf_code + f"\n function bridge_fn(json_row) {{ "
257+ f"return JSON.stringify({ name } (JSON.parse(json_row))); }}" )
258+ js_func = _QuickJsCallable (bridge_source , "bridge_fn" )
275259
276260 def js_wrapper (row ):
277261 row_as_dict = py_value_to_js_dict (row )
262+ row_json = json .dumps (row_as_dict )
278263 try :
279- js_result = js_func (row_as_dict )
280- except simplex .JsException as exn :
264+ js_result_json = js_func (row_json )
265+ js_result = json .loads (js_result_json )
266+ except Exception as exn :
281267 raise RuntimeError (
282- f"Error evaluating javascript expression: "
283- f"{ exn .mes ['message' ]} " ) from exn
284- return dicts_to_rows (_js_object_to_py_object (js_result ))
268+ f"Error evaluating javascript expression: { exn } " ) from exn
269+ return dicts_to_rows (js_result )
285270
286271 return js_wrapper
287272
0 commit comments