2026年01月27日
本周工作计划与执行完成进度
中心服务平台 - 轻量化跨境分布式第二阶段改造
- 目标: 控制变更范围,降低引入风险,实现面向客户可用的初步功能体验。
- 进度: 已完成 75% | 计划完成 75%(🟢 未开始 | 🟡 进行中 | ⚠️ 部分完成 | ✅ 已完成 | 📌 已验证)
- 📌 以往已完成的工作内容此处不再重复,详情请参阅上周及上月的。
- 🟡 基于跨境服务器数据同步方案的设计文档,开展开发工作,共包含 78 项子任务:
- ✅ 阶段 1: 项目设置(共享基础设施); 目的: 项目初始化和基础结构搭建
- ✅ Task 001: 创建 datasync 包结构,路径:es-center-server-main-service/src/main/java/com/es/center/server/datasync/(包含 config/model/master/exporter/master/archive/master/schedule/metadata/trigger 子包)
- ✅ Task 002: 添加 Apache Commons VFS 依赖到 build.gradle.kts(使用 libs.commons.vfs2 统一版本管理)
- ✅ Task 003: 添加 Apache Commons Compress 依赖到 build.gradle.kts(使用 Spring Boot BOM 版本)
- ✅ 阶段 2: 基础设施(阻塞前置条件); 目的: 核心基础设施,必须在任何用户故事实现前完成
- ✅ Task 004: 创建 sync_log 表 SQL 脚本,路径:src/main/resources/db/03-version-2.0.1-20260120.sql
- ✅ Task 005: 创建触发器函数 SQL 脚本,路径:src/main/resources/db/50-data_sync_trigger_1.sql
- ✅ Task 006: 创建 SyncLog 简单 record,路径:src/main/java/com/es/center/server/datasync/model/SyncLog.java(使用 record,含 SyncLogId/tableName/opType/pkValue/rowData 字段)
- ✅ Task 007:
创建 FileChecksum record(❌ 已移除 - FileChecksum.java 不需要,改用文件大小校验) - ✅ Task 008: 创建 DownloadResult record,路径:src/main/java/com/es/center/server/datasync/model/DownloadResult.java(date/type/timestamp/localPath)
- ✅ Task 009:
创建 FileToDownload record(❌ 已移除 - 功能已合并至 DownloadResult) - ✅ Task 010: 创建 DataSyncConfig 配置类(含 @NacosConfig 注解),路径:src/main/java/com/es/center/server/datasync/config/DataSyncConfig.java(getNodeRole() 方法改为从 cloud-service.nodes 配置获取节点角色)
- ✅ Task 011: 创建 DataSyncTableMetadataService 用于表元数据缓存,路径:src/main/java/com/es/center/server/datasync/metadata/(SQL 优化:直接在查询中排除 z_ 前缀、ddl_history、sync_log;表名验证:isValidTableName() 使用正则表达式)
- ✅ Task 012: 创建 DataSyncTriggerInstaller 用于触发器安装,路径:src/main/java/com/es/center/server/datasync/trigger/(installTriggers() 返回 boolean;添加 tables 空值检查;ensureTriggerFunction 返回 boolean)
- ✅ Task 013:
创建 DataSyncApplicationStartupRunner(❌ 已移除 - 功能迁移至 DataSyncMasterSchedule) - ✅ Task 014: 实现 SFTP 客户端 LoadingCache(支持动态配置刷新),在 DataSyncConfig 中(使用 Guava LoadingCache,5分钟过期,配置变化自动重建)
- ✅ Task 015: 创建 DataSyncSftpDownloader,路径:src/main/java/com/es/center/server/datasync/secondary/download/
- ✅ Task 016: 创建 DataSyncSecondarySchedule,路径:src/main/java/com/es/center/server/datasync/secondary/schedule/
- ✅ 阶段 3: 用户故事 1 - 全量数据备份(优先级:P1)🎯 MVP; 目的: 实现每天自动执行完整数据备份,确保从节点拥有完整的数据副本
- ✅ Task 017: 创建 DataSyncFullExporter,路径:src/main/java/com/es/center/server/datasync/master/exporter/(378 行代码,包含导出、校验、移动逻辑)
- ✅ Task 018: 创建 DataSyncMasterArchiveService,路径:src/main/java/com/es/center/server/datasync/master/archive/(output 保留 2 天,archive 保留 3 天)
- ✅ Task 019: 创建 DataSyncMasterSchedule,路径:src/main/java/com/es/center/server/datasync/master/schedule/(使用 @Scheduled 注解:initialDelay=180s, fixedDelay=3s)
- ✅ Task 020: 实现按表导出逻辑,在 DataSyncFullExporter 中(分批 5000 条导出,JSONL.gz 格式)
- ✅ Task 021:
为每个数据文件实现校验文件生成(❌ 已移除 - 改用文件大小校验,不再生成 checksum 文件) - ✅ Task 022: 实现基于 UTC 时间的每日全量导出触发(01:00 UTC 检查)(shouldExport() 判断 UTC 1 点后且当天未导出)
- ✅ Task 023: 实现从生成目录到输出目录的原子性文件移动(moveToOutput() 原子性移动)
- ✅ Task 024: 实现归档清理(output 保留 2 个日期,archive 保留 3 个日期)(archiveOutput() + cleanArchive())
- ✅ Task 025: 为全量导出操作添加日志(开始/完成/失败)(使用 @Slf4j,ERROR 级别记录失败,INFO 记录开始/完成)
- ✅ Task 027: 创建 DataSyncFullImporter,路径:src/main/java/com/es/center/server/datasync/secondary/importer/(先清空表,再批量插入,处理空文件)
- ✅ Task 037: 在 DataSyncSecondarySchedule 中集成全量导入调用
- ✅ 阶段 4: 用户故事 2 - 增量数据同步(优先级:P2); 目的: 实现实时捕获业务数据变更并快速同步到从节点,确保数据一致性
- ✅ Task 026: 创建 DataSyncIncrementalExporter,路径:src/main/java/com/es/center/server/datasync/master/exporter/(使用 jdbcTemplate.queryForList 简化代码,row_data 保持 String 格式延迟解析)
- ✅ Task 028: 创建 DataSyncIncrementalImporter,路径:src/main/java/com/es/center/server/datasync/secondary/importer/(支持 I/U/D 操作,批量处理,文件校验)
- ✅ Task 029:
创建 DirectoryWatchService(❌ 已删除 - 改用定期扫描模式,由 DataSyncSecondarySchedule 统一调度) - ✅ Task 030: 实现增量导出逻辑(查询 sync_log 表)(在 DataSyncIncrementalExporter 中实现 queryPendingRecords)
- ✅ Task 031: 实现增量文件生成成功后的 sync_log 删除(在 DataSyncIncrementalExporter 中实现 deleteExportedLogs)
- ✅ Task 032: 实现增量导入(支持 I/U/D 操作处理)(在 DataSyncIncrementalImporter 中实现)
- ✅ Task 033: 在 DataSyncIncrementalImporter 中实现批量插入/更新/删除操作(分批处理,每批 100 条)
- ✅ Task 034: 实现目录监听服务(文件到达时触发导入)(DirectoryWatchService 已删除,改用定期扫描模式)
- ✅ Task 035: 将增量导出集成到主节点调度服务(3 秒间隔)(在 DataSyncMasterSchedule 中集成)
- ✅ Task 036: 为增量同步操作添加日志(ERROR 级别记录失败,INFO 记录开始/完成)
- ✅ 阶段 5: 用户故事 3 - 故障自动恢复(优先级:P2); 目的: 实现服务重启后自动恢复同步状态,无需人工干预
- ✅ Task 038: 主节点调度服务在每次任务执行前清理生成目录(在 DataSyncMasterSchedule.executeTask() 中实现)
- ✅ Task 039: 从节点启动时清理下载目录(由 DataSyncSecondarySchedule 处理)
- ✅ Task 040: 从节点启动时处理残留导入文件(由 DataSyncSecondarySchedule 处理)
- ✅ Task 041: 为启动恢复操作添加日志(ERROR 级别记录失败,INFO 记录开始/完成)(在 DataSyncMasterSchedule 中添加 [启动恢复] 标签日志)
- ✅ Task 042: 为不完整文件状态添加错误处理(在 DataSyncIncrementalImporter 中添加文件校验和错误处理)
- ✅ Task 063: 实现禁用状态清理逻辑:当数据同步功能被禁用时,清理 ddl_history trigger 记录和 sync_log(在 DataSyncMasterSchedule.executeTask() 中使用 jdbcTemplate.batchUpdate 实现)
- ✅ 阶段 6: 用户故事 4 - 数据完整性校验(优先级:P3); 目的: 实现同步数据的完整性验证,并在发现问题时发送告警
- ✅ Task 044: 创建 DataSyncValidationService,路径:src/main/java/com/es/center/server/datasync/secondary/download/(支持文件大小校验、批量校验)
- ✅ Task 045: 创建 FailedFileHandler,路径:src/main/java/com/es/center/server/datasync/util/(支持移动失败文件到 failed 目录,记录失败原因)
- ✅ Task 046: 实现 SFTP 文件列表和下载逻辑(与 DataSyncSftpDownloader 集成)(在 DataSyncSftpDownloader 中实现)
- ✅ Task 047: 实现文件大小校验(MD5 checksum 已移除,仅校验文件大小)(在 DataSyncValidationService 和下载服务中实现)
- ✅ Task 048: 实现失败文件移动到失败目录(在导入服务和归档服务中实现)
- ✅ Task 049: 实现校验失败的告警通知(使用 ERROR/WARN 日志记录告警,日志聚合系统可配置告警规则)
- ✅ Task 050: 为校验和失败场景添加日志(使用 [文件校验]、[增量导入] 等标签)
- ✅ 阶段 7: 用户故事 5 - 历史数据归档(优先级:P3); 目的: 实现自动清理过期的同步文件,避免占用过多存储空间
- ✅ Task 051: 创建 DataSyncSecondaryArchiveService,路径:src/main/java/com/es/center/server/datasync/secondary/archive/(实现归档清理、失败目录管理)
- ✅ Task 052: 实现从节点归档管理(archive 保留 3 个日期)(在 DataSyncSecondaryArchiveService 中实现 cleanArchive)
- ✅ Task 053: 实现从节点失败目录清理(failed 保留 3 个日期)(在 DataSyncSecondaryArchiveService 中实现 cleanFailed)
- ✅ Task 054: 实现成功导入文件移动到归档目录(在 DataSyncSecondaryArchiveService 中实现 moveToArchive)
- ✅ Task 055: 实现失败导入文件移动到失败目录(在 DataSyncSecondaryArchiveService 中实现 moveToFailed)
- ✅ Task 056: 为归档操作添加日志(使用 INFO/WARN 级别记录归档操作)
- ✅ 阶段 8: 代码质量检查与跨领域优化; 目的: 影响多个用户故事的改进
- ✅ Task 057: 为所有新增 Java 文件添加 Spotless 格式化(spotlessApply 执行成功)
- ✅ Task 058: 验证所有 SQL 符合 constitution 要求(小写)(所有 SQL 使用小写关键字)
- ✅ Task 059: 验证所有 Setter 注入遵循字段名 = Bean 名约定(无字段注入违规)
- ✅ Task 060: 验证所有日志使用 @Slf4j,而非 System.out.println(无违规)
- ✅ Task 061: 验证所有空值检查使用 StringUtils.isBlank,而非手写检查(代码规范)
- ✅ Task 062: 验证所有返回使用 Optional.empty() 而非 null(适用场景正确使用)
- ✅ Task 063: 为所有公共 API 添加文档注释(所有服务类有完整 JavaDoc)
- ✅ Task 064: 运行 quickstart.md 验证 - 同步功能的端到端测试(创建 DataSyncValidator 验证工具类和 DataSyncValidationController REST API)
- ✅ 阶段 9: 代码重构; 目的: 提高代码可读性和可维护性
- ✅ Task 063: 创建 DataSyncFileValidators 验证工具类,路径:src/main/java/com/es/center/server/datasync/util/(统一文件名和目录格式验证逻辑:isValidDate, isValidTimestamp, isValidSyncType, isValidDataFile)
- ✅ Task 064: 重构 DataSyncSftpDownloader.scanRemoteFiles 方法(将四层嵌套循环提取为多个递归方法,嵌套层级从4层降到2层,新增 closeQuietly 工具方法)
- ✅ Task 065: 简化 DataSyncIncrementalImporter 批处理方法(提取公共批处理逻辑,使用 ThrowingConsumer 函数式接口,消除重复代码)
- ✅ Task 066: 更新过时注释(修正 DataSyncConfig.java 和 DataSyncSecondarySchedule.java 中的过时注释)
- ✅ Task 067: 代码清理(删除冗余的 moveToFailed 重载方法,简化 finally 块注释)
- ✅ Task 068: 文档同步更新(更新 specs/001-cross-border-data-sync 文档)
- ✅ Task 069: 创建 DataSyncFileUtils 工具类,路径:src/main/java/com/es/center/server/datasync/util/(统一管理日期格式化常量 DATE_FORMATTER/TIME_FORMATTER、文件移动方法 moveToOutput)
- ✅ Task 070: 重构 DataSyncFullExporter(删除重复的 DateTimeFormatter 常量和 moveToOutput 方法,使用 DataSyncFileUtils 统一引用)
- ✅ Task 071: 重构 DataSyncIncrementalExporter(删除重复的 DateTimeFormatter 常量和 moveToOutput 方法,使用 DataSyncFileUtils 统一引用)
- ✅ Task 072: 重构 DataSyncSftpDownloader(移除 DataSyncFileUtils.closeQuietly 调用,内联实现 closeArrayQuietly;增强 filterFilesToDownload 检查 input/archive/failed 目录防重复下载)
- ✅ Task 073: 重构 FailedFileHandler(删除重复的 DateTimeFormatter 常量,使用 DataSyncFileUtils 统一引用)
- ✅ Task 074: 文档同步更新(更新 specs/001-cross-border-data-sync/plan.md 文档结构和修订记录)
- ✅ Task 075: 重构 FailedFileHandler(moveToFailed 改为从路径提取 date/type/timestamp,与远程目录结构保持一致;移除 LocalDateTime.now() 的使用)
- ✅ Task 076: 文档一致性修复:删除 data-model.md 中的 FileToDownload 定义;更新 SyncLog 为 record 类型;统一日期格式验证描述
- ✅ Task 077: 文档一致性修复:删除 quickstart.md 中未实现的 REST API 端点描述;移除 spec.md 中的 [待实现] 标记
- ✅ Task 078: 文档同步更新:更新 tasks.md 添加文档修复任务记录
- ✅ 阶段 1: 项目设置(共享基础设施); 目的: 项目初始化和基础结构搭建
其它
- 目标: 记录日常耗时的杂项工作、协作事项,以及必要的沟通、讨论等隐性事务。
- 进度: 属非计划内事务(🟢 未开始 | 🟡 进行中 | ⚠️ 部分完成 | ✅ 已完成 | 📌 已验证)
- 📌 以往已完成的工作内容此处不再重复,详情请参阅上周及上月的。
- 27 日
- 制作最新更新包并部署到 zxs-cs 服务器,用于主从节点数据同步联调。
总结
- PostgreSQL 中为父表添加的触发器不会自动应用到子表。新建子表时需要同步添加触发器,否则无法收集相关操作的增删改记录。
- 轻量化跨境分布式第二阶段改造:主线流程已打通
- 已完成
- 主节点:全量和增量数据导出(压缩文件)
- 从节点:下载全量和增量数据,导入到数据库
- 保障机制:主从节点失败重试、防重、归档、过期清理均已实现
- 待完成
- 从节点导入新数据时的通知机制
- 导入新用户、设备及采集服务器数据后,创建对应的 MQTT 账号
- 导入新国际化数据后,生成对应的国际化 JSON 文件
- 从节点导入新数据时的通知机制
- 已完成
备忘
Last updated on