Move to poetry, mypy and black
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Michel Roux 2022-09-01 18:52:02 +00:00
parent aa4bf1bbb2
commit 46977c7e38
24 changed files with 2307 additions and 387 deletions

View File

@ -1,3 +0,0 @@
.idea
.venv
.db

View File

@ -3,11 +3,15 @@ name: default
type: docker
steps:
- name: flake8
image: python:slim
- name: lint
image: python:3.7-slim
commands:
- pip install flake8
- flake8 pynyaata --ignore=E501
- pip install poetry
- poetry install
- poetry run flake8
- poetry run mypy .
- poetry run djlint .
- name: docker
image: plugins/docker
settings:
@ -17,6 +21,7 @@ steps:
from_secret: docker_username
password:
from_secret: docker_password
- name: pypi
image: plugins/pypi
settings:
@ -24,6 +29,7 @@ steps:
from_secret: pypi_username
password:
from_secret: pypi_password
skip_build: true
when:
branch:
- master

2
.flake8 Normal file
View File

@ -0,0 +1,2 @@
[flake8]
max-line-length = 100

162
.gitignore vendored
View File

@ -1,10 +1,154 @@
.idea
.venv
.vscode
.db
# https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
dist
build
*.egg*
__pycache__
test.py
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/

View File

@ -1,7 +1,12 @@
FROM python:3.10.6-slim as build
WORKDIR /app
COPY . .
RUN pip install poetry && poetry build
FROM python:3.10.6-slim
COPY pynyaata /app/pynyaata
COPY requirements.txt *.py /app/
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "run.py"]
COPY --from=build /app/dist /tmp/dist
RUN pip install /tmp/dist/*.whl && rm -rf /tmp/dist
CMD ["pynyaata"]

View File

@ -1,6 +0,0 @@
include run.py
include get404.py
include README.md
include requirements.txt
recursive-include pynyaata/static *
recursive-include pynyaata/templates *

View File

@ -27,7 +27,7 @@ After a good rewrite in Python, it's time to show it to the public, and here it
## Features
* Search on [Nyaa.si](https://nyaa.si/), [Nyaa.net (codename Pantsu)](https://nyaa.net/), [YggTorrent](https://duckduckgo.com/?q=yggtorrent) and [Anime-Ultime](http://www.anime-ultime.net/index-0-1)
* Search on [Nyaa.si](https://nyaa.si/), [YggTorrent](https://duckduckgo.com/?q=yggtorrent) and [Anime-Ultime](http://www.anime-ultime.net/index-0-1)
* Provide useful links to [TheTVDB](https://www.thetvdb.com/) and [Nautiljon](https://www.nautiljon.com/) during a search
* Color official and bad links
* Add seeded links to a database

View File

@ -1,21 +0,0 @@
from pynyaata.connectors.core import curl_content
from pynyaata.models import AnimeLink
links = AnimeLink.query.all()
for link in links:
html = curl_content(link.link, debug=False, cloudflare=True)
if html['http_code'] != 200 and html['http_code'] != 500:
print('(%d) %s %s : %s' % (
html['http_code'],
link.title.name,
link.season,
link.link
))
elif 'darkgray' in str(html['output']):
print('(darkgray) %s %s : %s' % (
link.title.name,
link.season,
link.link
))

1727
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,14 +1,23 @@
from asyncio import get_event_loop, set_event_loop, SelectorEventLoop
from asyncio import SelectorEventLoop, get_event_loop, set_event_loop
from functools import wraps
from operator import attrgetter, itemgetter
from flask import redirect, render_template, request, url_for, abort
from flask import abort, redirect, render_template, request, url_for
from . import utils
from .config import app, auth, ADMIN_USERNAME, ADMIN_PASSWORD, DB_ENABLED, APP_PORT, IS_DEBUG, TRANSMISSION_ENABLED
from .connectors import get_instance, run_all, Nyaa
from .config import (
ADMIN_PASSWORD,
ADMIN_USERNAME,
APP_PORT,
DB_ENABLED,
IS_DEBUG,
TRANSMISSION_ENABLED,
app,
auth,
)
from .connectors import Nyaa, get_instance, run_all
from .connectors.core import ConnectorLang, ConnectorReturn
from .forms import SearchForm, DeleteForm, EditForm, FolderDeleteForm, FolderEditForm
from .forms import DeleteForm, EditForm, FolderDeleteForm, FolderEditForm, SearchForm
if DB_ENABLED:
from .config import db
@ -29,7 +38,8 @@ def db_required(f):
def clean_titles():
db.engine.execute("""
db.engine.execute(
"""
DELETE
FROM anime_title
WHERE id IN (
@ -38,7 +48,8 @@ WHERE id IN (
LEFT JOIN anime_link ON anime_title.id = anime_link.title_id
WHERE anime_link.id IS NULL
)
""")
"""
)
@auth.verify_password
@ -46,9 +57,9 @@ def verify_password(username, password):
return username == ADMIN_USERNAME and ADMIN_PASSWORD == password
@app.template_filter('boldify')
@app.template_filter("boldify")
def boldify(name):
query = request.args.get('q', '')
query = request.args.get("q", "")
name = utils.boldify(name, query)
if DB_ENABLED:
for keyword in db.session.query(AnimeTitle.keyword.distinct()).all():
@ -57,12 +68,12 @@ def boldify(name):
return name
@app.template_filter('flagify')
@app.template_filter("flagify")
def flagify(is_vf):
return ConnectorLang.FR.value if is_vf else ConnectorLang.JP.value
@app.template_filter('colorify')
@app.template_filter("colorify")
def colorify(model):
return get_instance(model.link, model.title.keyword).color
@ -72,53 +83,62 @@ def inject_user():
return dict(db_disabled=not DB_ENABLED)
@app.route('/')
@app.route("/")
def home():
return render_template('layout.html', search_form=SearchForm(), title='Anime torrents search engine')
return render_template(
"layout.html", search_form=SearchForm(), title="Anime torrents search engine"
)
@app.route('/search')
@app.route("/search")
def search():
query = request.args.get('q')
query = request.args.get("q")
if not query:
return redirect(url_for('home'))
return redirect(url_for("home"))
set_event_loop(SelectorEventLoop())
torrents = get_event_loop().run_until_complete(run_all(query))
return render_template('search.html', search_form=SearchForm(), connectors=torrents)
return render_template("search.html", search_form=SearchForm(), connectors=torrents)
@app.route('/latest')
@app.route('/latest/<int:page>')
@app.route("/latest")
@app.route("/latest/<int:page>")
def latest(page=1):
set_event_loop(SelectorEventLoop())
torrents = get_event_loop().run_until_complete(
run_all('', return_type=ConnectorReturn.HISTORY, page=page)
run_all("", return_type=ConnectorReturn.HISTORY, page=page)
)
results = []
for torrent in torrents:
results = results + torrent.data
for result in results:
result['self'] = get_instance(result['href'])
results.sort(key=itemgetter('date'), reverse=True)
result["self"] = get_instance(result["href"])
results.sort(key=itemgetter("date"), reverse=True)
return render_template('latest.html', search_form=SearchForm(), torrents=results, page=page)
return render_template(
"latest.html", search_form=SearchForm(), torrents=results, page=page
)
@app.route('/list')
@app.route('/list/<url_filters>')
@app.route("/list")
@app.route("/list/<url_filters>")
@db_required
def list_animes(url_filters='nyaa,yggtorrent'):
def list_animes(url_filters="nyaa,yggtorrent"):
filters = None
for i, to_filter in enumerate(url_filters.split(',')):
for i, to_filter in enumerate(url_filters.split(",")):
if not i:
filters = AnimeLink.link.contains(to_filter)
else:
filters = filters | AnimeLink.link.contains(to_filter)
titles = db.session.query(AnimeTitle, AnimeLink).join(
AnimeLink).filter(filters).order_by(AnimeTitle.name).all()
titles = (
db.session.query(AnimeTitle, AnimeLink)
.join(AnimeLink)
.filter(filters)
.order_by(AnimeTitle.name)
.all()
)
results = {}
for title, link in titles:
@ -127,10 +147,10 @@ def list_animes(url_filters='nyaa,yggtorrent'):
else:
results[title.id].append(link)
return render_template('list.html', search_form=SearchForm(), titles=results)
return render_template("list.html", search_form=SearchForm(), titles=results)
@app.route('/admin', methods=['GET', 'POST'])
@app.route("/admin", methods=["GET", "POST"])
@db_required
@auth.login_required
def admin():
@ -139,9 +159,9 @@ def admin():
if form.validate_on_submit():
link = AnimeLink.query.filter_by(id=int(form.id.data)).first()
if link:
form.message = '%s (%s) has been successfully deleted' % (
form.message = "%s (%s) has been successfully deleted" % (
link.title.name,
link.season
link.season,
)
db.session.delete(link)
db.session.commit()
@ -152,19 +172,21 @@ def admin():
db.session.commit()
else:
form._errors = {
'id': ['Id %s was not found in the database' % form.id.data]
"id": ["Id %s was not found in the database" % form.id.data]
}
folders = AnimeFolder.query.all()
for folder in folders:
for title in folder.titles:
title.links.sort(key=attrgetter('season'))
folder.titles.sort(key=attrgetter('name'))
title.links.sort(key=attrgetter("season"))
folder.titles.sort(key=attrgetter("name"))
return render_template('admin/list.html', search_form=SearchForm(), folders=folders, action_form=form)
return render_template(
"admin/list.html", search_form=SearchForm(), folders=folders, action_form=form
)
@app.route('/admin/folder', methods=['GET', 'POST'])
@app.route("/admin/folder", methods=["GET", "POST"])
@db_required
@auth.login_required
def folder_list():
@ -173,31 +195,33 @@ def folder_list():
if form.validate_on_submit():
folder = AnimeFolder.query.filter_by(id=int(form.id.data)).first()
if folder:
form.message = '%s has been successfully deleted' % folder.name
form.message = "%s has been successfully deleted" % folder.name
db.session.delete(folder)
db.session.commit()
else:
form._errors = {
'id': ['Id %s was not found in the database' % form.id.data]
"id": ["Id %s was not found in the database" % form.id.data]
}
folders = AnimeFolder.query.all()
return render_template('admin/folder/list.html', search_form=SearchForm(), folders=folders, action_form=form)
return render_template(
"admin/folder/list.html",
search_form=SearchForm(),
folders=folders,
action_form=form,
)
@app.route('/admin/folder/edit', methods=['GET', 'POST'])
@app.route('/admin/folder/edit/<int:folder_id>', methods=['GET', 'POST'])
@app.route("/admin/folder/edit", methods=["GET", "POST"])
@app.route("/admin/folder/edit/<int:folder_id>", methods=["GET", "POST"])
@db_required
@auth.login_required
def folder_edit(folder_id=None):
folder = AnimeFolder.query.filter_by(id=folder_id).first()
folder = folder if folder else AnimeFolder()
form = FolderEditForm(
request.form,
id=folder.id,
name=folder.name,
path=folder.path
request.form, id=folder.id, name=folder.name, path=folder.path
)
if form.validate_on_submit():
@ -206,13 +230,15 @@ def folder_edit(folder_id=None):
folder.path = form.path.data
db.session.add(folder)
db.session.commit()
return redirect(url_for('folder_list'))
return redirect(url_for("folder_list"))
return render_template('admin/folder/edit.html', search_form=SearchForm(), action_form=form)
return render_template(
"admin/folder/edit.html", search_form=SearchForm(), action_form=form
)
@app.route('/admin/edit', methods=['GET', 'POST'])
@app.route('/admin/edit/<int:link_id>', methods=['GET', 'POST'])
@app.route("/admin/edit", methods=["GET", "POST"])
@app.route("/admin/edit/<int:link_id>", methods=["GET", "POST"])
@db_required
@auth.login_required
def admin_edit(link_id=None):
@ -228,9 +254,9 @@ def admin_edit(link_id=None):
link=link.link,
season=link.season,
comment=link.comment,
keyword=link.title.keyword if link.title else None
keyword=link.title.keyword if link.title else None,
)
form.folder.choices = [('', '')] + [(g.id, g.name) for g in folders]
form.folder.choices = [("", "")] + [(g.id, g.name) for g in folders]
if form.validate_on_submit():
# Instance for VF tag
@ -238,9 +264,9 @@ def admin_edit(link_id=None):
# Title
title = AnimeTitle.query.filter_by(id=link.title_id).first()
title = title if title else AnimeTitle.query.filter_by(
name=form.name.data
).first()
title = (
title if title else AnimeTitle.query.filter_by(name=form.name.data).first()
)
title = title if title else AnimeTitle()
title.folder_id = form.folder.data
title.name = form.name.data
@ -262,23 +288,21 @@ def admin_edit(link_id=None):
# Transmission
if TRANSMISSION_ENABLED and isinstance(instance, Nyaa):
if title.folder.path is not None and title.folder.path != '':
download_url = link.link.replace(
'/view/',
'/download/'
) + '.torrent'
torrent_path = '%s/%s' % (title.folder.path, title.name)
if title.folder.path is not None and title.folder.path != "":
download_url = link.link.replace("/view/", "/download/") + ".torrent"
torrent_path = "%s/%s" % (title.folder.path, title.name)
torrent = transmission.add_torrent(
download_url,
download_dir=torrent_path
download_url, download_dir=torrent_path
)
transmission.move_torrent_data(torrent.id, torrent_path)
transmission.start_torrent(torrent.id)
return redirect(url_for('admin'))
return redirect(url_for("admin"))
return render_template('admin/edit.html', search_form=SearchForm(), folders=folders, action_form=form)
return render_template(
"admin/edit.html", search_form=SearchForm(), folders=folders, action_form=form
)
def run():
app.run('0.0.0.0', APP_PORT, IS_DEBUG)
app.run("0.0.0.0", APP_PORT, IS_DEBUG)

View File

@ -10,19 +10,23 @@ from transmission_rpc.client import Client
load_dotenv()
IS_DEBUG = environ.get('FLASK_ENV', 'production') == 'development'
ADMIN_USERNAME = environ.get('ADMIN_USERNAME', 'admin')
ADMIN_PASSWORD = environ.get('ADMIN_PASSWORD', 'secret')
APP_PORT = int(environ.get('FLASK_PORT', 5000))
CACHE_TIMEOUT = int(environ.get('CACHE_TIMEOUT', 60 * 60))
REQUESTS_TIMEOUT = int(environ.get('REQUESTS_TIMEOUT', 5))
BLACKLIST_WORDS = environ.get('BLACKLIST_WORDS', '').split(',') if environ.get('BLACKLIST_WORDS', '') else []
IS_DEBUG = environ.get("FLASK_ENV", "production") == "development"
ADMIN_USERNAME = environ.get("ADMIN_USERNAME", "admin")
ADMIN_PASSWORD = environ.get("ADMIN_PASSWORD", "secret")
APP_PORT = int(environ.get("FLASK_PORT", 5000))
CACHE_TIMEOUT = int(environ.get("CACHE_TIMEOUT", 60 * 60))
REQUESTS_TIMEOUT = int(environ.get("REQUESTS_TIMEOUT", 5))
BLACKLIST_WORDS = (
environ.get("BLACKLIST_WORDS", "").split(",")
if environ.get("BLACKLIST_WORDS", "")
else []
)
DB_ENABLED = False
REDIS_ENABLED = False
TRANSMISSION_ENABLED = False
app = Flask(__name__)
app.name = 'PyNyaaTa'
app.name = "PyNyaaTa"
app.debug = IS_DEBUG
app.secret_key = urandom(24).hex()
app.url_map.strict_slashes = False
@ -30,33 +34,31 @@ auth = HTTPBasicAuth()
logging.basicConfig(level=(logging.DEBUG if IS_DEBUG else logging.INFO))
logger = logging.getLogger(app.name)
db_uri = environ.get('DATABASE_URI')
db_uri = environ.get("DATABASE_URI")
if db_uri:
DB_ENABLED = True
app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_ECHO'] = IS_DEBUG
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_recycle': 200
}
app.config["SQLALCHEMY_DATABASE_URI"] = db_uri
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = True
app.config["SQLALCHEMY_ECHO"] = IS_DEBUG
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {"pool_recycle": 200}
db = SQLAlchemy(app)
from .models import create_all
create_all()
cache_host = environ.get('REDIS_SERVER')
cache_host = environ.get("REDIS_SERVER")
if cache_host:
REDIS_ENABLED = True
cache = Redis(cache_host)
transmission_host = environ.get('TRANSMISSION_SERVER')
transmission_host = environ.get("TRANSMISSION_SERVER")
if transmission_host:
TRANSMISSION_ENABLED = True
transmission_username = environ.get('TRANSMISSION_RPC_USERNAME')
transmission_password = environ.get('TRANSMISSION_RPC_PASSWORD')
transmission_username = environ.get("TRANSMISSION_RPC_USERNAME")
transmission_password = environ.get("TRANSMISSION_RPC_PASSWORD")
transmission = Client(
username=transmission_username,
password=transmission_password,
host=transmission_host,
logger=logger
logger=logger,
)

View File

@ -3,24 +3,26 @@ from asyncio import gather
from .animeultime import AnimeUltime
from .core import Other
from .nyaa import Nyaa
from .yggtorrent import YggTorrent, YggAnimation
from .yggtorrent import YggAnimation, YggTorrent
async def run_all(*args, **kwargs):
coroutines = [Nyaa(*args, **kwargs).run(),
AnimeUltime(*args, **kwargs).run(),
YggTorrent(*args, **kwargs).run(),
YggAnimation(*args, **kwargs).run()]
coroutines = [
Nyaa(*args, **kwargs).run(),
AnimeUltime(*args, **kwargs).run(),
YggTorrent(*args, **kwargs).run(),
YggAnimation(*args, **kwargs).run(),
]
return list(await gather(*coroutines))
def get_instance(url, query=''):
if 'nyaa.si' in url:
def get_instance(url, query=""):
if "nyaa.si" in url:
return Nyaa(query)
elif 'anime-ultime' in url:
elif "anime-ultime" in url:
return AnimeUltime(query)
elif 'ygg' in url:
elif "ygg" in url:
return YggTorrent(query)
else:
return Other(query)

View File

@ -2,80 +2,79 @@ from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from .core import ConnectorCore, ConnectorReturn, ConnectorCache, curl_content
from ..utils import parse_date, link_exist_in_db
from .core import ConnectorCache, ConnectorCore, ConnectorReturn, curl_content
from ..utils import link_exist_in_db, parse_date
class AnimeUltime(ConnectorCore):
color = 'is-warning'
title = 'Anime-Ultime'
favicon = 'animeultime.png'
base_url = 'http://www.anime-ultime.net'
color = "is-warning"
title = "Anime-Ultime"
favicon = "animeultime.png"
base_url = "http://www.anime-ultime.net"
is_light = True
def get_full_search_url(self):
from_date = ''
sort_type = 'search'
from_date = ""
sort_type = "search"
if self.return_type is ConnectorReturn.HISTORY:
try:
page_date = datetime.now() - timedelta((int(self.page) - 1) * 365 / 12)
except OverflowError:
page_date = datetime.fromtimestamp(0)
from_date = page_date.strftime('%m%Y')
sort_type = 'history'
from_date = page_date.strftime("%m%Y")
sort_type = "history"
return '%s/%s-0-1/%s' % (self.base_url, sort_type, from_date)
return "%s/%s-0-1/%s" % (self.base_url, sort_type, from_date)
@ConnectorCache.cache_data
def search(self):
response = curl_content(self.get_full_search_url(), {
'search': self.query
})
response = curl_content(self.get_full_search_url(), {"search": self.query})
if response['http_code'] == 200:
html = BeautifulSoup(response['output'], 'html.parser')
title = html.select('div.title')
player = html.select('div.AUVideoPlayer')
if response["http_code"] == 200:
html = BeautifulSoup(response["output"], "html.parser")
title = html.select("div.title")
player = html.select("div.AUVideoPlayer")
if len(title) > 0 and 'Recherche' in title[0].get_text():
trs = html.select('table.jtable tr')
if len(title) > 0 and "Recherche" in title[0].get_text():
trs = html.select("table.jtable tr")
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
tds = tr.findAll("td")
if len(tds) < 2:
continue
url = tds[0].a
href = '%s/%s' % (self.base_url, url['href'])
href = "%s/%s" % (self.base_url, url["href"])
if not any(href == d['href'] for d in self.data):
self.data.append({
'vf': self.is_vf(),
'href': href,
'name': url.get_text(),
'type': tds[1].get_text(),
'class': self.color if link_exist_in_db(href) else ''
})
if not any(href == d["href"] for d in self.data):
self.data.append(
{
"vf": self.is_vf(),
"href": href,
"name": url.get_text(),
"type": tds[1].get_text(),
"class": self.color if link_exist_in_db(href) else "",
}
)
elif len(player) > 0:
name = html.select('h1')
ani_type = html.select('div.titre')
href = '%s/file-0-1/%s' % (
self.base_url,
player[0]['data-serie']
)
name = html.select("h1")
ani_type = html.select("div.titre")
href = "%s/file-0-1/%s" % (self.base_url, player[0]["data-serie"])
self.data.append({
'vf': self.is_vf(),
'href': href,
'name': name[0].get_text(),
'type': ani_type[0].get_text().replace(':', ''),
'class': self.color if link_exist_in_db(href) else ''
})
self.data.append(
{
"vf": self.is_vf(),
"href": href,
"name": name[0].get_text(),
"type": ani_type[0].get_text().replace(":", ""),
"class": self.color if link_exist_in_db(href) else "",
}
)
self.on_error = False
@ -83,31 +82,33 @@ class AnimeUltime(ConnectorCore):
def get_history(self):
response = curl_content(self.get_full_search_url())
if response['http_code'] == 200:
html = BeautifulSoup(response['output'], 'html.parser')
tables = html.select('table.jtable')
h3s = html.findAll('h3')
if response["http_code"] == 200:
html = BeautifulSoup(response["output"], "html.parser")
tables = html.select("table.jtable")
h3s = html.findAll("h3")
for i, table in enumerate(tables):
for j, tr in enumerate(table.findAll('tr')):
for j, tr in enumerate(table.findAll("tr")):
if not j:
continue
tds = tr.findAll('td')
tds = tr.findAll("td")
link = tds[0].a
href = '%s/%s' % (self.base_url, link['href'])
href = "%s/%s" % (self.base_url, link["href"])
self.data.append({
'vf': self.is_vf(),
'href': href,
'name': link.get_text(),
'type': tds[4].get_text(),
'date': parse_date(h3s[i].string[:-3], '%A %d %B %Y'),
'class': self.color if link_exist_in_db(href) else ''
})
self.data.append(
{
"vf": self.is_vf(),
"href": href,
"name": link.get_text(),
"type": tds[4].get_text(),
"date": parse_date(h3s[i].string[:-3], "%A %d %B %Y"),
"class": self.color if link_exist_in_db(href) else "",
}
)
self.on_error = False
@ConnectorCache.cache_data
def is_vf(self, url=''):
def is_vf(self, url=""):
return False

View File

@ -3,11 +3,11 @@ from enum import Enum
from functools import wraps
from json import dumps, loads
from redis.exceptions import RedisError
import requests
from requests import RequestException
from redis.exceptions import RedisError
from ..config import CACHE_TIMEOUT, REQUESTS_TIMEOUT, logger, REDIS_ENABLED
from ..config import CACHE_TIMEOUT, REDIS_ENABLED, REQUESTS_TIMEOUT, logger
if REDIS_ENABLED:
from ..config import cache
@ -21,8 +21,8 @@ class ConnectorReturn(Enum):
class ConnectorLang(Enum):
FR = '🇫🇷'
JP = '🇯🇵'
FR = "🇫🇷"
JP = "🇯🇵"
class Cache:
@ -30,11 +30,11 @@ class Cache:
@wraps(f)
def wrapper(*args, **kwds):
connector = args[0]
key = 'pynyaata.%s.%s.%s.%s' % (
key = "pynyaata.%s.%s.%s.%s" % (
connector.__class__.__name__,
f.__name__,
connector.query,
connector.page
connector.page,
)
if REDIS_ENABLED:
@ -47,8 +47,8 @@ class Cache:
if json:
data = loads(json)
connector.data = data['data']
connector.is_more = data['is_more']
connector.data = data["data"]
connector.is_more = data["is_more"]
connector.on_error = False
return
@ -56,10 +56,11 @@ class Cache:
if not connector.on_error and REDIS_ENABLED:
try:
cache.set(key, dumps({
'data': connector.data,
'is_more': connector.is_more
}), CACHE_TIMEOUT)
cache.set(
key,
dumps({"data": connector.data, "is_more": connector.is_more}),
CACHE_TIMEOUT,
)
except RedisError:
pass
@ -72,31 +73,24 @@ ConnectorCache = Cache()
def curl_content(url, params=None, ajax=False, debug=True, cloudflare=False):
output = ''
output = ""
http_code = 500
method = 'post' if (params is not None) else 'get'
method = "post" if (params is not None) else "get"
headers = {}
if ajax:
headers['X-Requested-With'] = 'XMLHttpRequest'
headers["X-Requested-With"] = "XMLHttpRequest"
if cloudflare:
headers['User-Agent'] = 'Googlebot/2.1 (+http://www.google.com/bot.html)'
headers["User-Agent"] = "Googlebot/2.1 (+http://www.google.com/bot.html)"
try:
if method == 'post':
if method == "post":
response = requests.post(
url,
params,
timeout=REQUESTS_TIMEOUT,
headers=headers
url, params, timeout=REQUESTS_TIMEOUT, headers=headers
)
else:
response = requests.get(
url,
timeout=REQUESTS_TIMEOUT,
headers=headers
)
response = requests.get(url, timeout=REQUESTS_TIMEOUT, headers=headers)
output = response.text
http_code = response.status_code
@ -104,7 +98,7 @@ def curl_content(url, params=None, ajax=False, debug=True, cloudflare=False):
if debug:
logger.exception(e)
return {'http_code': http_code, 'output': output}
return {"http_code": http_code, "output": output}
class ConnectorCore(ABC):
@ -167,10 +161,10 @@ class ConnectorCore(ABC):
class Other(ConnectorCore):
color = 'is-danger'
title = 'Other'
favicon = 'blank.png'
base_url = ''
color = "is-danger"
title = "Other"
favicon = "blank.png"
base_url = ""
is_light = True
def get_full_search_url(self):

View File

@ -1,28 +1,33 @@
from bs4 import BeautifulSoup
from .core import ConnectorCore, ConnectorReturn, ConnectorCache, curl_content
from ..utils import link_exist_in_db, check_blacklist_words, check_if_vf
from .core import ConnectorCache, ConnectorCore, ConnectorReturn, curl_content
from ..utils import check_blacklist_words, check_if_vf, link_exist_in_db
class Nyaa(ConnectorCore):
color = 'is-link'
title = 'Nyaa'
favicon = 'nyaa.png'
base_url = 'https://nyaa.si'
color = "is-link"
title = "Nyaa"
favicon = "nyaa.png"
base_url = "https://nyaa.si"
is_light = False
def get_full_search_url(self):
sort_type = 'size'
sort_type = "size"
if self.return_type is ConnectorReturn.HISTORY:
sort_type = 'id'
sort_type = "id"
to_query = '(%s vf)|(%s vostfr)|(%s multi)|(%s french)' % (
to_query = "(%s vf)|(%s vostfr)|(%s multi)|(%s french)" % (
self.query,
self.query,
self.query,
self.query,
self.query
)
return '%s/?f=0&c=1_3&s=%s&o=desc&q=%s&p=%s' % (self.base_url, sort_type, to_query, self.page)
return "%s/?f=0&c=1_3&s=%s&o=desc&q=%s&p=%s" % (
self.base_url,
sort_type,
to_query,
self.page,
)
def get_history(self):
self.search()
@ -31,21 +36,21 @@ class Nyaa(ConnectorCore):
def search(self):
response = curl_content(self.get_full_search_url())
if response['http_code'] == 200:
html = BeautifulSoup(response['output'], 'html.parser')
trs = html.select('table.torrent-list tr')
if response["http_code"] == 200:
html = BeautifulSoup(response["output"], "html.parser")
trs = html.select("table.torrent-list tr")
valid_trs = 0
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
tds = tr.findAll("td")
check_downloads = int(tds[7].get_text())
check_seeds = int(tds[5].get_text())
if check_downloads or check_seeds:
urls = tds[1].findAll('a')
urls = tds[1].findAll("a")
if len(urls) > 1:
url = urls[1]
@ -60,21 +65,31 @@ class Nyaa(ConnectorCore):
continue
valid_trs = valid_trs + 1
href = self.base_url + url['href']
href = self.base_url + url["href"]
self.data.append({
'vf': check_if_vf(url_safe),
'href': href,
'name': url_safe,
'comment': str(urls[0]).replace('/view/', self.base_url + '/view/') if has_comment else '',
'link': tds[2].decode_contents().replace('/download/', self.base_url + '/download/'),
'size': tds[3].get_text(),
'date': tds[4].get_text(),
'seeds': check_seeds,
'leechs': tds[6].get_text(),
'downloads': check_downloads,
'class': self.color if link_exist_in_db(href) else 'is-%s' % tr['class'][0]
})
self.data.append(
{
"vf": check_if_vf(url_safe),
"href": href,
"name": url_safe,
"comment": str(urls[0]).replace(
"/view/", self.base_url + "/view/"
)
if has_comment
else "",
"link": tds[2]
.decode_contents()
.replace("/download/", self.base_url + "/download/"),
"size": tds[3].get_text(),
"date": tds[4].get_text(),
"seeds": check_seeds,
"leechs": tds[6].get_text(),
"downloads": check_downloads,
"class": self.color
if link_exist_in_db(href)
else "is-%s" % tr["class"][0],
}
)
self.on_error = False
self.is_more = valid_trs and valid_trs != len(trs) - 1
@ -83,9 +98,9 @@ class Nyaa(ConnectorCore):
def is_vf(self, url):
response = curl_content(url)
if response['http_code'] == 200:
html = BeautifulSoup(response['output'], 'html.parser')
title = html.select('h3.panel-title')
if response["http_code"] == 200:
html = BeautifulSoup(response["output"], "html.parser")
title = html.select("h3.panel-title")
return check_if_vf(title[0].get_text())
return False

View File

@ -4,28 +4,27 @@ from urllib.parse import quote
from bs4 import BeautifulSoup
from .core import ConnectorCore, ConnectorReturn, ConnectorCache, curl_content
from ..utils import parse_date, link_exist_in_db, check_blacklist_words, check_if_vf
from .core import ConnectorCache, ConnectorCore, ConnectorReturn, curl_content
from ..utils import check_blacklist_words, check_if_vf, link_exist_in_db, parse_date
class YggTorrent(ConnectorCore):
color = 'is-success'
title = 'YggTorrent'
favicon = 'yggtorrent.png'
base_url = 'https://www5.yggtorrent.fi'
color = "is-success"
title = "YggTorrent"
favicon = "yggtorrent.png"
base_url = "https://www5.yggtorrent.fi"
is_light = False
category = 2179
def get_full_search_url(self):
sort_type = 'size'
sort_type = "size"
if self.return_type is ConnectorReturn.HISTORY:
sort_type = 'publish_date'
sort_page = '&page=%s' % (
(self.page - 1) * 50
) if self.page > 1 else ''
sort_type = "publish_date"
sort_page = "&page=%s" % ((self.page - 1) * 50) if self.page > 1 else ""
return '%s/engine/search?name=%s&category=2145&sub_category=%s&do=search&order=desc&sort=%s%s' % (
self.base_url, self.query, self.category, sort_type, sort_page
return (
"%s/engine/search?name=%s&category=2145&sub_category=%s&do=search&order=desc&sort=%s%s"
% (self.base_url, self.query, self.category, sort_type, sort_page)
)
def get_history(self):
@ -34,20 +33,18 @@ class YggTorrent(ConnectorCore):
@ConnectorCache.cache_data
def search(self):
if self.category:
response = curl_content(
self.get_full_search_url(), cloudflare=True
)
response = curl_content(self.get_full_search_url(), cloudflare=True)
if response['http_code'] == 200:
html = BeautifulSoup(response['output'], 'html.parser')
trs = html.select('table.table tr')
if response["http_code"] == 200:
html = BeautifulSoup(response["output"], "html.parser")
trs = html.select("table.table tr")
valid_trs = 0
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.findAll('td')
tds = tr.findAll("td")
check_downloads = int(tds[6].get_text())
check_seeds = int(tds[7].get_text())
@ -60,23 +57,35 @@ class YggTorrent(ConnectorCore):
valid_trs = valid_trs + 1
self.data.append({
'vf': check_if_vf(url_safe),
'href': url['href'],
'name': url_safe,
'comment': '<a href="%s#comm" target="_blank"><i class="fa fa-comments-o"></i>%s</a>' %
(url['href'], tds[3].decode_contents()),
'link': '<a href="%s/engine/download_torrent?id=%s">'
'<i class="fa fa-fw fa-download"></i>'
'</a>' % (self.base_url,
re.search(r'/(\d+)', url['href']).group(1)),
'size': tds[5].get_text(),
'date': parse_date(datetime.fromtimestamp(int(tds[4].div.get_text()))),
'seeds': check_seeds,
'leechs': tds[8].get_text(),
'downloads': check_downloads,
'class': self.color if link_exist_in_db(quote(url['href'], '/+:')) else ''
})
self.data.append(
{
"vf": check_if_vf(url_safe),
"href": url["href"],
"name": url_safe,
"comment": (
'<a href="%s#comm" target="_blank">'
'<i class="fa fa-comments-o"></i>%s</a>'
)
% (url["href"], tds[3].decode_contents()),
"link": '<a href="%s/engine/download_torrent?id=%s">'
'<i class="fa fa-fw fa-download"></i>'
"</a>"
% (
self.base_url,
re.search(r"/(\d+)", url["href"]).group(1),
),
"size": tds[5].get_text(),
"date": parse_date(
datetime.fromtimestamp(int(tds[4].div.get_text()))
),
"seeds": check_seeds,
"leechs": tds[8].get_text(),
"downloads": check_downloads,
"class": self.color
if link_exist_in_db(quote(url["href"], "/+:"))
else "",
}
)
self.on_error = False
self.is_more = valid_trs and valid_trs != len(trs) - 1
@ -85,14 +94,14 @@ class YggTorrent(ConnectorCore):
def is_vf(self, url):
response = curl_content(url)
if response['http_code'] == 200:
html = BeautifulSoup(response['output'], 'html.parser')
title = html.select('#title h1')
if response["http_code"] == 200:
html = BeautifulSoup(response["output"], "html.parser")
title = html.select("#title h1")
return check_if_vf(title[0].get_text())
return False
class YggAnimation(YggTorrent):
title = 'YggAnimation'
title = "YggAnimation"
category = 2178

View File

@ -1,38 +1,38 @@
from flask_wtf import FlaskForm
from wtforms import HiddenField, StringField, SelectField
from wtforms import HiddenField, SelectField, StringField
from wtforms.fields.html5 import SearchField, URLField
from wtforms.validators import DataRequired
class SearchForm(FlaskForm):
q = SearchField('search', validators=[DataRequired()])
q = SearchField("search", validators=[DataRequired()])
class DeleteForm(FlaskForm):
class Meta:
csrf = False
id = HiddenField('id', validators=[DataRequired()])
id = HiddenField("id", validators=[DataRequired()])
class EditForm(FlaskForm):
id = HiddenField('id')
folder = SelectField('folder', validators=[DataRequired()])
name = StringField('name', validators=[DataRequired()])
link = URLField('link', validators=[DataRequired()])
season = StringField('season', validators=[DataRequired()])
comment = StringField('comment')
keyword = StringField('keyword', validators=[DataRequired()])
id = HiddenField("id")
folder = SelectField("folder", validators=[DataRequired()])
name = StringField("name", validators=[DataRequired()])
link = URLField("link", validators=[DataRequired()])
season = StringField("season", validators=[DataRequired()])
comment = StringField("comment")
keyword = StringField("keyword", validators=[DataRequired()])
class FolderEditForm(FlaskForm):
id = HiddenField('id')
name = StringField('name', validators=[DataRequired()])
path = StringField('path')
id = HiddenField("id")
name = StringField("name", validators=[DataRequired()])
path = StringField("path")
class FolderDeleteForm(FlaskForm):
class Meta:
csrf = False
id = HiddenField('id', validators=[DataRequired()])
id = HiddenField("id", validators=[DataRequired()])

15
pynyaata/get404.py Normal file
View File

@ -0,0 +1,15 @@
from .connectors.core import curl_content
from .models import AnimeLink
links = AnimeLink.query.all()
for link in links:
html = curl_content(link.link, debug=False, cloudflare=True)
if html["http_code"] != 200 and html["http_code"] != 500:
print(
"(%d) %s %s : %s"
% (html["http_code"], link.title.name, link.season, link.link)
)
elif "darkgray" in str(html["output"]):
print("(darkgray) %s %s : %s" % (link.title.name, link.season, link.link))

View File

@ -6,9 +6,7 @@ class AnimeFolder(db.Model):
name = db.Column(db.String(length=100), unique=True, nullable=False)
path = db.Column(db.String(length=100))
titles = db.relationship(
"AnimeTitle",
backref="folder",
cascade='all,delete-orphan'
"AnimeTitle", backref="folder", cascade="all,delete-orphan"
)
@ -16,12 +14,8 @@ class AnimeTitle(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(length=100), unique=True, nullable=False)
keyword = db.Column(db.Text(), nullable=False)
folder_id = db.Column(db.Integer, db.ForeignKey('anime_folder.id'))
links = db.relationship(
'AnimeLink',
backref="title",
cascade='all,delete-orphan'
)
folder_id = db.Column(db.Integer, db.ForeignKey("anime_folder.id"))
links = db.relationship("AnimeLink", backref="title", cascade="all,delete-orphan")
class AnimeLink(db.Model):
@ -30,7 +24,7 @@ class AnimeLink(db.Model):
season = db.Column(db.Text(), nullable=False)
comment = db.Column(db.Text())
vf = db.Column(db.Boolean, nullable=False)
title_id = db.Column(db.Integer, db.ForeignKey('anime_title.id'))
title_id = db.Column(db.Integer, db.ForeignKey("anime_title.id"))
def create_all():

View File

@ -2,17 +2,18 @@ import re
from datetime import datetime
from dateparser import parse
from .config import DB_ENABLED, BLACKLIST_WORDS
from .config import BLACKLIST_WORDS, DB_ENABLED
def link_exist_in_db(href):
if DB_ENABLED:
from .models import AnimeLink
return AnimeLink.query.filter_by(link=href).first()
return False
def parse_date(str_to_parse, date_format=''):
def parse_date(str_to_parse, date_format=""):
if str_to_parse is None:
date_to_format = datetime.fromtimestamp(0)
elif isinstance(str_to_parse, datetime):
@ -24,12 +25,14 @@ def parse_date(str_to_parse, date_format=''):
else:
date_to_format = datetime.fromtimestamp(0)
return date_to_format.isoformat(' ', 'minutes')
return date_to_format.isoformat(" ", "minutes")
def boldify(str_to_replace, keyword):
if keyword:
return re.sub('(%s)' % keyword, r'<b>\1</b>', str_to_replace, flags=re.IGNORECASE)
return re.sub(
"(%s)" % keyword, r"<b>\1</b>", str_to_replace, flags=re.IGNORECASE
)
else:
return str_to_replace
@ -39,4 +42,4 @@ def check_blacklist_words(url):
def check_if_vf(title):
return any(word.lower() in title.lower() for word in ['vf', 'multi', 'french'])
return any(word.lower() in title.lower() for word in ["vf", "multi", "french"])

56
pyproject.toml Normal file
View File

@ -0,0 +1,56 @@
[tool.poetry]
name = "pynyaata"
version = "2.0.0"
description = "π 😼た, Xéfir's personal animes torrent search engine"
authors = ["Xéfir Destiny <xefir@crystalyx.net>"]
license = "WTFPL"
readme = "README.md"
homepage = "https://nyaa.crystalyx.net/"
repository = "https://git.crystalyx.net/Xefir/PyNyaaTa"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent"
]
[tool.poetry.scripts]
pynyaata = 'pynyaata:run'
[tool.poetry.dependencies]
python = "^3.7"
Flask = "^2.2.2"
Flask-SQLAlchemy = "^2.5.1"
Flask-HTTPAuth = "^4.7.0"
Flask-WTF = "^1.0.1"
WTForms = "^3.0.1"
PyMySQL = "^1.0.2"
pg8000 = "^1.29.1"
requests = "^2.28.1"
beautifulsoup4 = "^4.11.1"
python-dotenv = "^0.20.0"
dateparser = "^1.1.1"
redis = "^4.3.4"
transmission-rpc = "^3.3.2"
[tool.poetry.group.dev.dependencies]
flake8 = "3.9.2"
black = "^22.8.0"
mypy = "^0.971"
djlint = "1.9.3"
pytest = "^7.1.2"
pytest-cov = "^3.0.0"
flake8-black = "^0.3.3"
flake8-alphabetize = "^0.0.17"
types-dateparser = "^1.1.4"
types-redis = "^4.3.19"
types-requests = "^2.28.9"
Flask-HTTPAuth-stubs = "^0.1.5"
types-Flask-SQLAlchemy = "^2.5.9"
types-beautifulsoup4 = "^4.11.5"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@ -1,13 +0,0 @@
Flask==2.2.2
Flask-SQLAlchemy==2.5.1
Flask-HTTPAuth==4.7.0
Flask-WTF==1.0.1
WTForms==2.3.3
PyMySQL==1.0.2
pg8000==1.29.1
requests==2.28.1
beautifulsoup4==4.11.1
python-dotenv==0.20.0
dateparser==1.1.1
redis==4.3.4
transmission-rpc==3.3.2

5
run.py
View File

@ -1,5 +0,0 @@
#!/usr/bin/env python3
from pynyaata import run
if __name__ == "__main__":
run()

View File

@ -1,31 +0,0 @@
from datetime import datetime
from setuptools import setup, find_packages
with open("README.md") as readme_file:
long_description = readme_file.read()
with open("requirements.txt") as requirements_file:
requirements = requirements_file.read().splitlines()
setup(
name="PyNyaaTa",
version=datetime.now().strftime("%Y%m%d%H%M"),
author="Xéfir Destiny",
author_email="xefir@crystalyx.net",
description="π 😼た, Xéfir's personal animes torrent search engine",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://git.crystalyx.net/Xefir/PyNyaaTa",
packages=find_packages(),
install_requires=requirements,
include_package_data=True,
entry_points={
"console_scripts": ["pynyaata=pynyaata:run"],
},
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
],
python_requires=">=3.5",
)