Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,9 @@ def initiate_auth_code_flow(
):
"""Initiate an auth code flow.

.. deprecated::
The response_mode parameter is deprecated and will be removed in a future version.

Later when the response reaches your redirect_uri,
you can use :func:`~acquire_token_by_auth_code_flow()`
to complete the authentication/authorization.
Expand Down Expand Up @@ -960,18 +963,10 @@ def initiate_auth_code_flow(
New in version 1.15.

:param str response_mode:
OPTIONAL. Specifies the method with which response parameters should be returned.
The default value is equivalent to ``query``, which is still secure enough in MSAL Python
(because MSAL Python does not transfer tokens via query parameter in the first place).
For even better security, we recommend using the value ``form_post``.
In "form_post" mode, response parameters
will be encoded as HTML form values that are transmitted via the HTTP POST method and
encoded in the body using the application/x-www-form-urlencoded format.
Valid values can be either "form_post" for HTTP POST to callback URI or
"query" (the default) for HTTP GET with parameters encoded in query string.
More information on possible values
`here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
.. deprecated::
This parameter is deprecated and will be removed in a future version.
The response_mode is always set to ``form_post`` for security reasons,
regardless of the value provided. Do not use this parameter.

:return:
The auth code flow. It is a dict in this form::
Expand All @@ -991,6 +986,13 @@ def initiate_auth_code_flow(
3. and then relay this dict and subsequent auth response to
:func:`~acquire_token_by_auth_code_flow()`.
"""
if response_mode is not None:
import warnings
warnings.warn(
"The 'response_mode' parameter is deprecated and will be removed in a future version. "
"Response mode is always 'form_post' for security reasons.",
DeprecationWarning,
stacklevel=2)
client = _ClientWithCcsRoutingInfo(
{"authorization_endpoint": self.authority.authorization_endpoint},
self.client_id,
Expand Down
90 changes: 71 additions & 19 deletions msal/oauth2cli/authcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,76 @@ class _AuthCodeHandler(BaseHTTPRequestHandler):
def do_GET(self):
# For flexibility, we choose to not check self.path matching redirect_uri
#assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
qs = parse_qs(urlparse(self.path).query)
if qs.get('code') or qs.get("error"): # So, it is an auth response
parsed_url = urlparse(self.path)
qs = parse_qs(parsed_url.query)

if qs.get('code'): # Auth code via GET is a security risk - reject it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having a grace period that both GET and POST are supported? Otherwise if an app can't just switch to form post it'll be stuck with an older version of MSAL

logger.error("Received auth code via query string (GET request). "
"This is a security risk. Only form_post (POST) is supported.")
self._send_full_response(
"Authentication method not supported. "
"The application requires response_mode=form_post for security. "
"Please ensure your application registration uses form_post response mode.",
is_ok=False)
elif qs.get("error"): # Errors can come via GET - process them
auth_response = _qs2kv(qs)
logger.debug("Got auth response: %s", auth_response)
if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
# OAuth2 successful and error responses contain state when it was used
# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
self._send_full_response("State mismatch") # Possibly an attack
else:
template = (self.server.success_template
if "code" in qs else self.server.error_template)
if _is_html(template.template):
safe_data = _escape(auth_response) # Foiling an XSS attack
else:
safe_data = auth_response
self._send_full_response(template.safe_substitute(**safe_data))
self.server.auth_response = auth_response # Set it now, after the response is likely sent
self._process_auth_response(auth_response)
elif not qs and parsed_url.path == '/':
# GET request to root with no query params - this is likely from clicking OK on eSTS error
# Treat it as an error since success would come via POST
logger.warning("Received GET request to root path with no query parameters - treating as authentication error")
auth_response = {
"error": "authentication_failed",
"error_description": "Please close this window and try again."
}
# Include the original state so state validation doesn't fail
if self.server.auth_state:
auth_response["state"] = self.server.auth_state
self._process_auth_response(auth_response)
else:
self._send_full_response(self.server.welcome_page)
# NOTE: Don't do self.server.shutdown() here. It'll halt the server.

def do_POST(self):
# Handle form_post response mode where auth code is sent via POST body
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
try:
from urllib.parse import parse_qs as parse_qs_post
except ImportError:
from urlparse import parse_qs as parse_qs_post

qs = parse_qs_post(post_data)
if qs.get('code') or qs.get('error'): # So, it is an auth response
auth_response = _qs2kv(qs)
logger.debug("Got auth response via POST: %s", auth_response)
self._process_auth_response(auth_response)
else:
self._send_full_response("Invalid POST request", is_ok=False)

def _process_auth_response(self, auth_response):
"""Process the auth response from either GET or POST request."""
if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
# OAuth2 successful and error responses contain state when it was used
# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
self._send_full_response("State mismatch") # Possibly an attack
# Don't set auth_response for security, but mark as done to avoid hanging
self.server.done = True
else:
template = (self.server.success_template
if "code" in auth_response else self.server.error_template)
if _is_html(template.template):
safe_data = _escape(auth_response) # Foiling an XSS attack
else:
safe_data = dict(auth_response) # Make a copy to avoid mutating original
# Provide default values for common OAuth2 response fields
# to avoid showing literal placeholder text like "$error_description"
safe_data.setdefault("error", "")
safe_data.setdefault("error_description", "")
safe_data.setdefault("error_uri", "")
self._send_full_response(template.safe_substitute(**safe_data))
self.server.auth_response = auth_response # Set it now, after the response is likely sent

def _send_full_response(self, body, is_ok=True):
self.send_response(200 if is_ok else 400)
content_type = 'text/html' if _is_html(body) else 'text/plain'
Expand Down Expand Up @@ -317,19 +366,22 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
auth_uri_callback(_uri)

self._server.success_template = Template(success_template or
"Authentication completed. You can close this window now.")
"Authentication complete. You can return to the application. Please close this browser tab.\n\n"
"For your security: Do not share the contents of this page, the address bar, or take screenshots.")
self._server.error_template = Template(error_template or
"Authentication failed. $error: $error_description. ($error_uri)")
"Authentication failed. $error: $error_description.\n\n"
"For your security: Do not share the contents of this page, the address bar, or take screenshots.")

self._server.timeout = timeout # Otherwise its handle_timeout() won't work
self._server.auth_response = {} # Shared with _AuthCodeHandler
self._server.auth_state = state # So handler will check it before sending response
self._server.done = False # Flag to indicate completion without setting auth_response
while not self._closing: # Otherwise, the handle_request() attempt
# would yield noisy ValueError trace
# Derived from
# https://docs.python.org/2/library/basehttpserver.html#more-examples
self._server.handle_request()
if self._server.auth_response:
if self._server.auth_response or self._server.done:
break
result.update(self._server.auth_response) # Return via writable result param

Expand Down
16 changes: 15 additions & 1 deletion msal/oauth2cli/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,14 @@ def _build_auth_request_params(self, response_type, **kwargs):
response_type = self._stringify(response_type)

params = {'client_id': self.client_id, 'response_type': response_type}
params.update(kwargs) # Note: None values will override params
# Strictly enforce form_post for security - query string is not allowed
params['response_mode'] = 'form_post'
if 'response_mode' in kwargs and kwargs['response_mode'] != 'form_post':
import warnings
warnings.warn(
"The 'response_mode' parameter will be overridden to 'form_post' for better security.",
UserWarning)
params.update({k: v for k, v in kwargs.items() if k != 'response_mode'}) # Exclude response_mode from kwargs
params = {k: v for k, v in params.items() if v is not None} # clean up
if params.get('scope'):
params['scope'] = self._stringify(params['scope'])
Expand Down Expand Up @@ -467,6 +474,13 @@ def initiate_auth_code_flow(
3. and then relay this dict and subsequent auth response to
:func:`~obtain_token_by_auth_code_flow()`.
"""
if "response_mode" in kwargs:
import warnings
warnings.warn(
"The 'response_mode' parameter is deprecated and will be removed in a future version. "
"Response mode is always 'form_post' for security reasons.",
DeprecationWarning,
stacklevel=2)
response_type = kwargs.pop("response_type", "code") # Auth Code flow
# Must be "code" when you are using Authorization Code Grant.
# The "token" for Implicit Grant is not applicable thus not allowed.
Expand Down
148 changes: 145 additions & 3 deletions tests/test_authcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,159 @@ def test_no_two_concurrent_receivers_can_listen_on_same_port(self):
pass

def test_template_should_escape_input(self):
"""Test that POST request with HTML in error is properly escaped"""
with AuthCodeReceiver() as receiver:
receiver._scheduled_actions = [( # Injection happens here when the port is known
1, # Delay it until the receiver is activated by get_auth_response()
lambda: self.assertEqual(
"<html>&lt;tag&gt;foo&lt;/tag&gt;</html>",
requests.get("http://localhost:{}?error=<tag>foo</tag>".format(
receiver.get_port())).text,
"Unsafe data in HTML should be escaped",
requests.post(
"http://localhost:{}".format(receiver.get_port()),
data={"error": "<tag>foo</tag>"},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
).text,
))]
receiver.get_auth_response( # Starts server and hang until timeout
timeout=3,
error_template="<html>$error</html>",
)

def test_get_request_with_auth_code_is_rejected(self):
"""Test that GET request with auth code is rejected for security"""
with AuthCodeReceiver() as receiver:
test_code = "test_auth_code_12345"
test_state = "test_state_67890"
receiver._scheduled_actions = [(
1,
lambda: self._verify_get_rejection(
receiver.get_port(),
code=test_code,
state=test_state
)
)]
result = receiver.get_auth_response(
timeout=3,
state=test_state,
)
# Should not receive auth response via GET
self.assertIsNone(result)

def _verify_get_rejection(self, port, **params):
"""Helper to verify GET requests with auth codes are rejected"""
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode

response = requests.get(
"http://localhost:{}?{}".format(port, urlencode(params))
)
# Verify error message about unsupported method
self.assertIn("not supported", response.text.lower())
self.assertEqual(response.status_code, 400)

def test_post_request_with_auth_code(self):
"""Test that POST request with auth code is handled correctly (form_post response mode)"""
with AuthCodeReceiver() as receiver:
test_code = "test_auth_code_12345"
test_state = "test_state_67890"
receiver._scheduled_actions = [(
1,
lambda: self._send_post_auth_response(
receiver.get_port(),
code=test_code,
state=test_state
)
)]
result = receiver.get_auth_response(
timeout=3,
state=test_state,
success_template="Success: Got code $code",
)
self.assertIsNotNone(result)
self.assertEqual(result.get("code"), test_code)
self.assertEqual(result.get("state"), test_state)

def test_post_request_with_error(self):
"""Test that POST request with error is handled correctly"""
with AuthCodeReceiver() as receiver:
test_error = "access_denied"
test_error_description = "User denied access"
receiver._scheduled_actions = [(
1,
lambda: self._send_post_auth_response(
receiver.get_port(),
error=test_error,
error_description=test_error_description
)
)]
result = receiver.get_auth_response(
timeout=3,
error_template="Error: $error - $error_description",
)
self.assertIsNotNone(result)
self.assertEqual(result.get("error"), test_error)
self.assertEqual(result.get("error_description"), test_error_description)

def test_post_request_state_mismatch(self):
"""Test that POST request with mismatched state is rejected"""
with AuthCodeReceiver() as receiver:
test_code = "test_auth_code_12345"
wrong_state = "wrong_state"
expected_state = "expected_state"
receiver._scheduled_actions = [(
1,
lambda: self._send_post_auth_response(
receiver.get_port(),
code=test_code,
state=wrong_state
)
)]
result = receiver.get_auth_response(
timeout=3,
state=expected_state,
)
# When state mismatches, the response should not be set
self.assertIsNone(result)

def test_post_request_escapes_html(self):
"""Test that POST request with HTML in error is properly escaped"""
with AuthCodeReceiver() as receiver:
malicious_error = "<script>alert('xss')</script>"
receiver._scheduled_actions = [(
1,
lambda: self._verify_post_escaping(receiver.get_port(), malicious_error)
)]
receiver.get_auth_response(
timeout=3,
error_template="<html>$error</html>",
)

def _send_post_auth_response(self, port, **params):
"""Helper to send POST request with auth response"""
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode

response = requests.post(
"http://localhost:{}".format(port),
data=params,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
return response

def _verify_post_escaping(self, port, malicious_error):
"""Helper to verify HTML escaping in POST requests"""
response = self._send_post_auth_response(port, error=malicious_error)
# Verify that the malicious script is escaped
self.assertIn("&lt;script&gt;", response.text)
self.assertNotIn("<script>", response.text)
# Note: The escape function also escapes single quotes to &#x27;
expected = "<html>&lt;script&gt;alert(&#x27;xss&#x27;)&lt;/script&gt;</html>"
self.assertEqual(
expected,
response.text,
"HTML in POST data should be escaped to prevent XSS"
)

Loading
Loading