Source code for scapi.database.state

import time
from datetime import datetime
from typing import Optional


[docs] class CommitState: """Commit state tracker with TTL cache for remote commit.""" _time_offset = time.time() - time.monotonic() def __init__(self, ttl: float, local: str = ""): self.local = local self._ttl = ttl self._remote = "" self._until = 0.0 @property def remote(self) -> str: """Cached remote commit hash.""" if time.monotonic() > self._until: return "" return self._remote @remote.setter def remote(self, value: str) -> None: self._remote = value self._until = time.monotonic() + self._ttl if self._ttl > 0 else float("inf") @property def uptodate(self) -> bool: """Local and remote commit equality.""" if not self.local or time.monotonic() > self._until: return False return self.local == self._remote @property def until(self) -> Optional[datetime]: """Expiration time as datetime or None if infinite.""" if self._until == float("inf") or self._until == 0.0: return None timestamp = self._until + self._time_offset return datetime.fromtimestamp(timestamp) def __repr__(self) -> str: return f"{self.__class__.__name__}(local='{self.local}', remote='{self.remote}', uptodate={self.uptodate}, until={self.until})"