Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion auth0/authentication/back_channel_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def back_channel_login(
login_hint: str,
scope: str,
authorization_details: Optional[Union[str, List[Dict]]] = None,
requested_expiry: Optional[int] = None,
**kwargs
) -> Any:
"""Send a Back-Channel Login.
Expand All @@ -31,6 +32,9 @@ def back_channel_login(
authorization_details (str, list of dict, optional): JSON string or a list of dictionaries representing
Rich Authorization Requests (RAR) details to include in the CIBA request.

requested_expiry (int, optional): Number of seconds the authentication request is valid for.
Auth0 defaults to 300 seconds (5 mins) if not provided.

**kwargs: Other fields to send along with the request.

Returns:
Expand All @@ -50,7 +54,12 @@ def back_channel_login(
data["authorization_details"] = authorization_details
elif isinstance(authorization_details, list):
data["authorization_details"] = json.dumps(authorization_details)


if requested_expiry is not None:
if not isinstance(requested_expiry, int) or requested_expiry <= 0:
raise ValueError("requested_expiry must be a positive integer")
data["requested_expiry"] = str(requested_expiry)

data.update(kwargs)

return self.authenticated_post(
Expand Down
32 changes: 20 additions & 12 deletions auth0/authentication/get_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def backchannel_login(
use urn:openid:params:grant-type:ciba

Returns:
access_token, id_token
access_token, id_token, refresh_token, token_type, expires_in, scope and authorization_details
"""

return self.authenticated_post(
Expand All @@ -284,7 +284,8 @@ def access_token_for_connection(
subject_token: str,
requested_token_type: str,
connection: str | None = None,
grant_type: str = "urn:auth0:params:oauth:grant-type:token-exchange:federated-connection-access-token"
grant_type: str = "urn:auth0:params:oauth:grant-type:token-exchange:federated-connection-access-token",
login_hint: str = None
) -> Any:
"""Calls /oauth/token endpoint with federated-connection-access-token grant type

Expand All @@ -293,22 +294,29 @@ def access_token_for_connection(

subject_token (str): String containing the value of subject_token_type.

requested_token_type (str): String containing the type of rquested token.
requested_token_type (str): String containing the type of requested token.

connection (str, optional): Denotes the name of a social identity provider configured to your application

login_hint (str, optional): A hint to the OpenID Provider regarding the end-user for whom authentication is being requested

Returns:
access_token, scope, issued_token_type, token_type
access_token, scope, issued_token_type, token_type, expires_in
"""

data = {
"client_id": self.client_id,
"grant_type": grant_type,
"subject_token_type": subject_token_type,
"subject_token": subject_token,
"requested_token_type": requested_token_type,
"connection": connection,
}

if login_hint:
data["login_hint"] = login_hint

return self.authenticated_post(
f"{self.protocol}://{self.domain}/oauth/token",
data={
"client_id": self.client_id,
"grant_type": grant_type,
"subject_token_type": subject_token_type,
"subject_token": subject_token,
"requested_token_type": requested_token_type,
"connection": connection,
},
data=data,
)
2 changes: 2 additions & 0 deletions auth0/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ def __init__(
error_code: str,
message: str,
content: Any | None = None,
headers: Any | None = None,
) -> None:
self.status_code = status_code
self.error_code = error_code
self.message = message
self.content = content
self.headers = headers

def __str__(self) -> str:
return f"{self.status_code}: {self.message}"
Expand Down
2 changes: 2 additions & 0 deletions auth0/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,14 @@ def content(self) -> Any:
error_code=self._error_code(),
message=self._error_message(),
content=self._content,
headers=self._headers
)

raise Auth0Error(
status_code=self._status_code,
error_code=self._error_code(),
message=self._error_message(),
headers=self._headers
)
else:
return self._content
Expand Down
72 changes: 72 additions & 0 deletions auth0/test/authentication/test_back_channel_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ def test_ciba(self, mock_post):
},
)

@mock.patch("requests.request")
def test_server_error(self, mock_requests_request):
response = requests.Response()
response.status_code = 400
response._content = b'{"error":"foo"}'
mock_requests_request.return_value = response

g = BackChannelLogin("my.domain.com", "cid", client_secret="clsec")
with self.assertRaises(Auth0Error) as context:
g.back_channel_login(
binding_message="msg",
login_hint="hint",
scope="openid"
)
self.assertEqual(context.exception.status_code, 400)
self.assertEqual(context.exception.message, 'foo')

@mock.patch("auth0.rest.RestClient.post")
def test_should_require_binding_message(self, mock_post):
g = BackChannelLogin("my.domain.com", "cid", client_secret="clsec")
Expand Down Expand Up @@ -136,3 +153,58 @@ def test_with_authorization_details(self, mock_post):
"Request data does not match expected data after JSON serialization."
)

@mock.patch("auth0.rest.RestClient.post")
def test_with_request_expiry(self, mock_post):
g = BackChannelLogin("my.domain.com", "cid", client_secret="clsec")

g.back_channel_login(
binding_message="This is a binding message",
login_hint="{ \"format\": \"iss_sub\", \"iss\": \"https://my.domain.auth0.com/\", \"sub\": \"auth0|[USER ID]\" }",
scope="openid",
requested_expiry=100
)

args, kwargs = mock_post.call_args

self.assertEqual(args[0], "https://my.domain.com/bc-authorize")
self.assertEqual(
kwargs["data"],
{
"client_id": "cid",
"client_secret": "clsec",
"binding_message": "This is a binding message",
"login_hint": "{ \"format\": \"iss_sub\", \"iss\": \"https://my.domain.auth0.com/\", \"sub\": \"auth0|[USER ID]\" }",
"scope": "openid",
"requested_expiry": "100",
},
)

def test_requested_expiry_negative_raises(self):
g = BackChannelLogin("my.domain.com", "cid", client_secret="clsec")
with self.assertRaises(ValueError):
g.back_channel_login(
binding_message="msg",
login_hint="hint",
scope="openid",
requested_expiry=-10
)

def test_requested_expiry_zero_raises(self):
g = BackChannelLogin("my.domain.com", "cid", client_secret="clsec")
with self.assertRaises(ValueError):
g.back_channel_login(
binding_message="msg",
login_hint="hint",
scope="openid",
requested_expiry=0
)

def test_requested_non_int_raises(self):
g = BackChannelLogin("my.domain.com", "cid", client_secret="clsec")
with self.assertRaises(ValueError):
g.back_channel_login(
binding_message="msg",
login_hint="hint",
scope="openid",
requested_expiry="string_instead_of_int"
)
51 changes: 50 additions & 1 deletion auth0/test/authentication/test_get_token.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import unittest
import requests
from fnmatch import fnmatch
from unittest import mock
from unittest.mock import ANY

from cryptography.hazmat.primitives import asymmetric, serialization

from ... import Auth0Error
from ...authentication.get_token import GetToken


Expand Down Expand Up @@ -335,7 +337,25 @@ def test_backchannel_login(self, mock_post):
"grant_type": "urn:openid:params:grant-type:ciba",
},
)


@mock.patch("requests.request")
def test_backchannel_login_headers_on_failure(self, mock_requests_request):
response = requests.Response()
response.status_code = 400
response.headers = {"Retry-After": "100"}
response._content = b'{"error":"slow_down"}'
mock_requests_request.return_value = response

g = GetToken("my.domain.com", "cid", client_secret="csec")

with self.assertRaises(Auth0Error) as context:
g.backchannel_login(
auth_req_id="reqid",
grant_type="urn:openid:params:grant-type:ciba",
)
self.assertEqual(context.exception.headers["Retry-After"], "100")
self.assertEqual(context.exception.status_code, 400)

@mock.patch("auth0.rest.RestClient.post")
def test_connection_login(self, mock_post):
g = GetToken("my.domain.com", "cid", client_secret="csec")
Expand Down Expand Up @@ -364,4 +384,33 @@ def test_connection_login(self, mock_post):
"requested_token_type": "http://auth0.com/oauth/token-type/federated-connection-access-token",
"connection": "google-oauth2"
},
)

@mock.patch("auth0.rest.RestClient.post")
def test_connection_login_with_login_hint(self, mock_post):
g = GetToken("my.domain.com", "cid", client_secret="csec")

g.access_token_for_connection(
subject_token_type="urn:ietf:params:oauth:token-type:refresh_token",
subject_token="refid",
requested_token_type="http://auth0.com/oauth/token-type/federated-connection-access-token",
connection="google-oauth2",
login_hint="john.doe@example.com"
)

args, kwargs = mock_post.call_args

self.assertEqual(args[0], "https://my.domain.com/oauth/token")
self.assertEqual(
kwargs["data"],
{
"grant_type": "urn:auth0:params:oauth:grant-type:token-exchange:federated-connection-access-token",
"client_id": "cid",
"client_secret": "csec",
"subject_token_type": "urn:ietf:params:oauth:token-type:refresh_token",
"subject_token": "refid",
"requested_token_type": "http://auth0.com/oauth/token-type/federated-connection-access-token",
"connection": "google-oauth2",
"login_hint": "john.doe@example.com"
},
)