diff --git a/cheroot/ssl/__init__.py b/cheroot/ssl/__init__.py index 23c1e85675..458bdcc3d6 100644 --- a/cheroot/ssl/__init__.py +++ b/cheroot/ssl/__init__.py @@ -2,6 +2,7 @@ import socket as _socket from abc import ABC, abstractmethod +from getpass import getpass as _ask_for_password_interactively from warnings import warn as _warn from .. import errors as _errors @@ -113,3 +114,8 @@ def get_environ(self): def makefile(self, sock, mode='r', bufsize=-1): """Return socket file object.""" raise NotImplementedError # pragma: no cover + + def _prompt_for_tls_password(self) -> str: + """Prompt for encrypted private key password interactively.""" + prompt = 'Enter PEM pass phrase: ' + return _ask_for_password_interactively(prompt) diff --git a/cheroot/ssl/__init__.pyi b/cheroot/ssl/__init__.pyi index 31abe293f4..78e7d56e59 100644 --- a/cheroot/ssl/__init__.pyi +++ b/cheroot/ssl/__init__.pyi @@ -1,3 +1,4 @@ +import collections.abc as _c import typing as _t from abc import ABC, abstractmethod @@ -6,8 +7,9 @@ class Adapter(ABC): private_key: _t.Any certificate_chain: _t.Any ciphers: _t.Any - private_key_password: str | bytes | None + private_key_password: _c.Callable[[], bytes | str] | bytes | str | None context: _t.Any + @abstractmethod def __init__( self, @@ -16,7 +18,10 @@ class Adapter(ABC): certificate_chain: _t.Any | None = ..., ciphers: _t.Any | None = ..., *, - private_key_password: str | bytes | None = ..., + private_key_password: _c.Callable[[], bytes | str] + | bytes + | str + | None = ..., ): ... def bind(self, sock): ... @abstractmethod @@ -25,3 +30,4 @@ class Adapter(ABC): def get_environ(self): ... @abstractmethod def makefile(self, sock, mode: str = ..., bufsize: int = ...): ... + def _prompt_for_tls_password(self) -> str: ... diff --git a/cheroot/ssl/builtin.py b/cheroot/ssl/builtin.py index fcde32f93e..b7fed72b21 100644 --- a/cheroot/ssl/builtin.py +++ b/cheroot/ssl/builtin.py @@ -247,6 +247,9 @@ def __init__( private_key_password=private_key_password, ) + if private_key_password is None: + private_key_password = self._prompt_for_tls_password + self.context = ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH, cafile=certificate_chain, diff --git a/cheroot/ssl/builtin.pyi b/cheroot/ssl/builtin.pyi index 6b3f0eebf0..4032a326e6 100644 --- a/cheroot/ssl/builtin.pyi +++ b/cheroot/ssl/builtin.pyi @@ -1,3 +1,4 @@ +import collections.abc as _c import typing as _t from . import Adapter @@ -14,7 +15,10 @@ class BuiltinSSLAdapter(Adapter): certificate_chain: _t.Any | None = ..., ciphers: _t.Any | None = ..., *, - private_key_password: str | bytes | None = ..., + private_key_password: _c.Callable[[], bytes | str] + | bytes + | str + | None = ..., ) -> None: ... @property def context(self): ... diff --git a/cheroot/ssl/pyopenssl.py b/cheroot/ssl/pyopenssl.py index 37fb136d74..6cf700341a 100644 --- a/cheroot/ssl/pyopenssl.py +++ b/cheroot/ssl/pyopenssl.py @@ -352,11 +352,20 @@ def wrap(self, sock): def _password_callback( self, password_max_length, - _verify_twice, - password, + verify_twice, + password_or_callback, /, ): """Pass a passphrase to password protected private key.""" + if callable(password_or_callback): + password = password_or_callback() + if verify_twice and password != password_or_callback(): + raise ValueError( + 'Verification failed: entered passwords do not match', + ) from None + else: + password = password_or_callback + b_password = b'' # returning a falsy value communicates an error if isinstance(password, str): b_password = password.encode('utf-8') @@ -381,6 +390,8 @@ def get_context(self): """ # See https://code.activestate.com/recipes/442473/ c = SSL.Context(SSL.SSLv23_METHOD) + if self.private_key_password is None: + self.private_key_password = self._prompt_for_tls_password c.set_passwd_cb(self._password_callback, self.private_key_password) c.use_privatekey_file(self.private_key) if self.certificate_chain: diff --git a/cheroot/ssl/pyopenssl.pyi b/cheroot/ssl/pyopenssl.pyi index 44c49e1530..6ca8d09eca 100644 --- a/cheroot/ssl/pyopenssl.pyi +++ b/cheroot/ssl/pyopenssl.pyi @@ -1,3 +1,4 @@ +import collections.abc as _c import typing as _t from OpenSSL import SSL @@ -32,14 +33,20 @@ class pyOpenSSLAdapter(Adapter): certificate_chain: _t.Any | None = ..., ciphers: _t.Any | None = ..., *, - private_key_password: str | bytes | None = ..., + private_key_password: _c.Callable[[], bytes | str] + | bytes + | str + | None = ..., ) -> None: ... def wrap(self, sock): ... def _password_callback( self, password_max_length: int, - _verify_twice: bool, - password: bytes | str | None, + verify_twice: bool, + password_or_callback: _c.Callable[[], bytes | str] + | bytes + | str + | None, /, ) -> bytes: ... def get_environ(self): ... diff --git a/cheroot/test/test_ssl.py b/cheroot/test/test_ssl.py index 77fb0b96c4..1859e6bc28 100644 --- a/cheroot/test/test_ssl.py +++ b/cheroot/test/test_ssl.py @@ -149,10 +149,18 @@ def make_tls_http_server(bind_addr, ssl_adapter, request): return httpserver +def get_key_password(): + """Return a predefined password string. + + It is to be used for decrypting private keys. + """ + return 'криївка' + + @pytest.fixture(scope='session') def private_key_password(): """Provide hardcoded password for private key.""" - return 'криївка' + return get_key_password() @pytest.fixture @@ -900,9 +908,17 @@ def test_http_over_https_ssl_handshake( ids=('encrypted-key', 'unencrypted-key'), ) @pytest.mark.parametrize( - 'password_as_bytes', - (True, False), - ids=('with-bytes-password', 'with-str-password'), + 'transform_password_arg', + ( + lambda pass_factory: pass_factory().encode('utf-8'), + lambda pass_factory: pass_factory(), + lambda pass_factory: pass_factory, + ), + ids=( + 'with-bytes-password', + 'with-str-password', + 'with-callable-password-provider', + ), ) # pylint: disable-next=too-many-positional-arguments def test_ssl_adapters_with_private_key_password( @@ -915,7 +931,7 @@ def test_ssl_adapters_with_private_key_password( tls_certificate_private_key_pem_path, adapter_type, encrypted_key, - password_as_bytes, + transform_password_arg, ): """Check server decrypts private TLS keys with password as bytes or str.""" key_file = ( @@ -923,17 +939,13 @@ def test_ssl_adapters_with_private_key_password( if encrypted_key else tls_certificate_private_key_pem_path ) - key_pass = ( - private_key_password.encode('utf-8') - if password_as_bytes - else private_key_password - ) + private_key_password = transform_password_arg(get_key_password) tls_adapter_cls = get_ssl_adapter_class(name=adapter_type) tls_adapter = tls_adapter_cls( certificate=tls_certificate_chain_pem_path, private_key=key_file, - private_key_password=key_pass, + private_key_password=private_key_password, ) interface, _host, port = _get_conn_data( @@ -1015,6 +1027,76 @@ def test_openssl_adapter_with_false_key_password( ) +@pytest.mark.parametrize( + 'adapter_type', + ('pyopenssl', 'builtin'), +) +def test_ssl_adapter_with_none_key_password( + tls_certificate_chain_pem_path, + tls_certificate_passwd_private_key_pem_path, + private_key_password, + adapter_type, + mocker, +): + """Check that TLS-adapters prompt for password when set as ``None``.""" + tls_adapter_cls = get_ssl_adapter_class(name=adapter_type) + mocker.patch( + 'cheroot.ssl._ask_for_password_interactively', + return_value=private_key_password, + ) + tls_adapter = tls_adapter_cls( + certificate=tls_certificate_chain_pem_path, + private_key=tls_certificate_passwd_private_key_pem_path, + ) + + assert tls_adapter.context is not None + + +class PasswordCallbackHelper: + """Collects helper methods for mocking password callback.""" + + def __init__(self, adapter: Adapter): + """Initialize helper variables.""" + self.counter = 0 + self.callback = adapter._password_callback + + def get_password(self): + """Provide correct password on first call, wrong on other calls.""" + self.counter += 1 + return get_key_password() * self.counter + + def verify_twice_callback(self, max_length, _verify_twice, userdata): + """Establish a mock callback for testing two-factor password prompt.""" + return self.callback(self, max_length, True, userdata) + + +@pytest.mark.parametrize('adapter_type', ('pyopenssl',)) +def test_openssl_adapter_verify_twice_callback( + tls_certificate_chain_pem_path, + tls_certificate_passwd_private_key_pem_path, + adapter_type, + mocker, +): + """Check that two-time password verification fails with correct error.""" + tls_adapter_cls = get_ssl_adapter_class(name=adapter_type) + helper = PasswordCallbackHelper(tls_adapter_cls) + + mocker.patch( + 'cheroot.ssl.pyopenssl.pyOpenSSLAdapter._password_callback', + side_effect=helper.verify_twice_callback, + ) + + with pytest.raises( + ValueError, + match='Verification failed: entered passwords do not match', + ): + tls_adapter_cls( + certificate=tls_certificate_chain_pem_path, + private_key=tls_certificate_passwd_private_key_pem_path, + private_key_password=helper.get_password, + ) + + @pytest.fixture def dummy_adapter(monkeypatch): """Provide a dummy SSL adapter instance.""" diff --git a/docs/changelog-fragments.d/798.bugfix.rst b/docs/changelog-fragments.d/798.bugfix.rst new file mode 100644 index 0000000000..582588e9b9 --- /dev/null +++ b/docs/changelog-fragments.d/798.bugfix.rst @@ -0,0 +1,7 @@ +Fixed prompting for the encrypted private key password interactively, +when the password in not set in the :py:attr:`private_key_password attribute +` in the +:py:class:`pyOpenSSL TLS adapter `. +Also improved the private key password to accept the :py:class:`~collections.abc.Callable` type. + +-- by :user:`jatalahd`