Skip to content

Fix PythonVirtualenvOperator failing to access Variables/Connections in Airflow 3.x#61629

Open
andreahlert wants to merge 1 commit intoapache:mainfrom
andreahlert:fix/virtualenv-supervisor-comms-propagation
Open

Fix PythonVirtualenvOperator failing to access Variables/Connections in Airflow 3.x#61629
andreahlert wants to merge 1 commit intoapache:mainfrom
andreahlert:fix/virtualenv-supervisor-comms-propagation

Conversation

@andreahlert
Copy link
Contributor

What

Fixes PythonVirtualenvOperator (and ExternalPythonOperator) failing to access Variable.get(), Connection.get(), and XCom via the Task SDK in Airflow 3.x environments (particularly on Kubernetes).

Closes: #58724

Why

In the normal forked execution path, the supervisor communicates with the task process via a Unix socketpair mapped to fd 0 (stdin). However, the __AIRFLOW_SUPERVISOR_FD environment variable is never set in this path - it is only set by InProcessTestSupervisor when using dag.test().

When PythonVirtualenvOperator launches a subprocess via Popen(close_fds=False), while fd 0 is technically inherited, the virtualenv subprocess has no reliable way to know which fd carries the supervisor comms channel. The reinit_supervisor_comms() function defaults to fd 0, but this is fragile and breaks when:

  1. The airflow.sdk.execution_time.task_runner import fails with ImportError (not just ModuleNotFoundError) due to missing transitive dependencies in the virtualenv
  2. reinit_supervisor_comms() itself crashes (e.g., socket validation fails), killing the entire virtualenv script

Without supervisor comms, the secrets backend falls back to EnvironmentVariablesBackend only, silently losing access to all DB-stored Variables and Connections.

How

Two-pronged fix:

1. Operator side (python.py)

In _BasePythonVirtualenvOperator._execute_python_callable_in_subprocess(), added an elif block that:

  • Detects the current SUPERVISOR_COMMS socket fd from task_runner
  • Marks it as inheritable via os.set_inheritable()
  • Propagates it to the subprocess via __AIRFLOW_SUPERVISOR_FD env var

This makes the fd propagation explicit rather than relying on implicit fd 0 inheritance.

2. Template side (python_virtualenv_script.jinja2)

  • Broadened except ModuleNotFoundError to except (ModuleNotFoundError, ImportError) to handle cases where task_runner can be found but has failing transitive dependencies
  • Wrapped reinit_supervisor_comms() call in try/except Exception to prevent the entire virtualenv script from crashing if socket communication setup fails

Testing

The existing test test_reinit_supervisor_comms in task-sdk/tests/task_sdk/execution_time/test_supervisor.py validates the basic mechanism (subprocess reinits comms and fetches a connection). The fix ensures this mechanism is properly triggered in production by explicitly propagating the fd.

…in Airflow 3.x

In the normal forked execution path, the supervisor communicates with
the task process via fd 0 (stdin), but __AIRFLOW_SUPERVISOR_FD env var
is never set. When PythonVirtualenvOperator launches a subprocess via
Popen, the virtualenv process cannot re-establish supervisor comms,
causing Variable.get() and Connection.get() to fail silently.

This fix explicitly detects the SUPERVISOR_COMMS socket fd in the
operator and propagates it via __AIRFLOW_SUPERVISOR_FD to the
subprocess. It also hardens the virtualenv script template to catch
ImportError (not just ModuleNotFoundError) and wraps
reinit_supervisor_comms() in try/except to prevent script crashes.

Closes: apache#58724
@andreahlert
Copy link
Contributor Author

@ashb @potiuk could you take a look at this? This fixes the supervisor comms propagation for \ in the Airflow 3.x forked execution path - the \ env var is never set, so the virtualenv subprocess can't access Variables/Connections via the Task SDK.

@ashb
Copy link
Member

ashb commented Feb 8, 2026

airflow.sdk.execution_time.task_runner import fails with ImportError (not just ModuleNotFoundError) due to missing transitive dependencies in the virtualenv

What is the error you see in this case? How does it present to the user/taskligs?

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet convinced this is the right behaviour

try:
reinit_supervisor_comms()
except Exception:
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If expect airflow is true then an error here should fail the task.

Regardless silently ignoring the error is the worst of all paths

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're right about the silent pass, I'll add a warning to stderr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'd keep the try/except itself. reinit failing doesn't mean airflow isn't there, it just means the socket setup broke.

If the callable actually needs Variables/Connections it'll fail with a proper error at that point anyway. Crashing the whole task on a socket error before we even get to run user code feels wrong to me.

The real fix is the fd propagation in python.py, this is just a fallback.

What do ya think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen. It means your environment is broken. If you don't want this, set expect airflow to false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this isn't in an expect airflow block.

Still, if imports at all it should be complete, else all sorts of things will break.

Is think moving this (without the exception handing) in to the expect airflow, or yet import, but only fail/log off expect airflow is yet

env_vars = dict(os.environ) if self.inherit_env else {}
if fd := os.getenv("__AIRFLOW_SUPERVISOR_FD"):
env_vars["__AIRFLOW_SUPERVISOR_FD"] = fd
elif AIRFLOW_V_3_0_PLUS and "__AIRFLOW_SUPERVISOR_FD" not in env_vars:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this path hit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normal forked execution. The supervisor dup2's the comms socket onto fd 0 never sets __AIRFLOW_SUPERVISOR_FD in env. So when the virtualenv operator spawns a Popen on top of that, getenv returns None and we land here.

We grab the fd from the existing SUPERVISOR_COMMS and pass it through. Otherwise reinit in the child defaults to fd 0 which is stdout at that point because Popen redirected it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connections/vars work without this change - stdin is not a normal stdin at this point, its already a socket, so the default to 0 is correct and works.

@andreahlert
Copy link
Contributor Author

airflow.sdk.execution_time.task_runner import fails with ImportError (not just ModuleNotFoundError) due to missing transitive dependencies in the virtualenv

What is the error you see in this case? How does it present to the user/taskligs?

If the SDK is installed but some transitive dep is missing (httpx, structlog, etc), the import blows up with ImportError, not ModuleNotFoundError. The script just dies before the callable even runs - pretty bad UX.

@ashb
Copy link
Member

ashb commented Feb 8, 2026

airflow.sdk.execution_time.task_runner import fails with ImportError (not just ModuleNotFoundError) due to missing transitive dependencies in the virtualenv

What is the error you see in this case? How does it present to the user/taskligs?

If the SDK is installed but some transitive dep is missing (httpx, structlog, etc),

How is that possible? Sdk lists those modules as dependencies.

the import blows up with ImportError, not ModuleNotFoundError. The script just dies before the callable even runs - pretty bad UX.

Does it show the error in task logs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PythonVirtualenvOperator fails to access Variables/Connections via Task SDK in Airflow 3.1.3 on Kubernetes

2 participants