from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from functools import wraps
from logging import exception
from typing import List, Optional
from pydantic import BaseModel, ByteSize, HttpUrl
class Color(str, Enum):
WHITE = "white"
BLACK = "black"
LIGHT = "light"
DARK = "dark"
PRIMARY = "primary"
LINK = "link"
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
DANGER = "danger"
DEFAULT = "default"
class RemoteFile(BaseModel):
bridge: str
id: int
category: str
color: Optional[Color]
name: str
link: HttpUrl
comment: int = 0
comment_url: HttpUrl
magnet: Optional[str]
torrent: Optional[HttpUrl]
size: Optional[ByteSize]
date: Optional[datetime]
seeds: Optional[int]
leechs: Optional[int]
downloads: Optional[int]
nb_pages: int = 1
class Bridge(BaseModel, ABC):
color: Color
title: str
favicon: HttpUrl
base_url: HttpUrl
@abstractmethod
def search_url(self, query: str = "", page: int = 1) -> HttpUrl:
pass
async def search(self, query: str = "", page: int = 1) -> List[RemoteFile]:
class Cache(ABC):
def get(self, key: str) -> Optional[List[RemoteFile]]:
def set(self, key: str, data: List[RemoteFile]):
def log_async(f):
@wraps(f)
async def wrapper(*args, **kwargs):
try:
return await f(*args, **kwargs)
except Exception as e:
exception(e)
raise e
return wrapper