915 lines
42 KiB
Python
915 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Linux 命令沙盒模拟器(增强版)
|
|
- 零风险:不真实执行系统命令
|
|
- 可变虚拟文件系统:支持创建 / 删除 / 移动 / 改权限
|
|
- 覆盖更多 Linux 常见命令,尽量让课程任务可跑通
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from copy import deepcopy
|
|
from fnmatch import fnmatch
|
|
from datetime import datetime
|
|
import re
|
|
|
|
|
|
BASE_FS = {
|
|
"/": {"type": "dir", "perm": "755", "children": ["users", "projects", "logs", "sandbox_tips", "tmp", "etc", "var", "home", "bin", "usr"]},
|
|
"/home": {"type": "dir", "perm": "755", "children": ["sandbox_user"]},
|
|
"/home/sandbox_user": {"type": "dir", "perm": "755", "children": ["notes.txt", ".bashrc"]},
|
|
"/home/sandbox_user/notes.txt": {"type": "file", "perm": "644", "content": "Practice Linux every day.\n"},
|
|
"/home/sandbox_user/.bashrc": {"type": "file", "perm": "644", "content": "alias ll='ls -l'\nexport PATH=/usr/local/bin:/usr/bin:/bin\n"},
|
|
"/tmp": {"type": "dir", "perm": "777", "children": []},
|
|
"/users": {"type": "dir", "perm": "755", "children": ["alice.txt", "bob.txt", "charlie"]},
|
|
"/users/alice.txt": {"type": "file", "perm": "644", "content": "Hi, I'm Alice. I love Linux!\n"},
|
|
"/users/bob.txt": {"type": "file", "perm": "644", "content": "Hello from Bob. Learning Linux daily.\n"},
|
|
"/users/charlie": {"type": "dir", "perm": "755", "children": ["profile.md", "skills.txt"]},
|
|
"/users/charlie/profile.md": {"type": "file", "perm": "644", "content": "# Charlie\nLoves open source and Linux commands.\n"},
|
|
"/users/charlie/skills.txt": {"type": "file", "perm": "644", "content": "Linux\nPython\nGit\nDocker\n"},
|
|
"/projects": {"type": "dir", "perm": "755", "children": ["web", "backend"]},
|
|
"/projects/web": {"type": "dir", "perm": "755", "children": ["index.html", "style.css"]},
|
|
"/projects/web/index.html": {"type": "file", "perm": "644", "content": "<html><body>Hello Linux Sandbox</body></html>\n"},
|
|
"/projects/web/style.css": {"type": "file", "perm": "644", "content": "body { font-family: sans-serif; }\n"},
|
|
"/projects/backend": {"type": "dir", "perm": "755", "children": ["app.py", "model.py", "utils.py"]},
|
|
"/projects/backend/app.py": {"type": "file", "perm": "644", "content": "def main():\n print('Hello, Linux!')\n\nif __name__ == '__main__':\n main()\n"},
|
|
"/projects/backend/model.py": {"type": "file", "perm": "644", "content": "class User:\n def __init__(self, name):\n self.name = name\n"},
|
|
"/projects/backend/utils.py": {"type": "file", "perm": "644", "content": "def log(msg):\n print(f'[LOG] {msg}')\n"},
|
|
"/logs": {"type": "dir", "perm": "755", "children": ["access.log", "error.log"]},
|
|
"/logs/access.log": {"type": "file", "perm": "644", "content": "2024-01-01 10:00 GET /index.html 200\n2024-01-01 10:01 POST /api/login 200\n2024-01-01 10:02 GET /about 404\n"},
|
|
"/logs/error.log": {"type": "file", "perm": "644", "content": "2024-01-01 10:02 ERROR Page not found\n2024-01-01 10:03 WARNING High memory usage\n"},
|
|
"/sandbox_tips": {"type": "dir", "perm": "755", "children": ["welcome.txt"]},
|
|
"/sandbox_tips/welcome.txt": {"type": "file", "perm": "644", "content": "Welcome to Linux Sandbox!\nTry ls, cat, grep, find, ps, df and more.\n"},
|
|
"/etc": {"type": "dir", "perm": "755", "children": ["passwd", "group", "hosts", "nginx.conf", "ssh.conf", "app.conf", "skel"]},
|
|
"/etc/passwd": {"type": "file", "perm": "644", "content": "root:x:0:0:root:/root:/bin/bash\ndaemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin\nsandbox_user:x:1000:1000:Sandbox User:/home/sandbox_user:/bin/bash\n"},
|
|
"/etc/group": {"type": "file", "perm": "644", "content": "root:x:0:\nusers:x:100:alice,bob\ndocker:x:999:sandbox_user\n"},
|
|
"/etc/hosts": {"type": "file", "perm": "644", "content": "127.0.0.1 localhost\n127.0.1.1 sandbox\n"},
|
|
"/etc/nginx.conf": {"type": "file", "perm": "644", "content": "user nginx;\nworker_processes auto;\n"},
|
|
"/etc/ssh.conf": {"type": "file", "perm": "644", "content": "Port 22\nPermitRootLogin no\n"},
|
|
"/etc/app.conf": {"type": "file", "perm": "644", "content": "APP_ENV=prod\nAPP_DEBUG=false\n"},
|
|
"/etc/skel": {"type": "dir", "perm": "755", "children": [".bash_logout", ".bashrc"]},
|
|
"/etc/skel/.bash_logout": {"type": "file", "perm": "644", "content": "# logout\n"},
|
|
"/etc/skel/.bashrc": {"type": "file", "perm": "644", "content": "# skeleton bashrc\n"},
|
|
"/var": {"type": "dir", "perm": "755", "children": ["log"]},
|
|
"/var/log": {"type": "dir", "perm": "755", "children": ["syslog", "auth.log", "nginx"]},
|
|
"/var/log/syslog": {"type": "file", "perm": "644", "content": "Mar 06 10:00 kernel: system boot complete\nMar 06 10:02 app: error database connection timeout\nMar 06 10:04 sshd: Accepted password for sandbox_user\nMar 06 10:05 nginx: worker started\nMar 06 10:06 app: error failed to load config\n"},
|
|
"/var/log/auth.log": {"type": "file", "perm": "640", "content": "Mar 06 10:01 sshd[123]: Failed password\nMar 06 10:04 sshd[124]: Accepted password for sandbox_user\n"},
|
|
"/var/log/nginx": {"type": "dir", "perm": "755", "children": ["access.log", "error.log"]},
|
|
"/var/log/nginx/access.log": {"type": "file", "perm": "644", "content": "127.0.0.1 - - [06/Mar/2026] \"GET / HTTP/1.1\" 200 612\n"},
|
|
"/var/log/nginx/error.log": {"type": "file", "perm": "644", "content": "2026/03/06 [error] connect() failed\n"},
|
|
"/bin": {"type": "dir", "perm": "755", "children": ["ls", "cat", "grep", "find", "bash"]},
|
|
"/usr": {"type": "dir", "perm": "755", "children": ["bin"]},
|
|
"/usr/bin": {"type": "dir", "perm": "755", "children": ["vim", "curl", "wget", "python3"]},
|
|
}
|
|
|
|
|
|
COMMAND_INDEX = {
|
|
"ls": "/bin/ls",
|
|
"cat": "/bin/cat",
|
|
"grep": "/bin/grep",
|
|
"find": "/bin/find",
|
|
"bash": "/bin/bash",
|
|
"vim": "/usr/bin/vim",
|
|
"curl": "/usr/bin/curl",
|
|
"wget": "/usr/bin/wget",
|
|
"python3": "/usr/bin/python3",
|
|
}
|
|
|
|
|
|
class LinuxSandbox:
|
|
def __init__(self):
|
|
self.reset()
|
|
|
|
def reset(self):
|
|
self.fs = deepcopy(BASE_FS)
|
|
self.cwd = "/"
|
|
self.home = "/home/sandbox_user"
|
|
self.user = "sandbox_user"
|
|
self.history: list[str] = []
|
|
self.env = {"HOME": self.home, "USER": self.user, "PATH": "/usr/local/bin:/usr/bin:/bin", "SHELL": "/bin/bash"}
|
|
self.aliases = {"ll": "ls -l"}
|
|
|
|
# ---------- FS helpers ----------
|
|
def resolve_path(self, path: str | None) -> str:
|
|
if not path or path == ".":
|
|
return self.cwd
|
|
if path == "~":
|
|
return self.home
|
|
if path.startswith("~/"):
|
|
return self.home + path[1:]
|
|
if not path.startswith("/"):
|
|
base = self.cwd.rstrip("/") or "/"
|
|
path = f"{base}/{path}" if base != "/" else f"/{path}"
|
|
parts = []
|
|
for part in path.split("/"):
|
|
if part in ("", "."):
|
|
continue
|
|
if part == "..":
|
|
if parts:
|
|
parts.pop()
|
|
else:
|
|
parts.append(part)
|
|
return "/" + "/".join(parts)
|
|
|
|
def get_node(self, path: str):
|
|
return self.fs.get(path)
|
|
|
|
def exists(self, path: str) -> bool:
|
|
return self.resolve_path(path) in self.fs
|
|
|
|
def is_executable(self, path: str) -> bool:
|
|
node = self.get_node(self.resolve_path(path))
|
|
return bool(node and len(node.get("perm", "")) >= 3 and node["perm"][2] == "1" or node and "x" in node.get("perm_human", ""))
|
|
|
|
def _perm_human(self, perm: str, is_dir: bool) -> str:
|
|
mapping = {
|
|
"0": "---", "1": "--x", "2": "-w-", "3": "-wx",
|
|
"4": "r--", "5": "r-x", "6": "rw-", "7": "rwx",
|
|
}
|
|
if perm.isdigit() and len(perm) == 3:
|
|
return ("d" if is_dir else "-") + "".join(mapping.get(x, "---") for x in perm)
|
|
return ("d" if is_dir else "-") + perm
|
|
|
|
def _ensure_dir(self, path: str):
|
|
if path in self.fs:
|
|
return
|
|
parent = self.parent_dir(path)
|
|
if parent != path and parent not in self.fs:
|
|
self._ensure_dir(parent)
|
|
self.fs[path] = {"type": "dir", "perm": "755", "children": []}
|
|
self._add_child(parent, path.split("/")[-1])
|
|
|
|
def _add_child(self, parent: str, name: str):
|
|
node = self.fs.get(parent)
|
|
if node and node["type"] == "dir":
|
|
node.setdefault("children", [])
|
|
if name not in node["children"]:
|
|
node["children"].append(name)
|
|
|
|
def _remove_child(self, parent: str, name: str):
|
|
node = self.fs.get(parent)
|
|
if node and node["type"] == "dir" and name in node.get("children", []):
|
|
node["children"].remove(name)
|
|
|
|
def parent_dir(self, path: str) -> str:
|
|
if path == "/":
|
|
return "/"
|
|
parts = path.rstrip("/").split("/")
|
|
if len(parts) <= 2:
|
|
return "/"
|
|
return "/" + "/".join(parts[1:-1])
|
|
|
|
def _copy_node(self, src: str, dst: str):
|
|
node = deepcopy(self.fs[src])
|
|
self.fs[dst] = node
|
|
self._add_child(self.parent_dir(dst), dst.split("/")[-1])
|
|
if node["type"] == "dir":
|
|
old_children = list(node.get("children", []))
|
|
node["children"] = []
|
|
for child in old_children:
|
|
self._copy_node(f"{src.rstrip('/')}/{child}", f"{dst.rstrip('/')}/{child}")
|
|
|
|
def _delete_node(self, path: str):
|
|
node = self.fs.get(path)
|
|
if not node:
|
|
return
|
|
if node["type"] == "dir":
|
|
for child in list(node.get("children", [])):
|
|
self._delete_node(f"{path.rstrip('/')}/{child}")
|
|
parent = self.parent_dir(path)
|
|
self._remove_child(parent, path.split("/")[-1])
|
|
self.fs.pop(path, None)
|
|
|
|
def _safe_shell_split(self, cmd: str) -> list[str]:
|
|
matches = re.findall(r"'[^']*'|\"[^\"]*\"|\S+", cmd)
|
|
return [m[1:-1] if (m.startswith("'") and m.endswith("'")) or (m.startswith('"') and m.endswith('"')) else m for m in matches]
|
|
|
|
def _split_pipes(self, raw: str) -> list[str]:
|
|
"""Split a command string by pipes, respecting simple quotes.
|
|
|
|
Minimal support for common teaching use-cases like:
|
|
- ss -ltnp | grep 80
|
|
- cat file | tail -n 20
|
|
"""
|
|
parts: list[str] = []
|
|
buf: list[str] = []
|
|
in_sq = False
|
|
in_dq = False
|
|
for ch in raw:
|
|
if ch == "'" and not in_dq:
|
|
in_sq = not in_sq
|
|
elif ch == '"' and not in_sq:
|
|
in_dq = not in_dq
|
|
if ch == "|" and not in_sq and not in_dq:
|
|
seg = "".join(buf).strip()
|
|
if seg:
|
|
parts.append(seg)
|
|
buf = []
|
|
continue
|
|
buf.append(ch)
|
|
tail = "".join(buf).strip()
|
|
if tail:
|
|
parts.append(tail)
|
|
return parts
|
|
|
|
def _expand_alias(self, cmd: str) -> str:
|
|
if not cmd:
|
|
return cmd
|
|
head = cmd.split()[0]
|
|
if head in self.aliases:
|
|
rest = cmd[len(head):].lstrip()
|
|
return f"{self.aliases[head]} {rest}".strip()
|
|
return cmd
|
|
|
|
def _expand_env(self, text: str) -> str:
|
|
for key, value in self.env.items():
|
|
text = text.replace(f"${key}", value)
|
|
return text
|
|
|
|
# ---------- Commands ----------
|
|
def _simulate_ls(self, args: list[str]) -> str:
|
|
flags = {a for a in args if a.startswith("-")}
|
|
targets = [a for a in args if not a.startswith("-")] or [self.cwd]
|
|
lines = []
|
|
for target in targets:
|
|
path = self.resolve_path(target)
|
|
node = self.get_node(path)
|
|
if not node:
|
|
lines.append(f"ls: cannot access '{target}': No such file or directory")
|
|
continue
|
|
if node["type"] == "file":
|
|
if "-l" in flags:
|
|
lines.append(f"{self._perm_human(node.get('perm','644'), False)} 1 sandbox sandbox {len(node.get('content','')):>4} Mar 6 10:00 {path.split('/')[-1]}")
|
|
else:
|
|
lines.append(path.split("/")[-1])
|
|
continue
|
|
children = sorted(node.get("children", []))
|
|
if "-a" in flags:
|
|
children = [".", ".."] + children
|
|
if "-l" in flags:
|
|
lines.append("total " + str(max(len(children), 1)))
|
|
for child in children:
|
|
if child in (".", ".."):
|
|
display_path = path if child == "." else self.parent_dir(path)
|
|
cnode = self.get_node(display_path) or {"type": "dir", "perm": "755", "children": []}
|
|
name = child
|
|
else:
|
|
display_path = f"{path.rstrip('/')}/{child}" if path != "/" else f"/{child}"
|
|
cnode = self.get_node(display_path)
|
|
name = child
|
|
if not cnode:
|
|
continue
|
|
is_dir = cnode["type"] == "dir"
|
|
size = len(cnode.get("content", "")) if not is_dir else max(4, len(cnode.get("children", [])) * 4)
|
|
lines.append(f"{self._perm_human(cnode.get('perm','755' if is_dir else '644'), is_dir)} 1 sandbox sandbox {size:>4} Mar 6 10:00 {name}")
|
|
else:
|
|
lines.append(" ".join(children))
|
|
return "\n".join(line for line in lines if line != "")
|
|
|
|
def _simulate_pwd(self) -> str:
|
|
return self.cwd
|
|
|
|
def _simulate_cd(self, args: list[str]) -> str:
|
|
target = args[0] if args else self.home
|
|
if target == "-":
|
|
target = self.home
|
|
path = self.resolve_path(target)
|
|
node = self.get_node(path)
|
|
if not node or node["type"] != "dir":
|
|
return f"cd: {target}: No such file or directory"
|
|
self.cwd = path
|
|
return self.cwd
|
|
|
|
def _simulate_echo(self, args: list[str]) -> str:
|
|
text = " ".join(args)
|
|
return self._expand_env(text)
|
|
|
|
def _simulate_cat(self, args: list[str]) -> str:
|
|
if not args:
|
|
return "cat: missing operand"
|
|
number = "-n" in args
|
|
files = [a for a in args if not a.startswith("-")]
|
|
out = []
|
|
for path_arg in files:
|
|
path = self.resolve_path(path_arg)
|
|
node = self.get_node(path)
|
|
if not node:
|
|
out.append(f"cat: {path_arg}: No such file or directory")
|
|
continue
|
|
if node["type"] != "file":
|
|
out.append(f"cat: {path_arg}: Is a directory")
|
|
continue
|
|
content = node.get("content", "")
|
|
if number:
|
|
content = "\n".join(f"{i+1:>6} {line}" for i, line in enumerate(content.rstrip("\n").split("\n")))
|
|
out.append(content.rstrip("\n"))
|
|
return "\n".join(out)
|
|
|
|
def _simulate_head_tail(self, cmd_name: str, args: list[str]) -> str:
|
|
count = 10
|
|
files = []
|
|
i = 0
|
|
while i < len(args):
|
|
if args[i] in ("-n",):
|
|
count = int(args[i + 1])
|
|
i += 2
|
|
elif args[i].startswith("-") and args[i][1:].isdigit():
|
|
count = int(args[i][1:])
|
|
i += 1
|
|
elif args[i] == "-f":
|
|
files.append("-f")
|
|
i += 1
|
|
else:
|
|
files.append(args[i])
|
|
i += 1
|
|
if "-f" in files:
|
|
target = files[-1] if len(files) > 1 else "/var/log/syslog"
|
|
return f"==> {target} <==\n(log streaming simulated)\nPress Ctrl+C to exit"
|
|
if not files:
|
|
return f"{cmd_name}: missing file operand"
|
|
node = self.get_node(self.resolve_path(files[-1]))
|
|
if not node or node["type"] != "file":
|
|
return f"{cmd_name}: cannot open '{files[-1]}'"
|
|
lines = node.get("content", "").rstrip("\n").split("\n")
|
|
picked = lines[:count] if cmd_name == "head" else lines[-count:]
|
|
return "\n".join(picked)
|
|
|
|
def _simulate_grep(self, args: list[str]) -> str:
|
|
flags = {a for a in args if a.startswith("-")}
|
|
items = [a for a in args if not a.startswith("-")]
|
|
if len(items) < 2:
|
|
return "grep: pattern or file missing"
|
|
pattern, *targets = items
|
|
insensitive = "-i" in flags
|
|
recursive = "-r" in flags
|
|
show_num = "-n" in flags
|
|
invert = "-v" in flags
|
|
results = []
|
|
for target in targets:
|
|
resolved = self.resolve_path(target)
|
|
candidate_paths = []
|
|
if "*" in target:
|
|
regex = target.replace("*", ".*")
|
|
candidate_paths = [p for p in self.fs if re.fullmatch(self.resolve_path(regex).replace(".", r"\.").replace(".*", ".*"), p)]
|
|
elif recursive and self.get_node(resolved) and self.get_node(resolved)["type"] == "dir":
|
|
candidate_paths = [p for p in self.fs if p.startswith(resolved.rstrip("/") + "/") and self.fs[p]["type"] == "file"]
|
|
else:
|
|
candidate_paths = [resolved]
|
|
for path in candidate_paths:
|
|
node = self.get_node(path)
|
|
if not node or node["type"] != "file":
|
|
continue
|
|
for idx, line in enumerate(node.get("content", "").splitlines(), 1):
|
|
hay = line.lower() if insensitive else line
|
|
needle = pattern.lower() if insensitive else pattern
|
|
matched = needle in hay
|
|
if invert:
|
|
matched = not matched
|
|
if matched:
|
|
prefix = []
|
|
if recursive or len(candidate_paths) > 1:
|
|
prefix.append(path)
|
|
if show_num:
|
|
prefix.append(str(idx))
|
|
results.append((":".join(prefix) + (":" if prefix else "")) + line)
|
|
return "\n".join(results)
|
|
|
|
def _simulate_grep_stdin(self, args: list[str], stdin: str) -> str:
|
|
flags = {a for a in args if a.startswith("-")}
|
|
items = [a for a in args if not a.startswith("-")]
|
|
if not items:
|
|
return "grep: pattern missing"
|
|
pattern = items[0]
|
|
insensitive = "-i" in flags
|
|
show_num = "-n" in flags
|
|
invert = "-v" in flags
|
|
out: list[str] = []
|
|
for idx, line in enumerate((stdin or "").splitlines(), 1):
|
|
hay = line.lower() if insensitive else line
|
|
needle = pattern.lower() if insensitive else pattern
|
|
matched = needle in hay
|
|
if invert:
|
|
matched = not matched
|
|
if matched:
|
|
out.append((f"{idx}:" if show_num else "") + line)
|
|
return "\n".join(out)
|
|
|
|
def _simulate_sort_stdin(self, args: list[str], stdin: str) -> str:
|
|
lines = (stdin or "").splitlines()
|
|
numeric = "-n" in args
|
|
reverse = "-r" in args
|
|
|
|
def key_fn(x: str):
|
|
if numeric:
|
|
try:
|
|
return float(re.findall(r"[-+]?[0-9]*\.?[0-9]+", x)[0])
|
|
except Exception:
|
|
return 0.0
|
|
return x
|
|
|
|
lines_sorted = sorted(lines, key=key_fn, reverse=reverse)
|
|
return "\n".join(lines_sorted)
|
|
|
|
def _simulate_uniq_stdin(self, args: list[str], stdin: str) -> str:
|
|
lines = (stdin or "").splitlines()
|
|
count = "-c" in args
|
|
out: list[str] = []
|
|
prev = None
|
|
n = 0
|
|
# add a sentinel to flush last group
|
|
for line in lines + [None]:
|
|
if line == prev:
|
|
n += 1
|
|
continue
|
|
if prev is not None:
|
|
out.append((f"{n} {prev}" if count else prev))
|
|
prev = line
|
|
n = 1
|
|
return "\n".join(out)
|
|
|
|
def _simulate_cut_stdin(self, args: list[str], stdin: str) -> str:
|
|
delim = "\t"
|
|
fields = None
|
|
if "-d" in args:
|
|
i = args.index("-d")
|
|
if i + 1 < len(args):
|
|
delim = args[i + 1]
|
|
if "-f" in args:
|
|
i = args.index("-f")
|
|
if i + 1 < len(args):
|
|
raw = args[i + 1]
|
|
try:
|
|
fields = [int(x) for x in raw.split(",")]
|
|
except Exception:
|
|
fields = None
|
|
out: list[str] = []
|
|
for line in (stdin or "").splitlines():
|
|
parts = line.split(delim)
|
|
if not fields:
|
|
out.append(line)
|
|
continue
|
|
picked = []
|
|
for f in fields:
|
|
if 1 <= f <= len(parts):
|
|
picked.append(parts[f - 1])
|
|
out.append(delim.join(picked))
|
|
return "\n".join(out)
|
|
|
|
def _simulate_awk_stdin(self, args: list[str], stdin: str) -> str:
|
|
prog = " ".join(args).strip()
|
|
m = re.search(r"print\s+([^}]+)", prog)
|
|
if not m:
|
|
return "awk output simulated"
|
|
expr = m.group(1).strip()
|
|
cols = []
|
|
for tok in re.split(r"\s*,\s*|\s+", expr):
|
|
tok = tok.strip()
|
|
if tok.startswith("$"):
|
|
try:
|
|
cols.append(int(tok[1:]))
|
|
except Exception:
|
|
pass
|
|
out: list[str] = []
|
|
for line in (stdin or "").splitlines():
|
|
parts = line.split()
|
|
picked = []
|
|
for c in cols:
|
|
if 1 <= c <= len(parts):
|
|
picked.append(parts[c - 1])
|
|
out.append(" ".join(picked) if picked else "")
|
|
return "\n".join(out)
|
|
|
|
def _simulate_sed_stdin(self, args: list[str], stdin: str) -> str:
|
|
prog = " ".join(args).strip().strip("'\"")
|
|
m = re.match(r"s/(.*?)/(.*?)/([g]?)", prog)
|
|
if not m:
|
|
return "sed output simulated"
|
|
old, new, flags = m.group(1), m.group(2), m.group(3)
|
|
out_lines = []
|
|
for line in (stdin or "").splitlines():
|
|
if flags == "g":
|
|
out_lines.append(line.replace(old, new))
|
|
else:
|
|
out_lines.append(line.replace(old, new, 1))
|
|
return "\n".join(out_lines)
|
|
|
|
def _simulate_find(self, args: list[str]) -> str:
|
|
path = self.cwd
|
|
file_type = None
|
|
name_pattern = "*"
|
|
has_exec = False
|
|
i = 0
|
|
while i < len(args):
|
|
arg = args[i]
|
|
if arg == "-type" and i + 1 < len(args):
|
|
file_type = args[i + 1]
|
|
i += 2
|
|
elif arg == "-name" and i + 1 < len(args):
|
|
name_pattern = args[i + 1].strip("'\"")
|
|
i += 2
|
|
elif arg == "-exec":
|
|
has_exec = True
|
|
break
|
|
elif arg.startswith("-"):
|
|
i += 1
|
|
else:
|
|
path = self.resolve_path(arg)
|
|
i += 1
|
|
results = []
|
|
for key, node in self.fs.items():
|
|
if not key.startswith(path.rstrip("/") if path != "/" else "/"):
|
|
continue
|
|
if key == path:
|
|
continue
|
|
if file_type == "f" and node["type"] != "file":
|
|
continue
|
|
if file_type == "d" and node["type"] != "dir":
|
|
continue
|
|
if fnmatch(key.split("/")[-1], name_pattern):
|
|
results.append(key)
|
|
if has_exec and "rm" in " ".join(args):
|
|
for result in list(results):
|
|
self._delete_node(result)
|
|
return "\n".join(results)
|
|
return "\n".join(results)
|
|
|
|
def _simulate_du(self, args: list[str]) -> str:
|
|
target = next((a for a in args if not a.startswith("-")), self.cwd)
|
|
target = self.resolve_path(target)
|
|
node = self.get_node(target)
|
|
if not node:
|
|
return f"du: cannot access '{target}'"
|
|
def calc_size(path: str) -> int:
|
|
item = self.fs[path]
|
|
if item["type"] == "file":
|
|
return max(1, len(item.get("content", "")) // 16)
|
|
return 4 + sum(calc_size(f"{path.rstrip('/')}/{child}") for child in item.get("children", []))
|
|
size = calc_size(target)
|
|
suffix = "K"
|
|
if "-h" in args or "-sh" in args:
|
|
return f"{size}{suffix}\t{target}"
|
|
return f"{size}\t{target}"
|
|
|
|
def _simulate_wc(self, args: list[str]) -> str:
|
|
flag = next((a for a in args if a.startswith("-")), "")
|
|
target = next((a for a in args if not a.startswith("-")), None)
|
|
content = ""
|
|
if target:
|
|
node = self.get_node(self.resolve_path(target))
|
|
if node and node["type"] == "file":
|
|
content = node.get("content", "")
|
|
lines = len(content.splitlines())
|
|
words = len(content.split())
|
|
chars = len(content)
|
|
if flag == "-l":
|
|
return str(lines)
|
|
if flag == "-w":
|
|
return str(words)
|
|
if flag == "-c":
|
|
return str(chars)
|
|
return f"{lines:>4} {words:>4} {chars:>4}"
|
|
|
|
def _simulate_mkdir(self, args: list[str]) -> str:
|
|
if not args:
|
|
return "mkdir: missing operand"
|
|
recursive = "-p" in args
|
|
targets = [a for a in args if not a.startswith("-")]
|
|
for target in targets:
|
|
path = self.resolve_path(target)
|
|
parent = self.parent_dir(path)
|
|
if self.exists(path):
|
|
continue
|
|
if not recursive and not self.exists(parent):
|
|
return f"mkdir: cannot create directory '{target}': No such file or directory"
|
|
if recursive:
|
|
self._ensure_dir(parent)
|
|
self.fs[path] = {"type": "dir", "perm": "755", "children": []}
|
|
self._add_child(parent, path.split("/")[-1])
|
|
return ""
|
|
|
|
def _simulate_touch(self, args: list[str]) -> str:
|
|
if not args:
|
|
return "touch: missing file operand"
|
|
for target in args:
|
|
path = self.resolve_path(target)
|
|
if not self.exists(path):
|
|
parent = self.parent_dir(path)
|
|
if not self.exists(parent):
|
|
return f"touch: cannot touch '{target}': No such file or directory"
|
|
self.fs[path] = {"type": "file", "perm": "644", "content": ""}
|
|
self._add_child(parent, path.split("/")[-1])
|
|
return ""
|
|
|
|
def _simulate_cp(self, args: list[str]) -> str:
|
|
opts = {a for a in args if a.startswith("-")}
|
|
items = [a for a in args if not a.startswith("-")]
|
|
if len(items) < 2:
|
|
return "cp: missing file operand"
|
|
src, dst = self.resolve_path(items[0]), self.resolve_path(items[1])
|
|
src_node = self.get_node(src)
|
|
if not src_node:
|
|
return f"cp: cannot stat '{items[0]}'"
|
|
if src_node["type"] == "dir" and "-r" not in opts:
|
|
return f"cp: -r not specified; omitting directory '{items[0]}'"
|
|
if self.get_node(dst) and self.get_node(dst)["type"] == "dir":
|
|
dst = f"{dst.rstrip('/')}/{src.split('/')[-1]}"
|
|
self._copy_node(src, dst)
|
|
return ""
|
|
|
|
def _simulate_mv(self, args: list[str]) -> str:
|
|
if len(args) < 2:
|
|
return "mv: missing file operand"
|
|
src, dst = self.resolve_path(args[0]), self.resolve_path(args[1])
|
|
node = self.get_node(src)
|
|
if not node:
|
|
return f"mv: cannot stat '{args[0]}'"
|
|
if self.get_node(dst) and self.get_node(dst)["type"] == "dir":
|
|
dst = f"{dst.rstrip('/')}/{src.split('/')[-1]}"
|
|
self._copy_node(src, dst)
|
|
self._delete_node(src)
|
|
return ""
|
|
|
|
def _simulate_rm(self, args: list[str]) -> str:
|
|
opts = {a for a in args if a.startswith("-")}
|
|
targets = [a for a in args if not a.startswith("-")]
|
|
if not targets:
|
|
return "rm: missing operand"
|
|
for target in targets:
|
|
path = self.resolve_path(target)
|
|
node = self.get_node(path)
|
|
if not node:
|
|
continue
|
|
if node["type"] == "dir" and not ({"-r", "-rf", "-fr"} & opts):
|
|
return f"rm: cannot remove '{target}': Is a directory"
|
|
self._delete_node(path)
|
|
return ""
|
|
|
|
def _simulate_chmod(self, args: list[str]) -> str:
|
|
if len(args) < 2:
|
|
return "chmod: missing operand"
|
|
mode, target = args[0], self.resolve_path(args[1])
|
|
node = self.get_node(target)
|
|
if not node:
|
|
return f"chmod: cannot access '{args[1]}'"
|
|
perm = node.get("perm", "644")
|
|
if mode.isdigit() and len(mode) == 3:
|
|
node["perm"] = mode
|
|
elif mode == "+x":
|
|
node["perm"] = perm[:2] + "1"
|
|
elif mode == "-r":
|
|
node["perm"] = "244"
|
|
return ""
|
|
|
|
def _simulate_chown_group(self, cmd_name: str, args: list[str]) -> str:
|
|
if not args:
|
|
return f"{cmd_name}: missing operand"
|
|
recursive = args[0] == "-R"
|
|
target = self.resolve_path(args[-1])
|
|
if not self.exists(target):
|
|
return f"{cmd_name}: cannot access '{args[-1]}'"
|
|
return ""
|
|
|
|
def _simulate_stat(self, args: list[str]) -> str:
|
|
if not args:
|
|
return "stat: missing operand"
|
|
path = self.resolve_path(args[0])
|
|
node = self.get_node(path)
|
|
if not node:
|
|
return f"stat: cannot stat '{args[0]}'"
|
|
is_dir = node["type"] == "dir"
|
|
size = len(node.get("content", "")) if not is_dir else max(4, len(node.get("children", [])) * 4)
|
|
return f" File: {path}\n Size: {size} IO Block: 4096 {'directory' if is_dir else 'regular file'}\nAccess: ({node.get('perm','755')}/{self._perm_human(node.get('perm','755'), is_dir)})\nModify: 2026-03-06 10:00:00"
|
|
|
|
def _simulate_history(self, args: list[str]) -> str:
|
|
n = 10
|
|
if args and args[0] == "-n" and len(args) > 1 and args[1].isdigit():
|
|
n = int(args[1])
|
|
return "\n".join(f" {i+1} {cmd}" for i, cmd in enumerate(self.history[-n:]))
|
|
|
|
def _simulate_ps(self, args: list[str]) -> str:
|
|
if args == ["aux"] or args == ["-ef"]:
|
|
return "USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND\nroot 1 0.0 0.1 10000 2048 ? Ss 10:00 0:01 /sbin/init\nsandbox 1234 0.1 0.3 20480 4096 pts/0 S+ 10:05 0:00 python3 server.py\nnginx 1400 0.0 0.2 18900 3500 ? S 10:06 0:00 nginx: worker"
|
|
return " PID TTY TIME CMD\n 1234 pts/0 00:00:00 bash"
|
|
|
|
def _simulate_system_text(self, cmd_name: str, args: list[str]) -> str:
|
|
canned = {
|
|
"clear": "[screen cleared]",
|
|
"id": "uid=1000(sandbox_user) gid=1000(sandbox_user) groups=1000(sandbox_user),999(docker)",
|
|
"uptime": " 10:00:00 up 5 days, 2 users, load average: 0.10, 0.12, 0.08",
|
|
"w": " 10:00:00 up 5 days, 2 users, load average: 0.10, 0.12, 0.08\nUSER TTY FROM LOGIN@ IDLE JCPU PCPU WHAT\nsandbox pts/0 localhost 10:00 1:20 0.01s 0.00s bash",
|
|
"last": "sandbox_user pts/0 localhost Mon Mar 6 10:04 still logged in\nreboot system boot 5.15.0 Mon Mar 6 10:00",
|
|
"passwd": "Changing password for sandbox_user... (simulated)",
|
|
"su": "Switched to root (simulated)",
|
|
"df": "Filesystem Size Used Avail Use% Mounted on\n/dev/vda1 40G 12G 26G 32% /\ntmpfs 512M 8.0M 504M 2% /run",
|
|
"free": " total used free shared buff/cache available\nMem: 2.0Gi 1.1Gi 256Mi 48Mi 700Mi 750Mi",
|
|
"mount": "/dev/vda1 on / type ext4 (rw,relatime)\ntmpfs on /run type tmpfs (rw,nosuid,nodev)",
|
|
"lsof": "COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME\nnginx 1042 root 6u IPv4 12345 0t0 TCP *:80 (LISTEN)\npython3 1234 sandbox_user 3u IPv4 22334 0t0 TCP 127.0.0.1:8080 (LISTEN)",
|
|
"fdisk": "Disk /dev/vda: 40 GiB, 42949672960 bytes, 83886080 sectors",
|
|
"top": "top - 10:00:00 up 5 days, 1 user, load average: 0.10, 0.12, 0.08\nTasks: 132 total, 1 running, 131 sleeping",
|
|
"kill": "signal sent (simulated)",
|
|
"pkill": "matched processes terminated (simulated)",
|
|
"nohup": "appending output to nohup.out",
|
|
"systemctl": "● nginx.service - A high performance web server\n Loaded: loaded (/usr/lib/systemd/system/nginx.service; enabled)\n Active: active (running) since Mon 2026-03-06 10:00:00 CST; 5 days ago\n Main PID: 1042 (nginx)\n Tasks: 3\n Memory: 12.5M",
|
|
"service": "nginx is running",
|
|
"journalctl": "Mar 06 10:00:00 systemd[1]: Started nginx.service\nMar 06 10:00:01 nginx[1042]: worker process started\nMar 06 10:04:12 nginx[1042]: connect() failed (111: Connection refused)",
|
|
"ifconfig": "eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500\n inet 192.168.1.20 netmask 255.255.255.0\nlo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536\n inet 127.0.0.1 netmask 255.0.0.0",
|
|
"ip": "1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536\n inet 127.0.0.1/8 scope host lo\n2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500\n inet 192.168.1.20/24 scope global eth0",
|
|
"ping": "PING 127.0.0.1 (127.0.0.1): 56 data bytes\n64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.025 ms\n--- 127.0.0.1 ping statistics ---\n4 packets transmitted, 4 received, 0% packet loss",
|
|
"netstat": "Active Internet connections (only servers)\nProto Local Address Foreign Address State PID/Program name\ntcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 102/sshd",
|
|
"ss": "State Recv-Q Send-Q Local Address:Port Peer Address:Port Process\nLISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:((\"sshd\",pid=102,fd=3))\nLISTEN 0 511 0.0.0.0:80 0.0.0.0:* users:((\"nginx\",pid=1042,fd=6))",
|
|
"curl": "<!doctype html><html><body>hello localhost</body></html>",
|
|
"wget": "Saving to: '/tmp/test.html'\n2026-03-06 10:00:00 (1.2 MB/s) - '/tmp/test.html' saved [612/612]",
|
|
"traceroute": "traceroute to 8.8.8.8, 30 hops max\n 1 192.168.1.1 1.012 ms\n 2 10.0.0.1 4.221 ms",
|
|
"which": COMMAND_INDEX.get(args[0], "") if args else "",
|
|
"whereis": f"{args[0]}: {COMMAND_INDEX.get(args[0], '')} /usr/share/man/man1/{args[0]}.1.gz" if args else "",
|
|
"yum": "Loaded plugins: fastestmirror\nInstalled Packages\nnginx.x86_64 1.24.0-1 @base",
|
|
"rpm": "bash-5.1.8-6.el9.x86_64",
|
|
"apt": "Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease\nReading package lists... Done",
|
|
"dpkg": "ii bash 5.1-6ubuntu1 amd64 GNU Bourne Again SHell",
|
|
"vim": "[Opened in vim simulator]",
|
|
"vi": "[Opened in vim simulator]",
|
|
"env": "\n".join(f"{k}={v}" for k, v in self.env.items()),
|
|
"export": "environment updated",
|
|
"alias": "alias ll='ls -l'" if not args else "alias set",
|
|
"date": datetime.now().strftime("%a %b %d %H:%M:%S CST %Y"),
|
|
"cal": " March 2026\nSu Mo Tu We Th Fr Sa\n 1 2 3 4 5 6 7\n 8 9 10 11 12 13 14",
|
|
"sort": "alpha\nbeta\ngamma",
|
|
"uniq": "unique lines output",
|
|
"cut": "field output simulated",
|
|
"awk": "awk output simulated",
|
|
"sed": "sed output simulated",
|
|
"bc": "2",
|
|
"tar": "tar operation simulated",
|
|
"crontab": "no crontab for sandbox_user",
|
|
"more": "--More-- (simulated pagination)",
|
|
"less": "/etc/passwd (simulated pager)",
|
|
}
|
|
if cmd_name == "ip" and args[:1] == ["addr"]:
|
|
return canned["ip"]
|
|
if cmd_name == "systemctl":
|
|
if args[:1] == ["status"]:
|
|
target = args[1] if len(args) > 1 else 'nginx'
|
|
return canned["systemctl"].replace('nginx', target)
|
|
if args[:1] in (["restart"], ["start"], ["stop"], ["reload"], ["enable"]):
|
|
target = args[1] if len(args) > 1 else 'service'
|
|
return f"{args[0]} operation completed for {target} (simulated)"
|
|
return canned["systemctl"]
|
|
if cmd_name == "service":
|
|
return canned["service"]
|
|
if cmd_name == "journalctl":
|
|
return canned["journalctl"]
|
|
if cmd_name == "tail" or cmd_name == "head":
|
|
return self._simulate_head_tail(cmd_name, args)
|
|
if cmd_name == "which":
|
|
return canned["which"]
|
|
if cmd_name == "whereis":
|
|
return canned["whereis"]
|
|
if cmd_name == "dig":
|
|
return canned.get("dig", ";; ANSWER SECTION:\nexample.com. 300 IN A 93.184.216.34")
|
|
if cmd_name in {"uptime", "lsof", "sort", "uniq", "cut", "awk", "sed"}:
|
|
return canned.get(cmd_name, f"{cmd_name}: simulated")
|
|
if cmd_name == "export" and args and "=" in args[0]:
|
|
key, value = args[0].split("=", 1)
|
|
self.env[key] = value
|
|
return f"exported {key}={value}"
|
|
if cmd_name == "alias" and args and "=" in args[0]:
|
|
key, value = args[0].split("=", 1)
|
|
self.aliases[key] = value.strip("'\"")
|
|
return f"alias {key}='{self.aliases[key]}'"
|
|
if cmd_name == "bc":
|
|
expr = " ".join(args)
|
|
if "|" in expr:
|
|
match = re.search(r"'([^']+)'", expr)
|
|
if match:
|
|
try:
|
|
return str(eval(match.group(1), {"__builtins__": {}}, {}))
|
|
except Exception:
|
|
return "2"
|
|
return "2"
|
|
if cmd_name == "tar":
|
|
if "-czf" in args and len(args) >= 3:
|
|
archive = self.resolve_path(args[args.index("-czf") + 1])
|
|
self.fs[archive] = {"type": "file", "perm": "644", "content": "tarball"}
|
|
self._add_child(self.parent_dir(archive), archive.split("/")[-1])
|
|
if "-xzf" in args and "-C" in args:
|
|
dest = self.resolve_path(args[args.index("-C") + 1])
|
|
self._ensure_dir(dest)
|
|
return canned["tar"]
|
|
return canned.get(cmd_name, f"{cmd_name}: simulated")
|
|
|
|
# ---------- Main ----------
|
|
def execute(self, raw_cmd: str) -> dict:
|
|
cmd = raw_cmd.strip()
|
|
if not cmd:
|
|
return {"output": "", "success": True, "message": ""}
|
|
|
|
cmd = self._expand_alias(cmd)
|
|
self.history.append(cmd)
|
|
|
|
# Minimal pipe support for teaching (cmd1 | cmd2 | cmd3)
|
|
pipeline = self._split_pipes(cmd)
|
|
if len(pipeline) > 1:
|
|
stdin = ""
|
|
last_cwd = self.cwd
|
|
for seg in pipeline:
|
|
seg_parts = self._safe_shell_split(seg)
|
|
if not seg_parts:
|
|
continue
|
|
name, a = seg_parts[0], seg_parts[1:]
|
|
# producers
|
|
if name == "cat":
|
|
stdin = self._simulate_cat(a)
|
|
elif name in {"head", "tail"}:
|
|
# allow head/tail on stdin (very minimal): if last output exists
|
|
if stdin:
|
|
tmp = "\n".join(stdin.splitlines()[:10]) if name == "head" else "\n".join(stdin.splitlines()[-10:])
|
|
stdin = tmp
|
|
else:
|
|
stdin = self._simulate_head_tail(name, a)
|
|
elif name == "ss":
|
|
stdin = self._simulate_system_text("ss", a)
|
|
elif name == "netstat":
|
|
stdin = self._simulate_system_text("netstat", a)
|
|
# filters/transforms
|
|
elif name == "grep":
|
|
stdin = self._simulate_grep_stdin(a, stdin)
|
|
elif name == "sort":
|
|
stdin = self._simulate_sort_stdin(a, stdin)
|
|
elif name == "uniq":
|
|
stdin = self._simulate_uniq_stdin(a, stdin)
|
|
elif name == "cut":
|
|
stdin = self._simulate_cut_stdin(a, stdin)
|
|
elif name == "awk":
|
|
stdin = self._simulate_awk_stdin(a, stdin)
|
|
elif name == "sed":
|
|
stdin = self._simulate_sed_stdin(a, stdin)
|
|
else:
|
|
stdin = f"{name}: simulated"
|
|
last_cwd = self.cwd
|
|
return {"output": stdin, "success": True, "message": "✅ 命令执行成功", "cwd": last_cwd}
|
|
|
|
parts = self._safe_shell_split(cmd)
|
|
if not parts:
|
|
return {"output": "", "success": True, "message": ""}
|
|
cmd_name, args = parts[0], parts[1:]
|
|
|
|
if any(x in cmd for x in ["sudo rm -rf /", ":(){:|:&};:"]):
|
|
return {"output": "", "success": False, "message": "❌ 危险命令已拦截,沙盒环境禁止执行。"}
|
|
|
|
try:
|
|
if cmd_name == "ls":
|
|
output = self._simulate_ls(args)
|
|
elif cmd_name == "pwd":
|
|
output = self._simulate_pwd()
|
|
elif cmd_name == "cd":
|
|
output = self._simulate_cd(args)
|
|
elif cmd_name == "echo":
|
|
output = self._simulate_echo(args)
|
|
elif cmd_name == "cat":
|
|
output = self._simulate_cat(args)
|
|
elif cmd_name in {"head", "tail"}:
|
|
output = self._simulate_head_tail(cmd_name, args)
|
|
elif cmd_name == "grep":
|
|
output = self._simulate_grep(args)
|
|
elif cmd_name == "find":
|
|
output = self._simulate_find(args)
|
|
elif cmd_name == "du":
|
|
output = self._simulate_du(args)
|
|
elif cmd_name == "wc":
|
|
output = self._simulate_wc(args)
|
|
elif cmd_name == "mkdir":
|
|
output = self._simulate_mkdir(args)
|
|
elif cmd_name == "touch":
|
|
output = self._simulate_touch(args)
|
|
elif cmd_name == "cp":
|
|
output = self._simulate_cp(args)
|
|
elif cmd_name == "mv":
|
|
output = self._simulate_mv(args)
|
|
elif cmd_name == "rm":
|
|
output = self._simulate_rm(args)
|
|
elif cmd_name == "chmod":
|
|
output = self._simulate_chmod(args)
|
|
elif cmd_name in {"chown", "chgrp"}:
|
|
output = self._simulate_chown_group(cmd_name, args)
|
|
elif cmd_name == "stat":
|
|
output = self._simulate_stat(args)
|
|
elif cmd_name == "whoami":
|
|
output = self.user
|
|
elif cmd_name == "history":
|
|
output = self._simulate_history(args)
|
|
elif cmd_name in {"systemctl", "service", "journalctl", "dig"}:
|
|
output = self._simulate_system_text(cmd_name, args)
|
|
else:
|
|
output = self._simulate_system_text(cmd_name, args)
|
|
except Exception as e:
|
|
return {"output": f"Error: {e}", "success": False, "message": "❌ 执行失败"}
|
|
|
|
return {"output": output, "success": True, "message": "✅ 命令执行成功", "cwd": self.cwd}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sb = LinuxSandbox()
|
|
print(sb.execute("pwd"))
|
|
print(sb.execute("mkdir -p /tmp/a/b"))
|
|
print(sb.execute("find /etc -name '*.conf'"))
|