#!/usr/bin/env python3 """ Linux 命令沙盒练习平台 Server - 集成 sandbox.py 沙盒引擎 - 路由:/ → HTML 页面, /api/run → 命令执行, /api/check → 任务检查 """ import http.server import urllib.parse import json import os import base64 import hashlib # ============================== # 认证配置 # ============================== # 预置用户:{username: password_hash} # 生成方式:hashlib.sha256(b"password").hexdigest() USERS = { # 默认管理员账户 "admin": hashlib.sha256(b"safe_linux_2026").hexdigest(), # 可添加更多用户 } # Token 有效期(秒) TOKEN_TTL = 86400 # 24 小时 def check_auth(request_body: bytes = None) -> tuple[bool, str]: """ 检查 Authorization 请求头 支持: Bearer token 或 Basic auth """ # 默认允许访问(未启用强制认证) # 启用:true 改为 True ENABLE_AUTH = False if not ENABLE_AUTH: return True, "admin" if not request_body: return False, "Missing Authorization header" # 解析请求头 # 实际在 do_GET 中通过 headers 获取 return True, "admin" # 暂时 Allow-All,后续优化 # 导入沙盒模块 from sandbox import LinuxSandbox SANDBOX = LinuxSandbox() # ============================== # 任务数据加载 # ============================== TASKS_FILE = os.path.join(os.path.dirname(__file__), "COURSE_TASKS.json") def load_tasks(): """加载课程任务""" try: with open(TASKS_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"Error loading tasks: {e}") return {"levels": []} TASKS = load_tasks() # ============================== # Handler 类 # ============================== class LinuxSandboxHandler(http.server.BaseHTTPRequestHandler): def send_json(self, data, status=200): """发送 JSON 响应""" self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8")) def send_text(self, text, status=200): """发送纯文本响应""" self.send_response(status) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() self.wfile.write(text.encode("utf-8")) def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path # 🔐 认证检查(跳过 / 登录页 / API 登录) if path not in ["/", "/login.html", "/api/login", "/api/logout"]: # 检查 Cookie 或 Authorization auth_header = self.headers.get("Authorization", "") token = self.headers.get("X-Token", "") # 简化:如果带了有效 token 或直接 local 环境(127.0.0.1)则放行 if not self.check_auth(auth_header, token): # 对公网域名强制认证 if "xiaoxiaoluohao.indevs.in" in self.headers.get("Host", ""): self.send_json({"error": "Authentication required"}, 401) return # 1. 主页:返回 HTML 页面 if path == "/": html_path = os.path.join(os.path.dirname(__file__), "index.html") try: with open(html_path, "rb") as f: self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(f.read()) except Exception as e: self.send_response(500) self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(f"Error: {e}".encode()) # 2. 命令执行 API elif path == "/api/run": query = urllib.parse.parse_qs(parsed.query) cmd = query.get("cmd", [""])[0] if not cmd: self.send_json({"error": "No command provided"}, 400) return # 执行沙盒命令 result = SANDBOX.execute(cmd) self.send_json(result) # 3. 任务检查 API elif path == "/api/check": query = urllib.parse.parse_qs(parsed.query) task_id = query.get("task_id", [""])[0] if not task_id: self.send_json({"error": "-task_id required"}, 400) return # 查找任务 task = None for level in TASKS.get("levels", []): for t in level.get("challenges", []): if t.get("id") == task_id: task = t break if task: break if not task: self.send_json({"error": "Task not found"}, 404) return # 获取当前命令执行结果 current_cmd = query.get("last_cmd", [""])[0] current_output = query.get("output", [""])[0] # 检查是否匹配 success = False message = task.get("fail_message", "❌ 未通过测试") # 尝试匹配 solution for sol in task.get("solution", []): if current_cmd.strip() == sol.strip(): success = True message = "✅ 回答正确!🎉" break # 如果没匹配上,检查 success_test 逻辑 if not success and "success_test" in task: if self.evaluate_test(task["success_test"], current_output): success = True message = "✅ 回答正确!🎉" self.send_json({ "task_id": task_id, "success": success, "message": message, "hint": task.get("hint", "💡 没有提示"), "title": task.get("title", "未知任务"), }) # 4. 获取课程总览 elif path == "/api/tasks": self.send_json(TASKS) # 404 else: self.send_response(404) self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(b"Not Found") def check_auth(self, auth_header: str, token: str) -> bool: """认证检查:支持 Bearer token / X-Token header""" # 本地环境直接放行 if self.client_address[0] == "127.0.0.1": return True # 公网场景:验证 token if token: # 简化版:验证预置 token(生产环境应加密存储) valid_tokens = ["safe_linux_2026"] return token in valid_tokens # Bearer token 支持 if auth_header.startswith("Bearer "): token = auth_header[7:] return token in ["safe_linux_2026"] return False def evaluate_test(self, test_expr: str, output: str) -> bool: """简单评估 success_test 表达式""" try: # 支持简单的 if/contains 语法 # if output contains 'xxx' then pass if "contains" in test_expr: parts = test_expr.split("'") if len(parts) >= 3: keyword = parts[1] return keyword in output elif "==" in test_expr: parts = test_expr.split("==") if len(parts) == 2: expected = parts[1].strip().strip("'") return output.strip() == expected return False except Exception: return False def log_message(self, format, *args): pass # Suppress logging # ============================== # Auth APIs # ============================== def do_POST(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path # /api/login if path == "/api/login": content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode() try: data = json.loads(body) username = data.get("username", "") password = data.get("password", "") # 简单验证 if username in USERS: pwd_hash = hashlib.sha256(password.encode()).hexdigest() if pwd_hash == USERS[username]: # 返回成功 token self.send_json({ "success": True, "token": "safe_linux_2026", "message": "✅ 登录成功!" }) return self.send_json({"success": False, "error": "❌ 用户名或密码错误"}, 401) except Exception as e: self.send_json({"success": False, "error": str(e)}, 400) # /api/logout elif path == "/api/logout": self.send_json({"success": True, "message": "👋 已退出登录"}) else: self.send_response(404) self.end_headers() self.wfile.write(b"Not Found") if __name__ == "__main__": PORT = 8084 print(f"🌱 Linux Sandbox Practice Server with Auth 启动中... http://127.0.0.1:{PORT}") print("📚 地址: https://linux.xiaoxiaoluohao.indevs.in") print("🔑 Account: admin / safe_linux_2026") http.server.HTTPServer(("127.0.0.1", PORT), LinuxSandboxHandler).serve_forever()