from typing import Optional
from scapi.consts import BaseUrl, Defaults
from scapi.http.api import APIClient
from scapi.http.client import HTTPClient
from scapi.http.params import Params
from . import models
[docs]
class OAuthClient(APIClient):
"""API Client for OAuth 2.0 authentication with EXBO services."""
def __init__(
self,
*,
client_id: str,
client_secret: str,
base_url: str = BaseUrl.OAUTH,
redirect_uri: str = Defaults.REDIRECT_URI,
scope: str = Defaults.SCOPE,
json: bool = Defaults.JSON,
):
"""
Initialize OAuth client with application credentials.
Args:
client_id: Application client identifier.
client_secret: Application client secret.
base_url (optional): OAuth server base URL. Defaults to `https://exbo.net/oauth`.
redirect_uri (optional): Redirect URI for authorization flow. Defaults to `http://localhost`.
scope (optional, stub): Requested access scope. Defaults to `""`.
json (optional): Return JSON instead of models. Defaults to `False`.
"""
self._client_id = client_id
self._client_secret = client_secret
self._base_url = base_url
self._redirect_uri = redirect_uri
self._scope = scope
self._json = json
self._http = HTTPClient(base_url=base_url)
[docs]
def get_authorize_url(
self,
state: Optional[str] = None,
redirect_uri: Optional[str] = None,
scope: Optional[str] = None,
) -> str:
"""
Generate user authorization URL.
Args:
state (optional): CSRF protection state string.
redirect_uri (optional): Override default redirect URI.
scope (optional): Override default scope.
Returns:
URL for user authorization.
"""
params = Params(
client_id=self._client_id,
redirect_uri=redirect_uri or self._redirect_uri,
scope=scope or self._scope,
state=state,
response_type="code",
)
return f"{self._base_url}/authorize?{params}"
[docs]
async def get_app_token(
self,
scope: Optional[str] = None,
) -> models.AppToken:
"""
Request application token using client credentials grant.
**NOTE:** New tokens replace and invalidate previous ones.
Args:
scope (optional): Override default scope.
Returns:
Application token data.
"""
response = await self._http.POST(
url="token",
data={
"client_id": self._client_id,
"client_secret": self._client_secret,
"scope": scope or self._scope,
"grant_type": "client_credentials",
},
)
return self._parse(response, models.AppToken)
[docs]
async def get_user_token(
self,
code: str,
) -> models.UserToken:
"""
Exchange authorization code for user token.
**NOTE:** New tokens replace and invalidate previous ones.
Args:
code: Authorization code from user redirect.
Returns:
User token data with access and refresh token.
"""
response = await self._http.POST(
url="token",
data={
"client_id": self._client_id,
"client_secret": self._client_secret,
"code": code,
"redirect_uri": self._redirect_uri,
"grant_type": "authorization_code",
},
)
return self._parse(response, models.UserToken)
[docs]
async def refresh_user_token(
self,
refresh_token: str,
scope: Optional[str] = None,
) -> models.UserToken:
"""
Refresh expired user access token.
Args:
refresh_token: Refresh token from previous authorization.
scope (optional): Override default scope.
Returns:
Refreshed user token data.
"""
response = await self._http.POST(
url="token",
data={
"client_id": self._client_id,
"client_secret": self._client_secret,
"refresh_token": refresh_token,
"scope": scope or self._scope,
"grant_type": "refresh_token",
},
)
return self._parse(response, models.UserToken)
[docs]
async def validate_user_token(
self,
token: str,
) -> models.UserInfo:
"""
Validate user token and retrieve user information.
Args:
token: User access token.
Returns:
User account information.
"""
response = await self._http.GET(
url="user",
headers={"Authorization": f"Bearer {token}"},
)
return self._parse(response, models.UserInfo)