from asyncio import gather, get_event_loop, run from functools import partial, wraps from os import makedirs, path from typing import List from PIL import Image from bs4 import BeautifulSoup, Tag from lxml.html import fromstring from pypub import Chapter, Epub, create_chapter_from_string # type: ignore from pypub.const import SUPPORTED_TAGS # type: ignore from requests import get UNSUPPORTED_TAGS = ["a", "h3"] OUTPUT_DIR = "output" class MyChapter(Chapter): def parse_etree(self): """generate new filtered element-tree""" etree = fromstring(self.content) # check if we can minimalize the scope body = etree.xpath(".//body") etree = body[0] if body else etree article = etree.xpath(".//article") etree = article[0] if article else etree # iterate elements in tree and delete/modify them for elem in [elem for elem in etree.iter()][1:]: if elem.tag in UNSUPPORTED_TAGS: parent = elem.getparent() parent.remove(elem) # if element tag is supported elif elem.tag in SUPPORTED_TAGS: # remove attributes not approved for specific tag for attr in elem.attrib: if attr != "style" and attr not in SUPPORTED_TAGS[elem.tag]: elem.attrib.pop(attr) # if element is not supported, append children to parent else: parent = elem.getparent() for child in elem.getchildren(): parent.append(child) parent.remove(elem) # NOTE: this is a bug with lxml, some children have # text in the parent included in the tail rather # than text attribute, so we also append tail to text if elem.tail and elem.tail.strip(): parent.text = (parent.text or "") + elem.tail.strip() # ensure all images with no src are removed for img in etree.xpath(".//img"): if "src" not in img.attrib: img.getparent().remove(img) # return new element-tree return etree def replace_images(self, image_dir: str, timeout: int = 10): super().replace_images(image_dir, timeout) for img in self.etree.xpath(".//img"): if img.attrib["src"].startswith("http"): img.getparent().remove(img) else: src = f"{path.dirname(image_dir)}/{img.attrib['src']}" image = Image.open(src) image.thumbnail((1000, 2000)) image.save(src) def async_wrap(func): @wraps(func) async def run(*args, loop=None, executor=None, **kwargs): if loop is None: loop = get_event_loop() pfunc = partial(func, *args, **kwargs) return await loop.run_in_executor(executor, pfunc) return run makedirs(name=OUTPUT_DIR, exist_ok=True) cover_req = get( url="https://i0.wp.com/thefantasyinn.com/wp-content/uploads/2018/08/twi.jpg", stream=True, ) Image.open(cover_req.raw).save(f"{OUTPUT_DIR}/cover.png") def process_volume(epub: Epub, urls: List[str]): for url in urls: page_req = get(url) page_html = BeautifulSoup(markup=page_req.text, features="lxml") page_content = page_html.select_one("div.entry-content") page_title = page_html.select_one("h1.entry-title") page_date = page_html.find(name="meta", property="article:modified_time") if not page_content or not page_title or type(page_date) is not Tag: raise Exception() galleries = page_content.select("div.tiled-gallery") for gallery in galleries: gallery.decompose() title = page_title.get_text().strip() chapter = create_chapter_from_string( html=page_content.prettify(), title=title, factory=MyChapter, ) print(f"{epub.title} - {title}") epub.add_chapter(chapter) epub.create_epub(OUTPUT_DIR) @async_wrap def process_book(url: str, creator: str, publisher: str, language: str): toc_req = get(url) toc_html = BeautifulSoup(markup=toc_req.text, features="lxml") toc_content = toc_html.select("div.entry-content > p") toc_title = toc_html.select_one("#site-title > span > a") toc_date = toc_html.find(name="meta", property="article:modified_time") if not toc_title or type(toc_date) is not Tag: raise Exception() for i, toc_line in enumerate(toc_content): if i % 2 == 0: title = toc_title.get_text().strip() volume = toc_line.get_text().strip() elif volume: epub = Epub( title=f"{title} - {volume}", creator=creator, language=language, publisher=publisher, date=toc_date["content"], cover=f"{OUTPUT_DIR}/cover.png", epub_dir=f"{OUTPUT_DIR}/{title} - {volume}", ) urls = [] for link in toc_line.select("a"): urls.append(link.attrs["href"]) process_volume(epub=epub, urls=urls) async def create_books(): await gather( process_book( "https://aubergevagabonde.wordpress.com/sommaire/", "Pirateaba", "ElliVia", "fr", ), process_book( "https://wanderinginn.com/table-of-contents/", "Pirateaba", "Xefir", "en" ), return_exceptions=True, ) run(create_books())