237 lines
6.6 KiB
Python
237 lines
6.6 KiB
Python
from os import getenv, path
|
|
from typing import Annotated
|
|
|
|
from docker import errors, from_env
|
|
from docker.models.containers import Container
|
|
from dotenv import load_dotenv
|
|
from fastapi import Depends, FastAPI, HTTPException, Request
|
|
from fastapi.responses import PlainTextResponse
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
from starlette import status, types
|
|
from uvicorn import run
|
|
|
|
load_dotenv()
|
|
client = from_env()
|
|
security = HTTPBasic()
|
|
|
|
|
|
def http_401() -> HTTPException:
|
|
return HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
|
|
async def check_auth(
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> HTTPBasicCredentials:
|
|
usernames = getenv("USERNAMES", "").split(",")
|
|
passwords = getenv("PASSWORDS", "").split(",")
|
|
|
|
if credentials.username not in usernames or credentials.password not in passwords:
|
|
raise http_401()
|
|
|
|
user_index = usernames.index(credentials.username)
|
|
password = passwords[user_index]
|
|
|
|
if credentials.password != password:
|
|
raise http_401()
|
|
|
|
return credentials
|
|
|
|
|
|
app = FastAPI(dependencies=[Depends(check_auth)])
|
|
|
|
|
|
class SerializedContainer(BaseModel):
|
|
id: str
|
|
name: str | None
|
|
image: str | None
|
|
labels: dict[str, str]
|
|
status: str
|
|
health: str
|
|
engine: str | None
|
|
owner: str | None
|
|
environment: list[str]
|
|
logs: str | None
|
|
|
|
|
|
def container_logs(container: Container, tail: int) -> str | None:
|
|
try:
|
|
return container.logs(tail=tail).decode()
|
|
except errors.APIError:
|
|
return None
|
|
|
|
|
|
def serialize_container(container: Container) -> SerializedContainer:
|
|
return SerializedContainer(
|
|
id=container.short_id,
|
|
name=container.name,
|
|
image=container.image.tags[0] if container.image else None,
|
|
labels=container.labels,
|
|
status=container.status,
|
|
health=container.health,
|
|
engine=container.labels.get("engine"),
|
|
owner=container.labels.get("owner"),
|
|
environment=container.attrs["Config"]["Env"],
|
|
logs=container_logs(container, 100),
|
|
)
|
|
|
|
|
|
def select_container(
|
|
container_name: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)]
|
|
) -> Container:
|
|
try:
|
|
container = client.containers.get(container_name)
|
|
except errors.APIError:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
if (
|
|
credentials.username != "admin"
|
|
and container.labels.get("engine") != "pilotwings"
|
|
and container.labels.get("owner") != credentials.username
|
|
):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
|
|
return container
|
|
|
|
|
|
@app.get("/api/containers")
|
|
def get_containers(
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> list[SerializedContainer]:
|
|
if credentials.username == "admin":
|
|
return [
|
|
serialize_container(container)
|
|
for container in client.containers.list(
|
|
filters={"label": ["engine=pilotwings"]}
|
|
)
|
|
]
|
|
|
|
return [
|
|
serialize_container(container)
|
|
for container in client.containers.list(
|
|
filters={"label": ["engine=pilotwings", f"owner={credentials.username}"]},
|
|
)
|
|
]
|
|
|
|
|
|
@app.get("/api/container/{container_name}")
|
|
def get_container(
|
|
container_name: str,
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> SerializedContainer:
|
|
return serialize_container(select_container(container_name, credentials))
|
|
|
|
|
|
class ContainerRequest(BaseModel):
|
|
image: str
|
|
environment: list[str]
|
|
|
|
|
|
@app.post("/api/container/{container_name}")
|
|
def create_or_update_container(
|
|
container_name: str,
|
|
request_body: ContainerRequest,
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> SerializedContainer:
|
|
networks = client.networks.list(names=["pilotwings"])
|
|
|
|
if not networks:
|
|
client.networks.create("pilotwings")
|
|
|
|
client.images.pull(request_body.image)
|
|
|
|
try:
|
|
delete_container(container_name, credentials)
|
|
except HTTPException:
|
|
pass
|
|
|
|
return serialize_container(
|
|
client.containers.run(
|
|
request_body.image,
|
|
detach=True,
|
|
environment=request_body.environment,
|
|
labels={"engine": "pilotwings", "owner": credentials.username},
|
|
name=container_name,
|
|
network="pilotwings",
|
|
restart_policy={"Name": "always"},
|
|
)
|
|
)
|
|
|
|
|
|
@app.post("/api/container/{container_name}/pull")
|
|
def pull_container(
|
|
container_name: str,
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> SerializedContainer:
|
|
container = select_container(container_name, credentials)
|
|
|
|
if not container.image:
|
|
raise HTTPException(status_code=status.HTTP_410_GONE)
|
|
|
|
request_body = ContainerRequest(
|
|
image=container.image.tags[0], environment=container.attrs["Config"]["Env"]
|
|
)
|
|
|
|
return create_or_update_container(container_name, request_body, credentials)
|
|
|
|
|
|
@app.post("/api/container/{container_name}/restart")
|
|
def restart_container(
|
|
container_name: str,
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> SerializedContainer:
|
|
container = select_container(container_name, credentials)
|
|
container.restart()
|
|
return serialize_container(container)
|
|
|
|
|
|
@app.get("/api/container/{container_name}/logs", response_class=PlainTextResponse)
|
|
def get_container_logs(
|
|
container_name: str,
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> str:
|
|
container = select_container(container_name, credentials)
|
|
return container.logs().decode()
|
|
|
|
|
|
@app.delete("/api/container/{container_name}")
|
|
def delete_container(
|
|
container_name: str,
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> None:
|
|
container = select_container(container_name, credentials)
|
|
container.stop()
|
|
container.remove(v=True, force=True)
|
|
|
|
|
|
class AuthStaticFiles(StaticFiles):
|
|
async def __call__(
|
|
self, scope: types.Scope, receive: types.Receive, send: types.Send
|
|
) -> None:
|
|
request = Request(scope, receive)
|
|
credentials = await security(request)
|
|
|
|
if not credentials:
|
|
raise http_401()
|
|
|
|
await check_auth(credentials)
|
|
await super().__call__(scope, receive, send)
|
|
|
|
|
|
app.mount(
|
|
"/",
|
|
AuthStaticFiles(
|
|
directory=f"{path.dirname(path.realpath(__file__))}/dist", html=True
|
|
),
|
|
name="static",
|
|
)
|
|
|
|
|
|
def launch() -> None:
|
|
run(app, host="0.0.0.0")
|