This repository has been archived on 2023-10-01. You can view files and clone it, but cannot push or open issues or pull requests.
PyNyaaTa/pynyaata2/types.py

79 lines
1.7 KiB
Python
Raw Normal View History

2022-10-26 13:06:04 +00:00
from abc import ABC, abstractmethod
2023-01-13 22:11:55 +00:00
from asyncio import get_event_loop
2022-10-26 13:06:04 +00:00
from datetime import datetime
from enum import Enum
2023-01-13 22:11:55 +00:00
from functools import partial, wraps
2022-10-26 13:06:04 +00:00
from typing import List, Optional
from pydantic import BaseModel, ByteSize, HttpUrl
class Color(str, Enum):
WHITE = "white"
BLACK = "black"
LIGHT = "light"
DARK = "dark"
PRIMARY = "primary"
LINK = "link"
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
DANGER = "danger"
DEFAULT = "default"
class RemoteFile(BaseModel):
2022-12-22 00:01:21 +00:00
bridge: str
2022-10-26 13:06:04 +00:00
id: int
category: str
color: Optional[Color]
name: str
link: HttpUrl
comment: int = 0
2023-01-06 13:17:11 +00:00
comment_url: Optional[HttpUrl]
2022-10-26 13:06:04 +00:00
magnet: Optional[str]
torrent: Optional[HttpUrl]
size: Optional[ByteSize]
date: Optional[datetime]
seeds: Optional[int]
leechs: Optional[int]
downloads: Optional[int]
nb_pages: int = 1
class Bridge(BaseModel, ABC):
color: Color
title: str
favicon: HttpUrl
base_url: HttpUrl
@abstractmethod
def search_url(self, query: str = "", page: int = 1) -> HttpUrl:
pass
@abstractmethod
2023-01-04 15:57:16 +00:00
async def search(self, query: str = "", page: int = 1) -> List[RemoteFile]:
2022-10-26 13:06:04 +00:00
pass
class Cache(ABC):
@abstractmethod
def get(self, key: str) -> Optional[List[RemoteFile]]:
pass
@abstractmethod
def set(self, key: str, data: List[RemoteFile]):
pass
2023-01-04 21:32:27 +00:00
2023-01-13 22:11:55 +00:00
def async_wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = get_event_loop()
2023-01-04 21:32:27 +00:00
2023-01-13 22:11:55 +00:00
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run