feat: tighten linux login flow and refresh lab entry
This commit is contained in:
135
server.py
135
server.py
@@ -8,31 +8,40 @@ import http.server
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import secrets
|
||||
import time
|
||||
import urllib.parse
|
||||
from typing import Any
|
||||
|
||||
from sandbox import LinuxSandbox
|
||||
|
||||
DEFAULT_USERNAME = os.environ.get("LINUX_LAB_USERNAME", "admin")
|
||||
DEFAULT_PASSWORD = os.environ.get("LINUX_LAB_PASSWORD", "safe_linux_2026")
|
||||
USERS = {
|
||||
"admin": hashlib.sha256(b"safe_linux_2026").hexdigest(),
|
||||
DEFAULT_USERNAME: hashlib.sha256(DEFAULT_PASSWORD.encode("utf-8")).hexdigest(),
|
||||
}
|
||||
|
||||
TASKS_FILE = os.path.join(os.path.dirname(__file__), "COURSE_TASKS.json")
|
||||
HTML_FILE = os.path.join(os.path.dirname(__file__), "index.html")
|
||||
LOGIN_FILE = os.path.join(os.path.dirname(__file__), "login.html")
|
||||
PRIVACY_FILE = os.path.join(os.path.dirname(__file__), "privacy.html")
|
||||
SANDBOX = LinuxSandbox()
|
||||
SESSION_COOKIE = "linux_lab_session"
|
||||
SESSION_TTL_SECONDS = 8 * 60 * 60
|
||||
SESSIONS: dict[str, dict[str, Any]] = {}
|
||||
|
||||
PUBLIC_GET_PATHS = {
|
||||
"/",
|
||||
"/login.html",
|
||||
"/privacy",
|
||||
"/privacy.html",
|
||||
"/api/health",
|
||||
"/api/session",
|
||||
}
|
||||
PUBLIC_POST_PATHS = {
|
||||
"/api/login",
|
||||
"/api/logout",
|
||||
}
|
||||
SAFE_REMOTE_HOST = "xiaoxiaoluohao.indevs.in"
|
||||
|
||||
|
||||
def load_course() -> dict[str, Any]:
|
||||
@@ -47,6 +56,37 @@ def load_course() -> dict[str, Any]:
|
||||
COURSE = load_course()
|
||||
|
||||
|
||||
def secure_cookie_enabled() -> bool:
|
||||
return os.environ.get("LINUX_LAB_SECURE_COOKIE", "").lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def create_session(username: str) -> str:
|
||||
session_id = secrets.token_urlsafe(24)
|
||||
SESSIONS[session_id] = {
|
||||
"user": username,
|
||||
"expires_at": time.time() + SESSION_TTL_SECONDS,
|
||||
}
|
||||
return session_id
|
||||
|
||||
|
||||
def get_session(session_id: str) -> dict[str, Any] | None:
|
||||
if not session_id:
|
||||
return None
|
||||
session = SESSIONS.get(session_id)
|
||||
if not session:
|
||||
return None
|
||||
if session.get("expires_at", 0) <= time.time():
|
||||
SESSIONS.pop(session_id, None)
|
||||
return None
|
||||
session["expires_at"] = time.time() + SESSION_TTL_SECONDS
|
||||
return session
|
||||
|
||||
|
||||
def delete_session(session_id: str):
|
||||
if session_id:
|
||||
SESSIONS.pop(session_id, None)
|
||||
|
||||
|
||||
def looks_garbled(value: Any) -> bool:
|
||||
if not isinstance(value, str) or not value.strip():
|
||||
return False
|
||||
@@ -980,11 +1020,13 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
def send_json(self, data: Any, status: int = 200):
|
||||
def send_json(self, data: Any, status: int = 200, extra_headers: dict[str, str] | None = None):
|
||||
raw = json.dumps(data, ensure_ascii=False).encode("utf-8")
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||||
self.send_header("Content-Length", str(len(raw)))
|
||||
for header, value in (extra_headers or {}).items():
|
||||
self.send_header(header, value)
|
||||
self.end_headers()
|
||||
self.wfile.write(raw)
|
||||
|
||||
@@ -997,6 +1039,36 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def send_redirect(self, location: str):
|
||||
self.send_response(302)
|
||||
self.send_header("Location", location)
|
||||
self.end_headers()
|
||||
|
||||
def cookie_header(self, session_id: str = "", clear: bool = False) -> str:
|
||||
base = f"{SESSION_COOKIE}={session_id if not clear else ''}; Path=/; HttpOnly; SameSite=Lax"
|
||||
if clear:
|
||||
return base + "; Max-Age=0"
|
||||
if secure_cookie_enabled():
|
||||
base += "; Secure"
|
||||
return base + f"; Max-Age={SESSION_TTL_SECONDS}"
|
||||
|
||||
def current_session_id(self) -> str:
|
||||
cookie = self.headers.get("Cookie", "")
|
||||
for part in cookie.split(";"):
|
||||
name, _, value = part.strip().partition("=")
|
||||
if name == SESSION_COOKIE:
|
||||
return value.strip()
|
||||
return ""
|
||||
|
||||
def current_session(self) -> dict[str, Any] | None:
|
||||
return get_session(self.current_session_id())
|
||||
|
||||
def current_user(self) -> str | None:
|
||||
session = self.current_session()
|
||||
if not session:
|
||||
return None
|
||||
return str(session.get("user") or "")
|
||||
|
||||
def is_public_path(self, path: str, method: str) -> bool:
|
||||
if method == "GET":
|
||||
return path in PUBLIC_GET_PATHS
|
||||
@@ -1004,19 +1076,10 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
return path in PUBLIC_POST_PATHS
|
||||
return False
|
||||
|
||||
def check_auth(self, auth_header: str, token: str) -> bool:
|
||||
if token == "safe_linux_2026":
|
||||
return True
|
||||
if auth_header.startswith("Bearer ") and auth_header[7:] == "safe_linux_2026":
|
||||
return True
|
||||
return False
|
||||
|
||||
def require_auth_if_needed(self, path: str, method: str) -> bool:
|
||||
if self.is_public_path(path, method):
|
||||
return True
|
||||
auth_header = self.headers.get("Authorization", "")
|
||||
token = self.headers.get("X-Token", "")
|
||||
if not self.check_auth(auth_header, token):
|
||||
if not self.current_session():
|
||||
self.send_json({"error": "Authentication required"}, 401)
|
||||
return False
|
||||
return True
|
||||
@@ -1025,10 +1088,21 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
parsed = urllib.parse.urlparse(self.path)
|
||||
path = parsed.path
|
||||
|
||||
if not self.require_auth_if_needed(path, "GET"):
|
||||
if path == "/":
|
||||
self.send_redirect("/app" if self.current_session() else "/login.html")
|
||||
return
|
||||
|
||||
if path == "/":
|
||||
if path == "/login.html":
|
||||
if self.current_session():
|
||||
self.send_redirect("/app")
|
||||
return
|
||||
self.send_file(LOGIN_FILE, "text/html; charset=utf-8")
|
||||
return
|
||||
|
||||
if path in {"/app", "/index.html"}:
|
||||
if not self.current_session():
|
||||
self.send_redirect("/login.html")
|
||||
return
|
||||
self.send_file(HTML_FILE, "text/html; charset=utf-8")
|
||||
return
|
||||
|
||||
@@ -1036,6 +1110,9 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
self.send_file(PRIVACY_FILE, "text/html; charset=utf-8")
|
||||
return
|
||||
|
||||
if not self.require_auth_if_needed(path, "GET"):
|
||||
return
|
||||
|
||||
if path == "/api/health":
|
||||
overview = build_overview()
|
||||
self.send_json(
|
||||
@@ -1051,6 +1128,11 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
)
|
||||
return
|
||||
|
||||
if path == "/api/session":
|
||||
user = self.current_user()
|
||||
self.send_json({"authenticated": bool(user), "user": user or ""})
|
||||
return
|
||||
|
||||
if path == "/api/course":
|
||||
self.send_json(COURSE)
|
||||
return
|
||||
@@ -1130,9 +1212,6 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
parsed = urllib.parse.urlparse(self.path)
|
||||
path = parsed.path
|
||||
|
||||
if not self.require_auth_if_needed(path, "POST"):
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
raw = self.rfile.read(content_length).decode("utf-8") if content_length else "{}"
|
||||
|
||||
@@ -1146,14 +1225,30 @@ class LinuxLearningHandler(http.server.BaseHTTPRequestHandler):
|
||||
username = data.get("username", "")
|
||||
password = data.get("password", "")
|
||||
if username in USERS and hashlib.sha256(password.encode("utf-8")).hexdigest() == USERS[username]:
|
||||
self.send_json({"success": True, "token": "safe_linux_2026", "message": "Login succeeded"})
|
||||
session_id = create_session(username)
|
||||
self.send_json(
|
||||
{
|
||||
"success": True,
|
||||
"user": username,
|
||||
"message": "Login succeeded",
|
||||
"expires_in": SESSION_TTL_SECONDS,
|
||||
},
|
||||
extra_headers={"Set-Cookie": self.cookie_header(session_id)},
|
||||
)
|
||||
return
|
||||
|
||||
self.send_json({"success": False, "error": "Invalid username or password"}, 401)
|
||||
return
|
||||
|
||||
if not self.require_auth_if_needed(path, "POST"):
|
||||
return
|
||||
|
||||
if path == "/api/logout":
|
||||
self.send_json({"success": True, "message": "Logged out"})
|
||||
delete_session(self.current_session_id())
|
||||
self.send_json(
|
||||
{"success": True, "message": "Logged out"},
|
||||
extra_headers={"Set-Cookie": self.cookie_header(clear=True)},
|
||||
)
|
||||
return
|
||||
|
||||
if path == "/api/reset":
|
||||
|
||||
Reference in New Issue
Block a user