import json import logging from asyncio import new_event_loop from datetime import datetime, timedelta from os import environ, path from typing import List, Optional from disnake import Client, Guild from dotenv import load_dotenv from ics import Calendar, Event # type: ignore from ics.alarm.display import DisplayAlarm # type: ignore from quart import Quart, redirect, render_template, request, session, url_for from requests_oauthlib import OAuth2Session # type: ignore load_dotenv() QUART_DEBUG = environ.get("QUART_DEBUG", False) DISCORD_TOKEN = environ.get("DISCORD_TOKEN") OAUTH2_CLIENT_ID = environ.get("OAUTH2_CLIENT_ID") OAUTH2_CLIENT_SECRET = environ.get("OAUTH2_CLIENT_SECRET") OAUTH2_REDIRECT_URI = environ.get("OAUTH2_REDIRECT_URI", "callback") if not DISCORD_TOKEN or not OAUTH2_CLIENT_ID or not OAUTH2_CLIENT_SECRET: raise Exception( "Missing some env variables " "(could be DISCORD_TOKEN, OAUTH2_CLIENT_ID or OAUTH2_CLIENT_SECRET)" ) API_BASE_URL = environ.get("API_BASE_URL", "https://discordapp.com/api") AUTHORIZATION_BASE_URL = API_BASE_URL + "/oauth2/authorize" TOKEN_URL = API_BASE_URL + "/oauth2/token" if QUART_DEBUG: logging.basicConfig(level=logging.DEBUG) class Discord(Client): async def on_ready(self): print(f"Logged on as {self.user}!") client = Discord() app = Quart(__name__) app.config["SECRET_KEY"] = OAUTH2_CLIENT_SECRET def get_guild_by_id(guild_id: str) -> Optional[Guild]: for guild in client.guilds: if str(guild.id) == guild_id or guild.vanity_url_code == guild_id: return guild return None def token_updater(token: str) -> None: session["oauth2_token"] = token def make_oauth_session( host_url: str, token: Optional[str] = None, state: Optional[str] = None, scope: Optional[List[str]] = None, ) -> OAuth2Session: return OAuth2Session( client_id=OAUTH2_CLIENT_ID, token=token, state=state, scope=scope, redirect_uri=host_url + OAUTH2_REDIRECT_URI, auto_refresh_kwargs={ "client_id": OAUTH2_CLIENT_ID, "client_secret": OAUTH2_CLIENT_SECRET, }, auto_refresh_url=TOKEN_URL, token_updater=token_updater, ) CATALOG_CACHE = {} @app.errorhandler(500) async def errorhandler(error: Exception): print(f"\33[31m{error}\33[m") return await render_template("error.html.j2", error=str(error)), 500 @app.errorhandler(404) async def not_found(error: Exception): return await render_template("error.html.j2", error=str(error)), 404 def i18n(str: str) -> str: lang = request.accept_languages.best_match(["en", "fr"]) if lang not in CATALOG_CACHE: catalog_file = f"{path.dirname(__file__)}/translations/{lang}.json" if path.exists(catalog_file): with open(catalog_file) as catalog_json: catalog = json.load(catalog_json) CATALOG_CACHE[lang] = catalog if lang in CATALOG_CACHE and str in CATALOG_CACHE[lang]: return CATALOG_CACHE[lang][str] return str def days_before_failure(): nextYear = datetime.today().year + 5 - ((datetime.today().year + 5) % 5) nextDate = datetime(year=nextYear, month=6, day=3) nextDelta = nextDate - datetime.now() return nextDelta.days def avatar_cdn(type: str, id: str, hash: str): ext = "gif" if hash.startswith("a_") else "png" return f"https://cdn.discordapp.com/{type}/{id}/{hash}.{ext}" @app.context_processor def context_processor(): return dict( _=i18n, avatar_cdn=avatar_cdn, days_before_failure=days_before_failure(), bot=client.user, ) @app.route("/") async def index(): return await render_template("index.html.j2") @app.route("/login") async def login(): token = session.get("oauth2_token") if token is not None: return redirect(url_for(".guilds")) discord = make_oauth_session( host_url=request.host_url, scope=["identify", "guilds"] ) authorization_url, state = discord.authorization_url(AUTHORIZATION_BASE_URL) session["oauth2_state"] = state return redirect(authorization_url) @app.route("/callback") async def callback(): request_values = await request.values if request_values.get("error"): return request_values["error"] discord = make_oauth_session( host_url=request.host_url, state=session.get("oauth2_state") ) token = discord.fetch_token( TOKEN_URL, client_secret=OAUTH2_CLIENT_SECRET, authorization_response=request.url, ) token_updater(token) return redirect(url_for(".guilds")) @app.route("/guilds") async def guilds(): token = session.get("oauth2_token") if token is None: return redirect(url_for(".login")) discord = make_oauth_session(host_url=request.host_url, token=token) user = discord.get(API_BASE_URL + "/users/@me") guilds = discord.get(API_BASE_URL + "/users/@me/guilds") if user.status_code != 200 or guilds.status_code != 200: return redirect(url_for(".login")) return await render_template( "guilds.html.j2", guilds=guilds.json(), user=user.json() ) @app.route("/.ics") async def ical(guild_id: str): guild = get_guild_by_id(guild_id) if guild is None: return redirect(url_for(".login")) calendar = Calendar() for scheduled_event in guild.scheduled_events: event = Event() event.name = scheduled_event.name event.begin = scheduled_event.scheduled_start_time event.end = ( scheduled_event.scheduled_end_time or scheduled_event.scheduled_start_time + timedelta(hours=2) ) event.uid = str(scheduled_event.id) event.description = scheduled_event.description event.url = f"https://discord.com/events/{guild_id}/{scheduled_event.id}" event.location = ( scheduled_event.entity_metadata.location if scheduled_event.entity_metadata else None ) alarm = DisplayAlarm() alarm.trigger = timedelta(hours=1) event.alarms.append(alarm) calendar.events.add(event) return str(calendar) def run(): loop = new_event_loop() discord_task = loop.create_task(client.start(DISCORD_TOKEN)) discord_task.add_done_callback(lambda f: loop.stop()) quart_task = loop.create_task(app.run_task()) quart_task.add_done_callback(lambda f: loop.stop()) loop.run_forever()