import asyncio
import atexit
import socket
from types import TracebackType
from typing import Any, Dict, Optional, TypeAlias
from urllib.parse import urljoin
import aiofiles
from aiohttp import ClientResponse, ClientSession, ClientTimeout, TCPConnector
from pydantic import ValidationError
from scapi.consts import Defaults
from scapi.exceptions import RequestError
from .params import Params
from .ratelimit import RateLimit
Headers: TypeAlias = Dict[str, str]
Json: TypeAlias = Dict[str, Any]
Data: TypeAlias = Json | str | bytes
[docs]
class HTTPClient:
"""HTTP requests client."""
_TTL_DNS_CACHE: int = 300
_STREAM_CHUNK_SIZE: int = 1024 * 8
def __init__(
self,
*,
base_url: str = "",
timeout: int = Defaults.TIMEOUT,
headers: Optional[Headers] = None,
):
"""
Initialize HTTP client.
Args:
base_url (optional): Base URL for requests.
timeout (optional): Request timeout in seconds. Defaults to `60s`.
headers (optional): Default HTTP headers.
"""
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._headers = headers or {}
self._session: Optional[ClientSession] = None
self._ratelimit: RateLimit = RateLimit.model_construct()
atexit.register(self._cleanup)
@property
def ratelimit(self) -> RateLimit:
"""Current ratelimit status."""
return self._ratelimit
[docs]
async def request(
self,
method: str,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
data: Optional[Data] = None,
filename: Optional[str] = None,
raw: bool = False,
) -> Any:
"""
Send HTTP request.
Args:
method: HTTP method.
url: Request URL.
params (optional): Query parameters.
headers (optional): Request headers.
data (optional): Request body.
filename (optional): File path to streaming.
raw (optional): Return raw bytes without parsing.
Returns:
Parsed response data.
"""
url = urljoin(self._base_url + "/", url.lstrip("/"))
# Prepare request options
rparams = params.to_dict() if isinstance(params, Params) else {}
rheaders = {**self._headers, **(headers or {})}
# Create session
session = await self._use_session()
# Send http request
async with session.request(method=method, url=url, params=rparams, headers=rheaders, data=data) as response:
# Update ratelimit by response headers
await self._update_ratelimit(response)
# Validate response status code
if not (200 <= response.status < 300):
await self._on_error(response)
# Stream response data to file path
if filename:
return await self._stream_to_file(response, filename)
# Parse response by content type
try:
data = await self._parse(response, raw)
finally:
await response.release()
return data
async def _use_session(self) -> ClientSession:
"""Create or reuse aiohttp client session."""
if self._session is None:
self._session = ClientSession(
timeout=ClientTimeout(total=self._timeout),
connector=TCPConnector(
family=socket.AF_INET,
ttl_dns_cache=self._TTL_DNS_CACHE,
force_close=True,
),
)
return self._session
async def _parse(self, response: ClientResponse, raw: bool) -> Any:
"""Parse response based on content type."""
if not raw:
content_type = response.headers.get("content-type", "").lower()
if "application/json" in content_type:
return await response.json()
if content_type.startswith("text/") or "xml" in content_type:
return await response.text()
return await response.read()
async def _stream_to_file(self, response: ClientResponse, filename: str) -> str:
"""Stream response content to local file."""
async with aiofiles.open(filename, "wb") as fp:
async for chunk in response.content.iter_chunked(self._STREAM_CHUNK_SIZE):
await fp.write(chunk)
return filename
async def _update_ratelimit(self, response: ClientResponse) -> None:
"""Update ratelimit values from response headers."""
try:
current = self._ratelimit.model_dump()
updated = RateLimit.model_validate(response.headers).model_dump(exclude_none=True)
merged = {**current, **updated}
self._ratelimit = RateLimit.model_construct(**merged)
except (Exception, ValidationError):
pass
async def _on_error(self, response: ClientResponse) -> None:
"""Handle HTTP error responses."""
default = "Unknown error"
try:
data = await response.json()
except Exception:
data = await response.text(errors="replace") or default
exception = RequestError._registry.get(response.status, RequestError)
raise exception(data=data, status=response.status, method=response.method, url=response.url)
[docs]
async def HEAD(
self,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
raw: bool = False,
):
return await self.request(
method="HEAD",
url=url,
params=params,
headers=headers,
raw=raw,
)
[docs]
async def GET(
self,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
filename: Optional[str] = None,
raw: bool = False,
):
return await self.request(
method="GET",
url=url,
params=params,
headers=headers,
filename=filename,
raw=raw,
)
[docs]
async def POST(
self,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
data: Optional[Data] = None,
raw: bool = False,
):
return await self.request(
method="POST",
url=url,
params=params,
data=data,
headers=headers,
raw=raw,
)
[docs]
async def PUT(
self,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
data: Optional[Data] = None,
raw: bool = False,
):
return await self.request(
method="PUT",
url=url,
params=params,
data=data,
headers=headers,
raw=raw,
)
[docs]
async def DELETE(
self,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
raw: bool = False,
):
return await self.request(
method="DELETE",
url=url,
params=params,
headers=headers,
raw=raw,
)
[docs]
async def PATCH(
self,
url: str,
params: Optional[Params] = None,
headers: Optional[Headers] = None,
data: Optional[Data] = None,
raw: bool = False,
):
return await self.request(
method="PATCH",
url=url,
params=params,
data=data,
headers=headers,
raw=raw,
)
[docs]
async def close(self) -> None:
"""Close client session."""
if self._session and not self._session.closed:
await self._session.close()
def _cleanup(self) -> None:
if self._session and not self._session.closed:
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(self._session.close())
loop.close()
except Exception:
pass
async def __aenter__(self):
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
):
await self.close()
return False
def __repr__(self):
return f"{self.__class__.__name__}(base_url='{self._base_url}', timeout={self._timeout}, ratelimit={repr(self._ratelimit)})"