Skip to Content

2026年01月27日

本周工作计划与执行完成进度

中心服务平台 - 轻量化跨境分布式第二阶段改造

  • 目标: 控制变更范围,降低引入风险,实现面向客户可用的初步功能体验。
  • 进度: 已完成 75% | 计划完成 75%(🟢 未开始 | 🟡 进行中 | ⚠️ 部分完成 | ✅ 已完成 | 📌 已验证)
    • 📌 以往已完成的工作内容此处不再重复,详情请参阅上周及上月的。
    • 🟡 基于跨境服务器数据同步方案的设计文档,开展开发工作,共包含 78 项子任务:
      • ✅ 阶段 1: 项目设置(共享基础设施); 目的: 项目初始化和基础结构搭建
        • ✅ Task 001: 创建 datasync 包结构,路径:es-center-server-main-service/src/main/java/com/es/center/server/datasync/(包含 config/model/master/exporter/master/archive/master/schedule/metadata/trigger 子包)
        • ✅ Task 002: 添加 Apache Commons VFS 依赖到 build.gradle.kts(使用 libs.commons.vfs2 统一版本管理)
        • ✅ Task 003: 添加 Apache Commons Compress 依赖到 build.gradle.kts(使用 Spring Boot BOM 版本)
      • ✅ 阶段 2: 基础设施(阻塞前置条件); 目的: 核心基础设施,必须在任何用户故事实现前完成
        • ✅ Task 004: 创建 sync_log 表 SQL 脚本,路径:src/main/resources/db/03-version-2.0.1-20260120.sql
        • ✅ Task 005: 创建触发器函数 SQL 脚本,路径:src/main/resources/db/50-data_sync_trigger_1.sql
        • ✅ Task 006: 创建 SyncLog 简单 record,路径:src/main/java/com/es/center/server/datasync/model/SyncLog.java(使用 record,含 SyncLogId/tableName/opType/pkValue/rowData 字段)
        • ✅ Task 007: 创建 FileChecksum record(❌ 已移除 - FileChecksum.java 不需要,改用文件大小校验)
        • ✅ Task 008: 创建 DownloadResult record,路径:src/main/java/com/es/center/server/datasync/model/DownloadResult.java(date/type/timestamp/localPath)
        • ✅ Task 009: 创建 FileToDownload record(❌ 已移除 - 功能已合并至 DownloadResult)
        • ✅ Task 010: 创建 DataSyncConfig 配置类(含 @NacosConfig 注解),路径:src/main/java/com/es/center/server/datasync/config/DataSyncConfig.java(getNodeRole() 方法改为从 cloud-service.nodes 配置获取节点角色)
        • ✅ Task 011: 创建 DataSyncTableMetadataService 用于表元数据缓存,路径:src/main/java/com/es/center/server/datasync/metadata/(SQL 优化:直接在查询中排除 z_ 前缀、ddl_history、sync_log;表名验证:isValidTableName() 使用正则表达式)
        • ✅ Task 012: 创建 DataSyncTriggerInstaller 用于触发器安装,路径:src/main/java/com/es/center/server/datasync/trigger/(installTriggers() 返回 boolean;添加 tables 空值检查;ensureTriggerFunction 返回 boolean)
        • ✅ Task 013: 创建 DataSyncApplicationStartupRunner(❌ 已移除 - 功能迁移至 DataSyncMasterSchedule)
        • ✅ Task 014: 实现 SFTP 客户端 LoadingCache(支持动态配置刷新),在 DataSyncConfig 中(使用 Guava LoadingCache,5分钟过期,配置变化自动重建)
        • ✅ Task 015: 创建 DataSyncSftpDownloader,路径:src/main/java/com/es/center/server/datasync/secondary/download/
        • ✅ Task 016: 创建 DataSyncSecondarySchedule,路径:src/main/java/com/es/center/server/datasync/secondary/schedule/
      • ✅ 阶段 3: 用户故事 1 - 全量数据备份(优先级:P1)🎯 MVP; 目的: 实现每天自动执行完整数据备份,确保从节点拥有完整的数据副本
        • ✅ Task 017: 创建 DataSyncFullExporter,路径:src/main/java/com/es/center/server/datasync/master/exporter/(378 行代码,包含导出、校验、移动逻辑)
        • ✅ Task 018: 创建 DataSyncMasterArchiveService,路径:src/main/java/com/es/center/server/datasync/master/archive/(output 保留 2 天,archive 保留 3 天)
        • ✅ Task 019: 创建 DataSyncMasterSchedule,路径:src/main/java/com/es/center/server/datasync/master/schedule/(使用 @Scheduled 注解:initialDelay=180s, fixedDelay=3s)
        • ✅ Task 020: 实现按表导出逻辑,在 DataSyncFullExporter 中(分批 5000 条导出,JSONL.gz 格式)
        • ✅ Task 021: 为每个数据文件实现校验文件生成(❌ 已移除 - 改用文件大小校验,不再生成 checksum 文件)
        • ✅ Task 022: 实现基于 UTC 时间的每日全量导出触发(01:00 UTC 检查)(shouldExport() 判断 UTC 1 点后且当天未导出)
        • ✅ Task 023: 实现从生成目录到输出目录的原子性文件移动(moveToOutput() 原子性移动)
        • ✅ Task 024: 实现归档清理(output 保留 2 个日期,archive 保留 3 个日期)(archiveOutput() + cleanArchive())
        • ✅ Task 025: 为全量导出操作添加日志(开始/完成/失败)(使用 @Slf4j,ERROR 级别记录失败,INFO 记录开始/完成)
        • ✅ Task 027: 创建 DataSyncFullImporter,路径:src/main/java/com/es/center/server/datasync/secondary/importer/(先清空表,再批量插入,处理空文件)
        • ✅ Task 037: 在 DataSyncSecondarySchedule 中集成全量导入调用
      • ✅ 阶段 4: 用户故事 2 - 增量数据同步(优先级:P2); 目的: 实现实时捕获业务数据变更并快速同步到从节点,确保数据一致性
        • ✅ Task 026: 创建 DataSyncIncrementalExporter,路径:src/main/java/com/es/center/server/datasync/master/exporter/(使用 jdbcTemplate.queryForList 简化代码,row_data 保持 String 格式延迟解析)
        • ✅ Task 028: 创建 DataSyncIncrementalImporter,路径:src/main/java/com/es/center/server/datasync/secondary/importer/(支持 I/U/D 操作,批量处理,文件校验)
        • ✅ Task 029: 创建 DirectoryWatchService(❌ 已删除 - 改用定期扫描模式,由 DataSyncSecondarySchedule 统一调度)
        • ✅ Task 030: 实现增量导出逻辑(查询 sync_log 表)(在 DataSyncIncrementalExporter 中实现 queryPendingRecords)
        • ✅ Task 031: 实现增量文件生成成功后的 sync_log 删除(在 DataSyncIncrementalExporter 中实现 deleteExportedLogs)
        • ✅ Task 032: 实现增量导入(支持 I/U/D 操作处理)(在 DataSyncIncrementalImporter 中实现)
        • ✅ Task 033: 在 DataSyncIncrementalImporter 中实现批量插入/更新/删除操作(分批处理,每批 100 条)
        • ✅ Task 034: 实现目录监听服务(文件到达时触发导入)(DirectoryWatchService 已删除,改用定期扫描模式)
        • ✅ Task 035: 将增量导出集成到主节点调度服务(3 秒间隔)(在 DataSyncMasterSchedule 中集成)
        • ✅ Task 036: 为增量同步操作添加日志(ERROR 级别记录失败,INFO 记录开始/完成)
      • ✅ 阶段 5: 用户故事 3 - 故障自动恢复(优先级:P2); 目的: 实现服务重启后自动恢复同步状态,无需人工干预
        • ✅ Task 038: 主节点调度服务在每次任务执行前清理生成目录(在 DataSyncMasterSchedule.executeTask() 中实现)
        • ✅ Task 039: 从节点启动时清理下载目录(由 DataSyncSecondarySchedule 处理)
        • ✅ Task 040: 从节点启动时处理残留导入文件(由 DataSyncSecondarySchedule 处理)
        • ✅ Task 041: 为启动恢复操作添加日志(ERROR 级别记录失败,INFO 记录开始/完成)(在 DataSyncMasterSchedule 中添加 [启动恢复] 标签日志)
        • ✅ Task 042: 为不完整文件状态添加错误处理(在 DataSyncIncrementalImporter 中添加文件校验和错误处理)
        • ✅ Task 063: 实现禁用状态清理逻辑:当数据同步功能被禁用时,清理 ddl_history trigger 记录和 sync_log(在 DataSyncMasterSchedule.executeTask() 中使用 jdbcTemplate.batchUpdate 实现)
      • ✅ 阶段 6: 用户故事 4 - 数据完整性校验(优先级:P3); 目的: 实现同步数据的完整性验证,并在发现问题时发送告警
        • ✅ Task 044: 创建 DataSyncValidationService,路径:src/main/java/com/es/center/server/datasync/secondary/download/(支持文件大小校验、批量校验)
        • ✅ Task 045: 创建 FailedFileHandler,路径:src/main/java/com/es/center/server/datasync/util/(支持移动失败文件到 failed 目录,记录失败原因)
        • ✅ Task 046: 实现 SFTP 文件列表和下载逻辑(与 DataSyncSftpDownloader 集成)(在 DataSyncSftpDownloader 中实现)
        • ✅ Task 047: 实现文件大小校验(MD5 checksum 已移除,仅校验文件大小)(在 DataSyncValidationService 和下载服务中实现)
        • ✅ Task 048: 实现失败文件移动到失败目录(在导入服务和归档服务中实现)
        • ✅ Task 049: 实现校验失败的告警通知(使用 ERROR/WARN 日志记录告警,日志聚合系统可配置告警规则)
        • ✅ Task 050: 为校验和失败场景添加日志(使用 [文件校验]、[增量导入] 等标签)
      • ✅ 阶段 7: 用户故事 5 - 历史数据归档(优先级:P3); 目的: 实现自动清理过期的同步文件,避免占用过多存储空间
        • ✅ Task 051: 创建 DataSyncSecondaryArchiveService,路径:src/main/java/com/es/center/server/datasync/secondary/archive/(实现归档清理、失败目录管理)
        • ✅ Task 052: 实现从节点归档管理(archive 保留 3 个日期)(在 DataSyncSecondaryArchiveService 中实现 cleanArchive)
        • ✅ Task 053: 实现从节点失败目录清理(failed 保留 3 个日期)(在 DataSyncSecondaryArchiveService 中实现 cleanFailed)
        • ✅ Task 054: 实现成功导入文件移动到归档目录(在 DataSyncSecondaryArchiveService 中实现 moveToArchive)
        • ✅ Task 055: 实现失败导入文件移动到失败目录(在 DataSyncSecondaryArchiveService 中实现 moveToFailed)
        • ✅ Task 056: 为归档操作添加日志(使用 INFO/WARN 级别记录归档操作)
      • ✅ 阶段 8: 代码质量检查与跨领域优化; 目的: 影响多个用户故事的改进
        • ✅ Task 057: 为所有新增 Java 文件添加 Spotless 格式化(spotlessApply 执行成功)
        • ✅ Task 058: 验证所有 SQL 符合 constitution 要求(小写)(所有 SQL 使用小写关键字)
        • ✅ Task 059: 验证所有 Setter 注入遵循字段名 = Bean 名约定(无字段注入违规)
        • ✅ Task 060: 验证所有日志使用 @Slf4j,而非 System.out.println(无违规)
        • ✅ Task 061: 验证所有空值检查使用 StringUtils.isBlank,而非手写检查(代码规范)
        • ✅ Task 062: 验证所有返回使用 Optional.empty() 而非 null(适用场景正确使用)
        • ✅ Task 063: 为所有公共 API 添加文档注释(所有服务类有完整 JavaDoc)
        • ✅ Task 064: 运行 quickstart.md 验证 - 同步功能的端到端测试(创建 DataSyncValidator 验证工具类和 DataSyncValidationController REST API)
      • ✅ 阶段 9: 代码重构; 目的: 提高代码可读性和可维护性
        • ✅ Task 063: 创建 DataSyncFileValidators 验证工具类,路径:src/main/java/com/es/center/server/datasync/util/(统一文件名和目录格式验证逻辑:isValidDate, isValidTimestamp, isValidSyncType, isValidDataFile)
        • ✅ Task 064: 重构 DataSyncSftpDownloader.scanRemoteFiles 方法(将四层嵌套循环提取为多个递归方法,嵌套层级从4层降到2层,新增 closeQuietly 工具方法)
        • ✅ Task 065: 简化 DataSyncIncrementalImporter 批处理方法(提取公共批处理逻辑,使用 ThrowingConsumer 函数式接口,消除重复代码)
        • ✅ Task 066: 更新过时注释(修正 DataSyncConfig.java 和 DataSyncSecondarySchedule.java 中的过时注释)
        • ✅ Task 067: 代码清理(删除冗余的 moveToFailed 重载方法,简化 finally 块注释)
        • ✅ Task 068: 文档同步更新(更新 specs/001-cross-border-data-sync 文档)
        • ✅ Task 069: 创建 DataSyncFileUtils 工具类,路径:src/main/java/com/es/center/server/datasync/util/(统一管理日期格式化常量 DATE_FORMATTER/TIME_FORMATTER、文件移动方法 moveToOutput)
        • ✅ Task 070: 重构 DataSyncFullExporter(删除重复的 DateTimeFormatter 常量和 moveToOutput 方法,使用 DataSyncFileUtils 统一引用)
        • ✅ Task 071: 重构 DataSyncIncrementalExporter(删除重复的 DateTimeFormatter 常量和 moveToOutput 方法,使用 DataSyncFileUtils 统一引用)
        • ✅ Task 072: 重构 DataSyncSftpDownloader(移除 DataSyncFileUtils.closeQuietly 调用,内联实现 closeArrayQuietly;增强 filterFilesToDownload 检查 input/archive/failed 目录防重复下载)
        • ✅ Task 073: 重构 FailedFileHandler(删除重复的 DateTimeFormatter 常量,使用 DataSyncFileUtils 统一引用)
        • ✅ Task 074: 文档同步更新(更新 specs/001-cross-border-data-sync/plan.md 文档结构和修订记录)
        • ✅ Task 075: 重构 FailedFileHandler(moveToFailed 改为从路径提取 date/type/timestamp,与远程目录结构保持一致;移除 LocalDateTime.now() 的使用)
        • ✅ Task 076: 文档一致性修复:删除 data-model.md 中的 FileToDownload 定义;更新 SyncLog 为 record 类型;统一日期格式验证描述
        • ✅ Task 077: 文档一致性修复:删除 quickstart.md 中未实现的 REST API 端点描述;移除 spec.md 中的 [待实现] 标记
        • ✅ Task 078: 文档同步更新:更新 tasks.md 添加文档修复任务记录

其它

  • 目标: 记录日常耗时的杂项工作、协作事项,以及必要的沟通、讨论等隐性事务。
  • 进度: 属非计划内事务(🟢 未开始 | 🟡 进行中 | ⚠️ 部分完成 | ✅ 已完成 | 📌 已验证)
    • 📌 以往已完成的工作内容此处不再重复,详情请参阅上周及上月的。
    • 27 日
      • 制作最新更新包并部署到 zxs-cs 服务器,用于主从节点数据同步联调。

总结

  • PostgreSQL 中为父表添加的触发器不会自动应用到子表。新建子表时需要同步添加触发器,否则无法收集相关操作的增删改记录。
  • 轻量化跨境分布式第二阶段改造:主线流程已打通
    • 已完成
      • 主节点:全量和增量数据导出(压缩文件)
      • 从节点:下载全量和增量数据,导入到数据库
      • 保障机制:主从节点失败重试、防重、归档、过期清理均已实现
    • 待完成
      • 从节点导入新数据时的通知机制
        • 导入新用户、设备及采集服务器数据后,创建对应的 MQTT 账号
        • 导入新国际化数据后,生成对应的国际化 JSON 文件

备忘

Last updated on