from subprocess import run
from bs4 import BeautifulSoup
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from sys import platform
import re
import requests
import locale
class ConnectorException(Exception):
def __init__(self, connector_type):
super().__init__("Error, can't grab data from %s" % connector_type)
class Connector(ABC):
blacklist_words = ['Chris44', 'Vol.']
def __init__(self, query):
self.query = query
@abstractmethod
def get_full_search_url(self, sort_type, page, category):
pass
@abstractmethod
def search(self, sort_type, page, category):
pass
@abstractmethod
def get_history(self, sort_type, page, category):
pass
def curl_content(self, url, params=None, ajax=False):
if isinstance(self, YggTorrent):
try:
qt_env = {'QT_QPA_PLATFORM': 'offscreen'} if platform is 'linux' else {}
qt_output = run('phantomjs --cookies-file=/tmp/cookies.json delay.js "%s" 5000' % url, env=qt_env,
shell=True, check=True, capture_output=True, timeout=7000)
output = qt_output.stdout
http_code = 200
except Exception as e:
output = ''
http_code = 500
print(e)
else:
if ajax:
headers = {'X-Requested-With': 'XMLHttpRequest'}
else:
headers = {}
if params is not None:
response = requests.post(url, params, timeout=10, headers=headers)
else:
response = requests.get(url, timeout=10, headers=headers)
output = response.text
http_code = response.status_code
return {'http_code': http_code, 'output': output}
@staticmethod
def get_instance(url, query):
if 'nyaa.si' in url:
return Nyaa(query)
elif 'nyaa.net' in url:
return Pantsu(query)
elif 'anime-ultime' in url:
return AnimeUltime(query)
elif 'ygg' in url:
return YggTorrent(query)
else:
return Other(query)
@staticmethod
def get_lang(str_to_test):
if re.search('(vf|multi|french)', str_to_test, re.IGNORECASE):
return 'fr'
else:
return 'jp'
def boldify(self, str_to_replace):
if self.query:
return re.sub('(%s)' % self.query, r'\1', str_to_replace, flags=re.IGNORECASE)
else:
return str_to_replace
class Nyaa(Connector):
color = 'is-link'
title = 'Nyaa'
favicon = 'nyaa.png'
base_url = 'https://nyaa.si'
default_sort = 'size'
def get_full_search_url(self, sort_type=default_sort, page=1, category=None):
to_query = '(%s vf)|(%s vostfr)|(%s multi)|(%s french)' % (self.query, self.query, self.query, self.query)
return '%s/?f=0&c=1_3&s=%s&o=desc&q=%s&p=%s' % (self.base_url, sort_type, to_query, page)
def get_history(self, sort_type=default_sort, page=1, category=None):
output = self.search(sort_type, page, category)
return output[0]
def search(self, sort_type=default_sort, page=1, category=None):
data = []
response = self.curl_content(self.get_full_search_url(sort_type, page))
if response['http_code'] is 200:
html = BeautifulSoup(response['output'], 'html.parser')
trs = html.select('table.torrent-list tr')
valid_trs = 0
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
check_downloads = int(tds[7].string)
check_seeds = int(tds[5].string)
if check_downloads or check_seeds:
urls = tds[1].findAll('a')
if len(urls) > 1:
url = urls[1]
has_comment = True
else:
url = urls[0]
has_comment = False
if any(url.string in word for word in self.blacklist_words):
continue
valid_trs = valid_trs + 1
data.append({
'lang': self.get_lang(url.string),
'href': '%s%s' % (self.base_url, url['href']),
'name': self.boldify(url.string),
'comment': str(urls[0]).replace('/view/',
'%s%s' % (self.base_url, '/view/')) if has_comment else '',
'link': tds[2].decode_contents().replace('/download/', '%s%s' % (self.base_url, '/download/')),
'size': tds[3].string,
'date': '%s:00' % tds[4].string,
'seeds': check_seeds,
'leechs': tds[6].string,
'downloads': check_downloads,
'class': 'is-%s' % tr['class'][0]
})
return data, valid_trs is not len(trs)
else:
raise ConnectorException(self.title)
class Pantsu(Connector):
color = 'is-info'
title = 'Pantsu'
favicon = 'pantsu.png'
base_url = 'https://nyaa.net'
default_sort = 4
def get_full_search_url(self, sort_type=default_sort, page=1, category=None):
to_query = '(%s vf)|(%s vostfr)|(%s multi)|(%s french)' % (self.query, self.query, self.query, self.query)
return '%s/search/%s?c=3_13&order=false&q=%s&sort=%s' % (self.base_url, page, to_query, sort_type)
def get_history(self, sort_type=default_sort, page=1, category=None):
output = self.search(sort_type, page, category)
return output[0]
def search(self, sort_type=default_sort, page=1, category=None):
data = []
response = self.curl_content(self.get_full_search_url(sort_type, page))
if response['http_code'] is 200:
html = BeautifulSoup(response['output'], 'html.parser')
trs = html.select('div.results tr')
valid_trs = 0
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
check_downloads = int(tds[6].string.replace('-', '0'))
check_seeds = int(tds[4].string.replace('-', '0'))
if check_downloads or check_seeds:
url = tds[1].a
if any(url.string in word for word in self.blacklist_words):
continue
valid_trs = valid_trs + 1
data.append({
'lang': self.get_lang(url.string),
'href': '%s%s' % (self.base_url, url['href']),
'name': self.boldify(url.string),
'comment': '',
'link': tds[2].decode_contents()
.replace('icon-magnet', 'fa fa-fw fa-magnet')
.replace('icon-floppy', 'fa fa-fw fa-download'),
'size': tds[3].string,
'date': datetime
.strptime(tds[7]['title'], '%m/%d/%Y, %I:%M:%S %p %Z+0')
.strftime('%Y-%m-%d %H:%M:%S'),
'seeds': check_seeds,
'leechs': tds[5].string,
'downloads': check_downloads,
'class': 'is-%s' % tr['class'][0]
})
return data, valid_trs is not len(trs)
else:
raise ConnectorException(self.title)
class YggTorrent(Connector):
color = 'is-success'
title = 'YggTorrent'
favicon = 'yggtorrent.png'
base_url = 'https://www2.yggtorrent.pe'
default_sort = 'size'
def get_full_search_url(self, sort_type=default_sort, page=1, category=None):
if category is None:
raise ConnectorException(self.title)
return '%s/engine/search?do=search&order=desc&sort=%s&category=2145&sub_category=%s&name=%s&page=%s' % (
self.base_url, sort_type, category, self.query, page
)
def get_history(self, sort_type=default_sort, page=1, category=None):
if category is None:
raise ConnectorException(self.title)
output = self.search(sort_type, page, category)
return output[0]
def search(self, sort_type=default_sort, page=1, category=None):
if category is None:
raise ConnectorException(self.title)
data = []
response = self.curl_content(self.get_full_search_url(sort_type, page, category))
if response['http_code'] is 200:
html = BeautifulSoup(response['output'], 'html.parser')
trs = html.select('table.table tr')
valid_trs = 0
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
check_downloads = int(tds[6].string)
check_seeds = int(tds[7].string)
if check_downloads or check_seeds:
url = tds[1].a
if any(url.string in word for word in self.blacklist_words):
continue
valid_trs = valid_trs + 1
data.append({
'lang': self.get_lang(url.string),
'href': url['href'],
'name': self.boldify(url.string),
'comment': '%s' %
(url['href'], tds[3].string),
'link': '' %
(self.base_url, re.search(r'/(\d+)', url['href']).group(1)),
'size': tds[5].string,
'date': datetime.fromtimestamp(int(tds[4].div.string)).strftime('%Y-%m-%d %H:%M:%S'),
'seeds': check_seeds,
'leechs': tds[8].string,
'downloads': check_downloads,
'class': ''
})
return data, valid_trs is len(trs)
else:
raise ConnectorException(self.title)
class AnimeUltime(Connector):
color = 'is-warning'
title = 'Anime-Ultime'
favicon = 'animeultime.png'
base_url = 'http://www.anime-ultime.net'
default_sort = 'search'
def get_full_search_url(self, sort_type=default_sort, page=1, category=None):
if sort_type is 'history':
page_date = datetime.now() - timedelta((page - 1) * 365 / 12)
from_date = page_date.strftime('%m%Y')
else:
from_date = ''
return '%s/%s-0-1/%s' % (self.base_url, sort_type, from_date)
def search(self, sort_type=default_sort, page=1, category=None):
data = []
response = self.curl_content(self.get_full_search_url(sort_type, page), {'search': self.query})
if response['http_code'] is 200:
html = BeautifulSoup(response['output'], 'html.parser')
title = html.select('div.title')
if 'Recherche' in title[0].string:
trs = html.select('table.jtable tr')
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
if len(tds) < 2:
continue
url = tds[0].a
data.append({
'lang': 'jp',
'href': '%s/%s' % (self.base_url, url['href']),
'name': url.decode_contents(),
'type': tds[1].string
})
else:
player = html.select('div.AUVideoPlayer')
name = html.select('h1')
ani_type = html.select('div.titre')
data.append({
'lang': 'jp',
'href': '%s%s' % (self.get_full_search_url('file'), player[0]['data-serie']),
'name': self.boldify(name[0].string),
'type': ani_type[0].string.replace(':', '')
})
return data, False
else:
raise ConnectorException(self.title)
def get_history(self, sort_type=default_sort, page=1, category=None):
data = []
response = self.curl_content(self.get_full_search_url('history', page))
if response['http_code'] is 200:
html = BeautifulSoup(response['output'], 'html.parser')
tables = html.select('table.jtable')
h3s = html.findAll('h3')
for i, table in enumerate(tables):
for j, tr in enumerate(table.findAll('tr')):
if not j:
continue
tds = tr.findAll('td')
link = tds[0].a
current_locale = locale.getlocale()
locale.setlocale(locale.LC_ALL, ('fr_FR', 'UTF-8'))
release_date = datetime.strptime(h3s[i].string, '%A %d %B %Y : ').strftime('%Y-%m-%d %H:%M:%S')
locale.setlocale(locale.LC_ALL, current_locale)
data.append({
'lang': 'jp',
'href': '%s%s' % (self.get_full_search_url('file'), link['href']),
'name': link.string,
'type': tds[4].string,
'date': release_date
})
return data
else:
raise ConnectorException(self.title)
class Other(Connector):
color = 'is-danger'
title = 'Other'
favicon = 'blank.png'
def get_full_search_url(self, sort_type=None, page=1, category=None):
return ''
def search(self, sort_type=None, page=1, category=None):
return [], False
def get_history(self, sort_type, page, category):
return []