从零搭建”一物一码”合格证查询系统——安全与防篡改设计
本文是系列教程的第 4 篇。前三篇我们完成了需求分析、数据建模与前后端开发;本篇将深入系统中最核心的安全架构——数字签名、审计哈希链、不可变性守卫、频率限制与双人复核机制。如果你对”如何让一张电子合格证无法被伪造”这个问题感兴趣,这篇文章值得你从头读到尾。
1. Ed25519 数字签名
1.1 为什么选 Ed25519?
在非对称签名算法的选择上,我们放弃了 RSA 和 ECDSA,转而采用 Ed25519。理由如下:
| 维度 |
RSA-2048 |
ECDSA (P-256) |
Ed25519 |
| 公钥长度 |
256 bytes |
64 bytes |
32 bytes |
| 签名长度 |
256 bytes |
64 bytes |
64 bytes |
| 签名速度 |
慢 |
中等 |
极快 |
| 参数选择 |
需选 e/padding |
需选曲线 |
无需任何参数 |
| 时序攻击 |
需额外防护 |
需额外防护 |
天然免疫 |
对于合格证这种需要批量签发、移动端快速验证的场景,Ed25519 的 32 字节密钥和微秒级签名速度是决定性优势。更重要的是,它不需要开发者做任何参数选择——消除了”选错曲线”或”用错 padding”这类隐患。
1.2 签名流程详解
整个签名过程在 signing.py 模块中实现,分为四个确定性步骤:
1
| 证书对象 → 构建规范载荷 → 序列化为字节 → 计算摘要 → Ed25519签名
|
第一步:构建规范载荷(Canonical Payload)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def build_canonical_payload(cert) -> dict: """ 从证书对象中提取需要签名的字段,构建确定性字典。 只包含业务关键字段,排除签名本身等元数据。 """ return { "cert_id": str(cert.id), "cert_version": cert.cert_version, "product_name": cert.product_name, "batch_number": cert.batch_number, "standard_ref": cert.standard_ref, "result": cert.result, "issued_at": cert.issued_at.isoformat() if cert.issued_at else None, "issuer_name": cert.issuer_name, }
|
这里的关键设计是只提取业务字段。签名字段(signature、content_digest 等)不参与载荷构建,否则会形成循环依赖。
第二步:确定性序列化
1 2 3 4 5 6 7 8 9
| def canonical_bytes(payload: dict) -> bytes: """ 将字典序列化为确定性的 JSON 字节串。 sort_keys 保证字段顺序一致;紧凑 separators 消除空格差异。 任何平台、任何语言对同一数据都能产出完全相同的字节。 """ return json.dumps( payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False ).encode("utf-8")
|
为什么要如此严格?因为 {"a":1,"b":2} 和 {"b": 2, "a": 1} 在语义上相同,但字节完全不同。如果不做规范化,同一张证书每次序列化可能得到不同字节,签名验证就会随机失败。
第三步:计算摘要
1 2 3 4 5 6 7
| def compute_digest(payload: dict) -> str: """ 对规范化字节计算 SHA-256 摘要,返回十六进制字符串。 摘要同时存入证书记录,方便快速比对。 """ raw = canonical_bytes(payload) return hashlib.sha256(raw).hexdigest()
|
第四步:签名并写入证书
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| def sign_certificate(cert) -> None: """ 完整签名流程:加载私钥 → 构建载荷 → 计算摘要 → 签名 → 写回证书。 签名对象是摘要字符串的 UTF-8 编码,而非原始载荷字节。 """ kid, priv = _load_private_key() payload = build_canonical_payload(cert) digest = compute_digest(payload) sig = priv.sign(digest.encode()) cert.content_digest = digest cert.signature = base64.b64encode(sig).decode() cert.signature_algorithm = "ed25519" cert.signature_kid = kid cert.signed_at = timezone.now()
|
1.3 验证流程
验证是签名的逆过程,任何人(包括前端)都可以执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| def verify_certificate(cert) -> bool: """ 验证三步走: 1. 重新计算摘要,与存储的摘要比对(检测字段篡改) 2. 根据 kid 从数据库加载对应公钥 3. 用公钥验证签名(检测摘要/签名篡改) """ payload = build_canonical_payload(cert) digest = compute_digest(payload)
if digest != cert.content_digest: return False
key_record = SigningKey.objects.get(kid=cert.signature_kid, is_active=True) pub = Ed25519PublicKey.from_public_bytes(base64.b64decode(key_record.public_key))
try: pub.verify(base64.b64decode(cert.signature), digest.encode()) return True except InvalidSignature: return False
|
1.4 密钥管理
当前实现采用环境变量存储私钥,公钥存入 SigningKey 数据模型:
1 2 3
| # .env(开发环境) SIGNING_PRIVATE_KEY=base64编码的私钥 SIGNING_ACTIVE_KID=key-2024-001
|
SigningKey 模型存储公钥及元数据(kid、创建时间、是否激活),支持密钥轮换——新密钥上线后旧证书仍可用旧公钥验证。生产环境强烈建议将私钥迁移至 AWS KMS、HashiCorp Vault 等硬件安全模块,环境变量方案仅适用于开发测试。
2. 二维码签名载荷
2.1 从明文 URL 到签名载荷
传统做法是让二维码包含一个查询 URL,例如 https://example.com/check?token=abc123。问题在于:任何人都能构造这样的 URL。攻击者只需猜测或枚举 token 格式,就能生成看似合法的二维码。
我们的方案是让二维码本身携带签名信息,格式如下:
1
| qrc1.<base64url编码的JSON载荷>.<base64url编码的签名>
|
前缀 qrc1 是协议版本号,方便未来升级解析逻辑而不破坏旧二维码的兼容性。
2.2 载荷构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def build_qr_payload(cert) -> str: """ 构建签名二维码内容。 载荷字段尽量短(用缩写),因为 QR 码容量有限。 """ payload = { "t": cert.cargo.token, "c": cert.id, "v": cert.cert_version, "iat": int(time.time()), "k": settings.SIGNING_ACTIVE_KID } body = canonical_bytes(payload) kid, priv = _load_private_key() sig = priv.sign(body)
return "qrc1." + base64url_encode(body) + "." + base64url_encode(sig)
|
注意这里签名的对象是载荷字节本身而非摘要,因为 QR 载荷本身就很短(通常不到 200 字节),没必要增加一层哈希。
2.3 扫码验证流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| def verify_qr_payload(raw: str) -> dict: """ 解析并验证 QR 载荷,返回解码后的 payload 字典。 任何验证失败都抛出 InvalidQRCode 异常。 """ parts = raw.split(".") if len(parts) != 3 or parts[0] != "qrc1": raise InvalidQRCode("格式不合法")
body = base64url_decode(parts[1]) sig = base64url_decode(parts[2]) payload = json.loads(body)
kid = payload.get("k") key_record = SigningKey.objects.get(kid=kid, is_active=True) pub = load_public_key(key_record.public_key)
try: pub.verify(sig, body) except InvalidSignature: raise InvalidQRCode("签名验证失败,该二维码可能被伪造")
return payload
|
这个设计让离线验证成为可能——只要客户端预置了公钥,不联网也能判断二维码是否由合法系统签发。
3. 审计哈希链(Audit Hash Chain)
3.1 设计理念
审计日志的核心需求是不可篡改。我们借鉴区块链的思路,在 AuditEntry 模型中实现了哈希链结构——每条记录的哈希值包含前一条记录的哈希,形成链式依赖。
1 2 3 4 5 6
| Entry #1 Entry #2 Entry #3 ┌──────────┐ ┌──────────┐ ┌──────────┐ │ data │ │ data │ │ data │ │ prev: 0x0│────>│ prev: H1 │────>│ prev: H2 │ │ curr: H1 │ │ curr: H2 │ │ curr: H3 │ └──────────┘ └──────────┘ └──────────┘
|
如果有人试图修改 Entry #2 的数据,其 curr_hash 就会改变,导致 Entry #3 的 prev_hash 不再匹配——篡改会像多米诺骨牌一样暴露整条链。
3.2 模型定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| class AuditEntry(models.Model): """ 审计日志条目 —— 不可变、链式哈希。 """ chain_seq = models.BigIntegerField(unique=True) target_type = models.CharField(max_length=50) target_id = models.CharField(max_length=100) action = models.CharField(max_length=30) actor = models.CharField(max_length=150) actor_ip = models.GenericIPAddressField(null=True) snapshot = models.JSONField(default=dict) diff = models.JSONField(default=dict) prev_hash = models.CharField(max_length=64) curr_hash = models.CharField(max_length=64) event_signature = models.TextField(blank=True, default="") created_at = models.DateTimeField(auto_now_add=True)
def save(self, *args, **kwargs): if self.pk and AuditEntry.objects.filter(pk=self.pk).exists(): raise PermissionError("审计条目不可修改") last = AuditEntry.objects.order_by("-chain_seq").first() self.chain_seq = (last.chain_seq + 1) if last else 1 self.prev_hash = last.curr_hash if last else ("0" * 64) self.curr_hash = self._compute_hash() super().save(*args, **kwargs)
def delete(self, *args, **kwargs): raise PermissionError("审计条目不可删除")
def _compute_hash(self) -> str: """将关键字段与前一条哈希拼接后取 SHA-256""" content = f"{self.chain_seq}|{self.target_type}|{self.target_id}|" \ f"{self.action}|{self.actor}|{json.dumps(self.snapshot, sort_keys=True)}|" \ f"{self.prev_hash}" return hashlib.sha256(content.encode()).hexdigest()
|
3.3 信号驱动的自动审计
审计条目的创建由 Django 的 post_save 信号自动触发,开发者无需手动调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @receiver(post_save, sender=Certificate) def audit_certificate_change(sender, instance, created, **kwargs): """ 证书每次保存后自动写入审计日志。 操作人信息来自 CurrentActorMiddleware(基于 threading.local)。 """ actor_info = get_current_actor() AuditEntry.objects.create( target_type="certificate", target_id=str(instance.id), action="create" if created else "update", actor=actor_info.get("user", "system"), actor_ip=actor_info.get("ip"), snapshot=build_canonical_payload(instance), diff=compute_field_diff(instance) if not created else {}, )
|
CurrentActorMiddleware 是一个简单的 Django 中间件,利用 threading.local() 将当前请求的用户名和 IP 地址注入线程上下文,使得信号处理器无需访问 request 对象也能获取操作人信息。
4. 证书不可变性守卫(Frozen Fields)
数字签名防止的是外部伪造,而不可变性守卫防止的是内部误改——即使是通过 Django Admin 或直接调用 ORM,也无法静默修改已发布证书的关键字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| FROZEN_FIELDS = [ "product_name", "batch_number", "standard_ref", "result", "issuer_name", "issued_at", ]
ALLOWED_TRANSITIONS = { ("pass", "superseded"), ("pass", "revoked"), }
class Certificate(models.Model):
def save(self, *args, **kwargs): if self.pk: old = Certificate.objects.get(pk=self.pk)
if old.result != self.result: transition = (old.result, self.result) if transition not in ALLOWED_TRANSITIONS: raise PermissionError( f"不允许的状态转换: {old.result} → {self.result}" )
if old.published_at is not None: for field in FROZEN_FIELDS: old_val = getattr(old, field) new_val = getattr(self, field) if old_val != new_val: raise PermissionError( f"已发布证书的 '{field}' 字段不可修改 " f"(原值={old_val!r}, 新值={new_val!r})" ) super().save(*args, **kwargs)
|
这段代码的设计哲学是在 ORM 层做最后一道防线。无论调用方是 API 视图、管理后台还是 manage.py shell,只要试图修改已发布证书的冻结字段,都会收到 PermissionError。
5. QR 扫码频率限制与风险标记
5.1 多层限流策略
系统在两个层面实施频率限制:
登录接口使用 django-ratelimit 装饰器:
1 2 3 4 5 6
| from django_ratelimit.decorators import ratelimit
@ratelimit(key="ip", rate="5/m", method="POST", block=True) def login_view(request): ...
|
QR 扫码接口使用自定义限流逻辑,提供更细粒度的控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| QR_SCAN_RATE_WINDOW = 60 QR_SCAN_RATE_LIMIT = 3 QR_SCAN_BLOCK_DURATION = 300
def check_scan_rate(cargo, client_ip: str) -> bool: """ 检查扫码频率。如果超限,封锁该货物的扫码入口。 返回 True 表示允许,False 表示已封锁。 """ if cargo.access_blocked_until and cargo.access_blocked_until > timezone.now(): return False
window_start = timezone.now() - timedelta(seconds=settings.QR_SCAN_RATE_WINDOW) recent_count = ScanLog.objects.filter( cargo=cargo, scanned_at__gte=window_start ).count()
if recent_count >= settings.QR_SCAN_RATE_LIMIT: cargo.access_blocked_until = timezone.now() + timedelta( seconds=settings.QR_SCAN_BLOCK_DURATION ) cargo.save(update_fields=["access_blocked_until"]) return False
return True
|
5.2 扫码日志与风险标记
每次扫码都会写入 ScanLog,与审计日志一样不可删除:
1 2 3 4 5 6 7 8 9 10
| class ScanLog(models.Model): cargo = models.ForeignKey(Cargo, on_delete=models.CASCADE) client_ip = models.GenericIPAddressField() user_agent = models.TextField(blank=True) scanned_at = models.DateTimeField(auto_now_add=True) risk_flag = models.BooleanField(default=False) risk_reason = models.CharField(max_length=200, blank=True)
def delete(self, *args, **kwargs): raise PermissionError("扫码日志不可删除")
|
当检测到异常模式时(如短时间内同一 IP 扫描大量不同货物、非工作时间的批量扫码等),系统将 risk_flag 置为 True 并记录原因,供管理员在后台审查。
被封锁的货物在扫码时返回 HTTP 429:
1 2 3 4 5
| if not check_scan_rate(cargo, client_ip): return JsonResponse({ "error": "扫码过于频繁,请稍后再试", "blocked_until": cargo.access_blocked_until.isoformat() }, status=429)
|
6. 双人复核(Dual Control)
在高价值场景下,单人操作是审计和合规的重大隐患。系统通过 CERT_DUAL_CONTROL_ENABLED 配置项(默认开启)强制实施双人复核:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CERT_DUAL_CONTROL_ENABLED = True
def review_certificate(request, cert_id): cert = Certificate.objects.get(id=cert_id)
if settings.CERT_DUAL_CONTROL_ENABLED: if cert.created_by == request.user: return JsonResponse({ "error": "双人复核要求:审核人不能是证书的创建者" }, status=403)
cert.reviewed_by = request.user cert.reviewed_at = timezone.now() cert.status = "approved" cert.save()
|
这个简单的 if 判断实现了职责分离——创建者不能自己审核自己的证书。在企业合规审计中,这是最基本也最有效的防欺诈手段之一。该配置在开发环境可以关闭以简化测试流程,但生产环境务必保持开启。
7. 前端完整性展示
所有安全机制如果不向用户展示,就失去了一半的价值。CertView.vue 组件负责在扫码结果页呈现证书的完整性验证状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| <template> <div class="integrity-section"> <!-- 验证通过:绿色盾牌 --> <div v-if="integrity.status === 'verified'" class="integrity-badge verified" > <ShieldCheckIcon class="icon-green" /> <span>数字签名验证通过</span> </div>
<!-- 验证失败:红色警告 --> <div v-else-if="integrity.status === 'failed'" class="integrity-badge failed" > <AlertTriangleIcon class="icon-red" /> <span>警告:签名验证失败,该证书可能被篡改</span> </div>
<!-- 无签名:灰色提示 --> <div v-else class="integrity-badge unsigned"> <ShieldOffIcon class="icon-gray" /> <span>该证书尚未签名</span> </div>
<!-- 可展开的技术细节 --> <details v-if="integrity.details" class="tech-details"> <summary>查看技术详情</summary> <dl> <dt>签名算法</dt> <dd>{{ integrity.details.algorithm }}</dd> <dt>密钥标识</dt> <dd>{{ integrity.details.kid }}</dd> <dt>内容摘要</dt> <dd class="monospace">{{ integrity.details.digest }}</dd> <dt>签名时间</dt> <dd>{{ formatTime(integrity.details.signed_at) }}</dd> </dl> </details> </div> </template>
|
三种状态的视觉区分让普通消费者也能一眼判断证书是否可信:绿色盾牌代表安全,红色三角代表危险,灰色代表未知。可展开的技术详情则为专业用户提供了完整的验证信息,包括算法类型、密钥标识、摘要值和签名时间。
安全架构总览
将以上七层防护串联起来,形成纵深防御体系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ┌─────────────────────────────────────────────────────────┐ │ 前端展示层 │ │ CertView.vue 完整性状态展示 │ ├─────────────────────────────────────────────────────────┤ │ 接入控制层 │ │ django-ratelimit · QR扫码频率限制 · 风险标记 │ ├─────────────────────────────────────────────────────────┤ │ 业务流程层 │ │ 双人复核 · 状态转换白名单 │ ├─────────────────────────────────────────────────────────┤ │ 数据完整性层 │ │ Ed25519签名 · QR签名载荷 · 冻结字段守卫 │ ├─────────────────────────────────────────────────────────┤ │ 审计追溯层 │ │ 哈希链审计日志 · 不可删除扫码记录 · 信号驱动自动记录 │ └─────────────────────────────────────────────────────────┘
|
每一层都独立生效:即使攻击者绕过了频率限制,伪造的二维码依然无法通过签名验证;即使内部人员试图修改已发布的证书,冻结字段守卫和审计哈希链会同时拦截和记录。这就是纵深防御的核心思想——不依赖任何单一机制,而是让多层防护相互补充。
小结与预告
本篇我们详细拆解了系统的七层安全架构:从 Ed25519 数字签名的每一行代码,到审计哈希链的不可篡改设计,再到前端的直观展示。这些机制共同确保了一个核心目标——每一张合格证从签发到查验的全生命周期都可验证、可追溯、不可伪造。
在下一篇(也是本系列的最后一篇)中,我们将把目光从代码转向运维,聊聊部署与上线:Docker 容器化编排、Nginx 反向代理配置、环境变量管理、数据库迁移策略,以及生产环境的监控与告警方案。让这套系统真正跑在线上,服务真实用户。
下篇预告:从零搭建”一物一码”合格证查询系统——部署与上线