并发任务重复调用的问题

我来用通俗易懂的语言详细解释重复调用任务重复发送问题的原理和解决方法。

问题原理详解

1. 什么是重复调用问题?

想象一下这样的场景:

  • 你有一个工厂,需要定期检查设备状态
  • 你安排了多个工人(定时任务)去检查同一台设备
  • 结果同一台设备被检查了多次,每次检查结果可能不同
  • 导致同一个设备在短时间内发送了多次不同状态的数据

2. 原始代码的问题架构

让我先展示原始的问题代码结构:

[1 tool called]

3. 问题的时间线分析

让我用时间线来说明问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
时间轴:10:46:00 ────────────── 10:46:07 ────────────── 10:46:15
│ │ │
│ │ │
任务1执行 任务2执行 任务3执行
(每1分钟) (每15分钟) (每1分钟)
│ │ │
▼ ▼ ▼
FetchDataConcurrently PushTmnlOnline FetchDataConcurrently
│ │ │
▼ ▼ ▼
处理设备A 调用FetchDataConcurrently 处理设备A
发送在线数据 处理设备A 发送离线数据
onlineFlag: "1" 发送在线数据 onlineFlag: "0"
onlineFlag: "1"

4. 原始问题代码示例

让我展示原始的问题代码:

[1 tool called]

问题原理详解

1. 循环调用问题

原始架构问题

1
2
3
定时任务A (每15分钟) ──┐
├──→ FetchDataConcurrently ──→ 处理所有数据
定时任务B (每1分钟) ──┘

问题

  • 多个定时任务都调用同一个方法 FetchDataConcurrently
  • 没有并发控制,可能同时执行
  • 同一个设备被处理多次

2. 数据不一致问题

时间差导致的问题

1
2
10:46:04 - 任务A执行,检查设备状态 → 在线 → 发送 onlineFlag: "1"
10:46:07 - 任务B执行,检查设备状态 → 离线 → 发送 onlineFlag: "0"

原因

  • 两次检查时间不同
  • Redis中的设备状态可能已经变化
  • 导致同一设备发送不同状态

3. 堆栈错误问题

循环调用导致的问题

1
2
3
4
5
6
7
8
9
FetchDataConcurrently()

service.Datasync().PushTmnlOnlineData()

PushTmnlOnlineData() 内部可能再次调用

FetchDataConcurrently() (如果配置错误)

无限循环...

解决方法详解

方案1:移除重复调用

原理:只保留一个统一的数据处理入口,避免多个任务重复调用。

实现代码

1
2
3
4
5
6
7
// 原始问题代码(已修复)
func (q *qPushTmnlOnline) Handle(ctx context.Context, mqMsg worker.Payload) (err error) {
// 原始代码:err = service.Datasync().FetchDataConcurrently(ctx)
// 修复后:禁用此任务,统一由FetchDataConcurrently处理
g.Log().Info(newCtx, workerName, " TaskPushTmnlOnline 已禁用,统一由 FetchDataConcurrently 处理")
return
}

效果

  • ✅ 消除重复调用
  • ✅ 避免循环调用
  • ✅ 统一数据处理入口

方案2:添加并发锁

原理:使用互斥锁防止同一时间多个任务执行同一个方法。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 全局锁,防止重复处理
var (
fetchDataMutex sync.Mutex
lastProcessTime time.Time
processInterval = 30 * time.Second // 最小处理间隔30秒
)

func (s *sDatasync) FetchDataConcurrently(ctx context.Context) (err error) {
// 检查是否正在处理中
if !fetchDataMutex.TryLock() {
g.Log().Infof(ctx, "FetchDataConcurrently 正在处理中,跳过本次调用")
return nil
}
defer fetchDataMutex.Unlock()

// 检查处理间隔
now := time.Now()
if !lastProcessTime.IsZero() && now.Sub(lastProcessTime) < processInterval {
g.Log().Infof(ctx, "FetchDataConcurrently 处理间隔太短,跳过本次调用")
return nil
}
lastProcessTime = now

// 继续处理...
}

效果

  • ✅ 防止并发执行
  • ✅ 控制处理频率
  • ✅ 避免重复处理

方案3:注释其他方法调用

原理:在 FetchDataConcurrently 中只处理终端在线状态,注释掉其他数据处理。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 注释掉电锅炉数据推送,只保留终端在线状态处理
// if len(boilerRunDataList) > 0 {
// wg.Add(1)
// go func() {
// defer wg.Done()
// request := &model.PushBoilerRunDataRequest{
// BoilerRunDataList: boilerRunDataList,
// }
// boilerErr = service.Datasync().PushBoilerRunData(ctx, request)
// // ... 其他逻辑
// }()
// }

// 注释掉温湿度数据推送,只保留终端在线状态处理
// temSensorDataList, temSensorErr := s.fetchTemSensorDataConcurrently(ctx, productDeviceMaps)
// ... 其他注释代码

// 只保留终端在线状态处理
tmnlOnlineDataList, tmnlOnlineErr := s.fetchTmnlOnlineDataConcurrently(ctx, productDeviceMaps)
if tmnlOnlineErr != nil {
g.Log().Errorf(ctx, "获取终端在线状态数据失败: %v", tmnlOnlineErr)
} else if len(tmnlOnlineDataList) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
request := &model.PushTmnlOnlineDataRequest{
TmnlOnlineDataList: tmnlOnlineDataList,
}
tmnlOnlineErr = service.Datasync().PushTmnlOnlineData(ctx, request)
// ... 处理逻辑
}()
}

效果

  • ✅ 专注处理终端在线状态
  • ✅ 避免处理其他不必要的数据
  • ✅ 减少系统负载

完整解决方案对比

修复前的问题

1
2
3
4
5
6
7
8
9
时间: 10:46:04
任务: FetchDataConcurrently
结果: 发送 onlineFlag: "1" (在线)

时间: 10:46:07
任务: PushTmnlOnline → FetchDataConcurrently
结果: 发送 onlineFlag: "0" (离线)

问题: 同一设备3秒内发送两次不同状态!

修复后的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
时间: 10:46:04
任务: FetchDataConcurrently (带锁)
结果: 发送 onlineFlag: "1" (在线)

时间: 10:46:07
任务: PushTmnlOnline (已禁用)
结果: 跳过执行,不重复处理

时间: 10:46:34 (30秒后)
任务: FetchDataConcurrently (带锁)
结果: 正常处理,发送最新状态

效果: 同一设备不会重复发送!

总结

问题根源

  1. 架构设计问题:多个任务调用同一个方法
  2. 并发控制缺失:没有防止重复处理的机制
  3. 职责不清晰:数据处理和任务调度耦合

解决方案

  1. 移除重复调用:只保留一个数据处理入口
  2. 添加并发锁:防止同时执行
  3. 控制处理频率:最小30秒间隔
  4. 专注单一功能:只处理终端在线状态

最终效果

  • ✅ 消除重复发送问题
  • ✅ 避免循环调用
  • ✅ 提高系统稳定性
  • ✅ 数据一致性得到保证

这就是重复调用任务重复发送问题的完整原理和解决方法!