from asyncio import gather, get_event_loop, run from functools import partial, wraps from os import makedirs, path from typing import List from PIL import Image from bs4 import BeautifulSoup, Tag from lxml.html import fromstring from pypub import Chapter, Epub, create_chapter_from_string # type: ignore from pypub.const import SUPPORTED_TAGS # type: ignore from requests import get UNSUPPORTED_TAGS = ["a", "h3"] OUTPUT_DIR = "output" class MyChapter(Chapter): def parse_etree(self): """generate new filtered element-tree""" etree = fromstring(self.content) # check if we can minimalize the scope body = etree.xpath(".//body") etree = body[0] if body else etree article = etree.xpath(".//article") etree = article[0] if article else etree # iterate elements in tree and delete/modify them for elem in [elem for elem in etree.iter()][1:]: if elem.tag in UNSUPPORTED_TAGS: parent = elem.getparent() parent.remove(elem) # if element tag is supported elif elem.tag in SUPPORTED_TAGS: # remove attributes not approved for specific tag for attr in elem.attrib: if attr != "style" and attr not in SUPPORTED_TAGS[elem.tag]: elem.attrib.pop(attr) # if element is not supported, append children to parent else: parent = elem.getparent() for child in elem.getchildren(): parent.append(child) parent.remove(elem) # NOTE: this is a bug with lxml, some children have # text in the parent included in the tail rather # than text attribute, so we also append tail to text if elem.tail and elem.tail.strip(): parent.text = (parent.text or "") + elem.tail.strip() # ensure all images with no src are removed for img in etree.xpath(".//img"): if "src" not in img.attrib: img.getparent().remove(img) # return new element-tree return etree def replace_images(self, image_dir: str, timeout: int = 10): try: super().replace_images(image_dir, timeout) except TimeoutError: pass for img in self.etree.xpath(".//img"): if img.attrib["src"].startswith("http"): img.getparent().remove(img) else: src = f"{path.dirname(image_dir)}/{img.attrib['src']}" image = Image.open(src) image.thumbnail((1000, 2000)) image.save(src) def async_wrap(func): @wraps(func) async def run(*args, loop=None, executor=None, **kwargs): if loop is None: loop = get_event_loop() pfunc = partial(func, *args, **kwargs) return await loop.run_in_executor(executor, pfunc) return run makedirs(name=OUTPUT_DIR, exist_ok=True) cover_req = get( url="https://i0.wp.com/wanderinginn.com/wp-content/uploads/2023/03/Wandering_Inn-Vol1-eCover.jpg", # noqa: E501 stream=True, ) cover_image = Image.open(cover_req.raw) cover_image.thumbnail((500, 1000)) cover_image.save(f"{OUTPUT_DIR}/cover.png") def process_volume(epub: Epub, urls: List[str]): for url in urls: page_req = get(url) page_html = BeautifulSoup(markup=page_req.text, features="lxml") page_content = page_html.select_one("div.entry-content") page_title = page_html.select_one("h1.entry-title") if not page_content or not page_title: raise Exception("Missing title or content") title = page_title.get_text().strip() if not title: continue galleries = page_content.select("div.tiled-gallery") for gallery in galleries: gallery.decompose() chapter = create_chapter_from_string( html=page_content.prettify(), title=title, factory=MyChapter, ) print(f"{epub.title} - {title}") epub.add_chapter(chapter) epub.create_epub(OUTPUT_DIR) @async_wrap def process_book( url: str, creator: str, publisher: str, language: str, strip_first=False ): toc_req = get(url) toc_html = BeautifulSoup(markup=toc_req.text, features="lxml") toc_content = toc_html.select("div.entry-content > p") toc_title = toc_html.select_one("#site-title > span > a") toc_date = toc_html.find(name="meta", property="article:modified_time") if not toc_title or type(toc_date) is not Tag: raise Exception("Missing title or date") title = toc_title.get_text().strip() if strip_first: toc_content.pop(0) for i, toc_line in enumerate(toc_content): if i % 2 == 0: volume = toc_line.get_text().strip() elif volume: epub = Epub( title=f"{title} - {volume}", creator=creator, language=language, publisher=publisher, date=toc_date["content"], cover=f"{OUTPUT_DIR}/cover.png", epub_dir=f"{OUTPUT_DIR}/{title} - {volume}", ) urls = [] for link in toc_line.select("a"): urls.append(link.attrs["href"]) process_volume(epub=epub, urls=urls) async def create_books(): await gather( process_book( url="https://aubergevagabonde.wordpress.com/sommaire/", creator="Pirateaba", publisher="ElliVia", language="fr", ), process_book( url="https://wanderinginn.com/table-of-contents/", creator="Pirateaba", publisher="Xefir", language="en", strip_first=True, ), ) run(create_books(), debug=True)