diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index e62b3a562c30..d510d73562c5 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -145,6 +145,7 @@ path: 'path' 'WriteToJson': path: 'path' + num_shards: 'num_shards' 'ReadFromParquet': path: 'file_pattern' 'WriteToParquet': diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index a6b2b5704751..69b38db4173c 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -16,8 +16,15 @@ # """This module defines the basic MapToFields operation.""" +import atexit +import importlib import itertools +import os +import queue import re +import sys +import threading +import uuid from collections import abc from collections.abc import Callable from collections.abc import Collection @@ -53,14 +60,6 @@ from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn from apache_beam.yaml.yaml_provider import dicts_to_rows -# Import js2py package if it exists -try: - import js2py - from js2py.base import JsObjectWrapper -except ImportError: - js2py = None - JsObjectWrapper = object - _str_expression_fields = { 'AssignTimestamps': 'timestamp', 'Filter': 'keep', @@ -178,20 +177,6 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') -# js2py's JsObjectWrapper object has a self-referencing __dict__ property -# that cannot be pickled without implementing the __getstate__ and -# __setstate__ methods. -class _CustomJsObjectWrapper(JsObjectWrapper): - def __init__(self, js_obj): - super().__init__(js_obj.__dict__['_obj']) - - def __getstate__(self): - return self.__dict__.copy() - - def __setstate__(self, state): - self.__dict__.update(state) - - # TODO(yaml) Improve type inferencing for JS UDF's def py_value_to_js_dict(py_value): if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or @@ -205,85 +190,181 @@ def py_value_to_js_dict(py_value): return py_value -# TODO(yaml) Consider adding optional language version parameter to support -# ECMAScript 5 and 6 +class PythonMonkeyDispatcher: + """Dispatcher for executing JavaScript code using pythonmonkey. + + This class manages a worker thread to execute JavaScript, ensuring that + pythonmonkey is only imported and used within that thread. It also handles + process shutdown carefully to avoid segmentation faults known to occur + when pythonmonkey is present during standard Python interpreter finalization. + """ + def __init__(self): + self._req_queue = queue.Queue() + self._resp_events = {} + self._resp_data = {} + self._lock = threading.Lock() + self._thread = threading.Thread(target=self._worker, daemon=True) + self._started = False + # Register the stop method to be called on exit. + # atexit handlers are executed in LIFO order. By registering at import time, + # we ensure this handler runs last, allowing other cleanup handlers + # (registered later) to execute first. + atexit.register(self.stop) + + def start(self): + with self._lock: + if not self._started: + self._thread.start() + self._started = True + + def stop(self): + # This method is called on process exit. + if not self._started: + return + # Flush standard streams before forced exit to avoid data loss. + try: + sys.stdout.flush() + sys.stderr.flush() + except Exception: + pass + # Force an immediate exit to avoid a segmentation fault that occurs with + # pythonmonkey during standard interpreter finalization. + # Since this runs as one of the last atexit handlers (due to import-time + # registration), most other cleanup should have already completed. + os._exit(0) + + def _worker(self): + try: + import pythonmonkey as pm + except ImportError: + pm = None + + self._pm = pm + self._cache = {} + + while True: + req = self._req_queue.get() + if req is None: + break + + req_id, type_str, payload = req + res = None + is_err = False + try: + if self._pm is None: + raise ImportError( + "PythonMonkey not installed or failed to import in worker thread." + ) + + if type_str == 'exec': + source, row = payload + if source not in self._cache: + self._cache[source] = self._pm.eval(f"({source})") + func = self._cache[source] + res = func(row) + except Exception as e: + res = e + is_err = True + + with self._lock: + if req_id in self._resp_events: + self._resp_data[req_id] = (is_err, res) + self._resp_events[req_id].set() + + def eval_and_run(self, source, row): + if not self._started: + self.start() + + req_id = str(uuid.uuid4()) + event = threading.Event() + with self._lock: + self._resp_events[req_id] = event + + self._req_queue.put((req_id, 'exec', (source, row))) + event.wait() + + with self._lock: + is_err, result = self._resp_data.pop(req_id) + del self._resp_events[req_id] + + if is_err: + raise result + return result + + +_pythonmonkey_dispatcher = PythonMonkeyDispatcher() + + +class JavaScriptCallable: + def __init__(self, source, name=None): + self._source = source + self._name = name + + def __call__(self, row): + # Check for pythonmonkey availability lazily (on first call) + if importlib.util.find_spec("pythonmonkey") is None: + raise RuntimeError( + "PythonMonkey is not installed. Please install 'apache_beam[yaml]' " + "to use JavaScript mapping functions.") + + row_as_dict = py_value_to_js_dict(row) + try: + # If we have a name, it means we evaluated a file and need to call + # a specific function. + # Dispatcher expects a self-contained source/expression. + if self._name: + # Wrap: (function() { ; return ; })() + effective_source = ( + f"(function() {{ {self._source}; return {self._name}; }})()") + else: + # Expression/Callable case: Wrap in parens to be safe + effective_source = f"({self._source})" + + js_result = _pythonmonkey_dispatcher.eval_and_run( + effective_source, row_as_dict) + + except Exception as exn: + raise RuntimeError( + f"Error evaluating javascript expression: {exn}") from exn + return dicts_to_rows(_finalize_js_result(js_result)) + + +def _finalize_js_result(obj): + """Coerce pythonmonkey objects to native Python objects (specifically + strings). + """ + if isinstance(obj, str): + return str(obj) + if isinstance(obj, list): + return [_finalize_js_result(x) for x in obj] + if isinstance(obj, dict): + return {k: _finalize_js_result(v) for k, v in obj.items()} + return obj + + def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): - # Check for installed js2py package - if js2py is None: + if importlib.util.find_spec("pythonmonkey") is None: raise ValueError( - "Javascript mapping functions are not supported on" - " Python 3.12 or later.") - - # import remaining js2py objects - from js2py import base - from js2py.constructors import jsdate - from js2py.internals import simplex - - js_array_type = ( - base.PyJsArray, - base.PyJsArrayBuffer, - base.PyJsInt8Array, - base.PyJsUint8Array, - base.PyJsUint8ClampedArray, - base.PyJsInt16Array, - base.PyJsUint16Array, - base.PyJsInt32Array, - base.PyJsUint32Array, - base.PyJsFloat32Array, - base.PyJsFloat64Array) - - def _js_object_to_py_object(obj): - if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): - return base.to_python(obj) - elif isinstance(obj, js_array_type): - return [_js_object_to_py_object(value) for value in obj.to_list()] - elif isinstance(obj, jsdate.PyJsDate): - return obj.to_utc_dt() - elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): - return None - elif isinstance(obj, base.PyJsError): - raise RuntimeError(obj['message']) - elif isinstance(obj, base.PyJsObject): - return { - key: _js_object_to_py_object(value['value']) - for (key, value) in obj.own.items() - } - elif isinstance(obj, base.JsObjectWrapper): - return _js_object_to_py_object(obj._obj) - - return obj + "PythonMonkey is not installed. Please install 'apache_beam[yaml]' " + "to use JavaScript mapping functions.") if expression: source = '\n'.join(['function(__row__) {'] + [ f' {name} = __row__.{name}' for name in original_fields if name in expression ] + [' return (' + expression + ')'] + ['}']) - js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) + return JavaScriptCallable(source) elif callable: - js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) + return JavaScriptCallable(callable) else: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - js = js2py.EvalJs() - js.eval(udf_code) - js_func = _CustomJsObjectWrapper(getattr(js, name)) - - def js_wrapper(row): - row_as_dict = py_value_to_js_dict(row) - try: - js_result = js_func(row_as_dict) - except simplex.JsException as exn: - raise RuntimeError( - f"Error evaluating javascript expression: " - f"{exn.mes['message']}") from exn - return dicts_to_rows(_js_object_to_py_object(js_result)) - - return js_wrapper + return JavaScriptCallable(udf_code, name=name) def _expand_python_mapping_func( diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3d664ab9de41..e6d0a0af41a2 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import importlib import logging import os import shutil @@ -31,11 +32,17 @@ from apache_beam.yaml.yaml_provider import dicts_to_rows from apache_beam.yaml.yaml_transform import YamlTransform +# We use find_spec to check for pythonmonkey availability without importing it. +# Importing pythonmonkey initializes the engine and binds it to the current +# thread (MainThread). This causes "too much recursion" errors when the +# Dispatcher later tries to use it from a background thread. try: - import js2py + pm_available = importlib.util.find_spec("pythonmonkey") is not None except ImportError: - js2py = None - logging.warning('js2py is not installed; some tests will be skipped.') + pm_available = False + +if not pm_available: + logging.warning('pythonmonkey is not installed; some tests will be skipped.') def as_rows(): @@ -63,7 +70,7 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(not pm_available, 'pythonmonkey not installed.') def test_map_to_fields_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -197,7 +204,7 @@ def test_map_to_fields_sql_reserved_keyword_append(): beam.Row(label='389a', timestamp=2, label_copy="389a"), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(not pm_available, 'pythonmonkey not installed.') def test_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -252,7 +259,7 @@ def test_filter_inline_py(self): row=beam.Row(rank=2, values=[7, 8, 9])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(not pm_available, 'pythonmonkey not installed.') def test_filter_expression_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -296,7 +303,7 @@ def test_filter_expression_py(self): row=beam.Row(rank=0, values=[1, 2, 3])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(not pm_available, 'pythonmonkey not installed.') def test_filter_inline_js_file(self): data = ''' function f(x) { diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 3b195cdbc087..8500830a17b1 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -574,8 +574,9 @@ def get_portability_package_data(): 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', - # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'js2py>=0.74,<1; python_version<"3.12"', + # pythonmonkey is used for Javascript mapping support + # Please install NPM and Node.js before installing PythonMonkey. + 'pythonmonkey>=1.3.0', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against