2026寒假练-基于树莓派RP2040游戏机实现AI关卡生成推箱子迷宫游戏
1. 项目描述
项目名称:【RP2040游戏机】AI关卡生成迷宫推箱子(Sokoban)
目标:在 RP2040 Game Kit 上实现可运行渲染的轻量解谜游戏‘’推箱子‘’,并由上位机使用 AI/脚本生成关卡,通过 USB 串口发送到游戏机端加载游玩;支持下一关、重开、难度选择;可选加分实现姿态传感器倾斜控制并在菜单切换。
设计思路概述
- 双端架构:
- 上位机(PC 端)负责“生成 + 验证 + 发送”关卡:先用随机/AI 产生 seed,再生成迷宫底图、从“已解状态反向打乱”、并用推箱子 BFS 在设备同尺度(40×40 子格)做可通关验证,最后通过串口协议发送。
- 下位机(RP2040 端)负责“接收 + 解析 + 渲染 + 交互”:接收 20×20 文本地图,扩展成 40×40 子格用于更顺滑的移动与更宽的走廊;提供菜单、难度、下一关、重开;并支持摇杆或 IMU 倾斜输入控制。
- 一致性:PC 端验证使用与设备端相同的“20×20 → 40×40(2×2 扩展)”规则,确保“看起来能走”与“实际通关”一致,避免不可解与死角等问题。
- 内存/稳定性:RP2040 端用
bytearray保存墙体(40×40=1600),关卡对象用set存索引,串口接收用tuple保存 20 行文本,配合gc.collect()与异常兜底,避免回到 REPL 出现>干扰通信。
硬件框图
- 主控:RP2040(MicroPython)
- 显示:240×240 SPI LCD(ST7789)
- 输入:四向摇杆(ADC)、A/B/Start/Select 按键(GPIO 上拉输入)
- 可选输入:MMA7660FC 三轴姿态传感器(I2C1)
- 通信:USB 串口与 PC 通信(COM11,115200)

软件流程
- 设备端启动:初始化 SPI-LCD → 初始化按键/摇杆 → 初始化 I2C1-IMU → 加载内置关卡 → 显示 READY
- PC 端循环生成:Ollama 产 seed → 生成迷宫 → 反向打乱箱子 → 子格 BFS 验证可解 → 打包
@LVL协议 → 串口发送 → 等待设备OK/ERR - 设备端运行:
- 串口接收
@LVL … @END→ 解析 20×20 → 扩展 40×40 → 更新显示 - 玩家摇杆/倾斜控制移动,推箱子判定
Start重开,B下一关(同难度匹配),A进入菜单(难度/控制方式切换)
- 串口接收
2. 硬件介绍
本项目使用 RP2040 Game Kit 板卡,LCD 使用 SPI0,摇杆 ADC=GP28/29,按键 GP5~8,IMU I2C1=GP10/11,包括:
- ST7789 240×240 彩屏,通过 SPI 驱动;代码中使用
SPI(0)并指定 LCD 的RST/DC/SCK/MOSI引脚初始化显示并使用全屏 FrameBuffer 快速刷新。 - 四向摇杆:使用 ADC 采样(代码里
ADC(Pin(28))与ADC(Pin(29))),启动时进行多次采样校准中心值,移动时设置死区避免漂移。 - A/B/Start/Select 按键:GPIO 上拉输入(代码中
Pin.IN, Pin.PULL_UP),通过下降沿检测实现“按一下触发一次”。 - IMU:MMA7660FC:I2C1 接口(代码
I2C(1, scl=Pin(11), sda=Pin(10))),运行中可读 XYZ,实现倾斜控制并可在菜单中切换
3. 实现的功能及展示
任务 1:在游戏机上实现迷宫/推箱子等解谜游戏运行与渲染
- 实现情况:
设备端main.py将关卡从 20×20 扩展为 40×40 子格(SCALE=2),每个子格映射为 6×6 像素(TILE=6),循环绘制墙/地板/目标/箱子/玩家,保证 240×240 全屏对齐。推箱子逻辑由try_move()完成:玩家试图移动→若前方是箱子则判断箱子下一格是否可推→更新箱子集合与玩家坐标→刷新画面。
任务 2:上位机 AI 生成关卡文本/位图,通过 USB 串口发送到游戏机
实现情况:
PC 端脚本使用 Ollama(llama3.1:8b)只生成随机 seed(ollama_get_seed()),关卡本体由脚本算法生成并验证,最后通过 serial.Serial(COM11,115200) 发送到设备端。
发送采用分块写入 write_payload_chunked(chunk=256) + 启动清理 drain_serial(),并用 wait_device_ack() 只识别 OK/ERR,忽略 >>> 等噪声,提高稳定性。
任务 3:游戏机解析并显示关卡,摇杆/按键游玩;支持下一关、重开、难度选择
实现情况:
- 解析显示:设备端
LevelReceiver监听串口协议帧,收齐 20 行后调用LevelRuntime.load_from_20x20_lines()解析并绘制。 - 游玩控制:默认摇杆控制(
Joy.dir()输出方向);MOVE_PERIOD_BY_DIFF=[180,130,90]控制不同难度的移动节奏。 - 重开:
Start触发重新从当前 base_pkt 载入。 - 下一关:
B进入下一关,并通过next_level_matching()保证“同难度 diff 匹配切换”。 - 难度选择:菜单中
Difficulty可循环切换(影响移动速度与 PC 端箱子数/迷宫复杂度策略)。
- 已通关的显示

任务 4:报告写明关卡数据格式与串口协议;视频演示至少 5 个 AI 关卡
实现情况:
本项目串口关卡协议为纯文本帧,便于调试与扩展:
PC端
def frame_level(name, diff, lines):
out = [f"@LVL name={name} diff={diff} w=20 h=20"]
out.extend(lines)
out.append("@END")
return "\n".join(out) + "\n"
def wait_device_ack(ser, timeout_s=15.0):
t0 = time.time()
last_nonempty = ""
while time.time() - t0 < timeout_s:
raw = ser.readline()
if not raw:
continue
s = raw.decode("utf-8", errors="ignore").strip()
if not s:
continue
last_nonempty = s
if _is_repl_noise(s):
continue
if s.startswith("OK") or s.startswith("ERR"):
return s
return "(timeout, last=%r)" % (last_nonempty,)
ap.add_argument("--count", type=int, default=6)
for i in range(args.count):
diff = (i % 3) if args.diff < 0 else args.diff
name = f"AI-D{diff}-{i+1}"
lines = gen_one_level(diff, seed)
send_one_level_with_retry(ser, name, diff, lines, seed)
设备端
if line.startswith("@LVL"):
self.in_level = True
self.meta = self._parse_meta(line)
self.lines = []
return None
if line.startswith("@END"):
try:
w = int(self.meta.get("w","20"))
h = int(self.meta.get("h","20"))
if w != 20 or h != 20:
raise ValueError("only supports 20x20")
if len(self.lines) != 20:
raise ValueError("need 20 lines, got %d" % len(self.lines))
for i in range(20):
if len(self.lines[i]) != 20:
raise ValueError("bad width at line %d" % i)
pkt = {
"name": self.meta.get("name","(unnamed)"),
"diff": int(self.meta.get("diff","0")),
"lines": tuple(self.lines),
}
print("OK", pkt["name"])
self._reset()
gc.collect()
return pkt
except Exception as e:
print("ERR", str(e))
self._reset()
gc.collect()
return None
下发格式:
- 帧头:
@LVL name=<关卡名> diff=<0|1|2> w=20 h=20 - 中间:紧接 20 行地图字符串(每行 20 字符)
- 帧尾:
@END
地图字符约定:
#墙.地面P玩家起点B箱子T目标点*箱子在目标点+玩家在目标点
设备端应答:
- 成功:
OK <name> - 失败:
ERR <原因>(尺寸不对/玩家数量不为 1 等原因)
运行 PC 脚本 --count 6,每发送一关设备都会显示 “Loaded: …”,并可按 B 切换查看至少 5 个 AI 生成关卡。

任务 5(可选加分):姿态传感器另一套控制方式,并可在菜单切换
实现情况:
设备端实现 MMA7660 类,I2C 扫描到 0x4C/0x4D 后写寄存器使能 active;运行中若 tilt_on=True 则用倾斜阈值触发方向(abs(x)>6 or abs(y)>6),可以在菜单中 Control: Tilt/Joystick 一键切换;当 IMU 不存在或异常则自动回退摇杆控制,保证主流程不崩溃。
4. 主要代码片段及说明
- 设备端关卡接收与校验:
LevelReceiver.tick() class LevelReceiver:
def __init__(self):
self.poll = uselect.poll()
self.poll.register(sys.stdin, uselect.POLLIN)
self.in_level = False
self.meta = {}
self.lines = []
def _parse_meta(self, header):
meta = {"name":"(unnamed)","diff":"0","w":"20","h":"20"}
parts = header.strip().split()
for p in parts[1:]:
if "=" in p:
k, v = p.split("=", 1)
meta[k.strip()] = v.strip()
return meta
def _reset(self):
self.in_level = False
self.meta = {}
self.lines = []
def tick(self):
ev = self.poll.poll(0)
if not ev:
return None
try:
line = sys.stdin.readline()
except Exception:
return None
if not line:
return None
line = line.strip("\r\n")
if (not line) or line in (">", ">>>", "..."):
return None
if line.startswith("@LVL"):
self.in_level = True
self.meta = self._parse_meta(line)
self.lines = []
return None
if not self.in_level:
return None
if line.startswith("@END"):
try:
w = int(self.meta.get("w","20"))
h = int(self.meta.get("h","20"))
if w != 20 or h != 20:
raise ValueError("only supports 20x20")
if len(self.lines) != 20:
raise ValueError("need 20 lines, got %d" % len(self.lines))
for i in range(20):
if len(self.lines[i]) != 20:
raise ValueError("bad width at line %d" % i)
pkt = {
"name": self.meta.get("name","(unnamed)"),
"diff": int(self.meta.get("diff","0")),
"lines": tuple(self.lines), # tuple saves RAM vs list
}
print("OK", pkt["name"])
self._reset()
gc.collect()
return pkt
except Exception as e:
print("ERR", str(e))
self._reset()
gc.collect()
return None
if len(self.lines) < 20:
self.lines.append(line)
return None- 以
@LVL进入接收态 → 收满 20 行 →@END校验尺寸 → 生成pkt并打印OK name - 设备端使用
uselect.poll()非阻塞读取sys.stdin,识别@LVL开始接收,累计 20 行地图;遇到@END时严格检查w/h=20、行数=20、每行长度=20,成功则回OK name并返回关卡包。
- 以
- 20×20→40×40 扩展与运行态数据结构:
LevelRuntime.load_from_20x20_lines() def idx_of(x, y):
return y*SW + x
class LevelRuntime:
def __init__(self):
self.name = "(none)"
self.diff = 0
self.baseS = bytearray(SW*SH) # 0/1
self.targets = set() # idx
self.boxes = set() # idx
self.px = 0
self.py = 0
def is_wall(self, x, y):
return self.baseS[idx_of(x,y)] == 1
def load_from_20x20_lines(self, name, diff, lines20):
if len(lines20) != 20:
raise ValueError("bad height")
for i in range(20):
if len(lines20[i]) != 20:
raise ValueError("bad width")
self.name = name
self.diff = diff
bs = self.baseS
for i in range(len(bs)):
bs[i] = 0
self.targets.clear()
self.boxes.clear()
self.px = 0
self.py = 0
found_p = 0
# expand base: each cell -> 2x2 subcells
for y in range(GH):
row = lines20[y]
for x in range(GW):
ch = row[x]
is_wall = (ch == '#')
sx = x*SCALE
sy = y*SCALE
base_idx = idx_of(sx, sy)
if is_wall:
bs[base_idx] = 1
bs[base_idx+1] = 1
bs[base_idx+SW] = 1
bs[base_idx+SW+1] = 1
# force expanded border walls
for x in range(SW):
bs[idx_of(x,0)] = 1
bs[idx_of(x,SH-1)] = 1
for y in range(SH):
bs[idx_of(0,y)] = 1
bs[idx_of(SW-1,y)] = 1
# objects at (2x,2y)
for y in range(GH):
row = lines20[y]
for x in range(GW):
ch = row[x]
sx = x*SCALE
sy = y*SCALE
ii = idx_of(sx, sy)
if ch == 'T':
self.targets.add(ii)
elif ch == 'B':
self.boxes.add(ii)
elif ch == '*':
self.targets.add(ii)
self.boxes.add(ii)
elif ch == 'P':
self.px, self.py = sx, sy
found_p += 1
elif ch == '+':
self.targets.add(ii)
self.px, self.py = sx, sy
found_p += 1
if found_p != 1:
raise ValueError("need exactly one player")
gc.collect()- 墙体写入
bytearray baseS(1=墙/0=地) - 箱子/目标点用
set(idx),玩家用(px,py),避免大对象复制
- 墙体写入
- 推箱子移动判定:
try_move() def try_move(lvl, dx, dy):
nx = lvl.px + dx
ny = lvl.py + dy
if nx < 0 or nx >= SW or ny < 0 or ny >= SH:
return False
if lvl.is_wall(nx, ny):
return False
nidx = idx_of(nx, ny)
if nidx in lvl.boxes:
bx = nx + dx
by = ny + dy
if bx < 0 or bx >= SW or by < 0 or by >= SH:
return False
if lvl.is_wall(bx, by):
return False
bidx = idx_of(bx, by)
if bidx in lvl.boxes:
return False
lvl.boxes.remove(nidx)
lvl.boxes.add(bidx)
lvl.px, lvl.py = nx, ny
return True- 先判墙,再判箱子可推性,再更新集合与玩家位置。设备端每次移动先检查边界与墙;若前方为箱子,则要求箱子下一格为空地且非墙且无箱子,才允许推;最后更新玩家位置与箱子集合。该逻辑是推箱玩法正确性的核心。
- PC 端“先验证再发送”:
gen_one_level()+solvable_push_bfs_subgrid() def solvable_push_bfs_subgrid(baseS, playerS, boxesS, targetsS):
boxes0 = frozenset(boxesS)
tgt = frozenset(targetsS)
for (bx,by) in boxes0:
if is_corner_deadS(baseS, bx, by, tgt):
return False
cp, _ = canonical_playerS(baseS, playerS, boxes0)
if cp is None:
return False
q=deque([(boxes0, playerS, 0)])
seen=set([(boxes0, cp)])
expansions=0
while q:
bxs, ppos, depth = q.popleft()
if bxs == tgt:
return True
if depth >= MAX_PUSH_DEPTH:
continue
expansions += 1
if expansions > MAX_PUSH_STATES:
return False
cp, reach = canonical_playerS(baseS, ppos, bxs)
if cp is None:
continue
bset=set(bxs)
for (bx,by) in bset:
for dx,dy in SDIRS:
stand=(bx-dx, by-dy)
dest =(bx+dx, by+dy)
if not inbS(*stand) or not inbS(*dest):
continue
if stand not in reach:
continue
if baseS[dest[1]][dest[0]] == '#':
continue
if dest in bset:
continue
if is_corner_deadS(baseS, dest[0], dest[1], tgt):
continue
nb=set(bset)
nb.remove((bx,by))
nb.add(dest)
nb=frozenset(nb)
np=(bx,by)
ncp, _ = canonical_playerS(baseS, np, nb)
if ncp is None:
continue
key=(nb, ncp)
if key in seen:
continue
seen.add(key)
q.append((nb, np, depth+1))
return Falsedef expand_to_subgrid(base20, player20, boxes20, targets20):
baseS = [['.' for _ in range(SW)] for _ in range(SH)]
for y in range(H):
for x in range(W):
ch = base20[y][x]
for yy in range(y*SCALE, y*SCALE+SCALE):
for xx in range(x*SCALE, x*SCALE+SCALE):
baseS[yy][xx] = '#' if ch == '#' else '.'
for x in range(SW):
baseS[0][x]='#'; baseS[SH-1][x]='#'
for y in range(SH):
baseS[y][0]='#'; baseS[y][SW-1]='#'
px,py = player20
pS = (px*SCALE, py*SCALE)
bS = set((bx*SCALE, by*SCALE) for (bx,by) in boxes20)
tS = set((tx*SCALE, ty*SCALE) for (tx,ty) in targets20)
return baseS, pS, bS, tS
- 迷宫生成(DFS carve + room/loop breaks)→ 目标点挑选(间距/开阔度约束)→ 从解态反推 scramble(保证存在解)→ 子格 BFS 推箱验证可解 → 输出 20×20 文本。PC 端不是“看起来像推箱子”就下发,而是先将 20×20 按同样
SCALE=2扩展到 40×40,然后用 push-BFS 搜索推箱状态空间,加入角死锁检查与状态上限,确保关卡在设备端真实可通关。 - 可解性 BFS 核心及扩展一致性
- 迷宫生成(DFS carve + room/loop breaks)→ 目标点挑选(间距/开阔度约束)→ 从解态反推 scramble(保证存在解)→ 子格 BFS 推箱验证可解 → 输出 20×20 文本。PC 端不是“看起来像推箱子”就下发,而是先将 20×20 按同样
- 串口鲁棒传输:
drain_serial()+wait_device_ack() def drain_serial(ser, ms=600):
"""清空设备端已有输出(含 >>> / > / READY / 旧日志),避免干扰 ACK 判定。"""
t0 = time.time()
while (time.time() - t0) * 1000 < ms:
try:
s = ser.readline().decode("utf-8", errors="ignore").strip()
except Exception:
s = ""
if not s:
time.sleep(0.01)
def _is_repl_noise(line: str) -> bool:
s = line.strip()
if not s:
return True
if s in (">", ">>>", ">>", "....", "..."):
return True
if s.endswith(">>>") or s.endswith(">>") or s.endswith(">"):
if len(s) <= 5:
return True
if s.startswith("MicroPython") or s.startswith("MPY:"):
return True
return False
def wait_device_ack(ser, timeout_s=15.0):
"""只认 OK/ERR,忽略 '>' '>>>' 等 REPL 噪声。"""
t0 = time.time()
last_nonempty = ""
while time.time() - t0 < timeout_s:
raw = ser.readline()
if not raw:
continue
s = raw.decode("utf-8", errors="ignore").strip()
if not s:
continue
last_nonempty = s
if _is_repl_noise(s):
continue
if s.startswith("OK") or s.startswith("ERR"):
return s
return "(timeout, last=%r)" % (last_nonempty,)- 为避免 MicroPython REPL 的
>>>、>等输出影响判定,PC端先 drain,再只接受设备端OK/ERR。若超时(设备端正刷新画面/菜单错过读),只重发一次,保证演示时稳定显示“至少 5 关 AI 关卡”。
- 为避免 MicroPython REPL 的
- IMU 倾斜控制 + 菜单切换:设备端 I2C1 扫描
0x4C/0x4D自动识别 MMA7660FC,菜单可切换Tilt/Joystick。倾斜阈值abs(x)>6 or abs(y)>6触发方向,IMU 不可用则自动回退摇杆,保证演示稳定。
class MMA7660:
def __init__(self, i2c):
self.i2c = i2c
self.addr = None
try:
addrs = i2c.scan()
except Exception:
addrs = []
for a in addrs:
if a in (0x4C, 0x4D):
self.addr = a
break
self.ok = self.addr is not None
if self.ok:
try:
self.i2c.writeto_mem(self.addr, 0x07, b"\x01") # active
except Exception:
self.ok = False
def _read_axis(self, reg):
v = self.i2c.readfrom_mem(self.addr, reg, 1)[0] & 0x3F
if v & 0x20:
v -= 0x40
return v
def read_xyz(self):
if not self.ok:
return 0, 0, 0
try:
return (self._read_axis(0x00), self._read_axis(0x01), self._read_axis(0x02))
except Exception:
return 0, 0, 0
# Task5: tilt control toggle in menu
if tilt_on and imu.ok:
x, y, z = imu.read_xyz()
if abs(x) > 6 or abs(y) > 6:
if abs(x) > abs(y):
dx = 1 if x > 0 else -1
else:
dy = 1 if y > 0 else -1
else:
dx, dy = joy.dir(dead=5000)
5. 遇到的主要问题及解决方法
- 串口通信易被 REPL 提示符干扰:PC 端经常出现
(timeout, last='>')或直接超时。通常不是“没发送成功”,而是设备端因为异常或软重启回到了 REPL,输出了>/>>>等提示符,PC 端如果把这些当成 ACK,就会误判;同时设备端在刷新屏幕/进入菜单时也可能短时间没及时读串口,看起来像丢包。 - 解决:设备端主循环外层用
try/except做兜底,出错只打印ERR runtime ...,避免崩回 REPL;PC 端在发送前先drain_serial()清空历史输出,并在wait_device_ack()中显式过滤>>>、>、MPY:等噪声,只接受以OK/ERR开头的应答;若超时则只重发一次,兼顾稳定性与避免无限重发。
- 解决:设备端主循环外层用
- 内存分配失败(MicroPython heap 紧张):项目使用 240×240 的全屏 FrameBuffer(RGB565)本身就占用较大内存,再叠加关卡数据结构如果使用 list-of-list、频繁拷贝或临时对象过多,就会出现
ERR memory allocation failed, allocating 256 bytes。这类错误常发生在“生成多关/频繁切关/接收新关卡后立刻刷新”的组合场景。 - 解决:设备端把墙体用
bytearray(SW*SH)存成 0/1,箱子/目标点用set(int idx),并且接收到的 20 行文本用tuple保存,减少 list 拷贝;每次加载新关或切关后主动gc.collect(),避免在运行中克隆关卡结构。PC 端做分块发送(chunk=256),降低串口峰值堆积带来的额外压力。
- 解决:设备端把墙体用
- “看起来可走但实际不可解”的推箱死局:仅靠随机摆放箱子/终点或简单连通性检查,容易生成“视觉上可走”但实际无法把箱子推到目标的关卡,典型问题:箱子推入 L 形死角后无法拉回、目标点周围空间不足、箱子/目标贴边导致推不进去、以及局部封闭造成实际路径不可达。生成多关时也遇到过“生成失败/尝试次数耗尽”的情况,本质就是约束太强但验证不够一致。
- 解决:PC 端采用“从解态反向打乱(scramble)”来保证存在解的起点,再用与设备端完全一致的 20×20→40×40 子格扩展后进行 push-BFS 可解性验证,并加入角死锁检测、全地面连通性检查、以及箱子/目标/玩家与边界的 margin 约束;若多次尝试仍失败则回退到
ultra_safe_level(),保证演示时永远有可玩的关卡,不会卡生成不了。
- 解决:PC 端采用“从解态反向打乱(scramble)”来保证存在解的起点,再用与设备端完全一致的 20×20→40×40 子格扩展后进行 push-BFS 可解性验证,并加入角死锁检测、全地面连通性检查、以及箱子/目标/玩家与边界的 margin 约束;若多次尝试仍失败则回退到
- 摇杆漂移与误触发:在实际板子上,ADC 摇杆中心值不是固定的,受供电噪声、温度和器件偏差影响,角色可能出现轻微“自走”,想向一个方向却被判成另一个方向,尤其移动速度和迷宫狭窄通道结合时更明显。
- 解决:设备端启动时对摇杆做多次采样平均来校准中心(
calibrate()),并设置 deadzone(例如dead=5000/7000)屏蔽小幅波动;同时每次只取主轴方向(abs(vx)>abs(vy) 的判定)避免斜向抖动造成误判。对菜单操作与游戏操作使用不同 deadzone,也能减少“菜单乱跳”。
- 解决:设备端启动时对摇杆做多次采样平均来校准中心(
6. 未来计划或改进建议
- 更丰富的关卡标签:在
@LVLheader 增加boxes=、seed=、steps_hint=等字段,设备端 banner 直接显示关卡信息,便于演示与复现。 - 渲染优化:当前全屏刷新简单稳定,后续可改为“局部刷新脏矩形”,进一步提升帧率并降低功耗。
- 更强的死锁剪枝:PC 端 BFS 可加入“不可达目标区域预判”“二箱互锁”等剪枝,在更复杂迷宫下提升生成速度。
- 关卡持久化:设备端可把已接收关卡写入 flash 文件(如
/levels.txt),断电保留,形成“关卡库”。