286 lines
9.5 KiB
Python
286 lines
9.5 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Linux 命令沙盒练习平台 Server
|
|||
|
|
- 集成 sandbox.py 沙盒引擎
|
|||
|
|
- 路由:/ → HTML 页面, /api/run → 命令执行, /api/check → 任务检查
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import http.server
|
|||
|
|
import urllib.parse
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import base64
|
|||
|
|
import hashlib
|
|||
|
|
|
|||
|
|
# ==============================
|
|||
|
|
# 认证配置
|
|||
|
|
# ==============================
|
|||
|
|
# 预置用户:{username: password_hash}
|
|||
|
|
# 生成方式:hashlib.sha256(b"password").hexdigest()
|
|||
|
|
USERS = {
|
|||
|
|
# 默认管理员账户
|
|||
|
|
"admin": hashlib.sha256(b"safe_linux_2026").hexdigest(),
|
|||
|
|
# 可添加更多用户
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# Token 有效期(秒)
|
|||
|
|
TOKEN_TTL = 86400 # 24 小时
|
|||
|
|
|
|||
|
|
|
|||
|
|
def check_auth(request_body: bytes = None) -> tuple[bool, str]:
|
|||
|
|
"""
|
|||
|
|
检查 Authorization 请求头
|
|||
|
|
支持: Bearer token 或 Basic auth
|
|||
|
|
"""
|
|||
|
|
# 默认允许访问(未启用强制认证)
|
|||
|
|
# 启用:true 改为 True
|
|||
|
|
ENABLE_AUTH = False
|
|||
|
|
|
|||
|
|
if not ENABLE_AUTH:
|
|||
|
|
return True, "admin"
|
|||
|
|
|
|||
|
|
if not request_body:
|
|||
|
|
return False, "Missing Authorization header"
|
|||
|
|
|
|||
|
|
# 解析请求头
|
|||
|
|
# 实际在 do_GET 中通过 headers 获取
|
|||
|
|
return True, "admin" # 暂时 Allow-All,后续优化
|
|||
|
|
|
|||
|
|
# 导入沙盒模块
|
|||
|
|
from sandbox import LinuxSandbox
|
|||
|
|
|
|||
|
|
SANDBOX = LinuxSandbox()
|
|||
|
|
|
|||
|
|
# ==============================
|
|||
|
|
# 任务数据加载
|
|||
|
|
# ==============================
|
|||
|
|
TASKS_FILE = os.path.join(os.path.dirname(__file__), "COURSE_TASKS.json")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_tasks():
|
|||
|
|
"""加载课程任务"""
|
|||
|
|
try:
|
|||
|
|
with open(TASKS_FILE, "r", encoding="utf-8") as f:
|
|||
|
|
return json.load(f)
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error loading tasks: {e}")
|
|||
|
|
return {"levels": []}
|
|||
|
|
|
|||
|
|
|
|||
|
|
TASKS = load_tasks()
|
|||
|
|
|
|||
|
|
# ==============================
|
|||
|
|
# Handler 类
|
|||
|
|
# ==============================
|
|||
|
|
class LinuxSandboxHandler(http.server.BaseHTTPRequestHandler):
|
|||
|
|
def send_json(self, data, status=200):
|
|||
|
|
"""发送 JSON 响应"""
|
|||
|
|
self.send_response(status)
|
|||
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|||
|
|
self.end_headers()
|
|||
|
|
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))
|
|||
|
|
|
|||
|
|
def send_text(self, text, status=200):
|
|||
|
|
"""发送纯文本响应"""
|
|||
|
|
self.send_response(status)
|
|||
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|||
|
|
self.end_headers()
|
|||
|
|
self.wfile.write(text.encode("utf-8"))
|
|||
|
|
|
|||
|
|
def do_GET(self):
|
|||
|
|
parsed = urllib.parse.urlparse(self.path)
|
|||
|
|
path = parsed.path
|
|||
|
|
|
|||
|
|
# 🔐 认证检查(跳过 / 登录页 / API 登录)
|
|||
|
|
if path not in ["/", "/login.html", "/api/login", "/api/logout"]:
|
|||
|
|
# 检查 Cookie 或 Authorization
|
|||
|
|
auth_header = self.headers.get("Authorization", "")
|
|||
|
|
token = self.headers.get("X-Token", "")
|
|||
|
|
|
|||
|
|
# 简化:如果带了有效 token 或直接 local 环境(127.0.0.1)则放行
|
|||
|
|
if not self.check_auth(auth_header, token):
|
|||
|
|
# 对公网域名强制认证
|
|||
|
|
if "xiaoxiaoluohao.indevs.in" in self.headers.get("Host", ""):
|
|||
|
|
self.send_json({"error": "Authentication required"}, 401)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 1. 主页:返回 HTML 页面
|
|||
|
|
if path == "/":
|
|||
|
|
html_path = os.path.join(os.path.dirname(__file__), "index.html")
|
|||
|
|
try:
|
|||
|
|
with open(html_path, "rb") as f:
|
|||
|
|
self.send_response(200)
|
|||
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|||
|
|
self.end_headers()
|
|||
|
|
self.wfile.write(f.read())
|
|||
|
|
except Exception as e:
|
|||
|
|
self.send_response(500)
|
|||
|
|
self.send_header("Content-Type", "text/plain")
|
|||
|
|
self.end_headers()
|
|||
|
|
self.wfile.write(f"Error: {e}".encode())
|
|||
|
|
|
|||
|
|
# 2. 命令执行 API
|
|||
|
|
elif path == "/api/run":
|
|||
|
|
query = urllib.parse.parse_qs(parsed.query)
|
|||
|
|
cmd = query.get("cmd", [""])[0]
|
|||
|
|
if not cmd:
|
|||
|
|
self.send_json({"error": "No command provided"}, 400)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 执行沙盒命令
|
|||
|
|
result = SANDBOX.execute(cmd)
|
|||
|
|
self.send_json(result)
|
|||
|
|
|
|||
|
|
# 3. 任务检查 API
|
|||
|
|
elif path == "/api/check":
|
|||
|
|
query = urllib.parse.parse_qs(parsed.query)
|
|||
|
|
task_id = query.get("task_id", [""])[0]
|
|||
|
|
|
|||
|
|
if not task_id:
|
|||
|
|
self.send_json({"error": "-task_id required"}, 400)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 查找任务
|
|||
|
|
task = None
|
|||
|
|
for level in TASKS.get("levels", []):
|
|||
|
|
for t in level.get("challenges", []):
|
|||
|
|
if t.get("id") == task_id:
|
|||
|
|
task = t
|
|||
|
|
break
|
|||
|
|
if task:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if not task:
|
|||
|
|
self.send_json({"error": "Task not found"}, 404)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 获取当前命令执行结果
|
|||
|
|
current_cmd = query.get("last_cmd", [""])[0]
|
|||
|
|
current_output = query.get("output", [""])[0]
|
|||
|
|
|
|||
|
|
# 检查是否匹配
|
|||
|
|
success = False
|
|||
|
|
message = task.get("fail_message", "❌ 未通过测试")
|
|||
|
|
|
|||
|
|
# 尝试匹配 solution
|
|||
|
|
for sol in task.get("solution", []):
|
|||
|
|
if current_cmd.strip() == sol.strip():
|
|||
|
|
success = True
|
|||
|
|
message = "✅ 回答正确!🎉"
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 如果没匹配上,检查 success_test 逻辑
|
|||
|
|
if not success and "success_test" in task:
|
|||
|
|
if self.evaluate_test(task["success_test"], current_output):
|
|||
|
|
success = True
|
|||
|
|
message = "✅ 回答正确!🎉"
|
|||
|
|
|
|||
|
|
self.send_json({
|
|||
|
|
"task_id": task_id,
|
|||
|
|
"success": success,
|
|||
|
|
"message": message,
|
|||
|
|
"hint": task.get("hint", "💡 没有提示"),
|
|||
|
|
"title": task.get("title", "未知任务"),
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
# 4. 获取课程总览
|
|||
|
|
elif path == "/api/tasks":
|
|||
|
|
self.send_json(TASKS)
|
|||
|
|
|
|||
|
|
# 404
|
|||
|
|
else:
|
|||
|
|
self.send_response(404)
|
|||
|
|
self.send_header("Content-Type", "text/plain")
|
|||
|
|
self.end_headers()
|
|||
|
|
self.wfile.write(b"Not Found")
|
|||
|
|
|
|||
|
|
def check_auth(self, auth_header: str, token: str) -> bool:
|
|||
|
|
"""认证检查:支持 Bearer token / X-Token header"""
|
|||
|
|
# 本地环境直接放行
|
|||
|
|
if self.client_address[0] == "127.0.0.1":
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
# 公网场景:验证 token
|
|||
|
|
if token:
|
|||
|
|
# 简化版:验证预置 token(生产环境应加密存储)
|
|||
|
|
valid_tokens = ["safe_linux_2026"]
|
|||
|
|
return token in valid_tokens
|
|||
|
|
|
|||
|
|
# Bearer token 支持
|
|||
|
|
if auth_header.startswith("Bearer "):
|
|||
|
|
token = auth_header[7:]
|
|||
|
|
return token in ["safe_linux_2026"]
|
|||
|
|
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def evaluate_test(self, test_expr: str, output: str) -> bool:
|
|||
|
|
"""简单评估 success_test 表达式"""
|
|||
|
|
try:
|
|||
|
|
# 支持简单的 if/contains 语法
|
|||
|
|
# if output contains 'xxx' then pass
|
|||
|
|
if "contains" in test_expr:
|
|||
|
|
parts = test_expr.split("'")
|
|||
|
|
if len(parts) >= 3:
|
|||
|
|
keyword = parts[1]
|
|||
|
|
return keyword in output
|
|||
|
|
elif "==" in test_expr:
|
|||
|
|
parts = test_expr.split("==")
|
|||
|
|
if len(parts) == 2:
|
|||
|
|
expected = parts[1].strip().strip("'")
|
|||
|
|
return output.strip() == expected
|
|||
|
|
return False
|
|||
|
|
except Exception:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def log_message(self, format, *args):
|
|||
|
|
pass # Suppress logging
|
|||
|
|
|
|||
|
|
# ==============================
|
|||
|
|
# Auth APIs
|
|||
|
|
# ==============================
|
|||
|
|
def do_POST(self):
|
|||
|
|
parsed = urllib.parse.urlparse(self.path)
|
|||
|
|
path = parsed.path
|
|||
|
|
|
|||
|
|
# /api/login
|
|||
|
|
if path == "/api/login":
|
|||
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|||
|
|
body = self.rfile.read(content_length).decode()
|
|||
|
|
try:
|
|||
|
|
data = json.loads(body)
|
|||
|
|
username = data.get("username", "")
|
|||
|
|
password = data.get("password", "")
|
|||
|
|
|
|||
|
|
# 简单验证
|
|||
|
|
if username in USERS:
|
|||
|
|
pwd_hash = hashlib.sha256(password.encode()).hexdigest()
|
|||
|
|
if pwd_hash == USERS[username]:
|
|||
|
|
# 返回成功 token
|
|||
|
|
self.send_json({
|
|||
|
|
"success": True,
|
|||
|
|
"token": "safe_linux_2026",
|
|||
|
|
"message": "✅ 登录成功!"
|
|||
|
|
})
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
self.send_json({"success": False, "error": "❌ 用户名或密码错误"}, 401)
|
|||
|
|
except Exception as e:
|
|||
|
|
self.send_json({"success": False, "error": str(e)}, 400)
|
|||
|
|
|
|||
|
|
# /api/logout
|
|||
|
|
elif path == "/api/logout":
|
|||
|
|
self.send_json({"success": True, "message": "👋 已退出登录"})
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
self.send_response(404)
|
|||
|
|
self.end_headers()
|
|||
|
|
self.wfile.write(b"Not Found")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
PORT = 8084
|
|||
|
|
print(f"🌱 Linux Sandbox Practice Server with Auth 启动中... http://127.0.0.1:{PORT}")
|
|||
|
|
print("📚 地址: https://linux.xiaoxiaoluohao.indevs.in")
|
|||
|
|
print("🔑 Account: admin / safe_linux_2026")
|
|||
|
|
http.server.HTTPServer(("127.0.0.1", PORT), LinuxSandboxHandler).serve_forever()
|