286 lines
9.5 KiB
Python
Executable File
286 lines
9.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Linux 命令沙盒练习平台 Server
|
||
- 集成 sandbox.py 沙盒引擎
|
||
- 路由:/ → HTML 页面, /api/run → 命令执行, /api/check → 任务检查
|
||
"""
|
||
|
||
import http.server
|
||
import urllib.parse
|
||
import json
|
||
import os
|
||
import base64
|
||
import hashlib
|
||
|
||
# ==============================
|
||
# 认证配置
|
||
# ==============================
|
||
# 预置用户:{username: password_hash}
|
||
# 生成方式:hashlib.sha256(b"password").hexdigest()
|
||
USERS = {
|
||
# 默认管理员账户
|
||
"admin": hashlib.sha256(b"safe_linux_2026").hexdigest(),
|
||
# 可添加更多用户
|
||
}
|
||
|
||
# Token 有效期(秒)
|
||
TOKEN_TTL = 86400 # 24 小时
|
||
|
||
|
||
def check_auth(request_body: bytes = None) -> tuple[bool, str]:
|
||
"""
|
||
检查 Authorization 请求头
|
||
支持: Bearer token 或 Basic auth
|
||
"""
|
||
# 默认允许访问(未启用强制认证)
|
||
# 启用:true 改为 True
|
||
ENABLE_AUTH = False
|
||
|
||
if not ENABLE_AUTH:
|
||
return True, "admin"
|
||
|
||
if not request_body:
|
||
return False, "Missing Authorization header"
|
||
|
||
# 解析请求头
|
||
# 实际在 do_GET 中通过 headers 获取
|
||
return True, "admin" # 暂时 Allow-All,后续优化
|
||
|
||
# 导入沙盒模块
|
||
from sandbox import LinuxSandbox
|
||
|
||
SANDBOX = LinuxSandbox()
|
||
|
||
# ==============================
|
||
# 任务数据加载
|
||
# ==============================
|
||
TASKS_FILE = os.path.join(os.path.dirname(__file__), "COURSE_TASKS.json")
|
||
|
||
|
||
def load_tasks():
|
||
"""加载课程任务"""
|
||
try:
|
||
with open(TASKS_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
print(f"Error loading tasks: {e}")
|
||
return {"levels": []}
|
||
|
||
|
||
TASKS = load_tasks()
|
||
|
||
# ==============================
|
||
# Handler 类
|
||
# ==============================
|
||
class LinuxSandboxHandler(http.server.BaseHTTPRequestHandler):
|
||
def send_json(self, data, status=200):
|
||
"""发送 JSON 响应"""
|
||
self.send_response(status)
|
||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))
|
||
|
||
def send_text(self, text, status=200):
|
||
"""发送纯文本响应"""
|
||
self.send_response(status)
|
||
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write(text.encode("utf-8"))
|
||
|
||
def do_GET(self):
|
||
parsed = urllib.parse.urlparse(self.path)
|
||
path = parsed.path
|
||
|
||
# 🔐 认证检查(跳过 / 登录页 / API 登录)
|
||
if path not in ["/", "/login.html", "/api/login", "/api/logout"]:
|
||
# 检查 Cookie 或 Authorization
|
||
auth_header = self.headers.get("Authorization", "")
|
||
token = self.headers.get("X-Token", "")
|
||
|
||
# 简化:如果带了有效 token 或直接 local 环境(127.0.0.1)则放行
|
||
if not self.check_auth(auth_header, token):
|
||
# 对公网域名强制认证
|
||
if "xiaoxiaoluohao.indevs.in" in self.headers.get("Host", ""):
|
||
self.send_json({"error": "Authentication required"}, 401)
|
||
return
|
||
|
||
# 1. 主页:返回 HTML 页面
|
||
if path == "/":
|
||
html_path = os.path.join(os.path.dirname(__file__), "index.html")
|
||
try:
|
||
with open(html_path, "rb") as f:
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write(f.read())
|
||
except Exception as e:
|
||
self.send_response(500)
|
||
self.send_header("Content-Type", "text/plain")
|
||
self.end_headers()
|
||
self.wfile.write(f"Error: {e}".encode())
|
||
|
||
# 2. 命令执行 API
|
||
elif path == "/api/run":
|
||
query = urllib.parse.parse_qs(parsed.query)
|
||
cmd = query.get("cmd", [""])[0]
|
||
if not cmd:
|
||
self.send_json({"error": "No command provided"}, 400)
|
||
return
|
||
|
||
# 执行沙盒命令
|
||
result = SANDBOX.execute(cmd)
|
||
self.send_json(result)
|
||
|
||
# 3. 任务检查 API
|
||
elif path == "/api/check":
|
||
query = urllib.parse.parse_qs(parsed.query)
|
||
task_id = query.get("task_id", [""])[0]
|
||
|
||
if not task_id:
|
||
self.send_json({"error": "-task_id required"}, 400)
|
||
return
|
||
|
||
# 查找任务
|
||
task = None
|
||
for level in TASKS.get("levels", []):
|
||
for t in level.get("challenges", []):
|
||
if t.get("id") == task_id:
|
||
task = t
|
||
break
|
||
if task:
|
||
break
|
||
|
||
if not task:
|
||
self.send_json({"error": "Task not found"}, 404)
|
||
return
|
||
|
||
# 获取当前命令执行结果
|
||
current_cmd = query.get("last_cmd", [""])[0]
|
||
current_output = query.get("output", [""])[0]
|
||
|
||
# 检查是否匹配
|
||
success = False
|
||
message = task.get("fail_message", "❌ 未通过测试")
|
||
|
||
# 尝试匹配 solution
|
||
for sol in task.get("solution", []):
|
||
if current_cmd.strip() == sol.strip():
|
||
success = True
|
||
message = "✅ 回答正确!🎉"
|
||
break
|
||
|
||
# 如果没匹配上,检查 success_test 逻辑
|
||
if not success and "success_test" in task:
|
||
if self.evaluate_test(task["success_test"], current_output):
|
||
success = True
|
||
message = "✅ 回答正确!🎉"
|
||
|
||
self.send_json({
|
||
"task_id": task_id,
|
||
"success": success,
|
||
"message": message,
|
||
"hint": task.get("hint", "💡 没有提示"),
|
||
"title": task.get("title", "未知任务"),
|
||
})
|
||
|
||
# 4. 获取课程总览
|
||
elif path == "/api/tasks":
|
||
self.send_json(TASKS)
|
||
|
||
# 404
|
||
else:
|
||
self.send_response(404)
|
||
self.send_header("Content-Type", "text/plain")
|
||
self.end_headers()
|
||
self.wfile.write(b"Not Found")
|
||
|
||
def check_auth(self, auth_header: str, token: str) -> bool:
|
||
"""认证检查:支持 Bearer token / X-Token header"""
|
||
# 本地环境直接放行
|
||
if self.client_address[0] == "127.0.0.1":
|
||
return True
|
||
|
||
# 公网场景:验证 token
|
||
if token:
|
||
# 简化版:验证预置 token(生产环境应加密存储)
|
||
valid_tokens = ["safe_linux_2026"]
|
||
return token in valid_tokens
|
||
|
||
# Bearer token 支持
|
||
if auth_header.startswith("Bearer "):
|
||
token = auth_header[7:]
|
||
return token in ["safe_linux_2026"]
|
||
|
||
return False
|
||
|
||
def evaluate_test(self, test_expr: str, output: str) -> bool:
|
||
"""简单评估 success_test 表达式"""
|
||
try:
|
||
# 支持简单的 if/contains 语法
|
||
# if output contains 'xxx' then pass
|
||
if "contains" in test_expr:
|
||
parts = test_expr.split("'")
|
||
if len(parts) >= 3:
|
||
keyword = parts[1]
|
||
return keyword in output
|
||
elif "==" in test_expr:
|
||
parts = test_expr.split("==")
|
||
if len(parts) == 2:
|
||
expected = parts[1].strip().strip("'")
|
||
return output.strip() == expected
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
def log_message(self, format, *args):
|
||
pass # Suppress logging
|
||
|
||
# ==============================
|
||
# Auth APIs
|
||
# ==============================
|
||
def do_POST(self):
|
||
parsed = urllib.parse.urlparse(self.path)
|
||
path = parsed.path
|
||
|
||
# /api/login
|
||
if path == "/api/login":
|
||
content_length = int(self.headers.get("Content-Length", 0))
|
||
body = self.rfile.read(content_length).decode()
|
||
try:
|
||
data = json.loads(body)
|
||
username = data.get("username", "")
|
||
password = data.get("password", "")
|
||
|
||
# 简单验证
|
||
if username in USERS:
|
||
pwd_hash = hashlib.sha256(password.encode()).hexdigest()
|
||
if pwd_hash == USERS[username]:
|
||
# 返回成功 token
|
||
self.send_json({
|
||
"success": True,
|
||
"token": "safe_linux_2026",
|
||
"message": "✅ 登录成功!"
|
||
})
|
||
return
|
||
|
||
self.send_json({"success": False, "error": "❌ 用户名或密码错误"}, 401)
|
||
except Exception as e:
|
||
self.send_json({"success": False, "error": str(e)}, 400)
|
||
|
||
# /api/logout
|
||
elif path == "/api/logout":
|
||
self.send_json({"success": True, "message": "👋 已退出登录"})
|
||
|
||
else:
|
||
self.send_response(404)
|
||
self.end_headers()
|
||
self.wfile.write(b"Not Found")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
PORT = 8084
|
||
print(f"🌱 Linux Sandbox Practice Server with Auth 启动中... http://127.0.0.1:{PORT}")
|
||
print("📚 地址: https://linux.xiaoxiaoluohao.indevs.in")
|
||
print("🔑 Account: admin / safe_linux_2026")
|
||
http.server.HTTPServer(("127.0.0.1", PORT), LinuxSandboxHandler).serve_forever()
|