Source code for scapi.http.client

import asyncio
import atexit
import socket
from types import TracebackType
from typing import Any, Dict, Optional, TypeAlias
from urllib.parse import urljoin

import aiofiles
from aiohttp import ClientResponse, ClientSession, ClientTimeout, TCPConnector
from pydantic import ValidationError

from scapi.consts import Defaults
from scapi.exceptions import RequestError

from .params import Params
from .ratelimit import RateLimit


Headers: TypeAlias = Dict[str, str]
Json: TypeAlias = Dict[str, Any]
Data: TypeAlias = Json | str | bytes


[docs] class HTTPClient: """HTTP requests client.""" _TTL_DNS_CACHE: int = 300 _STREAM_CHUNK_SIZE: int = 1024 * 8 def __init__( self, *, base_url: str = "", timeout: int = Defaults.TIMEOUT, headers: Optional[Headers] = None, ): """ Initialize HTTP client. Args: base_url (optional): Base URL for requests. timeout (optional): Request timeout in seconds. Defaults to `60s`. headers (optional): Default HTTP headers. """ self._base_url = base_url.rstrip("/") self._timeout = timeout self._headers = headers or {} self._session: Optional[ClientSession] = None self._ratelimit: RateLimit = RateLimit.model_construct() atexit.register(self._cleanup) @property def ratelimit(self) -> RateLimit: """Current ratelimit status.""" return self._ratelimit
[docs] async def request( self, method: str, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, data: Optional[Data] = None, filename: Optional[str] = None, raw: bool = False, ) -> Any: """ Send HTTP request. Args: method: HTTP method. url: Request URL. params (optional): Query parameters. headers (optional): Request headers. data (optional): Request body. filename (optional): File path to streaming. raw (optional): Return raw bytes without parsing. Returns: Parsed response data. """ url = urljoin(self._base_url + "/", url.lstrip("/")) # Prepare request options rparams = params.to_dict() if isinstance(params, Params) else {} rheaders = {**self._headers, **(headers or {})} # Create session session = await self._use_session() # Send http request async with session.request(method=method, url=url, params=rparams, headers=rheaders, data=data) as response: # Update ratelimit by response headers await self._update_ratelimit(response) # Validate response status code if not (200 <= response.status < 300): await self._on_error(response) # Stream response data to file path if filename: return await self._stream_to_file(response, filename) # Parse response by content type try: data = await self._parse(response, raw) finally: await response.release() return data
async def _use_session(self) -> ClientSession: """Create or reuse aiohttp client session.""" if self._session is None: self._session = ClientSession( timeout=ClientTimeout(total=self._timeout), connector=TCPConnector( family=socket.AF_INET, ttl_dns_cache=self._TTL_DNS_CACHE, force_close=True, ), ) return self._session async def _parse(self, response: ClientResponse, raw: bool) -> Any: """Parse response based on content type.""" if not raw: content_type = response.headers.get("content-type", "").lower() if "application/json" in content_type: return await response.json() if content_type.startswith("text/") or "xml" in content_type: return await response.text() return await response.read() async def _stream_to_file(self, response: ClientResponse, filename: str) -> str: """Stream response content to local file.""" async with aiofiles.open(filename, "wb") as fp: async for chunk in response.content.iter_chunked(self._STREAM_CHUNK_SIZE): await fp.write(chunk) return filename async def _update_ratelimit(self, response: ClientResponse) -> None: """Update ratelimit values from response headers.""" try: current = self._ratelimit.model_dump() updated = RateLimit.model_validate(response.headers).model_dump(exclude_none=True) merged = {**current, **updated} self._ratelimit = RateLimit.model_construct(**merged) except (Exception, ValidationError): pass async def _on_error(self, response: ClientResponse) -> None: """Handle HTTP error responses.""" default = "Unknown error" try: data = await response.json() except Exception: data = await response.text(errors="replace") or default exception = RequestError._registry.get(response.status, RequestError) raise exception(data=data, status=response.status, method=response.method, url=response.url)
[docs] async def HEAD( self, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, raw: bool = False, ): return await self.request( method="HEAD", url=url, params=params, headers=headers, raw=raw, )
[docs] async def GET( self, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, filename: Optional[str] = None, raw: bool = False, ): return await self.request( method="GET", url=url, params=params, headers=headers, filename=filename, raw=raw, )
[docs] async def POST( self, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, data: Optional[Data] = None, raw: bool = False, ): return await self.request( method="POST", url=url, params=params, data=data, headers=headers, raw=raw, )
[docs] async def PUT( self, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, data: Optional[Data] = None, raw: bool = False, ): return await self.request( method="PUT", url=url, params=params, data=data, headers=headers, raw=raw, )
[docs] async def DELETE( self, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, raw: bool = False, ): return await self.request( method="DELETE", url=url, params=params, headers=headers, raw=raw, )
[docs] async def PATCH( self, url: str, params: Optional[Params] = None, headers: Optional[Headers] = None, data: Optional[Data] = None, raw: bool = False, ): return await self.request( method="PATCH", url=url, params=params, data=data, headers=headers, raw=raw, )
[docs] async def close(self) -> None: """Close client session.""" if self._session and not self._session.closed: await self._session.close()
def _cleanup(self) -> None: if self._session and not self._session.closed: try: loop = asyncio.new_event_loop() loop.run_until_complete(self._session.close()) loop.close() except Exception: pass async def __aenter__(self): return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ): await self.close() return False def __repr__(self): return f"{self.__class__.__name__}(base_url='{self._base_url}', timeout={self._timeout}, ratelimit={repr(self._ratelimit)})"