2022-05-08 14:02:19 +00:00
|
|
|
from asyncio import new_event_loop
|
|
|
|
from datetime import timedelta
|
|
|
|
from os import environ
|
2022-05-09 22:37:43 +00:00
|
|
|
from typing import List, Optional, Union
|
2022-05-08 14:02:19 +00:00
|
|
|
|
|
|
|
from disnake import Client, Guild, Member
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
from ics import Calendar, Event # type: ignore
|
|
|
|
from ics.alarm.display import DisplayAlarm # type: ignore
|
2022-05-09 22:31:50 +00:00
|
|
|
from quart import Quart, redirect, request, session, url_for
|
|
|
|
from requests_oauthlib import OAuth2Session # type: ignore
|
2022-05-08 14:02:19 +00:00
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
DISCORD_TOKEN = environ.get("DISCORD_TOKEN")
|
2022-05-09 22:31:50 +00:00
|
|
|
OAUTH2_CLIENT_ID = environ.get("OAUTH2_CLIENT_ID")
|
|
|
|
OAUTH2_CLIENT_SECRET = environ.get("OAUTH2_CLIENT_SECRET")
|
|
|
|
OAUTH2_REDIRECT_URI = environ.get("OAUTH2_REDIRECT_URI", "callback")
|
|
|
|
if not DISCORD_TOKEN or not OAUTH2_CLIENT_ID or not OAUTH2_CLIENT_SECRET:
|
|
|
|
raise Exception(
|
|
|
|
"Missing some env variables "
|
|
|
|
"(could be DISCORD_TOKEN, OAUTH2_CLIENT_ID or OAUTH2_CLIENT_SECRET)"
|
|
|
|
)
|
2022-05-08 14:02:19 +00:00
|
|
|
|
2022-05-09 22:31:50 +00:00
|
|
|
API_BASE_URL = environ.get("API_BASE_URL", "https://discordapp.com/api")
|
|
|
|
AUTHORIZATION_BASE_URL = API_BASE_URL + "/oauth2/authorize"
|
|
|
|
TOKEN_URL = API_BASE_URL + "/oauth2/token"
|
2022-05-08 14:02:19 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Discord(Client):
|
|
|
|
async def on_ready(self):
|
|
|
|
print(f"Logged on as {self.user}!")
|
|
|
|
|
|
|
|
|
|
|
|
client = Discord()
|
|
|
|
app = Quart(__name__)
|
2022-05-09 22:31:50 +00:00
|
|
|
app.config["SECRET_KEY"] = OAUTH2_CLIENT_SECRET
|
2022-05-08 14:02:19 +00:00
|
|
|
|
|
|
|
|
2022-05-08 14:25:25 +00:00
|
|
|
def get_guild_by_id(guild_id: Union[int, str]) -> Optional[Guild]:
|
2022-05-08 14:02:19 +00:00
|
|
|
for guild in client.guilds:
|
|
|
|
if guild.id == int(guild_id) or guild.vanity_url_code == str(guild_id):
|
|
|
|
return guild
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2022-05-08 14:25:25 +00:00
|
|
|
def get_bot_member(guild: Guild) -> Optional[Member]:
|
2022-05-08 14:02:19 +00:00
|
|
|
for member in guild.members:
|
|
|
|
if member.id == client.user.id:
|
|
|
|
return member
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2022-05-08 14:25:25 +00:00
|
|
|
async def get_guild_tag(member: Member) -> Optional[str]:
|
2022-05-08 14:02:19 +00:00
|
|
|
if member.guild_permissions.manage_guild:
|
|
|
|
if member.guild.vanity_url_code:
|
|
|
|
return member.guild.vanity_url_code
|
|
|
|
else:
|
|
|
|
invites = await member.guild.invites()
|
|
|
|
return invites[0].code
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2022-05-09 22:31:50 +00:00
|
|
|
def token_updater(token: str) -> None:
|
|
|
|
session["oauth2_token"] = token
|
|
|
|
|
|
|
|
|
|
|
|
def make_oauth_session(
|
|
|
|
host_url: str,
|
|
|
|
token: Optional[str] = None,
|
|
|
|
state: Optional[str] = None,
|
2022-05-09 22:37:43 +00:00
|
|
|
scope: Optional[List[str]] = None,
|
2022-05-09 22:31:50 +00:00
|
|
|
) -> OAuth2Session:
|
|
|
|
print(host_url + OAUTH2_REDIRECT_URI)
|
|
|
|
return OAuth2Session(
|
|
|
|
client_id=OAUTH2_CLIENT_ID,
|
|
|
|
token=token,
|
|
|
|
state=state,
|
|
|
|
scope=scope,
|
|
|
|
redirect_uri=host_url + OAUTH2_REDIRECT_URI,
|
|
|
|
auto_refresh_kwargs={
|
|
|
|
"client_id": OAUTH2_CLIENT_ID,
|
|
|
|
"client_secret": OAUTH2_CLIENT_SECRET,
|
|
|
|
},
|
|
|
|
auto_refresh_url=TOKEN_URL,
|
|
|
|
token_updater=token_updater,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/login")
|
|
|
|
async def login():
|
|
|
|
discord = make_oauth_session(host_url=request.host_url, scope=["guilds"])
|
|
|
|
authorization_url, state = discord.authorization_url(AUTHORIZATION_BASE_URL)
|
|
|
|
session["oauth2_state"] = state
|
|
|
|
return redirect(authorization_url)
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/callback")
|
|
|
|
async def callback():
|
|
|
|
request_values = await request.values
|
|
|
|
if request_values.get("error"):
|
|
|
|
return request_values["error"]
|
|
|
|
discord = make_oauth_session(
|
|
|
|
host_url=request.host_url, state=session.get("oauth2_state")
|
|
|
|
)
|
|
|
|
token = discord.fetch_token(
|
|
|
|
TOKEN_URL,
|
|
|
|
client_secret=OAUTH2_CLIENT_SECRET,
|
|
|
|
authorization_response=request.url,
|
|
|
|
)
|
|
|
|
session["oauth2_token"] = token
|
|
|
|
print(token)
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
2022-05-08 14:02:19 +00:00
|
|
|
@app.route("/<guild_id>.ics")
|
|
|
|
async def index(guild_id):
|
|
|
|
guild = get_guild_by_id(guild_id)
|
|
|
|
if guild is None:
|
2022-05-09 22:31:50 +00:00
|
|
|
return redirect(url_for(".login"))
|
2022-05-08 14:02:19 +00:00
|
|
|
|
|
|
|
bot = get_bot_member(guild)
|
|
|
|
guild_tag = await get_guild_tag(bot)
|
|
|
|
calendar = Calendar()
|
|
|
|
|
|
|
|
for scheduled_event in guild.scheduled_events:
|
|
|
|
event = Event()
|
|
|
|
event.name = scheduled_event.name
|
|
|
|
event.begin = scheduled_event.scheduled_start_time
|
|
|
|
event.end = scheduled_event.scheduled_end_time
|
|
|
|
event.uid = str(scheduled_event.id)
|
|
|
|
event.description = scheduled_event.description
|
|
|
|
event.location = (
|
|
|
|
scheduled_event.entity_metadata.location
|
|
|
|
if scheduled_event.entity_metadata
|
|
|
|
else None
|
|
|
|
)
|
|
|
|
|
|
|
|
if guild_tag:
|
|
|
|
event.url = f"https://discord.gg/{guild_tag}?event={scheduled_event.id}"
|
|
|
|
|
|
|
|
alarm = DisplayAlarm()
|
|
|
|
alarm.trigger = timedelta(hours=1)
|
|
|
|
event.alarms.append(alarm)
|
|
|
|
|
|
|
|
calendar.events.add(event)
|
|
|
|
|
|
|
|
return str(calendar)
|
|
|
|
|
|
|
|
|
|
|
|
loop = new_event_loop()
|
|
|
|
discord_task = loop.create_task(client.start(DISCORD_TOKEN))
|
|
|
|
discord_task.add_done_callback(lambda f: loop.stop())
|
|
|
|
quart_task = loop.create_task(app.run_task())
|
|
|
|
quart_task.add_done_callback(lambda f: loop.stop())
|
|
|
|
loop.run_forever()
|