feat: add incident operation chains and basic pipeline support

This commit is contained in:
likingcode
2026-03-10 17:13:45 +08:00
parent 1e9b3a45c5
commit 8880f7d383
2 changed files with 308 additions and 2 deletions

View File

@@ -1,6 +1,6 @@
{
"meta": {
"version": "4.1",
"version": "4.2",
"title": "Linux 系统学习课程(运维全场景版)",
"author": "OpenClaw Dev",
"updated": "2026-03-10",
@@ -1952,6 +1952,50 @@
"type": "scenario",
"question": "如果端口没监听,你下一步更应该看什么?",
"answer": "看服务状态和日志,确认是否启动失败或启动后立即退出"
},
{
"id": "m10_l1_service_down_op1",
"type": "operation",
"title": "第一步:确认服务状态",
"hint": "systemctl status nginx",
"success_test": "cmd == 'systemctl status nginx'",
"solution": [
"systemctl status nginx"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l1_service_down_op2",
"type": "operation",
"title": "第二步:确认端口监听",
"hint": "ss -ltnp | grep 80",
"success_test": "'80' in output",
"solution": [
"ss -ltnp | grep 80"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l1_service_down_op3",
"type": "operation",
"title": "第三步:看最近日志",
"hint": "journalctl -u nginx -n 50",
"success_test": "'Started' in output or 'connect() failed' in output",
"solution": [
"journalctl -u nginx -n 50"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l1_service_down_op4",
"type": "operation",
"title": "第四步:本机请求验证",
"hint": "curl -I http://127.0.0.1",
"success_test": "'hello' in output or '200' in output or 'html' in output",
"solution": [
"curl -I http://127.0.0.1"
],
"success_msg": "✅ 通过:继续下一步"
}
],
"related_commands": [
@@ -2024,6 +2068,39 @@
"type": "scenario",
"question": "如果 /var/log 特别大,你会想到哪些命令组合?",
"answer": "df、du、find、sort 组合起来定位大文件和大目录"
},
{
"id": "m10_l2_disk_full_op1",
"type": "operation",
"title": "第一步:确认哪个挂载点满了",
"hint": "df -h",
"success_test": "'Filesystem' in output",
"solution": [
"df -h"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l2_disk_full_op2",
"type": "operation",
"title": "第二步:定位大目录(示例:/var/log",
"hint": "du -sh /var/log",
"success_test": "'/var/log' in output",
"solution": [
"du -sh /var/log"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l2_disk_full_op3",
"type": "operation",
"title": "第三步:找日志相关文件(示例)",
"hint": "find /var/log -type f",
"success_test": "'/var/log' in output",
"solution": [
"find /var/log -type f"
],
"success_msg": "✅ 通过:继续下一步"
}
],
"related_commands": [
@@ -2094,6 +2171,39 @@
"type": "scenario",
"question": "如果脚本明明存在却执行不了,你会从哪几类信息开始看?",
"answer": "先看 whoami/id再看文件权限和属主属组必要时看相关日志"
},
{
"id": "m10_l3_login_fail_op1",
"type": "operation",
"title": "第一步:确认当前身份",
"hint": "id",
"success_test": "'uid=' in output",
"solution": [
"id"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l3_login_fail_op2",
"type": "operation",
"title": "第二步:查看认证日志尾部",
"hint": "cat /var/log/auth.log | tail -n 1",
"success_test": "'sshd' in output",
"solution": [
"cat /var/log/auth.log | tail -n 1"
],
"success_msg": "✅ 通过:继续下一步"
},
{
"id": "m10_l3_login_fail_op3",
"type": "operation",
"title": "第三步:确认用户是否存在",
"hint": "grep sandbox_user /etc/passwd",
"success_test": "'sandbox_user' in output",
"solution": [
"grep sandbox_user /etc/passwd"
],
"success_msg": "✅ 通过:继续下一步"
}
],
"related_commands": [

View File

@@ -184,6 +184,34 @@ class LinuxSandbox:
matches = re.findall(r"'[^']*'|\"[^\"]*\"|\S+", cmd)
return [m[1:-1] if (m.startswith("'") and m.endswith("'")) or (m.startswith('"') and m.endswith('"')) else m for m in matches]
def _split_pipes(self, raw: str) -> list[str]:
"""Split a command string by pipes, respecting simple quotes.
Minimal support for common teaching use-cases like:
- ss -ltnp | grep 80
- cat file | tail -n 20
"""
parts: list[str] = []
buf: list[str] = []
in_sq = False
in_dq = False
for ch in raw:
if ch == "'" and not in_dq:
in_sq = not in_sq
elif ch == '"' and not in_sq:
in_dq = not in_dq
if ch == "|" and not in_sq and not in_dq:
seg = "".join(buf).strip()
if seg:
parts.append(seg)
buf = []
continue
buf.append(ch)
tail = "".join(buf).strip()
if tail:
parts.append(tail)
return parts
def _expand_alias(self, cmd: str) -> str:
if not cmd:
return cmd
@@ -346,6 +374,125 @@ class LinuxSandbox:
results.append((":".join(prefix) + (":" if prefix else "")) + line)
return "\n".join(results)
def _simulate_grep_stdin(self, args: list[str], stdin: str) -> str:
flags = {a for a in args if a.startswith("-")}
items = [a for a in args if not a.startswith("-")]
if not items:
return "grep: pattern missing"
pattern = items[0]
insensitive = "-i" in flags
show_num = "-n" in flags
invert = "-v" in flags
out: list[str] = []
for idx, line in enumerate((stdin or "").splitlines(), 1):
hay = line.lower() if insensitive else line
needle = pattern.lower() if insensitive else pattern
matched = needle in hay
if invert:
matched = not matched
if matched:
out.append((f"{idx}:" if show_num else "") + line)
return "\n".join(out)
def _simulate_sort_stdin(self, args: list[str], stdin: str) -> str:
lines = (stdin or "").splitlines()
numeric = "-n" in args
reverse = "-r" in args
def key_fn(x: str):
if numeric:
try:
return float(re.findall(r"[-+]?[0-9]*\.?[0-9]+", x)[0])
except Exception:
return 0.0
return x
lines_sorted = sorted(lines, key=key_fn, reverse=reverse)
return "\n".join(lines_sorted)
def _simulate_uniq_stdin(self, args: list[str], stdin: str) -> str:
lines = (stdin or "").splitlines()
count = "-c" in args
out: list[str] = []
prev = None
n = 0
# add a sentinel to flush last group
for line in lines + [None]:
if line == prev:
n += 1
continue
if prev is not None:
out.append((f"{n} {prev}" if count else prev))
prev = line
n = 1
return "\n".join(out)
def _simulate_cut_stdin(self, args: list[str], stdin: str) -> str:
delim = "\t"
fields = None
if "-d" in args:
i = args.index("-d")
if i + 1 < len(args):
delim = args[i + 1]
if "-f" in args:
i = args.index("-f")
if i + 1 < len(args):
raw = args[i + 1]
try:
fields = [int(x) for x in raw.split(",")]
except Exception:
fields = None
out: list[str] = []
for line in (stdin or "").splitlines():
parts = line.split(delim)
if not fields:
out.append(line)
continue
picked = []
for f in fields:
if 1 <= f <= len(parts):
picked.append(parts[f - 1])
out.append(delim.join(picked))
return "\n".join(out)
def _simulate_awk_stdin(self, args: list[str], stdin: str) -> str:
prog = " ".join(args).strip()
m = re.search(r"print\s+([^}]+)", prog)
if not m:
return "awk output simulated"
expr = m.group(1).strip()
cols = []
for tok in re.split(r"\s*,\s*|\s+", expr):
tok = tok.strip()
if tok.startswith("$"):
try:
cols.append(int(tok[1:]))
except Exception:
pass
out: list[str] = []
for line in (stdin or "").splitlines():
parts = line.split()
picked = []
for c in cols:
if 1 <= c <= len(parts):
picked.append(parts[c - 1])
out.append(" ".join(picked) if picked else "")
return "\n".join(out)
def _simulate_sed_stdin(self, args: list[str], stdin: str) -> str:
prog = " ".join(args).strip().strip("'\"")
m = re.match(r"s/(.*?)/(.*?)/([g]?)", prog)
if not m:
return "sed output simulated"
old, new, flags = m.group(1), m.group(2), m.group(3)
out_lines = []
for line in (stdin or "").splitlines():
if flags == "g":
out_lines.append(line.replace(old, new))
else:
out_lines.append(line.replace(old, new, 1))
return "\n".join(out_lines)
def _simulate_find(self, args: list[str]) -> str:
path = self.cwd
file_type = None
@@ -556,6 +703,7 @@ class LinuxSandbox:
"df": "Filesystem Size Used Avail Use% Mounted on\n/dev/vda1 40G 12G 26G 32% /\ntmpfs 512M 8.0M 504M 2% /run",
"free": " total used free shared buff/cache available\nMem: 2.0Gi 1.1Gi 256Mi 48Mi 700Mi 750Mi",
"mount": "/dev/vda1 on / type ext4 (rw,relatime)\ntmpfs on /run type tmpfs (rw,nosuid,nodev)",
"lsof": "COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME\nnginx 1042 root 6u IPv4 12345 0t0 TCP *:80 (LISTEN)\npython3 1234 sandbox_user 3u IPv4 22334 0t0 TCP 127.0.0.1:8080 (LISTEN)",
"fdisk": "Disk /dev/vda: 40 GiB, 42949672960 bytes, 83886080 sectors",
"top": "top - 10:00:00 up 5 days, 1 user, load average: 0.10, 0.12, 0.08\nTasks: 132 total, 1 running, 131 sleeping",
"kill": "signal sent (simulated)",
@@ -568,7 +716,7 @@ class LinuxSandbox:
"ip": "1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536\n inet 127.0.0.1/8 scope host lo\n2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500\n inet 192.168.1.20/24 scope global eth0",
"ping": "PING 127.0.0.1 (127.0.0.1): 56 data bytes\n64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.025 ms\n--- 127.0.0.1 ping statistics ---\n4 packets transmitted, 4 received, 0% packet loss",
"netstat": "Active Internet connections (only servers)\nProto Local Address Foreign Address State PID/Program name\ntcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 102/sshd",
"ss": "State Recv-Q Send-Q Local Address:Port Peer Address:Port Process\nLISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:((\"sshd\",pid=102,fd=3))",
"ss": "State Recv-Q Send-Q Local Address:Port Peer Address:Port Process\nLISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:((\"sshd\",pid=102,fd=3))\nLISTEN 0 511 0.0.0.0:80 0.0.0.0:* users:((\"nginx\",pid=1042,fd=6))",
"curl": "<!doctype html><html><body>hello localhost</body></html>",
"wget": "Saving to: '/tmp/test.html'\n2026-03-06 10:00:00 (1.2 MB/s) - '/tmp/test.html' saved [612/612]",
"traceroute": "traceroute to 8.8.8.8, 30 hops max\n 1 192.168.1.1 1.012 ms\n 2 10.0.0.1 4.221 ms",
@@ -585,6 +733,11 @@ class LinuxSandbox:
"alias": "alias ll='ls -l'" if not args else "alias set",
"date": datetime.now().strftime("%a %b %d %H:%M:%S CST %Y"),
"cal": " March 2026\nSu Mo Tu We Th Fr Sa\n 1 2 3 4 5 6 7\n 8 9 10 11 12 13 14",
"sort": "alpha\nbeta\ngamma",
"uniq": "unique lines output",
"cut": "field output simulated",
"awk": "awk output simulated",
"sed": "sed output simulated",
"bc": "2",
"tar": "tar operation simulated",
"crontab": "no crontab for sandbox_user",
@@ -652,6 +805,49 @@ class LinuxSandbox:
cmd = self._expand_alias(cmd)
self.history.append(cmd)
# Minimal pipe support for teaching (cmd1 | cmd2 | cmd3)
pipeline = self._split_pipes(cmd)
if len(pipeline) > 1:
stdin = ""
last_cwd = self.cwd
for seg in pipeline:
seg_parts = self._safe_shell_split(seg)
if not seg_parts:
continue
name, a = seg_parts[0], seg_parts[1:]
# producers
if name == "cat":
stdin = self._simulate_cat(a)
elif name in {"head", "tail"}:
# allow head/tail on stdin (very minimal): if last output exists
if stdin:
tmp = "\n".join(stdin.splitlines()[:10]) if name == "head" else "\n".join(stdin.splitlines()[-10:])
stdin = tmp
else:
stdin = self._simulate_head_tail(name, a)
elif name == "ss":
stdin = self._simulate_system_text("ss", a)
elif name == "netstat":
stdin = self._simulate_system_text("netstat", a)
# filters/transforms
elif name == "grep":
stdin = self._simulate_grep_stdin(a, stdin)
elif name == "sort":
stdin = self._simulate_sort_stdin(a, stdin)
elif name == "uniq":
stdin = self._simulate_uniq_stdin(a, stdin)
elif name == "cut":
stdin = self._simulate_cut_stdin(a, stdin)
elif name == "awk":
stdin = self._simulate_awk_stdin(a, stdin)
elif name == "sed":
stdin = self._simulate_sed_stdin(a, stdin)
else:
stdin = f"{name}: simulated"
last_cwd = self.cwd
return {"output": stdin, "success": True, "message": "✅ 命令执行成功", "cwd": last_cwd}
parts = self._safe_shell_split(cmd)
if not parts:
return {"output": "", "success": True, "message": ""}