#!/usr/bin/env python3 import argparse import re from urllib.parse import urlencode, urlparse from bs4 import BeautifulSoup from dns import rdatatype, resolver from requests import Session, adapters from urllib3.util.connection import HAS_IPV6 BLACKLIST_WORDS = ["dvd", "iso"] YGGTORRENT_BASE_URL = "https://www5.yggtorrent.fi" parser = argparse.ArgumentParser() parser.add_argument("-u", "--uploader", action="append") parser.add_argument("-y", "--year", type=int) parser.add_argument("-s", "--size", type=int, default=10) parser.add_argument("query") args = parser.parse_args() def parse_size(size): units = {"o": 1, "Ko": 10**3, "Mo": 10**6, "Go": 10**9, "To": 10**12} match = re.search("([0-9.]+)([^0-9]+)", size) number = match.group(1).strip() unit = match.group(2).strip() return int(float(number) * units[unit]) DNS_RESOLVER = resolver.Resolver() DNS_RESOLVER.cache = resolver.LRUCache() # type: ignore class DNSAdapter(adapters.HTTPAdapter): def __init__(self, nameservers): self.nameservers = nameservers super().__init__() def resolve(self, host, nameservers): DNS_RESOLVER.nameservers = nameservers if HAS_IPV6: try: answers_v6 = DNS_RESOLVER.query(host, rdatatype.AAAA) for rdata_v6 in answers_v6: return f"[{str(rdata_v6)}]" except resolver.NoAnswer: pass answers_v4 = DNS_RESOLVER.query(host, rdatatype.A) for rdata_v4 in answers_v4: return str(rdata_v4) def send(self, request, **kwargs): connection_pool_kwargs = self.poolmanager.connection_pool_kw result = urlparse(request.url) resolved_ip = self.resolve(result.hostname, self.nameservers) request.url = request.url.replace(result.hostname, resolved_ip) request.headers["Host"] = result.hostname request.headers[ "User-Agent" ] = "Googlebot/2.1 (+http://www.google.com/bot.html)" if result.scheme == "https": connection_pool_kwargs["server_hostname"] = result.hostname connection_pool_kwargs["assert_hostname"] = result.hostname return super().send(request, **kwargs) session = Session() session.mount("http://", DNSAdapter(["1.1.1.1"])) session.mount("https://", DNSAdapter(["1.1.1.1"])) def check_files(id): req = session.get(f"{YGGTORRENT_BASE_URL}/engine/get_files", params={"torrent": id}) files = req.json() html = BeautifulSoup(files["html"], "html.parser") trs = html.select("tr") return len(trs) == 1 and "mkv" in trs[0].get_text().lower() def search_ygg(query, multi, full): ygg_params = { "name": query, "description": "", "file": "", "uploader": "", "category": "2145", "sub_category": "2183", "do": "search", "order": "asc", "sort": "publish_date", } if full and args.year: ygg_params["name"] += f" {args.year}" if multi: ygg_params["option_langue:multiple[]"] = "4" req = session.get(f"{YGGTORRENT_BASE_URL}/engine/search", params=ygg_params) html = BeautifulSoup(req.text, "html.parser") trs = html.select("table.table tr") if len(trs) > 1: for i, tr in enumerate(trs): if not i: continue tds = tr.find_all("td") size = tds[5].get_text() downloads = tds[6].get_text() name = tds[1].get_text().lower().strip() if parse_size(size) > parse_size(f"{args.size}Go"): continue if int(downloads) < 10: continue if any(word.lower() in name for word in BLACKLIST_WORDS): continue if args.year and str(args.year) not in name: continue if args.uploader and not any( uploader.lower() in name for uploader in args.uploader ): continue link = tds[1].a["href"] id = link.split("/")[-1].split("-")[0] if not check_files(id): continue print(f"{name} {link}") exit(0) query_string = {"query": args.query, "filters": "type:movie"} if args.year: query_string["filters"] += f" AND year:{args.year}" tvdb = session.post( "https://tvshowtime-dsn.algolia.net/1/indexes/TVDB/query", params={ "x-algolia-application-id": "tvshowtime", "x-algolia-api-key": "c9d5ec1316cec12f093754c69dd879d3", }, json={"params": urlencode(query_string)}, ) tvdata = tvdb.json() if not tvdata["nbHits"] > 0: print("Can't find query on TheTVDB") exit(1) eng = tvdata["hits"][0]["name"] fra = ( tvdata["hits"][0]["translations"]["fra"] if "fra" in tvdata["hits"][0]["translations"] else args.query ) search_ygg(args.query, True, True) search_ygg(fra, True, True) search_ygg(eng, True, True) search_ygg(args.query, False, True) search_ygg(fra, False, True) search_ygg(eng, False, True) if args.year: search_ygg(args.query, True, False) search_ygg(fra, True, False) search_ygg(eng, True, False) search_ygg(args.query, False, False) search_ygg(fra, False, False) search_ygg(eng, False, False) print("No results :(")