Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
path: 'path'
'WriteToJson':
path: 'path'
num_shards: 'num_shards'
'ReadFromParquet':
path: 'file_pattern'
'WriteToParquet':
Expand Down
249 changes: 165 additions & 84 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@
#

"""This module defines the basic MapToFields operation."""
import atexit
import importlib
import itertools
import os
import queue
import re
import sys
import threading
import uuid
from collections import abc
from collections.abc import Callable
from collections.abc import Collection
Expand Down Expand Up @@ -53,14 +60,6 @@
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn
from apache_beam.yaml.yaml_provider import dicts_to_rows

# Import js2py package if it exists
try:
import js2py
from js2py.base import JsObjectWrapper
except ImportError:
js2py = None
JsObjectWrapper = object

_str_expression_fields = {
'AssignTimestamps': 'timestamp',
'Filter': 'keep',
Expand Down Expand Up @@ -178,20 +177,6 @@ def _check_mapping_arguments(
raise ValueError(f'{transform_name} cannot specify "name" without "path"')


# js2py's JsObjectWrapper object has a self-referencing __dict__ property
# that cannot be pickled without implementing the __getstate__ and
# __setstate__ methods.
class _CustomJsObjectWrapper(JsObjectWrapper):
def __init__(self, js_obj):
super().__init__(js_obj.__dict__['_obj'])

def __getstate__(self):
return self.__dict__.copy()

def __setstate__(self, state):
self.__dict__.update(state)


# TODO(yaml) Improve type inferencing for JS UDF's
def py_value_to_js_dict(py_value):
if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or
Expand All @@ -205,85 +190,181 @@ def py_value_to_js_dict(py_value):
return py_value


# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
class PythonMonkeyDispatcher:
"""Dispatcher for executing JavaScript code using pythonmonkey.

This class manages a worker thread to execute JavaScript, ensuring that
pythonmonkey is only imported and used within that thread. It also handles
process shutdown carefully to avoid segmentation faults known to occur
when pythonmonkey is present during standard Python interpreter finalization.
"""
def __init__(self):
self._req_queue = queue.Queue()
self._resp_events = {}
self._resp_data = {}
self._lock = threading.Lock()
self._thread = threading.Thread(target=self._worker, daemon=True)
self._started = False
# Register the stop method to be called on exit.
# atexit handlers are executed in LIFO order. By registering at import time,
# we ensure this handler runs last, allowing other cleanup handlers
# (registered later) to execute first.
atexit.register(self.stop)

def start(self):
with self._lock:
if not self._started:
self._thread.start()
self._started = True

def stop(self):
# This method is called on process exit.
if not self._started:
return
# Flush standard streams before forced exit to avoid data loss.
try:
sys.stdout.flush()
sys.stderr.flush()
except Exception:
pass
# Force an immediate exit to avoid a segmentation fault that occurs with
# pythonmonkey during standard interpreter finalization.
# Since this runs as one of the last atexit handlers (due to import-time
# registration), most other cleanup should have already completed.
os._exit(0)

def _worker(self):
try:
import pythonmonkey as pm
except ImportError:
pm = None

self._pm = pm
self._cache = {}

while True:
req = self._req_queue.get()
if req is None:
break

req_id, type_str, payload = req
res = None
is_err = False
try:
if self._pm is None:
raise ImportError(
"PythonMonkey not installed or failed to import in worker thread."
)

if type_str == 'exec':
source, row = payload
if source not in self._cache:
self._cache[source] = self._pm.eval(f"({source})")
func = self._cache[source]
res = func(row)
except Exception as e:
res = e
is_err = True

with self._lock:
if req_id in self._resp_events:
self._resp_data[req_id] = (is_err, res)
self._resp_events[req_id].set()

def eval_and_run(self, source, row):
if not self._started:
self.start()

req_id = str(uuid.uuid4())
event = threading.Event()
with self._lock:
self._resp_events[req_id] = event

self._req_queue.put((req_id, 'exec', (source, row)))
event.wait()

with self._lock:
is_err, result = self._resp_data.pop(req_id)
del self._resp_events[req_id]

if is_err:
raise result
return result


_pythonmonkey_dispatcher = PythonMonkeyDispatcher()


class JavaScriptCallable:
def __init__(self, source, name=None):
self._source = source
self._name = name

def __call__(self, row):
# Check for pythonmonkey availability lazily (on first call)
if importlib.util.find_spec("pythonmonkey") is None:
raise RuntimeError(
"PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
"to use JavaScript mapping functions.")

row_as_dict = py_value_to_js_dict(row)
try:
# If we have a name, it means we evaluated a file and need to call
# a specific function.
# Dispatcher expects a self-contained source/expression.
if self._name:
# Wrap: (function() { <source>; return <name>; })()
effective_source = (
f"(function() {{ {self._source}; return {self._name}; }})()")
else:
# Expression/Callable case: Wrap in parens to be safe
effective_source = f"({self._source})"

js_result = _pythonmonkey_dispatcher.eval_and_run(
effective_source, row_as_dict)

except Exception as exn:
raise RuntimeError(
f"Error evaluating javascript expression: {exn}") from exn
return dicts_to_rows(_finalize_js_result(js_result))


def _finalize_js_result(obj):
"""Coerce pythonmonkey objects to native Python objects (specifically
strings).
"""
if isinstance(obj, str):
return str(obj)
if isinstance(obj, list):
return [_finalize_js_result(x) for x in obj]
if isinstance(obj, dict):
return {k: _finalize_js_result(v) for k, v in obj.items()}
return obj


def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):

# Check for installed js2py package
if js2py is None:
if importlib.util.find_spec("pythonmonkey") is None:
raise ValueError(
"Javascript mapping functions are not supported on"
" Python 3.12 or later.")

# import remaining js2py objects
from js2py import base
from js2py.constructors import jsdate
from js2py.internals import simplex

js_array_type = (
base.PyJsArray,
base.PyJsArrayBuffer,
base.PyJsInt8Array,
base.PyJsUint8Array,
base.PyJsUint8ClampedArray,
base.PyJsInt16Array,
base.PyJsUint16Array,
base.PyJsInt32Array,
base.PyJsUint32Array,
base.PyJsFloat32Array,
base.PyJsFloat64Array)

def _js_object_to_py_object(obj):
if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
return base.to_python(obj)
elif isinstance(obj, js_array_type):
return [_js_object_to_py_object(value) for value in obj.to_list()]
elif isinstance(obj, jsdate.PyJsDate):
return obj.to_utc_dt()
elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
return None
elif isinstance(obj, base.PyJsError):
raise RuntimeError(obj['message'])
elif isinstance(obj, base.PyJsObject):
return {
key: _js_object_to_py_object(value['value'])
for (key, value) in obj.own.items()
}
elif isinstance(obj, base.JsObjectWrapper):
return _js_object_to_py_object(obj._obj)

return obj
"PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
"to use JavaScript mapping functions.")

if expression:
source = '\n'.join(['function(__row__) {'] + [
f' {name} = __row__.{name}'
for name in original_fields if name in expression
] + [' return (' + expression + ')'] + ['}'])
js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
return JavaScriptCallable(source)

elif callable:
js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
return JavaScriptCallable(callable)

else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
js_func = _CustomJsObjectWrapper(getattr(js, name))

def js_wrapper(row):
row_as_dict = py_value_to_js_dict(row)
try:
js_result = js_func(row_as_dict)
except simplex.JsException as exn:
raise RuntimeError(
f"Error evaluating javascript expression: "
f"{exn.mes['message']}") from exn
return dicts_to_rows(_js_object_to_py_object(js_result))

return js_wrapper
return JavaScriptCallable(udf_code, name=name)


def _expand_python_mapping_func(
Expand Down
21 changes: 14 additions & 7 deletions sdks/python/apache_beam/yaml/yaml_udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import importlib
import logging
import os
import shutil
Expand All @@ -31,11 +32,17 @@
from apache_beam.yaml.yaml_provider import dicts_to_rows
from apache_beam.yaml.yaml_transform import YamlTransform

# We use find_spec to check for pythonmonkey availability without importing it.
# Importing pythonmonkey initializes the engine and binds it to the current
# thread (MainThread). This causes "too much recursion" errors when the
# Dispatcher later tries to use it from a background thread.
try:
import js2py
pm_available = importlib.util.find_spec("pythonmonkey") is not None
except ImportError:
js2py = None
logging.warning('js2py is not installed; some tests will be skipped.')
pm_available = False

if not pm_available:
logging.warning('pythonmonkey is not installed; some tests will be skipped.')


def as_rows():
Expand Down Expand Up @@ -63,7 +70,7 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.tmpdir)

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
def test_map_to_fields_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -197,7 +204,7 @@ def test_map_to_fields_sql_reserved_keyword_append():
beam.Row(label='389a', timestamp=2, label_copy="389a"),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
def test_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -252,7 +259,7 @@ def test_filter_inline_py(self):
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
def test_filter_expression_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -296,7 +303,7 @@ def test_filter_expression_py(self):
row=beam.Row(rank=0, values=[1, 2, 3])),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
def test_filter_inline_js_file(self):
data = '''
function f(x) {
Expand Down
5 changes: 3 additions & 2 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,9 @@ def get_portability_package_data():
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.2',
'virtualenv-clone>=0.5,<1.0',
# https://github.com/PiotrDabkowski/Js2Py/issues/317
'js2py>=0.74,<1; python_version<"3.12"',
# pythonmonkey is used for Javascript mapping support
# Please install NPM and Node.js before installing PythonMonkey.
'pythonmonkey>=1.3.0',
'jsonschema>=4.0.0,<5.0.0',
] + dataframe_dependency,
# Keep the following dependencies in line with what we test against
Expand Down
Loading