Divent/divent/bot.py

348 lines
10 KiB
Python
Raw Normal View History

2022-05-08 14:02:19 +00:00
import json
import logging
from datetime import datetime, timedelta
2022-09-12 17:39:50 +00:00
from functools import wraps
2023-01-16 16:21:09 +00:00
from os import getenv, path
2023-11-07 09:52:38 +00:00
from typing import Dict, Optional, Union
2022-05-08 14:02:19 +00:00
2023-11-07 09:52:38 +00:00
from disnake import Asset, Client, Guild, Intents, Member, User
from disnake.guild_scheduled_event import GuildScheduledEvent
2022-05-08 14:02:19 +00:00
from dotenv import load_dotenv
2022-09-08 18:09:04 +00:00
from ics import Calendar, ContentLine, Event
2022-05-08 14:02:19 +00:00
from ics.alarm import DisplayAlarm
2023-05-26 10:13:15 +00:00
from oauthlib.oauth2 import OAuth2Error
2022-09-11 22:05:05 +00:00
from quart import Quart, redirect, render_template, request, session, url_for
2022-09-12 18:00:44 +00:00
from requests_oauthlib import OAuth2Session # type: ignore
2023-05-26 10:13:15 +00:00
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
2022-05-08 14:02:19 +00:00
2022-09-11 22:05:05 +00:00
2022-05-08 14:02:19 +00:00
load_dotenv()
2023-01-16 16:21:09 +00:00
DISCORD_TOKEN = getenv("DISCORD_TOKEN")
OAUTH2_CLIENT_ID = getenv("OAUTH2_CLIENT_ID")
OAUTH2_CLIENT_SECRET = getenv("OAUTH2_CLIENT_SECRET")
2022-05-08 14:02:19 +00:00
if not DISCORD_TOKEN:
raise Exception("Missing DISCORD_TOKEN")
2022-09-11 22:05:05 +00:00
if not OAUTH2_CLIENT_ID:
raise Exception("Missing OAUTH2_CLIENT_ID")
if not OAUTH2_CLIENT_SECRET:
raise Exception("Missing OAUTH2_CLIENT_SECRET")
2022-05-08 14:02:19 +00:00
2023-01-16 16:21:09 +00:00
QUART_DEBUG = getenv("QUART_DEBUG", False)
2022-05-08 14:02:19 +00:00
if QUART_DEBUG:
logging.basicConfig(level=logging.DEBUG)
2023-01-16 16:21:09 +00:00
API_BASE_URL = getenv("API_BASE_URL", "https://discordapp.com/api")
2022-09-12 17:39:50 +00:00
AUTHORIZATION_BASE_URL = f"{API_BASE_URL}/oauth2/authorize"
TOKEN_URL = f"{API_BASE_URL}/oauth2/token"
2023-11-07 09:52:38 +00:00
CATALOG_CACHE = {}
EVENTS_CACHE = {}
2022-05-08 14:02:19 +00:00
class Discord(Client):
async def on_ready(self):
2022-09-08 15:31:28 +00:00
print(f"Logged on as {self.user}!", flush=True)
2022-05-08 14:02:19 +00:00
2023-11-07 09:52:38 +00:00
for guild in self.guilds:
for scheduled_event in guild.scheduled_events:
EVENTS_CACHE[scheduled_event.id] = [
member.id
for member in await scheduled_event.fetch_users().flatten()
]
print("Events synchronised!", flush=True)
async def on_guild_scheduled_event_subscribe(
self, event: GuildScheduledEvent, user: Union[Member, User]
):
EVENTS_CACHE[event.id].append(user.id)
async def on_guild_scheduled_event_unsubscribe(
self, event: GuildScheduledEvent, user: Union[Member, User]
):
EVENTS_CACHE[event.id].remove(user.id)
async def on_guild_scheduled_event_create(self, event: GuildScheduledEvent):
EVENTS_CACHE[event.id] = [
member.id for member in await event.fetch_users().flatten()
]
async def on_guild_scheduled_event_delete(self, event: GuildScheduledEvent):
EVENTS_CACHE.pop(event.id)
intents = Intents.default()
intents.guild_scheduled_events = True
intents.members = True
client = Discord(intents=intents)
2022-05-08 14:02:19 +00:00
app = Quart(__name__)
2022-09-12 17:39:50 +00:00
app.config["SECRET_KEY"] = OAUTH2_CLIENT_SECRET
2023-10-02 08:41:54 +00:00
app.config["EXPLAIN_TEMPLATE_LOADING"] = QUART_DEBUG
app.asgi_app = ProxyHeadersMiddleware(app.asgi_app, "*") # type: ignore
2022-05-08 14:02:19 +00:00
def get_guild_by_id(guild_id: str) -> Optional[Guild]:
if guild_id:
for guild in client.guilds:
if str(guild.id) == guild_id or guild.vanity_url_code == guild_id:
return guild
return None
@app.errorhandler(500)
async def errorhandler(error: Exception):
2022-09-08 15:31:28 +00:00
print(f"\33[31m{error}\33[m", flush=True)
2022-05-08 14:02:19 +00:00
return await render_template("error.html.j2", error=str(error)), 500
@app.errorhandler(404)
async def not_found(error: Exception):
return await render_template("error.html.j2", error=str(error)), 404
2022-09-12 17:39:50 +00:00
def token_updater(token: str):
2022-09-11 22:05:05 +00:00
session["oauth2_token"] = token
2022-09-12 18:38:40 +00:00
def make_session(
token: Optional[Dict[str, str]] = None, state: Optional[str] = None
) -> OAuth2Session:
2022-09-11 22:05:05 +00:00
return OAuth2Session(
client_id=OAUTH2_CLIENT_ID,
token=token,
state=state,
2022-09-12 17:39:50 +00:00
scope=["identify", "guilds"],
redirect_uri=f"{request.host_url}callback",
2022-09-11 22:05:05 +00:00
auto_refresh_kwargs={
"client_id": OAUTH2_CLIENT_ID,
"client_secret": OAUTH2_CLIENT_SECRET,
},
2022-09-12 17:39:50 +00:00
auto_refresh_url=TOKEN_URL,
2022-09-11 22:05:05 +00:00
token_updater=token_updater,
)
2022-05-08 14:02:19 +00:00
def i18n(str: str) -> str:
lang = request.accept_languages.best_match(["en", "fr"])
if lang not in CATALOG_CACHE:
catalog_file = f"{path.dirname(__file__)}/translations/{lang}.json"
if path.exists(catalog_file):
with open(catalog_file) as catalog_json:
catalog = json.load(catalog_json)
CATALOG_CACHE[lang] = catalog
if lang in CATALOG_CACHE and str in CATALOG_CACHE[lang]:
return CATALOG_CACHE[lang][str]
return str
def days_before_failure() -> int:
nextYear = datetime.today().year + 5 - ((datetime.today().year + 5) % 5)
nextDate = datetime(year=nextYear, month=6, day=3)
nextDelta = nextDate - datetime.now()
return nextDelta.days
@app.context_processor
def context_processor():
2022-09-12 17:39:50 +00:00
return dict(
_=i18n,
client=client,
days_before_failure=days_before_failure(),
)
def login_required(fn):
@wraps(fn)
async def wrapper(*args, **kwargs):
2022-09-18 20:51:49 +00:00
if session.get("oauth2_token"):
2022-09-12 17:39:50 +00:00
return await fn(*args, **kwargs)
2022-09-18 21:15:03 +00:00
session["redirect_url"] = request.path
2022-09-12 17:39:50 +00:00
return redirect(url_for(".login"))
return wrapper
2022-05-08 14:02:19 +00:00
@app.route("/")
async def index():
2022-09-12 17:39:50 +00:00
return await render_template("index.html.j2")
@app.route("/login")
async def login():
discord = make_session()
authorization_url, state = discord.authorization_url(AUTHORIZATION_BASE_URL)
session["oauth2_state"] = state
return redirect(authorization_url)
@app.route("/callback")
async def callback():
request_values = await request.values
if request_values.get("error"):
return errorhandler(request_values.get("error"))
2023-05-26 10:13:15 +00:00
try:
discord = make_session(state=session.get("oauth2_state"))
token = discord.fetch_token(
TOKEN_URL,
client_secret=OAUTH2_CLIENT_SECRET,
authorization_response=request.url,
)
token_updater(token)
except OAuth2Error as e:
return errorhandler(e)
2022-09-18 21:15:03 +00:00
return redirect(session.pop("redirect_url", url_for(".guilds")))
2022-09-12 17:39:50 +00:00
@app.route("/guilds")
@login_required
async def guilds():
2023-11-07 00:26:12 +00:00
guild = request.args.get("guild")
2022-05-08 14:02:19 +00:00
if guild:
2023-11-07 00:26:12 +00:00
return redirect(url_for(".subscribe", entity_id=guild))
2022-09-12 17:39:50 +00:00
2022-11-24 13:23:15 +00:00
try:
discord = make_session(token=session.get("oauth2_token"))
user = discord.get(f"{API_BASE_URL}/users/@me").json()
user_guilds = discord.get(f"{API_BASE_URL}/users/@me/guilds").json()
2023-05-26 10:13:15 +00:00
except OAuth2Error:
2022-11-24 13:23:15 +00:00
return redirect(url_for(".login"))
2022-09-12 17:39:50 +00:00
common_guilds = []
for bot_guild in client.guilds:
for user_guild in user_guilds:
if str(bot_guild.id) == user_guild["id"]:
common_guilds.append(bot_guild)
return await render_template(
2023-11-07 00:26:12 +00:00
"guilds.html.j2",
user=user,
avatar=Asset._from_avatar(None, user["id"], user["avatar"]),
common_guilds=common_guilds,
2022-09-12 17:39:50 +00:00
)
2022-05-08 14:02:19 +00:00
2023-11-07 00:26:12 +00:00
@app.route("/subscribe/<entity_id>")
2022-09-12 17:39:50 +00:00
@login_required
2023-11-07 00:26:12 +00:00
async def subscribe(entity_id: str):
guild = get_guild_by_id(entity_id)
2022-09-12 17:39:50 +00:00
2023-11-07 00:26:12 +00:00
if guild:
try:
discord = make_session(token=session.get("oauth2_token"))
user_guilds = discord.get(f"{API_BASE_URL}/users/@me/guilds").json()
except OAuth2Error:
return redirect(url_for(".login"))
if not any(str(guild.id) == user_guild["id"] for user_guild in user_guilds):
return redirect(url_for(".login"))
return await render_template(
"subscribe.html.j2",
avatar=guild.icon,
entity_id=guild.vanity_url_code or guild.id,
)
2022-09-12 17:39:50 +00:00
try:
user = await client.get_or_fetch_user(int(entity_id))
except ValueError:
return redirect(url_for(".login"))
2023-11-07 00:26:12 +00:00
if user and str(user.id) == entity_id:
return await render_template(
"subscribe.html.j2", avatar=user.avatar, entity_id=user.id
)
2022-09-12 17:39:50 +00:00
2023-11-07 00:26:12 +00:00
return redirect(url_for(".login"))
2022-05-08 14:02:19 +00:00
2023-11-07 00:32:47 +00:00
def make_event(scheduled_event: GuildScheduledEvent) -> Event:
event = Event()
event.summary = scheduled_event.name
event.begin = scheduled_event.scheduled_start_time
event.end = scheduled_event.scheduled_end_time
event.duration = timedelta(hours=2)
event.uid = str(scheduled_event.id)
event.description = scheduled_event.description
2023-11-07 00:32:47 +00:00
event.url = scheduled_event.url
event.location = (
scheduled_event.entity_metadata.location
if scheduled_event.entity_metadata
else None
)
alarm = DisplayAlarm()
alarm.trigger = timedelta(hours=-1)
event.alarms.append(alarm)
return event
@app.route("/<entity_id>.ics")
async def ical(entity_id: str):
guild = get_guild_by_id(entity_id)
if guild:
calendar = Calendar()
2022-05-08 14:02:19 +00:00
calendar.extra.append(ContentLine(name="REFRESH-INTERVAL", value="PT1H"))
calendar.extra.append(ContentLine(name="X-PUBLISHED-TTL", value="PT1H"))
calendar.extra.append(ContentLine(name="NAME", value=guild.name))
calendar.extra.append(ContentLine(name="X-WR-CALNAME", value=guild.name))
if guild.description:
calendar.extra.append(
ContentLine(name="DESCRIPTION", value=guild.description)
)
calendar.extra.append(
ContentLine(name="X-WR-CALDESC", value=guild.description)
)
for scheduled_event in guild.scheduled_events:
2023-11-07 00:32:47 +00:00
event = make_event(scheduled_event)
calendar.events.append(event)
return calendar.serialize()
try:
user = await client.get_or_fetch_user(int(entity_id))
except ValueError:
return redirect(url_for(".login"))
if user:
calendar = Calendar()
calendar.extra.append(ContentLine(name="REFRESH-INTERVAL", value="PT1H"))
calendar.extra.append(ContentLine(name="X-PUBLISHED-TTL", value="PT1H"))
calendar.extra.append(ContentLine(name="NAME", value=client.user.display_name))
calendar.extra.append(
ContentLine(name="X-WR-CALNAME", value=client.user.display_name)
2022-05-08 14:02:19 +00:00
)
for guild in client.guilds:
2023-11-07 09:52:38 +00:00
for scheduled_event in guild.scheduled_events:
if user.id in EVENTS_CACHE[scheduled_event.id]:
event = make_event(scheduled_event)
calendar.events.append(event)
2022-05-08 14:02:19 +00:00
return calendar.serialize()
2022-05-08 14:02:19 +00:00
return redirect(url_for(".login"))
2022-05-08 14:02:19 +00:00
2024-01-24 11:25:33 +00:00
def run():
2023-11-07 10:07:42 +00:00
client.loop.create_task(client.start(DISCORD_TOKEN))
app.run("0.0.0.0", loop=client.loop)