This repository has been archived on 2023-10-01. You can view files and clone it, but cannot push or open issues or pull requests.
PyNyaaTa/pynyaata/connectors/core.py
Michel Roux bc65d05f46
Some checks failed
continuous-integration/drone/push Build is failing
Cache managed by redis now
2021-07-10 09:25:35 +02:00

201 lines
5.0 KiB
Python

from abc import ABC, abstractmethod
from enum import Enum
from functools import wraps
from json import dumps, loads
from urllib.parse import urlencode
import requests
from requests import RequestException
from ..config import CACHE_TIMEOUT, REQUESTS_TIMEOUT, CLOUDPROXY_ENDPOINT, logger, REDIS_ENABLED
if REDIS_ENABLED:
from ..config import cache
cloudproxy_session = None
class ConnectorReturn(Enum):
SEARCH = 1
HISTORY = 2
class ConnectorLang(Enum):
FR = '🇫🇷'
JP = '🇯🇵'
class Cache:
def cache_data(self, f):
@wraps(f)
def wrapper(*args, **kwds):
connector = args[0]
key = 'pynyaata.%s.%s.%s.%s' % (connector.__class__.__name__, f.__name__, connector.query, connector.page)
if REDIS_ENABLED:
json = cache.get(key)
if json:
data = loads(json)
connector.data = data['data']
connector.is_more = data['is_more']
connector.on_error = False
return
ret = f(*args, **kwds)
if not connector.on_error and REDIS_ENABLED:
cache.set(key, dumps({
'data': connector.data,
'is_more': connector.is_more
}), CACHE_TIMEOUT)
return ret
return wrapper
ConnectorCache = Cache()
def curl_content(url, params=None, ajax=False, debug=True):
from . import get_instance
output = ''
http_code = 500
method = 'post' if (params is not None) else 'get'
instance = get_instance(url)
if ajax:
headers = {'X-Requested-With': 'XMLHttpRequest'}
else:
headers = {}
try:
if not instance.is_behind_cloudflare:
if method == 'post':
response = requests.post(url, params, timeout=REQUESTS_TIMEOUT, headers=headers)
else:
response = requests.get(url, timeout=REQUESTS_TIMEOUT, headers=headers)
output = response.text
http_code = response.status_code
elif CLOUDPROXY_ENDPOINT:
global cloudproxy_session
if not cloudproxy_session:
json_session = requests.post(CLOUDPROXY_ENDPOINT, headers=headers, data=dumps({
'cmd': 'sessions.create'
}))
response_session = loads(json_session.text)
cloudproxy_session = response_session['session']
headers['Content-Type'] = 'application/x-www-form-urlencoded' if (method == 'post') else 'application/json'
json_response = requests.post(CLOUDPROXY_ENDPOINT, headers=headers, data=dumps({
'cmd': 'request.%s' % method,
'url': url,
'session': cloudproxy_session,
'postData': '%s' % urlencode(params) if (method == 'post') else ''
}))
http_code = json_response.status_code
response = loads(json_response.text)
if 'solution' in response:
output = response['solution']['response']
if http_code == 500:
requests.post(CLOUDPROXY_ENDPOINT, headers=headers, data=dumps({
'cmd': 'sessions.destroy',
'session': cloudproxy_session,
}))
cloudproxy_session = None
except RequestException as e:
if debug:
logger.exception(e)
return {'http_code': http_code, 'output': output}
class ConnectorCore(ABC):
@property
@abstractmethod
def color(self):
pass
@property
@abstractmethod
def title(self):
pass
@property
@abstractmethod
def favicon(self):
pass
@property
@abstractmethod
def base_url(self):
pass
@property
@abstractmethod
def is_light(self):
pass
@property
@abstractmethod
def is_behind_cloudflare(self):
pass
def __init__(self, query, page=1, return_type=ConnectorReturn.SEARCH):
self.query = query
self.data = []
self.is_more = False
self.on_error = True
self.page = page
self.return_type = return_type
@abstractmethod
def get_full_search_url(self):
pass
@abstractmethod
def search(self):
pass
@abstractmethod
def get_history(self):
pass
@abstractmethod
def is_vf(self, url):
pass
async def run(self):
if self.on_error:
if self.return_type is ConnectorReturn.SEARCH:
self.search()
elif self.return_type is ConnectorReturn.HISTORY:
self.get_history()
return self
class Other(ConnectorCore):
color = 'is-danger'
title = 'Other'
favicon = 'blank.png'
base_url = ''
is_light = True
is_behind_cloudflare = False
def get_full_search_url(self):
pass
def search(self):
pass
def get_history(self):
pass
def is_vf(self, url):
return False