Divent/divent/bot.py
Michel Roux cdf9d77378
Some checks failed
continuous-integration/drone/push Build is failing
Switch to flake8-alphabetize
2022-09-01 19:06:40 +00:00

227 lines
6.4 KiB
Python

import json
import logging
from datetime import datetime, timedelta
from os import environ, path
from typing import List, Optional
from disnake import Client, Guild
from dotenv import load_dotenv
from ics import Calendar, Event # type: ignore
from ics.alarm.display import DisplayAlarm # type: ignore
from quart import Quart, redirect, render_template, request, session, url_for
from requests_oauthlib import OAuth2Session # type: ignore
import sentry_sdk
from sentry_sdk.integrations.quart import QuartIntegration
load_dotenv()
QUART_DEBUG = environ.get("QUART_DEBUG", False)
DISCORD_TOKEN = environ.get("DISCORD_TOKEN")
OAUTH2_CLIENT_ID = environ.get("OAUTH2_CLIENT_ID")
OAUTH2_CLIENT_SECRET = environ.get("OAUTH2_CLIENT_SECRET")
OAUTH2_REDIRECT_URI = environ.get("OAUTH2_REDIRECT_URI", "callback")
if not DISCORD_TOKEN or not OAUTH2_CLIENT_ID or not OAUTH2_CLIENT_SECRET:
raise Exception(
"Missing some env variables "
"(could be DISCORD_TOKEN, OAUTH2_CLIENT_ID or OAUTH2_CLIENT_SECRET)"
)
API_BASE_URL = environ.get("API_BASE_URL", "https://discordapp.com/api")
AUTHORIZATION_BASE_URL = API_BASE_URL + "/oauth2/authorize"
TOKEN_URL = API_BASE_URL + "/oauth2/token"
if QUART_DEBUG:
logging.basicConfig(level=logging.DEBUG)
SENTRY_DSN = environ.get("SENTRY_DSN")
if SENTRY_DSN:
sentry_sdk.init(SENTRY_DSN, integrations=[QuartIntegration()])
class Discord(Client):
async def on_ready(self):
print(f"Logged on as {self.user}!")
client = Discord()
app = Quart(__name__)
app.config["SECRET_KEY"] = OAUTH2_CLIENT_SECRET
def get_guild_by_id(guild_id: str) -> Optional[Guild]:
for guild in client.guilds:
if str(guild.id) == guild_id or guild.vanity_url_code == guild_id:
return guild
return None
def token_updater(token: str) -> None:
session["oauth2_token"] = token
def make_oauth_session(
host_url: str,
token: Optional[str] = None,
state: Optional[str] = None,
scope: Optional[List[str]] = None,
) -> OAuth2Session:
return OAuth2Session(
client_id=OAUTH2_CLIENT_ID,
token=token,
state=state,
scope=scope,
redirect_uri=host_url + OAUTH2_REDIRECT_URI,
auto_refresh_kwargs={
"client_id": OAUTH2_CLIENT_ID,
"client_secret": OAUTH2_CLIENT_SECRET,
},
auto_refresh_url=TOKEN_URL,
token_updater=token_updater,
)
CATALOG_CACHE = {}
@app.errorhandler(500)
async def errorhandler(error: Exception):
print(f"\33[31m{error}\33[m")
return await render_template("error.html.j2", error=str(error)), 500
@app.errorhandler(404)
async def not_found(error: Exception):
return await render_template("error.html.j2", error=str(error)), 404
def i18n(str: str) -> str:
lang = request.accept_languages.best_match(["en", "fr"])
if lang not in CATALOG_CACHE:
catalog_file = f"{path.dirname(__file__)}/translations/{lang}.json"
if path.exists(catalog_file):
with open(catalog_file) as catalog_json:
catalog = json.load(catalog_json)
CATALOG_CACHE[lang] = catalog
if lang in CATALOG_CACHE and str in CATALOG_CACHE[lang]:
return CATALOG_CACHE[lang][str]
return str
def days_before_failure():
nextYear = datetime.today().year + 5 - ((datetime.today().year + 5) % 5)
nextDate = datetime(year=nextYear, month=6, day=3)
nextDelta = nextDate - datetime.now()
return nextDelta.days
def avatar_cdn(type: str, id: str, hash: str):
ext = "gif" if hash.startswith("a_") else "png"
return f"https://cdn.discordapp.com/{type}/{id}/{hash}.{ext}"
@app.context_processor
def context_processor():
return dict(
_=i18n,
avatar_cdn=avatar_cdn,
days_before_failure=days_before_failure(),
bot=client.user,
)
@app.route("/")
async def index():
return await render_template("index.html.j2")
@app.route("/login")
async def login():
token = session.get("oauth2_token")
if token is not None:
return redirect(url_for(".guilds"))
discord = make_oauth_session(
host_url=request.host_url, scope=["identify", "guilds"]
)
authorization_url, state = discord.authorization_url(AUTHORIZATION_BASE_URL)
session["oauth2_state"] = state
return redirect(authorization_url)
@app.route("/callback")
async def callback():
request_values = await request.values
if request_values.get("error"):
return request_values["error"]
discord = make_oauth_session(
host_url=request.host_url, state=session.get("oauth2_state")
)
token = discord.fetch_token(
TOKEN_URL,
client_secret=OAUTH2_CLIENT_SECRET,
authorization_response=request.url,
)
token_updater(token)
return redirect(url_for(".guilds"))
@app.route("/guilds")
async def guilds():
token = session.get("oauth2_token")
if token is None:
return redirect(url_for(".login"))
discord = make_oauth_session(host_url=request.host_url, token=token)
user = discord.get(API_BASE_URL + "/users/@me")
guilds = discord.get(API_BASE_URL + "/users/@me/guilds")
if user.status_code != 200 or guilds.status_code != 200:
return redirect(url_for(".login"))
return await render_template(
"guilds.html.j2", guilds=guilds.json(), user=user.json()
)
@app.route("/<guild_id>.ics")
async def ical(guild_id: str):
guild = get_guild_by_id(guild_id)
if guild is None:
return redirect(url_for(".login"))
calendar = Calendar()
for scheduled_event in guild.scheduled_events:
event = Event()
event.name = scheduled_event.name
event.begin = scheduled_event.scheduled_start_time
event.end = (
scheduled_event.scheduled_end_time
or scheduled_event.scheduled_start_time + timedelta(hours=2)
)
event.uid = str(scheduled_event.id)
event.description = scheduled_event.description
event.url = f"https://discord.com/events/{guild_id}/{scheduled_event.id}"
event.location = (
scheduled_event.entity_metadata.location
if scheduled_event.entity_metadata
else None
)
alarm = DisplayAlarm()
alarm.trigger = timedelta(hours=1)
event.alarms.append(alarm)
calendar.events.add(event)
return str(calendar)
quart_task = client.loop.create_task(app.run_task())
quart_task.add_done_callback(lambda f: client.loop.stop())
client.run(DISCORD_TOKEN)