This repository has been archived on 2023-10-01. You can view files and clone it, but cannot push or open issues or pull requests.
PyNyaaTa/pynyaata/cache/redis.py

23 lines
601 B
Python
Raw Normal View History

2022-10-26 13:06:04 +00:00
from json import dumps, loads
from typing import List, Optional
2022-12-22 00:01:21 +00:00
from pynyaata.cache import CACHE_TIMEOUT, REDIS_URL
2022-10-26 13:06:04 +00:00
from pynyaata.types import Cache, RemoteFile
2022-12-22 00:01:21 +00:00
from redis import ConnectionError, Redis
if not REDIS_URL:
raise ConnectionError(f"Invalid REDIS_URL: {REDIS_URL}")
2022-10-26 13:06:04 +00:00
client = Redis.from_url(REDIS_URL)
class RedisCache(Cache):
def get(self, key: str) -> Optional[List[RemoteFile]]:
2022-12-22 00:01:21 +00:00
data = client.get(key)
return loads(str(data)) if data else None
2022-10-26 13:06:04 +00:00
def set(self, key: str, data: List[RemoteFile]):
return client.set(key, dumps(data), CACHE_TIMEOUT)