Divent/divent/bot.py

227 lines
6.4 KiB
Python
Raw Normal View History

2022-06-23 13:27:13 +00:00
import json
2022-07-26 13:39:00 +00:00
import logging
2022-06-08 21:02:14 +00:00
from datetime import datetime, timedelta
2022-06-23 13:27:13 +00:00
from os import environ, path
2022-05-30 15:31:33 +00:00
from typing import List, Optional
2022-05-08 14:02:19 +00:00
2022-05-30 06:30:11 +00:00
from disnake import Client, Guild
2022-05-08 14:02:19 +00:00
from dotenv import load_dotenv
from ics import Calendar, Event # type: ignore
from ics.alarm.display import DisplayAlarm # type: ignore
2022-05-30 15:31:33 +00:00
from quart import Quart, redirect, render_template, request, session, url_for
from requests_oauthlib import OAuth2Session # type: ignore
2022-09-01 19:06:40 +00:00
import sentry_sdk
2022-08-29 14:14:56 +00:00
from sentry_sdk.integrations.quart import QuartIntegration
2022-05-08 14:02:19 +00:00
load_dotenv()
2022-07-26 13:39:00 +00:00
QUART_DEBUG = environ.get("QUART_DEBUG", False)
2022-05-08 14:02:19 +00:00
DISCORD_TOKEN = environ.get("DISCORD_TOKEN")
OAUTH2_CLIENT_ID = environ.get("OAUTH2_CLIENT_ID")
OAUTH2_CLIENT_SECRET = environ.get("OAUTH2_CLIENT_SECRET")
OAUTH2_REDIRECT_URI = environ.get("OAUTH2_REDIRECT_URI", "callback")
if not DISCORD_TOKEN or not OAUTH2_CLIENT_ID or not OAUTH2_CLIENT_SECRET:
raise Exception(
"Missing some env variables "
"(could be DISCORD_TOKEN, OAUTH2_CLIENT_ID or OAUTH2_CLIENT_SECRET)"
)
2022-05-08 14:02:19 +00:00
API_BASE_URL = environ.get("API_BASE_URL", "https://discordapp.com/api")
AUTHORIZATION_BASE_URL = API_BASE_URL + "/oauth2/authorize"
TOKEN_URL = API_BASE_URL + "/oauth2/token"
2022-05-08 14:02:19 +00:00
2022-07-26 13:39:00 +00:00
if QUART_DEBUG:
logging.basicConfig(level=logging.DEBUG)
2022-08-29 14:14:56 +00:00
SENTRY_DSN = environ.get("SENTRY_DSN")
if SENTRY_DSN:
sentry_sdk.init(SENTRY_DSN, integrations=[QuartIntegration()])
2022-05-08 14:02:19 +00:00
class Discord(Client):
async def on_ready(self):
print(f"Logged on as {self.user}!")
client = Discord()
app = Quart(__name__)
app.config["SECRET_KEY"] = OAUTH2_CLIENT_SECRET
2022-05-08 14:02:19 +00:00
2022-05-30 15:31:33 +00:00
def get_guild_by_id(guild_id: str) -> Optional[Guild]:
2022-05-08 14:02:19 +00:00
for guild in client.guilds:
2022-05-30 15:31:33 +00:00
if str(guild.id) == guild_id or guild.vanity_url_code == guild_id:
2022-05-08 14:02:19 +00:00
return guild
return None
def token_updater(token: str) -> None:
session["oauth2_token"] = token
def make_oauth_session(
host_url: str,
token: Optional[str] = None,
state: Optional[str] = None,
2022-05-09 22:37:43 +00:00
scope: Optional[List[str]] = None,
) -> OAuth2Session:
return OAuth2Session(
client_id=OAUTH2_CLIENT_ID,
token=token,
state=state,
scope=scope,
redirect_uri=host_url + OAUTH2_REDIRECT_URI,
auto_refresh_kwargs={
"client_id": OAUTH2_CLIENT_ID,
"client_secret": OAUTH2_CLIENT_SECRET,
},
auto_refresh_url=TOKEN_URL,
token_updater=token_updater,
)
2022-06-23 13:27:13 +00:00
CATALOG_CACHE = {}
2022-07-26 13:39:00 +00:00
@app.errorhandler(500)
async def errorhandler(error: Exception):
print(f"\33[31m{error}\33[m")
return await render_template("error.html.j2", error=str(error)), 500
@app.errorhandler(404)
async def not_found(error: Exception):
return await render_template("error.html.j2", error=str(error)), 404
2022-06-23 13:27:13 +00:00
def i18n(str: str) -> str:
lang = request.accept_languages.best_match(["en", "fr"])
if lang not in CATALOG_CACHE:
catalog_file = f"{path.dirname(__file__)}/translations/{lang}.json"
if path.exists(catalog_file):
with open(catalog_file) as catalog_json:
catalog = json.load(catalog_json)
CATALOG_CACHE[lang] = catalog
if lang in CATALOG_CACHE and str in CATALOG_CACHE[lang]:
return CATALOG_CACHE[lang][str]
return str
2022-06-08 21:02:14 +00:00
def days_before_failure():
nextYear = datetime.today().year + 5 - ((datetime.today().year + 5) % 5)
nextDate = datetime(year=nextYear, month=6, day=3)
nextDelta = nextDate - datetime.now()
2022-06-23 13:27:13 +00:00
return nextDelta.days
2022-07-26 13:39:00 +00:00
def avatar_cdn(type: str, id: str, hash: str):
ext = "gif" if hash.startswith("a_") else "png"
return f"https://cdn.discordapp.com/{type}/{id}/{hash}.{ext}"
2022-07-05 16:23:05 +00:00
2022-06-23 13:27:13 +00:00
@app.context_processor
def context_processor():
2022-07-26 13:39:00 +00:00
return dict(
_=i18n,
avatar_cdn=avatar_cdn,
days_before_failure=days_before_failure(),
bot=client.user,
)
2022-06-08 21:02:14 +00:00
2022-05-30 15:31:33 +00:00
@app.route("/")
async def index():
2022-06-17 15:02:11 +00:00
return await render_template("index.html.j2")
2022-05-30 15:31:33 +00:00
@app.route("/login")
async def login():
2022-06-08 21:02:14 +00:00
token = session.get("oauth2_token")
if token is not None:
return redirect(url_for(".guilds"))
2022-07-05 16:23:05 +00:00
discord = make_oauth_session(
host_url=request.host_url, scope=["identify", "guilds"]
)
authorization_url, state = discord.authorization_url(AUTHORIZATION_BASE_URL)
session["oauth2_state"] = state
return redirect(authorization_url)
@app.route("/callback")
async def callback():
request_values = await request.values
if request_values.get("error"):
return request_values["error"]
discord = make_oauth_session(
host_url=request.host_url, state=session.get("oauth2_state")
)
token = discord.fetch_token(
TOKEN_URL,
client_secret=OAUTH2_CLIENT_SECRET,
authorization_response=request.url,
)
2022-07-26 13:39:00 +00:00
token_updater(token)
2022-06-08 21:02:14 +00:00
return redirect(url_for(".guilds"))
@app.route("/guilds")
async def guilds():
token = session.get("oauth2_token")
if token is None:
return redirect(url_for(".login"))
discord = make_oauth_session(host_url=request.host_url, token=token)
2022-07-05 16:23:05 +00:00
user = discord.get(API_BASE_URL + "/users/@me")
guilds = discord.get(API_BASE_URL + "/users/@me/guilds")
if user.status_code != 200 or guilds.status_code != 200:
return redirect(url_for(".login"))
2022-06-08 21:02:14 +00:00
2022-07-05 16:23:05 +00:00
return await render_template(
"guilds.html.j2", guilds=guilds.json(), user=user.json()
)
2022-05-08 14:02:19 +00:00
@app.route("/<guild_id>.ics")
2022-07-05 16:23:05 +00:00
async def ical(guild_id: str):
2022-05-08 14:02:19 +00:00
guild = get_guild_by_id(guild_id)
if guild is None:
return redirect(url_for(".login"))
2022-05-08 14:02:19 +00:00
calendar = Calendar()
for scheduled_event in guild.scheduled_events:
event = Event()
event.name = scheduled_event.name
event.begin = scheduled_event.scheduled_start_time
2022-05-30 06:30:11 +00:00
event.end = (
scheduled_event.scheduled_end_time
or scheduled_event.scheduled_start_time + timedelta(hours=2)
)
2022-05-08 14:02:19 +00:00
event.uid = str(scheduled_event.id)
event.description = scheduled_event.description
2022-05-30 06:30:11 +00:00
event.url = f"https://discord.com/events/{guild_id}/{scheduled_event.id}"
2022-05-08 14:02:19 +00:00
event.location = (
scheduled_event.entity_metadata.location
if scheduled_event.entity_metadata
else None
)
alarm = DisplayAlarm()
alarm.trigger = timedelta(hours=1)
event.alarms.append(alarm)
calendar.events.add(event)
return str(calendar)
2022-08-22 21:44:02 +00:00
quart_task = client.loop.create_task(app.run_task())
quart_task.add_done_callback(lambda f: client.loop.stop())
client.run(DISCORD_TOKEN)