JW系统Kafka数据同步接口文档.md 17 KB

JW(教务)系统Kafka数据同步接口文档

一、概述

本文档整理了从Kafka获取的与JW(教务)系统相关的数据同步逻辑,以及相应的接口响应JSON格式。

二、Kafka消息架构

2.1 消息主题(Topic)

Topic名称 常量定义 说明
eventBus OLD_SYNC_TOPIC 第三方对接主题,用于与旧系统(包括教务系统)数据同步
sync_data_topic SYNC_DATA_TOPIC 一卡通内部操作双向同步主题
sync_to_cloud TO_CLOUD_TOPIC 云平台和本地部署同步主题

2.2 消息发送者(Sender)

发送者代码 常量名称 说明
002 TRAIN 教务系统
003 HR 人事系统
005 YKT 一卡通系统

2.3 事件类型(EventType) - 教务系统相关

事件类型代码 常量名称 说明
00200001 TRAIN_CLASS_ADD 班级增加
00200002 TRAIN_CLASS_EDIT 班级修改
00200003 TRAIN_CLASS_DEL 班级删除
00200004 TRAINEE_ADD 学员增加
00200005 TRAINEE_EDIT 学员修改
00200006 TRAINEE_DEL 学员删除
00500001 CONSUME_RECORD 消费记录(推送给教务)

三、Kafka消息格式

3.1 标准Kafka消息结构

{
  "header": {
    "eventId": "事件唯一标识",
    "sender": "发送者代码(002-教务, 003-人事, 005-一卡通)",
    "eventType": "事件类型代码",
    "tenantId": "租户ID"
  },
  "body": {
    // 具体业务数据
  }
}

四、从Kafka接收的教务系统数据接口

4.1 教务系统 → 云端数据同步

处理类: TrainEventStrategyImpl
Kafka Consumer: KafkaCloudConsumer.kafkaEventBusHandler()
Topic: eventBus
Sender: 002

4.1.1 班级同步接口 (增加/修改/删除)

事件类型:

  • TRAIN_CLASS_ADD (00200001) - 班级增加
  • TRAIN_CLASS_EDIT (00200002) - 班级修改
  • TRAIN_CLASS_DEL (00200003) - 班级删除

调用服务:

  • 增加/修改: remoteKafkaSyncService.syncTrainClass()
  • 删除: remoteKafkaSyncService.syncDelTrainClass()

数据处理:

  • 增加/修改: TrainUtils.getSycClass()
  • 删除: TrainUtils.getSycDeleteClass()

说明: 通过 del_flag 字段区分操作类型

  • del_flag = 0 或无此字段: 增加或修改(根据班级ID是否存在判断)
  • del_flag = 1: 删除

Kafka消息Body格式 (增加/修改):

{
  "id": "班级ID",
  "name": "班级名称",
  "year": 2025,
  "xq": "学期(0-上学期, 1-下学期)",
  "bmStarttime": "报名开始时间",
  "bdTime": "报到时间",
  "kbTime": "开班时间(开课时间)",
  "byTime": "毕业时间(结业时间)",
  "studentNum": 100,
  "noPayAllow": "是否允许未缴费报到(0-不允许, 1-允许)",
  "tenantId": "租户ID"
}

字段说明 (增加/修改):

字段名 类型 必填 说明 示例 映射到内部字段
id String 班级ID "CLASS001" dept_id
name String 班级名称 "2025春季计算机培训班" dept_name
year Integer 年份 2025 year
xq String 学期(0-上学期, 1-下学期) "0" semester(转换为"上学期"或"下学期")
bmStarttime String 报名开始时间/缴费开始时间 "2025-01-15 08:00:00" payBegin
bdTime String 报到时间 "2025-02-20 09:00:00" checkDate
kbTime String 开班时间(开课时间) "2025-02-21 08:00:00" beginDate
byTime String 毕业时间(结业时间) "2025-07-20 17:00:00" endDate, payEnd
studentNum Integer 计划人数(默认100) 100 planCount
noPayAllow String 是否允许未缴费报到(0-不允许, 1-允许) "0" payCheck
tenantId String 租户ID(无则使用默认) "000000" tenantId

固定值 (增加/修改):

  • chooseRoom: "0" (是否自主选房)
  • canEat: "1" (是否就餐)
  • operatorId: KAFKA_SYNC_ADMIN常量值

Kafka消息Body格式 (删除):

{
  "id": "班级ID",
  "del_flag": "1",
  "tenantId": "租户ID"
}

字段说明 (删除):

字段名 类型 必填 说明 示例 映射到内部字段
id String 班级ID "CLASS001" dept_id
del_flag String 删除标志: "1"表示删除 "1" delFlag
tenantId String 租户ID(无则使用默认) "000000" tenantId

业务逻辑:

  1. 增加/修改: 根据班级ID查询,存在则更新,不存在则新增
  2. 删除: 根据班级ID标记删除
  3. 班级结构: 校本部 → 年份(2025年) → 学期(上学期/下学期) → 班级名称

4.1.2 学员同步接口 (增加/修改/删除)

事件类型:

  • TRAINEE_ADD (00200004) - 学员增加
  • TRAINEE_EDIT (00200005) - 学员修改
  • TRAINEE_DEL (00200006) - 学员删除

调用服务:

  • 增加/修改: remoteKafkaSyncService.syncTrainee()
  • 删除: remoteKafkaSyncService.syncDelTrainee()

数据处理:

  • 增加/修改: TrainUtils.getSyncTrainee()
  • 删除: TrainUtils.getSyncDeleteTrainee()

说明: 通过事件类型区分操作,删除时数据结构不同

Kafka消息Body格式 (增加/修改):

{
  "student": {
    "id": "学员ID",
    "name": "学员姓名",
    "sex": "性别(1-男, 2-女)",
    "phone": "联系电话",
    "idCard": "身份证号",
    "currentClassId": "当前班级ID"
  },
  "trainClassStudent": {
    "studentId": "学员ID",
    "classId": "班级ID",
    "status": "学员状态(1-已报名, 2-已报到)"
  },
  "tenantId": "租户ID"
}

字段说明 (增加/修改):

student对象:

字段名 类型 必填 说明 示例 映射到内部字段
id String 学员ID "STU001" userId
name String 学员姓名 "张三" realName
sex String 性别(1-男, 2-女) "1" sex
phone String 联系电话 "13800138000" phone
idCard String 身份证号 "110101199001011234" idNumber
currentClassId String 当前班级ID "CLASS001" deptId

trainClassStudent对象:

字段名 类型 必填 说明 示例 映射到内部字段
studentId String 学员ID "STU001" userId
classId String 班级ID "CLASS001" deptId
status String 学员状态 "1" delFlag(1或2为"0"表示有效, 其他为"2"表示删除)

固定值 (增加/修改):

  • category: CATEGORY_TRAINEE常量值(学员类别)
  • postCode: TRAINEE_CODE常量值(学员岗位编码)
  • operatorId: KAFKA_SYNC_ADMIN常量值

Kafka消息Body格式 (删除):

{
  "id": "学员ID",
  "classId": "班级ID",
  "del_flag": "1",
  "tenantId": "租户ID"
}

字段说明 (删除):

字段名 类型 必填 说明 示例 映射到内部字段
id String 学员ID "STU001" userId
classId String 班级ID "CLASS001" deptId
del_flag String 删除标志: "1"表示删除 "1" -
tenantId String 租户ID(无则使用默认) "000000" tenantId

业务逻辑:

  1. 增加/修改: 根据学员ID和班级ID建立关联关系
  2. 删除: 根据学员ID和班级ID删除关联关系
  3. 状态处理: status为"1"(已报名)或"2"(已报到)时,delFlag="0"(有效);其他状态delFlag="2"(删除)
  4. 租户校验: 无tenantId的学员数据将被忽略

五、向教务系统推送的数据接口

5.1 消费记录推送 → 教务系统

处理类: BaseBusiness.sendConsumeToKafka()
Kafka Producer: kafkaNormalProducer.sendKafkaMessage()
Topic: eventBus
Sender: 005(一卡通系统)
EventType: CONSUME_RECORD (00500001)

5.1.1 接口说明

当消费记录上传完成后,系统会将就餐打卡信息推送到Kafka,教务系统消费此消息实现就餐打卡功能。

API接口:

POST /v1/Consumes/Consume/kafka/{date}
POST /v1/Consumes/Consume/kafka/{beginDate}/{endDate}

5.1.2 推送数据格式 (YcPushConsumeInfoVo)

Kafka消息Body格式:

{
  "recordId": "消费记录ID",
  "userId": "人员ID",
  "userNumb": "学号",
  "xm": "姓名",
  "deptId": "部门ID",
  "deptName": "部门名称",
  "roomId": "消费地点ID",
  "roomName": "地点名称",
  "cardNo": "卡流水号",
  "factoryFixId": "物理卡号",
  "consumeValue": "消费金额",
  "cardValue": "卡余额",
  "consumeDate": "消费时间",
  "mealTypeId": "餐类ID",
  "mealName": "餐类名称",
  "termNo": "机号",
  "termName": "机器名称",
  "category": "身份类别(0-系统内置 1-教师 2-学生 3-家长)",
  "otherSysId": "其他业务系统人员ID(教务或人事的人员ID)",
  "classId": "班级ID",
  "termRecordID": "机器流水号",
  "posRecordState": "消费记录标识",
  "tenantId": "租户ID"
}

5.1.3 字段说明

字段名 类型 说明 示例
recordId String 消费记录ID "123456"
userId String 人员ID "789"
userNumb String 学号 "2024001"
xm String 姓名 "张三"
deptId String 部门ID "100"
deptName String 部门名称 "计算机学院"
roomId String 消费地点ID ""
roomName String 地点名称 ""
cardNo String 卡流水号 "35193"
factoryFixId String 物理卡号 "3656457030"
consumeValue String 消费金额 "15.50"
cardValue String 卡余额 "184.50"
consumeDate String 消费时间 "2025-02-19 19:32:30"
mealTypeId String 餐类ID "1"
mealName String 餐类名称 "晚餐"
termNo String 机号 "100"
termName String 机器名称 "食堂1号窗口"
category String 身份类别 "2" (学生)
otherSysId String 教务系统人员ID "JW2024001"
classId String 班级ID "CLASS001"
termRecordID Long 机器流水号 47309
posRecordState Integer 消费记录标识 364

5.1.4 餐类名称映射

mealNameMap:
  "1" -> "早餐"
  "2" -> "午餐"
  "3" -> "晚餐"
  其他 -> "未知餐类"

六、Kafka消费者处理逻辑

6.1 云端消费者 (KafkaCloudConsumer)

条件: locationFlag = 'cloud'

6.1.1 eventBus主题监听

@KafkaListener(topics = KafkaTopicConstants.OLD_SYNC_TOPIC, groupId = "old-to-cloud-group")
public void kafkaEventBusHandler(ConsumerRecord<String, String> record)

处理逻辑:

  1. 检查offset避免重复消费
  2. 解析Kafka消息
  3. 判断sender,如果不是005或006则处理业务
  4. 根据sender获取对应的事件策略处理器
  5. 执行事件处理
  6. 记录消息消费状态

6.1.2 sync_to_cloud主题监听

@KafkaListener(topics = KafkaTopicConstants.TO_CLOUD_TOPIC, groupId = "local-to-cloud-group")
public void kafkaToCloudHandler(ConsumerRecord<String, String> record)

6.2 本地消费者 (KafkaLocalConsumer)

条件: locationFlag = 'local'

@KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "YTK_${spring.system.tenantId}")
public void cloudOperationSync(ConsumerRecord<String, String> record)

处理逻辑:

  1. 检查offset避免重复消费
  2. 解析Kafka消息并验证tenantId
  3. 根据sender获取事件策略处理器
  4. 执行事件处理
  5. 记录消息发送记录

七、事件策略处理器

7.1 教务系统事件处理 (TrainEventStrategyImpl)

Service名称: 002
实现接口: IYktEventStrategy

@Service(EventSenderConstants.TRAIN)  // "002"
public class TrainEventStrategyImpl implements IYktEventStrategy

处理的事件类型:

  • TRAIN_CLASS_ADD (00200001) → syncTrainClass() - 班级增加/修改
  • TRAIN_CLASS_EDIT (00200002) → syncTrainClass() - 班级增加/修改
  • TRAIN_CLASS_DEL (00200003) → syncDelTrainClass() - 班级删除
  • TRAINEE_ADD (00200004) → syncTrainee() - 学员增加/修改
  • TRAINEE_EDIT (00200005) → syncTrainee() - 学员增加/修改
  • TRAINEE_DEL (00200006) → syncDelTrainee() - 学员删除

说明:

  • 班级增加和修改使用相同接口,通过班级ID判断是新增还是更新
  • 学员增加和修改使用相同接口,通过学员ID判断是新增还是更新
  • 删除操作使用单独的接口

7.2 人事系统事件处理 (TeacherEventStrategyImpl)

Service名称: 003
实现接口: IYktEventStrategy

@Service(EventSenderConstants.HR)  // "003"
public class TeacherEventStrategyImpl implements IYktEventStrategy

处理的事件类型:

  • DEPT_ADD (00300001) → syncTeacherDept()
  • DEPT_EDIT (00300002) → syncTeacherDept()
  • DEPT_DEL (00300003) → syncDelTeacherDept()
  • TEACHER_ADD (00300004) → syncTeacher()
  • TEACHER_EDIT (00300005) → syncTeacher()
  • TEACHER_DEL (00300006) → syncDelTeacher()

7.3 消费系统事件处理 (ConsumeEventStrategyImpl)

Service名称: 120
实现接口: IYktEventStrategy

@Service(EventSenderConstants.CONSUME)  // "120"
public class ConsumeEventStrategyImpl implements IYktEventStrategy

处理的事件类型:

  • CONSUME (12000001) → remoteConsumeService.dealKafkaConsumeData()

八、业务流程

8.1 教务系统同步数据到云端

教务系统 → Kafka(eventBus) → KafkaCloudConsumer 
  → TrainEventStrategyImpl → RemoteKafkaSyncService 
  → 同步到云端数据库

8.2 消费记录推送到教务系统

消费机上传消费记录 → ConsumeController.uploadRecord()
  → ConsumeBusiness.postOrderAsync()
  → BaseBusiness.completeUploadRecord()
  → BaseBusiness.sendConsumeToKafka()
  → Kafka(eventBus) → 教务系统消费

8.3 手动推送消费记录到教务系统

API调用: POST /v1/Consumes/Consume/kafka/{date}
  → ConsumeController.consumeKafka()
  → BaseBusiness.sendToJwKafkaTest()
  → 查询未发送的消费记录
  → BaseBusiness.sendConsumeToKafka()
  → Kafka(eventBus) → 教务系统消费

九、数据转换示例

9.1 消费记录转换为教务系统格式

输入数据 (ConsumptionBo):

{
  "recordId": 123456,
  "userId": 789,
  "userNumb": "2024001",
  "realName": "张三",
  "deptName": "计算机学院",
  "cardNo": 35193,
  "factoryId": 3656457030,
  "consumeMoney": 15.50,
  "balance": 184.50,
  "consumeDate": "2025-02-19 19:32:30",
  "mealType": 3,
  "termNo": 100,
  "termName": "食堂1号窗口",
  "termRecordId": 47309,
  "recordStatus": 364
}

输出数据 (YcPushConsumeInfoVo):

{
  "recordId": "123456",
  "userId": "789",
  "userNumb": "2024001",
  "xm": "张三",
  "deptId": "100",
  "deptName": "计算机学院",
  "roomId": "",
  "roomName": "",
  "cardNo": "35193",
  "factoryFixId": "3656457030",
  "consumeValue": "15.50",
  "cardValue": "184.50",
  "consumeDate": "2025-02-19 19:32:30",
  "mealTypeId": "3",
  "mealName": "晚餐",
  "termNo": "100",
  "termName": "食堂1号窗口",
  "category": "2",
  "otherSysId": "JW2024001",
  "classId": "CLASS001",
  "termRecordID": 47309,
  "posRecordState": 364
}

十、相关配置

10.1 Kafka Topic配置

# 应用配置
locationFlag: cloud  # 或 local

# 租户配置
spring:
  system:
    tenantId: "000000"

10.2 常量定义位置

常量类 路径
KafkaTopicConstants ruoyi-common-message/kafka/constant/KafkaTopicConstants.java
EventSenderConstants ruoyi-common-message/kafka/constant/EventSenderConstants.java
EventTypeConstants ruoyi-common-message/kafka/constant/EventTypeConstants.java

十一、错误处理

11.1 消息消费记录

系统会记录每条Kafka消息的消费状态:

  • Y: 消费成功
  • N: 消费失败

记录表: RemoteSendMessageRecordBo

11.2 Offset管理

使用IXfOffsetService管理消费offset,防止重复消费:

  • 记录每条消息的offset、topic、groupId、partition
  • 消费前判断是否已处理过

十二、注意事项

  1. 消息去重: 通过offset管理机制确保消息不被重复消费
  2. 异步处理: 消费记录推送采用异步任务提交,不阻塞主流程
  3. 租户隔离: 消息中包含tenantId,确保多租户数据隔离
  4. 异常处理: 消息处理失败时会记录失败状态,便于后续排查
  5. 数据格式: 推送到教务系统的数据全部转换为String类型,确保兼容性

十三、测试接口

13.1 推送指定日期消费记录

POST /v1/Consumes/Consume/kafka/2025-02-19

13.2 推送日期范围消费记录

POST /v1/Consumes/Consume/kafka/2025-02-19/2025-02-20

响应:

{
  "code": 200,
  "msg": "操作成功",
  "data": null
}

文档版本: v1.0
生成日期: 2025-04-09
维护人员: 系统开发团队