dl/commands/pyyg.py
Michel Roux 8fedf9222e
All checks were successful
continuous-integration/drone/push Build is passing
Forgot python shebang
2022-11-13 17:33:32 +01:00

171 lines
4.6 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import re
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
from dns import rdatatype, resolver
from requests import Session, adapters
from urllib3.util.connection import HAS_IPV6
BLACKLIST_WORDS = ["dvd", "iso"]
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--uploader", action="append")
parser.add_argument("-y", "--year", type=int)
parser.add_argument("query")
args = parser.parse_args()
def parse_size(size):
units = {"o": 1, "Ko": 10**3, "Mo": 10**6, "Go": 10**9, "To": 10**12}
match = re.search("([0-9.]+)([^0-9]+)", size)
number = match.group(1).strip()
unit = match.group(2).strip()
return int(float(number) * units[unit])
DNS_RESOLVER = resolver.Resolver()
DNS_RESOLVER.cache = resolver.LRUCache() # type: ignore
class DNSAdapter(adapters.HTTPAdapter):
def __init__(self, nameservers):
self.nameservers = nameservers
super().__init__()
def resolve(self, host, nameservers):
DNS_RESOLVER.nameservers = nameservers
if HAS_IPV6:
try:
answers_v6 = DNS_RESOLVER.resolve(host, rdatatype.AAAA)
for rdata_v6 in answers_v6:
return f"[{str(rdata_v6)}]"
except resolver.NoAnswer:
pass
answers_v4 = DNS_RESOLVER.resolve(host, rdatatype.A)
for rdata_v4 in answers_v4:
return str(rdata_v4)
def send(self, request, **kwargs):
connection_pool_kwargs = self.poolmanager.connection_pool_kw
result = urlparse(request.url)
resolved_ip = self.resolve(result.hostname, self.nameservers)
request.url = request.url.replace(result.hostname, resolved_ip)
request.headers["Host"] = result.hostname
request.headers[
"User-Agent"
] = "Googlebot/2.1 (+http://www.google.com/bot.html)"
if result.scheme == "https":
connection_pool_kwargs["server_hostname"] = result.hostname
connection_pool_kwargs["assert_hostname"] = result.hostname
return super().send(request, **kwargs)
session = Session()
session.mount("http://", DNSAdapter(["1.1.1.1"]))
session.mount("https://", DNSAdapter(["1.1.1.1"]))
def get_files(id):
req = session.get(
"https://www5.yggtorrent.fi/engine/get_files", params={"torrent": id}
)
files = json.loads(req.text)
html = BeautifulSoup(files["html"], "html.parser")
trs = html.select("tr")
return len(trs)
def search_ygg(query, multi):
ygg_params = {
"name": f"{query} {args.year}",
"description": "",
"file": "",
"uploader": "",
"category": "2145",
"sub_category": "2183",
"do": "search",
"order": "asc",
"sort": "publish_date",
}
if multi:
ygg_params["option_langue"] = ["4"]
req = session.get("https://www5.yggtorrent.fi/engine/search", params=ygg_params)
html = BeautifulSoup(req.text, "html.parser")
trs = html.select("table.table tr")
if len(trs) > 1:
for i, tr in enumerate(trs):
if not i:
continue
tds = tr.find_all("td")
size = tds[5].get_text()
name = tds[1].get_text().lower().strip()
if parse_size(size) > parse_size("10Go"):
continue
if any(word.lower() in name for word in BLACKLIST_WORDS):
continue
if args.uploader and not any(
uploader.lower() in name for uploader in args.uploader
):
continue
link = tds[1].a["href"]
id = link.split("/")[-1].split("-")[0]
if get_files(id) > 1:
continue
print(f"{name} {args.year} {link}")
exit(0)
query_string = {"query": args.query, "filters": "type:movie"}
if args.year:
query_string["filters"] += " AND year:" + str(args.year)
tvdb = session.post(
"https://tvshowtime-dsn.algolia.net/1/indexes/TVDB/query",
params={
"x-algolia-application-id": "tvshowtime",
"x-algolia-api-key": "c9d5ec1316cec12f093754c69dd879d3",
},
json={"params": urlencode(query_string)},
)
tvdata = json.loads(tvdb.text)
if not tvdata["nbHits"] > 0:
print("Can't find query on TheTVDB")
exit(1)
eng = tvdata["hits"][0]["name"]
fra = (
tvdata["hits"][0]["translations"]["fra"]
if "fra" in tvdata["hits"][0]["translations"]
else args.query
)
search_ygg(args.query, True)
search_ygg(fra, True)
search_ygg(eng, True)
search_ygg(args.query, False)
search_ygg(fra, False)
search_ygg(eng, False)