feat: overhaul linux practice sandbox and learning UX

This commit is contained in:
likingcode
2026-03-10 07:30:42 +08:00
parent 5686831d9a
commit dacc1cb735
3 changed files with 1325 additions and 1567 deletions

1518
index.html

File diff suppressed because it is too large Load Diff

1037
sandbox.py

File diff suppressed because it is too large Load Diff

337
server.py
View File

@@ -1,285 +1,214 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Linux 命令沙盒练习平台 Server Linux 命令沙盒练习平台 Server(增强版)
- 集成 sandbox.py 沙盒引擎 - 集成 sandbox.py 沙盒引擎
- 路由:/ → HTML 页面, /api/run → 命令执行, /api/check → 任务检查 - 提供课程、命令执行、任务判定、学习建议等 API
""" """
from __future__ import annotations
import hashlib
import http.server import http.server
import urllib.parse
import json import json
import os import os
import base64 import urllib.parse
import hashlib from typing import Any
from sandbox import LinuxSandbox
# ==============================
# 认证配置
# ==============================
# 预置用户:{username: password_hash}
# 生成方式hashlib.sha256(b"password").hexdigest()
USERS = { USERS = {
# 默认管理员账户
"admin": hashlib.sha256(b"safe_linux_2026").hexdigest(), "admin": hashlib.sha256(b"safe_linux_2026").hexdigest(),
# 可添加更多用户
} }
# Token 有效期(秒) TOKEN_TTL = 86400
TOKEN_TTL = 86400 # 24 小时 TASKS_FILE = os.path.join(os.path.dirname(__file__), "COURSE_TASKS.json")
HTML_FILE = os.path.join(os.path.dirname(__file__), "index.html")
def check_auth(request_body: bytes = None) -> tuple[bool, str]:
"""
检查 Authorization 请求头
支持: Bearer token 或 Basic auth
"""
# 默认允许访问(未启用强制认证)
# 启用:true 改为 True
ENABLE_AUTH = False
if not ENABLE_AUTH:
return True, "admin"
if not request_body:
return False, "Missing Authorization header"
# 解析请求头
# 实际在 do_GET 中通过 headers 获取
return True, "admin" # 暂时 Allow-All后续优化
# 导入沙盒模块
from sandbox import LinuxSandbox
SANDBOX = LinuxSandbox() SANDBOX = LinuxSandbox()
# ==============================
# 任务数据加载
# ==============================
TASKS_FILE = os.path.join(os.path.dirname(__file__), "COURSE_TASKS.json")
def load_tasks() -> dict[str, Any]:
def load_tasks():
"""加载课程任务"""
try: try:
with open(TASKS_FILE, "r", encoding="utf-8") as f: with open(TASKS_FILE, "r", encoding="utf-8") as f:
return json.load(f) return json.load(f)
except Exception as e: except Exception as e:
print(f"Error loading tasks: {e}") print(f"Error loading tasks: {e}")
return {"levels": []} return {"meta": {}, "levels": []}
TASKS = load_tasks() TASKS = load_tasks()
# ==============================
# Handler 类 def find_task(task_id: str) -> dict[str, Any] | None:
# ============================== for level in TASKS.get("levels", []):
for task in level.get("challenges", []):
if task.get("id") == task_id:
task["level_title"] = level.get("title", "")
task["level_id"] = level.get("id", "")
return task
return None
class LinuxSandboxHandler(http.server.BaseHTTPRequestHandler): class LinuxSandboxHandler(http.server.BaseHTTPRequestHandler):
def send_json(self, data, status=200): def log_message(self, format, *args):
"""发送 JSON 响应""" pass
def send_json(self, data: dict[str, Any], status=200):
raw = json.dumps(data, ensure_ascii=False).encode("utf-8")
self.send_response(status) self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(raw)))
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8")) self.wfile.write(raw)
def send_text(self, text, status=200): def _send_file(self, path: str, content_type: str):
"""发送纯文本响应""" with open(path, "rb") as f:
self.send_response(status) content = f.read()
self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(content)))
self.end_headers() self.end_headers()
self.wfile.write(text.encode("utf-8")) self.wfile.write(content)
def check_auth(self, auth_header: str, token: str) -> bool:
if self.client_address[0] == "127.0.0.1":
return True
if token == "safe_linux_2026":
return True
if auth_header.startswith("Bearer ") and auth_header[7:] == "safe_linux_2026":
return True
return False
def do_GET(self): def do_GET(self):
parsed = urllib.parse.urlparse(self.path) parsed = urllib.parse.urlparse(self.path)
path = parsed.path path = parsed.path
# 🔐 认证检查(跳过 / 登录页 / API 登录) if path not in ["/", "/login.html", "/api/login", "/api/logout", "/api/tasks", "/api/health"]:
if path not in ["/", "/login.html", "/api/login", "/api/logout"]:
# 检查 Cookie 或 Authorization
auth_header = self.headers.get("Authorization", "") auth_header = self.headers.get("Authorization", "")
token = self.headers.get("X-Token", "") token = self.headers.get("X-Token", "")
if not self.check_auth(auth_header, token) and "xiaoxiaoluohao.indevs.in" in self.headers.get("Host", ""):
# 简化:如果带了有效 token 或直接 local 环境127.0.0.1)则放行 self.send_json({"error": "Authentication required"}, 401)
if not self.check_auth(auth_header, token): return
# 对公网域名强制认证
if "xiaoxiaoluohao.indevs.in" in self.headers.get("Host", ""):
self.send_json({"error": "Authentication required"}, 401)
return
# 1. 主页:返回 HTML 页面
if path == "/": if path == "/":
html_path = os.path.join(os.path.dirname(__file__), "index.html") self._send_file(HTML_FILE, "text/html; charset=utf-8")
try: return
with open(html_path, "rb") as f:
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(f.read())
except Exception as e:
self.send_response(500)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(f"Error: {e}".encode())
# 2. 命令执行 API if path == "/api/health":
elif path == "/api/run": self.send_json({"ok": True, "cwd": SANDBOX.cwd, "user": SANDBOX.user})
return
if path == "/api/run":
query = urllib.parse.parse_qs(parsed.query) query = urllib.parse.parse_qs(parsed.query)
cmd = query.get("cmd", [""])[0] cmd = query.get("cmd", [""])[0]
if not cmd: if not cmd:
self.send_json({"error": "No command provided"}, 400) self.send_json({"error": "No command provided"}, 400)
return return
# 执行沙盒命令
result = SANDBOX.execute(cmd) result = SANDBOX.execute(cmd)
self.send_json(result) self.send_json(result)
return
# 3. 任务检查 API if path == "/api/check":
elif path == "/api/check":
query = urllib.parse.parse_qs(parsed.query) query = urllib.parse.parse_qs(parsed.query)
task_id = query.get("task_id", [""])[0] task_id = query.get("task_id", [""])[0]
cmd = query.get("last_cmd", [""])[0]
output = query.get("output", [""])[0]
sandbox_state = {
"cwd": SANDBOX.cwd,
"exists": SANDBOX.exists,
"is_executable": SANDBOX.is_executable,
"cmd": cmd,
"output": output,
}
if not task_id: if not task_id:
self.send_json({"error": "-task_id required"}, 400) self.send_json({"error": "task_id required"}, 400)
return return
task = find_task(task_id)
# 查找任务
task = None
for level in TASKS.get("levels", []):
for t in level.get("challenges", []):
if t.get("id") == task_id:
task = t
break
if task:
break
if not task: if not task:
self.send_json({"error": "Task not found"}, 404) self.send_json({"error": "Task not found"}, 404)
return return
success, reason = self.evaluate_task(task, sandbox_state)
# 获取当前命令执行结果
current_cmd = query.get("last_cmd", [""])[0]
current_output = query.get("output", [""])[0]
# 检查是否匹配
success = False
message = task.get("fail_message", "❌ 未通过测试")
# 尝试匹配 solution
for sol in task.get("solution", []):
if current_cmd.strip() == sol.strip():
success = True
message = "✅ 回答正确!🎉"
break
# 如果没匹配上,检查 success_test 逻辑
if not success and "success_test" in task:
if self.evaluate_test(task["success_test"], current_output):
success = True
message = "✅ 回答正确!🎉"
self.send_json({ self.send_json({
"task_id": task_id, "task_id": task_id,
"success": success, "success": success,
"message": message, "message": task.get("success_msg", "✅ 回答正确!") if success else reason or task.get("fail_message", "❌ 未通过测试"),
"hint": task.get("hint", "💡 没有提示"), "hint": task.get("hint", "继续尝试,或者看看相关命令示例"),
"title": task.get("title", "未知任务"), "title": task.get("title", "未知任务"),
"next_suggestion": self.build_next_suggestion(task_id),
}) })
return
# 4. 获取课程总览 if path == "/api/tasks":
elif path == "/api/tasks":
self.send_json(TASKS) self.send_json(TASKS)
return
# 404 self.send_response(404)
else: self.end_headers()
self.send_response(404) self.wfile.write(b"Not Found")
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"Not Found")
def check_auth(self, auth_header: str, token: str) -> bool:
"""认证检查:支持 Bearer token / X-Token header"""
# 本地环境直接放行
if self.client_address[0] == "127.0.0.1":
return True
# 公网场景:验证 token
if token:
# 简化版:验证预置 token生产环境应加密存储
valid_tokens = ["safe_linux_2026"]
return token in valid_tokens
# Bearer token 支持
if auth_header.startswith("Bearer "):
token = auth_header[7:]
return token in ["safe_linux_2026"]
return False
def evaluate_test(self, test_expr: str, output: str) -> bool:
"""简单评估 success_test 表达式"""
try:
# 支持简单的 if/contains 语法
# if output contains 'xxx' then pass
if "contains" in test_expr:
parts = test_expr.split("'")
if len(parts) >= 3:
keyword = parts[1]
return keyword in output
elif "==" in test_expr:
parts = test_expr.split("==")
if len(parts) == 2:
expected = parts[1].strip().strip("'")
return output.strip() == expected
return False
except Exception:
return False
def log_message(self, format, *args):
pass # Suppress logging
# ==============================
# Auth APIs
# ==============================
def do_POST(self): def do_POST(self):
parsed = urllib.parse.urlparse(self.path) parsed = urllib.parse.urlparse(self.path)
path = parsed.path path = parsed.path
content_length = int(self.headers.get("Content-Length", 0))
raw = self.rfile.read(content_length).decode() if content_length else "{}"
# /api/login
if path == "/api/login": if path == "/api/login":
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode()
try: try:
data = json.loads(body) data = json.loads(raw)
username = data.get("username", "") except Exception:
password = data.get("password", "") self.send_json({"success": False, "error": "Invalid JSON"}, 400)
return
# 简单验证 username = data.get("username", "")
if username in USERS: password = data.get("password", "")
pwd_hash = hashlib.sha256(password.encode()).hexdigest() if username in USERS and hashlib.sha256(password.encode()).hexdigest() == USERS[username]:
if pwd_hash == USERS[username]: self.send_json({"success": True, "token": "safe_linux_2026", "message": "✅ 登录成功!"})
# 返回成功 token return
self.send_json({ self.send_json({"success": False, "error": "❌ 用户名或密码错误"}, 401)
"success": True, return
"token": "safe_linux_2026",
"message": "✅ 登录成功!"
})
return
self.send_json({"success": False, "error": "❌ 用户名或密码错误"}, 401)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 400)
# /api/logout if path == "/api/logout":
elif path == "/api/logout":
self.send_json({"success": True, "message": "👋 已退出登录"}) self.send_json({"success": True, "message": "👋 已退出登录"})
return
else: if path == "/api/reset":
self.send_response(404) SANDBOX.reset()
self.end_headers() self.send_json({"success": True, "message": "♻️ 沙盒环境已重置"})
self.wfile.write(b"Not Found") return
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not Found")
def evaluate_task(self, task: dict[str, Any], state: dict[str, Any]) -> tuple[bool, str]:
cmd = state["cmd"]
output = state["output"]
success_test = task.get("success_test")
if task.get("solution"):
for sol in task["solution"]:
if cmd.strip() == sol.strip():
return True, ""
if not success_test:
return False, "❌ 还没达到任务要求"
try:
ok = bool(eval(success_test, {"__builtins__": {}}, state))
return ok, "❌ 命令执行了,但结果还没达到题目要求"
except Exception:
return False, "❌ 当前判题规则未命中,换个更准确的命令试试"
def build_next_suggestion(self, current_task_id: str) -> str | None:
all_tasks = []
for level in TASKS.get("levels", []):
all_tasks.extend(level.get("challenges", []))
for idx, task in enumerate(all_tasks):
if task.get("id") == current_task_id and idx + 1 < len(all_tasks):
nxt = all_tasks[idx + 1]
return f"下一题:{nxt.get('title', '继续挑战')}"
return None
if __name__ == "__main__": if __name__ == "__main__":
PORT = 8084 PORT = 8084
print(f"🌱 Linux Sandbox Practice Server with Auth 启动中... http://127.0.0.1:{PORT}") print(f"🌱 Linux Sandbox Practice Server 启动中... http://127.0.0.1:{PORT}")
print("📚 地址: https://linux.xiaoxiaoluohao.indevs.in") print("📚 地址: https://linux.xiaoxiaoluohao.indevs.in")
print("🔑 Account: admin / safe_linux_2026") print("🔑 Account: admin / safe_linux_2026")
http.server.HTTPServer(("127.0.0.1", PORT), LinuxSandboxHandler).serve_forever() http.server.ThreadingHTTPServer(("127.0.0.1", PORT), LinuxSandboxHandler).serve_forever()