# JW(教务)系统Kafka数据同步接口文档 ## 一、概述 本文档整理了从Kafka获取的与JW(教务)系统相关的数据同步逻辑,以及相应的接口响应JSON格式。 ## 二、Kafka消息架构 ### 2.1 消息主题(Topic) | Topic名称 | 常量定义 | 说明 | |----------|---------|------| | eventBus | `OLD_SYNC_TOPIC` | 第三方对接主题,用于与旧系统(包括教务系统)数据同步 | | sync_data_topic | `SYNC_DATA_TOPIC` | 一卡通内部操作双向同步主题 | | sync_to_cloud | `TO_CLOUD_TOPIC` | 云平台和本地部署同步主题 | ### 2.2 消息发送者(Sender) | 发送者代码 | 常量名称 | 说明 | |----------|---------|------| | 002 | `TRAIN` | 教务系统 | | 003 | `HR` | 人事系统 | | 005 | `YKT` | 一卡通系统 | ### 2.3 事件类型(EventType) - 教务系统相关 | 事件类型代码 | 常量名称 | 说明 | |------------|---------|------| | 00200001 | `TRAIN_CLASS_ADD` | 班级增加 | | 00200002 | `TRAIN_CLASS_EDIT` | 班级修改 | | 00200003 | `TRAIN_CLASS_DEL` | 班级删除 | | 00200004 | `TRAINEE_ADD` | 学员增加 | | 00200005 | `TRAINEE_EDIT` | 学员修改 | | 00200006 | `TRAINEE_DEL` | 学员删除 | | 00500001 | `CONSUME_RECORD` | 消费记录(推送给教务) | ## 三、Kafka消息格式 ### 3.1 标准Kafka消息结构 ```json { "header": { "eventId": "事件唯一标识", "sender": "发送者代码(002-教务, 003-人事, 005-一卡通)", "eventType": "事件类型代码", "tenantId": "租户ID" }, "body": { // 具体业务数据 } } ``` ## 四、从Kafka接收的教务系统数据接口 ### 4.1 教务系统 → 云端数据同步 **处理类**: `TrainEventStrategyImpl` **Kafka Consumer**: `KafkaCloudConsumer.kafkaEventBusHandler()` **Topic**: `eventBus` **Sender**: `002` #### 4.1.1 班级同步接口 (增加/修改/删除) **事件类型**: - `TRAIN_CLASS_ADD` (00200001) - 班级增加 - `TRAIN_CLASS_EDIT` (00200002) - 班级修改 - `TRAIN_CLASS_DEL` (00200003) - 班级删除 **调用服务**: - 增加/修改: `remoteKafkaSyncService.syncTrainClass()` - 删除: `remoteKafkaSyncService.syncDelTrainClass()` **数据处理**: - 增加/修改: `TrainUtils.getSycClass()` - 删除: `TrainUtils.getSycDeleteClass()` **说明**: 通过 `del_flag` 字段区分操作类型 - `del_flag = 0` 或无此字段: 增加或修改(根据班级ID是否存在判断) - `del_flag = 1`: 删除 **Kafka消息Body格式 (增加/修改)**: ```json { "id": "班级ID", "name": "班级名称", "year": 2025, "xq": "学期(0-上学期, 1-下学期)", "bmStarttime": "报名开始时间", "bdTime": "报到时间", "kbTime": "开班时间(开课时间)", "byTime": "毕业时间(结业时间)", "studentNum": 100, "noPayAllow": "是否允许未缴费报到(0-不允许, 1-允许)", "tenantId": "租户ID" } ``` **字段说明 (增加/修改)**: | 字段名 | 类型 | 必填 | 说明 | 示例 | 映射到内部字段 | |-------|------|------|------|------|---------------| | id | String | 是 | 班级ID | "CLASS001" | dept_id | | name | String | 是 | 班级名称 | "2025春季计算机培训班" | dept_name | | year | Integer | 是 | 年份 | 2025 | year | | xq | String | 是 | 学期(0-上学期, 1-下学期) | "0" | semester(转换为"上学期"或"下学期") | | bmStarttime | String | 是 | 报名开始时间/缴费开始时间 | "2025-01-15 08:00:00" | payBegin | | bdTime | String | 是 | 报到时间 | "2025-02-20 09:00:00" | checkDate | | kbTime | String | 是 | 开班时间(开课时间) | "2025-02-21 08:00:00" | beginDate | | byTime | String | 是 | 毕业时间(结业时间) | "2025-07-20 17:00:00" | endDate, payEnd | | studentNum | Integer | 否 | 计划人数(默认100) | 100 | planCount | | noPayAllow | String | 否 | 是否允许未缴费报到(0-不允许, 1-允许) | "0" | payCheck | | tenantId | String | 否 | 租户ID(无则使用默认) | "000000" | tenantId | **固定值 (增加/修改)**: - `chooseRoom`: "0" (是否自主选房) - `canEat`: "1" (是否就餐) - `operatorId`: KAFKA_SYNC_ADMIN常量值 **Kafka消息Body格式 (删除)**: ```json { "id": "班级ID", "del_flag": "1", "tenantId": "租户ID" } ``` **字段说明 (删除)**: | 字段名 | 类型 | 必填 | 说明 | 示例 | 映射到内部字段 | |-------|------|------|------|------|---------------| | id | String | 是 | 班级ID | "CLASS001" | dept_id | | del_flag | String | 是 | 删除标志: "1"表示删除 | "1" | delFlag | | tenantId | String | 否 | 租户ID(无则使用默认) | "000000" | tenantId | **业务逻辑**: 1. **增加/修改**: 根据班级ID查询,存在则更新,不存在则新增 2. **删除**: 根据班级ID标记删除 3. **班级结构**: 校本部 → 年份(2025年) → 学期(上学期/下学期) → 班级名称 #### 4.1.2 学员同步接口 (增加/修改/删除) **事件类型**: - `TRAINEE_ADD` (00200004) - 学员增加 - `TRAINEE_EDIT` (00200005) - 学员修改 - `TRAINEE_DEL` (00200006) - 学员删除 **调用服务**: - 增加/修改: `remoteKafkaSyncService.syncTrainee()` - 删除: `remoteKafkaSyncService.syncDelTrainee()` **数据处理**: - 增加/修改: `TrainUtils.getSyncTrainee()` - 删除: `TrainUtils.getSyncDeleteTrainee()` **说明**: 通过事件类型区分操作,删除时数据结构不同 **Kafka消息Body格式 (增加/修改)**: ```json { "student": { "id": "学员ID", "name": "学员姓名", "sex": "性别(1-男, 2-女)", "phone": "联系电话", "idCard": "身份证号", "currentClassId": "当前班级ID" }, "trainClassStudent": { "studentId": "学员ID", "classId": "班级ID", "status": "学员状态(1-已报名, 2-已报到)" }, "tenantId": "租户ID" } ``` **字段说明 (增加/修改)**: **student对象**: | 字段名 | 类型 | 必填 | 说明 | 示例 | 映射到内部字段 | |-------|------|------|------|------|---------------| | id | String | 是 | 学员ID | "STU001" | userId | | name | String | 是 | 学员姓名 | "张三" | realName | | sex | String | 是 | 性别(1-男, 2-女) | "1" | sex | | phone | String | 是 | 联系电话 | "13800138000" | phone | | idCard | String | 否 | 身份证号 | "110101199001011234" | idNumber | | currentClassId | String | 是 | 当前班级ID | "CLASS001" | deptId | **trainClassStudent对象**: | 字段名 | 类型 | 必填 | 说明 | 示例 | 映射到内部字段 | |-------|------|------|------|------|---------------| | studentId | String | 是 | 学员ID | "STU001" | userId | | classId | String | 是 | 班级ID | "CLASS001" | deptId | | status | String | 是 | 学员状态 | "1" | delFlag(1或2为"0"表示有效, 其他为"2"表示删除) | **固定值 (增加/修改)**: - `category`: CATEGORY_TRAINEE常量值(学员类别) - `postCode`: TRAINEE_CODE常量值(学员岗位编码) - `operatorId`: KAFKA_SYNC_ADMIN常量值 **Kafka消息Body格式 (删除)**: ```json { "id": "学员ID", "classId": "班级ID", "del_flag": "1", "tenantId": "租户ID" } ``` **字段说明 (删除)**: | 字段名 | 类型 | 必填 | 说明 | 示例 | 映射到内部字段 | |-------|------|------|------|------|---------------| | id | String | 是 | 学员ID | "STU001" | userId | | classId | String | 是 | 班级ID | "CLASS001" | deptId | | del_flag | String | 是 | 删除标志: "1"表示删除 | "1" | - | | tenantId | String | 否 | 租户ID(无则使用默认) | "000000" | tenantId | **业务逻辑**: 1. **增加/修改**: 根据学员ID和班级ID建立关联关系 2. **删除**: 根据学员ID和班级ID删除关联关系 3. **状态处理**: status为"1"(已报名)或"2"(已报到)时,delFlag="0"(有效);其他状态delFlag="2"(删除) 4. **租户校验**: 无tenantId的学员数据将被忽略 ## 五、向教务系统推送的数据接口 ### 5.1 消费记录推送 → 教务系统 **处理类**: `BaseBusiness.sendConsumeToKafka()` **Kafka Producer**: `kafkaNormalProducer.sendKafkaMessage()` **Topic**: `eventBus` **Sender**: `005`(一卡通系统) **EventType**: `CONSUME_RECORD` (00500001) #### 5.1.1 接口说明 当消费记录上传完成后,系统会将就餐打卡信息推送到Kafka,教务系统消费此消息实现就餐打卡功能。 **API接口**: ``` POST /v1/Consumes/Consume/kafka/{date} POST /v1/Consumes/Consume/kafka/{beginDate}/{endDate} ``` #### 5.1.2 推送数据格式 (YcPushConsumeInfoVo) **Kafka消息Body格式**: ```json { "recordId": "消费记录ID", "userId": "人员ID", "userNumb": "学号", "xm": "姓名", "deptId": "部门ID", "deptName": "部门名称", "roomId": "消费地点ID", "roomName": "地点名称", "cardNo": "卡流水号", "factoryFixId": "物理卡号", "consumeValue": "消费金额", "cardValue": "卡余额", "consumeDate": "消费时间", "mealTypeId": "餐类ID", "mealName": "餐类名称", "termNo": "机号", "termName": "机器名称", "category": "身份类别(0-系统内置 1-教师 2-学生 3-家长)", "otherSysId": "其他业务系统人员ID(教务或人事的人员ID)", "classId": "班级ID", "termRecordID": "机器流水号", "posRecordState": "消费记录标识", "tenantId": "租户ID" } ``` #### 5.1.3 字段说明 | 字段名 | 类型 | 说明 | 示例 | |-------|------|------|------| | recordId | String | 消费记录ID | "123456" | | userId | String | 人员ID | "789" | | userNumb | String | 学号 | "2024001" | | xm | String | 姓名 | "张三" | | deptId | String | 部门ID | "100" | | deptName | String | 部门名称 | "计算机学院" | | roomId | String | 消费地点ID | "" | | roomName | String | 地点名称 | "" | | cardNo | String | 卡流水号 | "35193" | | factoryFixId | String | 物理卡号 | "3656457030" | | consumeValue | String | 消费金额 | "15.50" | | cardValue | String | 卡余额 | "184.50" | | consumeDate | String | 消费时间 | "2025-02-19 19:32:30" | | mealTypeId | String | 餐类ID | "1" | | mealName | String | 餐类名称 | "晚餐" | | termNo | String | 机号 | "100" | | termName | String | 机器名称 | "食堂1号窗口" | | category | String | 身份类别 | "2" (学生) | | otherSysId | String | 教务系统人员ID | "JW2024001" | | classId | String | 班级ID | "CLASS001" | | termRecordID | Long | 机器流水号 | 47309 | | posRecordState | Integer | 消费记录标识 | 364 | #### 5.1.4 餐类名称映射 ```java mealNameMap: "1" -> "早餐" "2" -> "午餐" "3" -> "晚餐" 其他 -> "未知餐类" ``` ## 六、Kafka消费者处理逻辑 ### 6.1 云端消费者 (KafkaCloudConsumer) **条件**: `locationFlag = 'cloud'` #### 6.1.1 eventBus主题监听 ```java @KafkaListener(topics = KafkaTopicConstants.OLD_SYNC_TOPIC, groupId = "old-to-cloud-group") public void kafkaEventBusHandler(ConsumerRecord record) ``` **处理逻辑**: 1. 检查offset避免重复消费 2. 解析Kafka消息 3. 判断sender,如果不是005或006则处理业务 4. 根据sender获取对应的事件策略处理器 5. 执行事件处理 6. 记录消息消费状态 #### 6.1.2 sync_to_cloud主题监听 ```java @KafkaListener(topics = KafkaTopicConstants.TO_CLOUD_TOPIC, groupId = "local-to-cloud-group") public void kafkaToCloudHandler(ConsumerRecord record) ``` ### 6.2 本地消费者 (KafkaLocalConsumer) **条件**: `locationFlag = 'local'` ```java @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "YTK_${spring.system.tenantId}") public void cloudOperationSync(ConsumerRecord record) ``` **处理逻辑**: 1. 检查offset避免重复消费 2. 解析Kafka消息并验证tenantId 3. 根据sender获取事件策略处理器 4. 执行事件处理 5. 记录消息发送记录 ## 七、事件策略处理器 ### 7.1 教务系统事件处理 (TrainEventStrategyImpl) **Service名称**: `002` **实现接口**: `IYktEventStrategy` ```java @Service(EventSenderConstants.TRAIN) // "002" public class TrainEventStrategyImpl implements IYktEventStrategy ``` **处理的事件类型**: - TRAIN_CLASS_ADD (00200001) → `syncTrainClass()` - 班级增加/修改 - TRAIN_CLASS_EDIT (00200002) → `syncTrainClass()` - 班级增加/修改 - TRAIN_CLASS_DEL (00200003) → `syncDelTrainClass()` - 班级删除 - TRAINEE_ADD (00200004) → `syncTrainee()` - 学员增加/修改 - TRAINEE_EDIT (00200005) → `syncTrainee()` - 学员增加/修改 - TRAINEE_DEL (00200006) → `syncDelTrainee()` - 学员删除 **说明**: - 班级增加和修改使用相同接口,通过班级ID判断是新增还是更新 - 学员增加和修改使用相同接口,通过学员ID判断是新增还是更新 - 删除操作使用单独的接口 ### 7.2 人事系统事件处理 (TeacherEventStrategyImpl) **Service名称**: `003` **实现接口**: `IYktEventStrategy` ```java @Service(EventSenderConstants.HR) // "003" public class TeacherEventStrategyImpl implements IYktEventStrategy ``` **处理的事件类型**: - DEPT_ADD (00300001) → `syncTeacherDept()` - DEPT_EDIT (00300002) → `syncTeacherDept()` - DEPT_DEL (00300003) → `syncDelTeacherDept()` - TEACHER_ADD (00300004) → `syncTeacher()` - TEACHER_EDIT (00300005) → `syncTeacher()` - TEACHER_DEL (00300006) → `syncDelTeacher()` ### 7.3 消费系统事件处理 (ConsumeEventStrategyImpl) **Service名称**: `120` **实现接口**: `IYktEventStrategy` ```java @Service(EventSenderConstants.CONSUME) // "120" public class ConsumeEventStrategyImpl implements IYktEventStrategy ``` **处理的事件类型**: - CONSUME (12000001) → `remoteConsumeService.dealKafkaConsumeData()` ## 八、业务流程 ### 8.1 教务系统同步数据到云端 ``` 教务系统 → Kafka(eventBus) → KafkaCloudConsumer → TrainEventStrategyImpl → RemoteKafkaSyncService → 同步到云端数据库 ``` ### 8.2 消费记录推送到教务系统 ``` 消费机上传消费记录 → ConsumeController.uploadRecord() → ConsumeBusiness.postOrderAsync() → BaseBusiness.completeUploadRecord() → BaseBusiness.sendConsumeToKafka() → Kafka(eventBus) → 教务系统消费 ``` ### 8.3 手动推送消费记录到教务系统 ``` API调用: POST /v1/Consumes/Consume/kafka/{date} → ConsumeController.consumeKafka() → BaseBusiness.sendToJwKafkaTest() → 查询未发送的消费记录 → BaseBusiness.sendConsumeToKafka() → Kafka(eventBus) → 教务系统消费 ``` ## 九、数据转换示例 ### 9.1 消费记录转换为教务系统格式 **输入数据 (ConsumptionBo)**: ```json { "recordId": 123456, "userId": 789, "userNumb": "2024001", "realName": "张三", "deptName": "计算机学院", "cardNo": 35193, "factoryId": 3656457030, "consumeMoney": 15.50, "balance": 184.50, "consumeDate": "2025-02-19 19:32:30", "mealType": 3, "termNo": 100, "termName": "食堂1号窗口", "termRecordId": 47309, "recordStatus": 364 } ``` **输出数据 (YcPushConsumeInfoVo)**: ```json { "recordId": "123456", "userId": "789", "userNumb": "2024001", "xm": "张三", "deptId": "100", "deptName": "计算机学院", "roomId": "", "roomName": "", "cardNo": "35193", "factoryFixId": "3656457030", "consumeValue": "15.50", "cardValue": "184.50", "consumeDate": "2025-02-19 19:32:30", "mealTypeId": "3", "mealName": "晚餐", "termNo": "100", "termName": "食堂1号窗口", "category": "2", "otherSysId": "JW2024001", "classId": "CLASS001", "termRecordID": 47309, "posRecordState": 364 } ``` ## 十、相关配置 ### 10.1 Kafka Topic配置 ```yaml # 应用配置 locationFlag: cloud # 或 local # 租户配置 spring: system: tenantId: "000000" ``` ### 10.2 常量定义位置 | 常量类 | 路径 | |-------|------| | KafkaTopicConstants | `ruoyi-common-message/kafka/constant/KafkaTopicConstants.java` | | EventSenderConstants | `ruoyi-common-message/kafka/constant/EventSenderConstants.java` | | EventTypeConstants | `ruoyi-common-message/kafka/constant/EventTypeConstants.java` | ## 十一、错误处理 ### 11.1 消息消费记录 系统会记录每条Kafka消息的消费状态: - **Y**: 消费成功 - **N**: 消费失败 记录表: `RemoteSendMessageRecordBo` ### 11.2 Offset管理 使用`IXfOffsetService`管理消费offset,防止重复消费: - 记录每条消息的offset、topic、groupId、partition - 消费前判断是否已处理过 ## 十二、注意事项 1. **消息去重**: 通过offset管理机制确保消息不被重复消费 2. **异步处理**: 消费记录推送采用异步任务提交,不阻塞主流程 3. **租户隔离**: 消息中包含tenantId,确保多租户数据隔离 4. **异常处理**: 消息处理失败时会记录失败状态,便于后续排查 5. **数据格式**: 推送到教务系统的数据全部转换为String类型,确保兼容性 ## 十三、测试接口 ### 13.1 推送指定日期消费记录 ```bash POST /v1/Consumes/Consume/kafka/2025-02-19 ``` ### 13.2 推送日期范围消费记录 ```bash POST /v1/Consumes/Consume/kafka/2025-02-19/2025-02-20 ``` **响应**: ```json { "code": 200, "msg": "操作成功", "data": null } ``` --- **文档版本**: v1.0 **生成日期**: 2025-04-09 **维护人员**: 系统开发团队