从零搭建”一物一码”合格证查询系统——安全与防篡改设计

本文是系列教程的第 4 篇。前三篇我们完成了需求分析、数据建模与前后端开发;本篇将深入系统中最核心的安全架构——数字签名、审计哈希链、不可变性守卫、频率限制与双人复核机制。如果你对”如何让一张电子合格证无法被伪造”这个问题感兴趣,这篇文章值得你从头读到尾。


1. Ed25519 数字签名

1.1 为什么选 Ed25519?

在非对称签名算法的选择上,我们放弃了 RSA 和 ECDSA,转而采用 Ed25519。理由如下:

维度 RSA-2048 ECDSA (P-256) Ed25519
公钥长度 256 bytes 64 bytes 32 bytes
签名长度 256 bytes 64 bytes 64 bytes
签名速度 中等 极快
参数选择 需选 e/padding 需选曲线 无需任何参数
时序攻击 需额外防护 需额外防护 天然免疫

对于合格证这种需要批量签发、移动端快速验证的场景,Ed25519 的 32 字节密钥和微秒级签名速度是决定性优势。更重要的是,它不需要开发者做任何参数选择——消除了”选错曲线”或”用错 padding”这类隐患。

1.2 签名流程详解

整个签名过程在 signing.py 模块中实现,分为四个确定性步骤:

1
证书对象 → 构建规范载荷 → 序列化为字节 → 计算摘要 → Ed25519签名

第一步:构建规范载荷(Canonical Payload)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def build_canonical_payload(cert) -> dict:
"""
从证书对象中提取需要签名的字段,构建确定性字典。
只包含业务关键字段,排除签名本身等元数据。
"""
return {
"cert_id": str(cert.id),
"cert_version": cert.cert_version,
"product_name": cert.product_name,
"batch_number": cert.batch_number,
"standard_ref": cert.standard_ref,
"result": cert.result,
"issued_at": cert.issued_at.isoformat() if cert.issued_at else None,
"issuer_name": cert.issuer_name,
}

这里的关键设计是只提取业务字段。签名字段(signaturecontent_digest 等)不参与载荷构建,否则会形成循环依赖。

第二步:确定性序列化

1
2
3
4
5
6
7
8
9
def canonical_bytes(payload: dict) -> bytes:
"""
将字典序列化为确定性的 JSON 字节串。
sort_keys 保证字段顺序一致;紧凑 separators 消除空格差异。
任何平台、任何语言对同一数据都能产出完全相同的字节。
"""
return json.dumps(
payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
).encode("utf-8")

为什么要如此严格?因为 {"a":1,"b":2}{"b": 2, "a": 1} 在语义上相同,但字节完全不同。如果不做规范化,同一张证书每次序列化可能得到不同字节,签名验证就会随机失败。

第三步:计算摘要

1
2
3
4
5
6
7
def compute_digest(payload: dict) -> str:
"""
对规范化字节计算 SHA-256 摘要,返回十六进制字符串。
摘要同时存入证书记录,方便快速比对。
"""
raw = canonical_bytes(payload)
return hashlib.sha256(raw).hexdigest()

第四步:签名并写入证书

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sign_certificate(cert) -> None:
"""
完整签名流程:加载私钥 → 构建载荷 → 计算摘要 → 签名 → 写回证书。
签名对象是摘要字符串的 UTF-8 编码,而非原始载荷字节。
"""
kid, priv = _load_private_key() # 从环境变量加载私钥
payload = build_canonical_payload(cert) # 提取业务字段
digest = compute_digest(payload) # SHA-256 摘要
sig = priv.sign(digest.encode()) # Ed25519 对摘要签名
cert.content_digest = digest
cert.signature = base64.b64encode(sig).decode()
cert.signature_algorithm = "ed25519"
cert.signature_kid = kid # 密钥标识符,支持密钥轮换
cert.signed_at = timezone.now()

1.3 验证流程

验证是签名的逆过程,任何人(包括前端)都可以执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def verify_certificate(cert) -> bool:
"""
验证三步走:
1. 重新计算摘要,与存储的摘要比对(检测字段篡改)
2. 根据 kid 从数据库加载对应公钥
3. 用公钥验证签名(检测摘要/签名篡改)
"""
payload = build_canonical_payload(cert)
digest = compute_digest(payload)

# 步骤1:摘要比对
if digest != cert.content_digest:
return False # 业务字段被篡改

# 步骤2:加载公钥
key_record = SigningKey.objects.get(kid=cert.signature_kid, is_active=True)
pub = Ed25519PublicKey.from_public_bytes(base64.b64decode(key_record.public_key))

# 步骤3:验签
try:
pub.verify(base64.b64decode(cert.signature), digest.encode())
return True
except InvalidSignature:
return False

1.4 密钥管理

当前实现采用环境变量存储私钥,公钥存入 SigningKey 数据模型:

1
2
3
# .env(开发环境)
SIGNING_PRIVATE_KEY=base64编码的私钥
SIGNING_ACTIVE_KID=key-2024-001

SigningKey 模型存储公钥及元数据(kid、创建时间、是否激活),支持密钥轮换——新密钥上线后旧证书仍可用旧公钥验证。生产环境强烈建议将私钥迁移至 AWS KMS、HashiCorp Vault 等硬件安全模块,环境变量方案仅适用于开发测试。


2. 二维码签名载荷

2.1 从明文 URL 到签名载荷

传统做法是让二维码包含一个查询 URL,例如 https://example.com/check?token=abc123。问题在于:任何人都能构造这样的 URL。攻击者只需猜测或枚举 token 格式,就能生成看似合法的二维码。

我们的方案是让二维码本身携带签名信息,格式如下:

1
qrc1.<base64url编码的JSON载荷>.<base64url编码的签名>

前缀 qrc1 是协议版本号,方便未来升级解析逻辑而不破坏旧二维码的兼容性。

2.2 载荷构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def build_qr_payload(cert) -> str:
"""
构建签名二维码内容。
载荷字段尽量短(用缩写),因为 QR 码容量有限。
"""
payload = {
"t": cert.cargo.token, # 查询令牌
"c": cert.id, # 证书ID
"v": cert.cert_version, # 证书版本(防止旧版本被重放)
"iat": int(time.time()), # 签发时间戳
"k": settings.SIGNING_ACTIVE_KID # 密钥标识
}
body = canonical_bytes(payload) # 确定性JSON字节
kid, priv = _load_private_key()
sig = priv.sign(body) # 直接对载荷签名(非摘要)

# base64url 编码:URL安全,无填充符
return "qrc1." + base64url_encode(body) + "." + base64url_encode(sig)

注意这里签名的对象是载荷字节本身而非摘要,因为 QR 载荷本身就很短(通常不到 200 字节),没必要增加一层哈希。

2.3 扫码验证流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def verify_qr_payload(raw: str) -> dict:
"""
解析并验证 QR 载荷,返回解码后的 payload 字典。
任何验证失败都抛出 InvalidQRCode 异常。
"""
parts = raw.split(".")
if len(parts) != 3 or parts[0] != "qrc1":
raise InvalidQRCode("格式不合法")

body = base64url_decode(parts[1]) # 解码载荷
sig = base64url_decode(parts[2]) # 解码签名
payload = json.loads(body)

# 加载公钥并验签
kid = payload.get("k")
key_record = SigningKey.objects.get(kid=kid, is_active=True)
pub = load_public_key(key_record.public_key)

try:
pub.verify(sig, body)
except InvalidSignature:
raise InvalidQRCode("签名验证失败,该二维码可能被伪造")

return payload # 验证通过,返回 token 等信息供后续查询

这个设计让离线验证成为可能——只要客户端预置了公钥,不联网也能判断二维码是否由合法系统签发。


3. 审计哈希链(Audit Hash Chain)

3.1 设计理念

审计日志的核心需求是不可篡改。我们借鉴区块链的思路,在 AuditEntry 模型中实现了哈希链结构——每条记录的哈希值包含前一条记录的哈希,形成链式依赖。

1
2
3
4
5
6
Entry #1          Entry #2          Entry #3
┌──────────┐ ┌──────────┐ ┌──────────┐
│ data │ │ data │ │ data │
│ prev: 0x0│────>│ prev: H1 │────>│ prev: H2 │
│ curr: H1 │ │ curr: H2 │ │ curr: H3 │
└──────────┘ └──────────┘ └──────────┘

如果有人试图修改 Entry #2 的数据,其 curr_hash 就会改变,导致 Entry #3 的 prev_hash 不再匹配——篡改会像多米诺骨牌一样暴露整条链

3.2 模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class AuditEntry(models.Model):
"""
审计日志条目 —— 不可变、链式哈希。
"""
chain_seq = models.BigIntegerField(unique=True) # 单调递增序号
target_type = models.CharField(max_length=50) # 被审计对象类型
target_id = models.CharField(max_length=100) # 被审计对象ID
action = models.CharField(max_length=30) # 操作类型
actor = models.CharField(max_length=150) # 操作人
actor_ip = models.GenericIPAddressField(null=True) # 操作人IP
snapshot = models.JSONField(default=dict) # 操作后快照
diff = models.JSONField(default=dict) # 变更差异
prev_hash = models.CharField(max_length=64) # 前一条哈希
curr_hash = models.CharField(max_length=64) # 当前哈希
event_signature = models.TextField(blank=True, default="") # 可选:对事件签名
created_at = models.DateTimeField(auto_now_add=True)

def save(self, *args, **kwargs):
if self.pk and AuditEntry.objects.filter(pk=self.pk).exists():
raise PermissionError("审计条目不可修改") # 已有记录禁止更新
# 计算哈希链
last = AuditEntry.objects.order_by("-chain_seq").first()
self.chain_seq = (last.chain_seq + 1) if last else 1
self.prev_hash = last.curr_hash if last else ("0" * 64)
self.curr_hash = self._compute_hash()
super().save(*args, **kwargs)

def delete(self, *args, **kwargs):
raise PermissionError("审计条目不可删除") # 永远拒绝删除

def _compute_hash(self) -> str:
"""将关键字段与前一条哈希拼接后取 SHA-256"""
content = f"{self.chain_seq}|{self.target_type}|{self.target_id}|" \
f"{self.action}|{self.actor}|{json.dumps(self.snapshot, sort_keys=True)}|" \
f"{self.prev_hash}"
return hashlib.sha256(content.encode()).hexdigest()

3.3 信号驱动的自动审计

审计条目的创建由 Django 的 post_save 信号自动触发,开发者无需手动调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@receiver(post_save, sender=Certificate)
def audit_certificate_change(sender, instance, created, **kwargs):
"""
证书每次保存后自动写入审计日志。
操作人信息来自 CurrentActorMiddleware(基于 threading.local)。
"""
actor_info = get_current_actor() # 从线程局部变量获取当前用户和IP
AuditEntry.objects.create(
target_type="certificate",
target_id=str(instance.id),
action="create" if created else "update",
actor=actor_info.get("user", "system"),
actor_ip=actor_info.get("ip"),
snapshot=build_canonical_payload(instance),
diff=compute_field_diff(instance) if not created else {},
)

CurrentActorMiddleware 是一个简单的 Django 中间件,利用 threading.local() 将当前请求的用户名和 IP 地址注入线程上下文,使得信号处理器无需访问 request 对象也能获取操作人信息。


4. 证书不可变性守卫(Frozen Fields)

数字签名防止的是外部伪造,而不可变性守卫防止的是内部误改——即使是通过 Django Admin 或直接调用 ORM,也无法静默修改已发布证书的关键字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 已发布证书中不可修改的字段列表
FROZEN_FIELDS = [
"product_name", "batch_number", "standard_ref",
"result", "issuer_name", "issued_at",
]

# 允许的状态转换(仅用于复核和撤销流程)
ALLOWED_TRANSITIONS = {
("pass", "superseded"), # 换版:旧证书标记为已替代
("pass", "revoked"), # 撤销:发现问题后主动作废
}

class Certificate(models.Model):
# ... 字段定义 ...

def save(self, *args, **kwargs):
if self.pk: # 更新操作
old = Certificate.objects.get(pk=self.pk)

# 检查状态转换是否合法
if old.result != self.result:
transition = (old.result, self.result)
if transition not in ALLOWED_TRANSITIONS:
raise PermissionError(
f"不允许的状态转换: {old.result}{self.result}"
)

# 已发布证书的冻结字段检查
if old.published_at is not None:
for field in FROZEN_FIELDS:
old_val = getattr(old, field)
new_val = getattr(self, field)
if old_val != new_val:
raise PermissionError(
f"已发布证书的 '{field}' 字段不可修改 "
f"(原值={old_val!r}, 新值={new_val!r})"
)
super().save(*args, **kwargs)

这段代码的设计哲学是在 ORM 层做最后一道防线。无论调用方是 API 视图、管理后台还是 manage.py shell,只要试图修改已发布证书的冻结字段,都会收到 PermissionError


5. QR 扫码频率限制与风险标记

5.1 多层限流策略

系统在两个层面实施频率限制:

登录接口使用 django-ratelimit 装饰器:

1
2
3
4
5
6
from django_ratelimit.decorators import ratelimit

@ratelimit(key="ip", rate="5/m", method="POST", block=True)
def login_view(request):
# 每个 IP 每分钟最多 5 次登录尝试
...

QR 扫码接口使用自定义限流逻辑,提供更细粒度的控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# settings.py
QR_SCAN_RATE_WINDOW = 60 # 滑动窗口:60秒
QR_SCAN_RATE_LIMIT = 3 # 窗口内最大扫码次数
QR_SCAN_BLOCK_DURATION = 300 # 触发后封锁时长:300秒

def check_scan_rate(cargo, client_ip: str) -> bool:
"""
检查扫码频率。如果超限,封锁该货物的扫码入口。
返回 True 表示允许,False 表示已封锁。
"""
# 检查是否在封锁期内
if cargo.access_blocked_until and cargo.access_blocked_until > timezone.now():
return False

# 统计滑动窗口内的扫码次数
window_start = timezone.now() - timedelta(seconds=settings.QR_SCAN_RATE_WINDOW)
recent_count = ScanLog.objects.filter(
cargo=cargo, scanned_at__gte=window_start
).count()

if recent_count >= settings.QR_SCAN_RATE_LIMIT:
# 超限:封锁并标记风险
cargo.access_blocked_until = timezone.now() + timedelta(
seconds=settings.QR_SCAN_BLOCK_DURATION
)
cargo.save(update_fields=["access_blocked_until"])
return False

return True

5.2 扫码日志与风险标记

每次扫码都会写入 ScanLog,与审计日志一样不可删除:

1
2
3
4
5
6
7
8
9
10
class ScanLog(models.Model):
cargo = models.ForeignKey(Cargo, on_delete=models.CASCADE)
client_ip = models.GenericIPAddressField()
user_agent = models.TextField(blank=True)
scanned_at = models.DateTimeField(auto_now_add=True)
risk_flag = models.BooleanField(default=False) # 异常标记
risk_reason = models.CharField(max_length=200, blank=True)

def delete(self, *args, **kwargs):
raise PermissionError("扫码日志不可删除")

当检测到异常模式时(如短时间内同一 IP 扫描大量不同货物、非工作时间的批量扫码等),系统将 risk_flag 置为 True 并记录原因,供管理员在后台审查。

被封锁的货物在扫码时返回 HTTP 429:

1
2
3
4
5
if not check_scan_rate(cargo, client_ip):
return JsonResponse({
"error": "扫码过于频繁,请稍后再试",
"blocked_until": cargo.access_blocked_until.isoformat()
}, status=429)

6. 双人复核(Dual Control)

在高价值场景下,单人操作是审计和合规的重大隐患。系统通过 CERT_DUAL_CONTROL_ENABLED 配置项(默认开启)强制实施双人复核:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# settings.py
CERT_DUAL_CONTROL_ENABLED = True

# views.py — 审核接口
def review_certificate(request, cert_id):
cert = Certificate.objects.get(id=cert_id)

if settings.CERT_DUAL_CONTROL_ENABLED:
if cert.created_by == request.user:
return JsonResponse({
"error": "双人复核要求:审核人不能是证书的创建者"
}, status=403)

cert.reviewed_by = request.user
cert.reviewed_at = timezone.now()
cert.status = "approved"
cert.save()

这个简单的 if 判断实现了职责分离——创建者不能自己审核自己的证书。在企业合规审计中,这是最基本也最有效的防欺诈手段之一。该配置在开发环境可以关闭以简化测试流程,但生产环境务必保持开启。


7. 前端完整性展示

所有安全机制如果不向用户展示,就失去了一半的价值。CertView.vue 组件负责在扫码结果页呈现证书的完整性验证状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<template>
<div class="integrity-section">
<!-- 验证通过:绿色盾牌 -->
<div
v-if="integrity.status === 'verified'"
class="integrity-badge verified"
>
<ShieldCheckIcon class="icon-green" />
<span>数字签名验证通过</span>
</div>

<!-- 验证失败:红色警告 -->
<div
v-else-if="integrity.status === 'failed'"
class="integrity-badge failed"
>
<AlertTriangleIcon class="icon-red" />
<span>警告:签名验证失败,该证书可能被篡改</span>
</div>

<!-- 无签名:灰色提示 -->
<div v-else class="integrity-badge unsigned">
<ShieldOffIcon class="icon-gray" />
<span>该证书尚未签名</span>
</div>

<!-- 可展开的技术细节 -->
<details v-if="integrity.details" class="tech-details">
<summary>查看技术详情</summary>
<dl>
<dt>签名算法</dt>
<dd>{{ integrity.details.algorithm }}</dd>
<dt>密钥标识</dt>
<dd>{{ integrity.details.kid }}</dd>
<dt>内容摘要</dt>
<dd class="monospace">{{ integrity.details.digest }}</dd>
<dt>签名时间</dt>
<dd>{{ formatTime(integrity.details.signed_at) }}</dd>
</dl>
</details>
</div>
</template>

三种状态的视觉区分让普通消费者也能一眼判断证书是否可信:绿色盾牌代表安全,红色三角代表危险,灰色代表未知。可展开的技术详情则为专业用户提供了完整的验证信息,包括算法类型、密钥标识、摘要值和签名时间。


安全架构总览

将以上七层防护串联起来,形成纵深防御体系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────┐
│ 前端展示层 │
│ CertView.vue 完整性状态展示 │
├─────────────────────────────────────────────────────────┤
│ 接入控制层 │
│ django-ratelimit · QR扫码频率限制 · 风险标记 │
├─────────────────────────────────────────────────────────┤
│ 业务流程层 │
│ 双人复核 · 状态转换白名单 │
├─────────────────────────────────────────────────────────┤
│ 数据完整性层 │
│ Ed25519签名 · QR签名载荷 · 冻结字段守卫 │
├─────────────────────────────────────────────────────────┤
│ 审计追溯层 │
│ 哈希链审计日志 · 不可删除扫码记录 · 信号驱动自动记录 │
└─────────────────────────────────────────────────────────┘

每一层都独立生效:即使攻击者绕过了频率限制,伪造的二维码依然无法通过签名验证;即使内部人员试图修改已发布的证书,冻结字段守卫和审计哈希链会同时拦截和记录。这就是纵深防御的核心思想——不依赖任何单一机制,而是让多层防护相互补充。


小结与预告

本篇我们详细拆解了系统的七层安全架构:从 Ed25519 数字签名的每一行代码,到审计哈希链的不可篡改设计,再到前端的直观展示。这些机制共同确保了一个核心目标——每一张合格证从签发到查验的全生命周期都可验证、可追溯、不可伪造

在下一篇(也是本系列的最后一篇)中,我们将把目光从代码转向运维,聊聊部署与上线:Docker 容器化编排、Nginx 反向代理配置、环境变量管理、数据库迁移策略,以及生产环境的监控与告警方案。让这套系统真正跑在线上,服务真实用户。

下篇预告:从零搭建”一物一码”合格证查询系统——部署与上线