sagooiot插件的定时任务实现

为什么需要定时任务

​ 以电网接口为例,系统需要向目标接口发送数据,对方要求数据需要定时上报,这是就需要后端执行定时任务,每到定时任务时间自动执行发送数据任务;还有的情况是,系统需要定时向某第三方api请求数据,或者需要定时执行系统上的功能。

在插件中的定时任务的实现

首先定时任务整个流程如下:

task层中的具体工作流程:

定时任务执行的文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package task

import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
"huadian-dcsc/module/dcscpush/service"
"sagooiot/pkg/worker"
)

// ScheduledSyncUserList 是一个指向 worker.Scheduled 类型的指针,用于表示用户列表同步的定时任务。
// 初始时创建一个新的 worker.Scheduled 实例。
var ScheduledSyncUserList = new(worker.Scheduled)

// ScheduledSyncUserListRun 函数用于注册用户列表同步任务。
// 调用 worker.RegisterProcess 方法将 SyncUserList 注册到任务处理中,
// 并将返回的定时任务实例赋值给 ScheduledSyncUserList。
func ScheduledSyncUserListRun() {
ScheduledSyncUserList = worker.RegisterProcess(SyncUserList)
}

// SyncUserList 同步用户列表,是一个指向 qSyncUserList 类型的指针,代表用户列表同步任务。
var SyncUserList = &qSyncUserList{}

// qSyncUserList 结构体实现了任务处理所需的方法,用于处理用户列表同步任务。
// 嵌入 worker.BaseProcess 结构体,继承其基础功能。
type qSyncUserList struct {
worker.BaseProcess
}
// GetTopic 主题
// GetTopic 方法用于获取同步用户列表任务的主题,该主题(TaskSyncUserList)用于标识此同步用户列表任务,
// 在消息队列或任务调度等场景中,可通过该主题来识别并处理该任务。
func (q *qSyncUserList) GetTopic() string {
return TaskSyncUserList
}

// Handle 处理消息
func (q *qSyncUserList) Handle(ctx context.Context, mqMsg worker.Payload) (err error) {
newCtx := context.Background()
workerName := gconv.String(mqMsg.Payload)
g.Log().Info(newCtx, workerName, " Start TaskSyncUserList")
err = service.Datasync().SyncSysUser(ctx)
if err != nil {
return
}
return
}

task.go的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package task

import (
"context"
_ "huadian-dcsc/module/dcscpush/logic/datasync"
)

func Run() (err error) {
//启动定时任务处理服务
ScheduledSyncUserListRun()

err = ScheduledSyncUserList.Cron(context.Background(), TaskSyncUserList, "0 0 2 * * ?", []byte("ctwing"))
if err != nil {
return
}

}

sagooiot中work包的定时任务处理

work_proces.go 中 Cron 表达式定时执行任务的实现机制

整体架构概述

在 sagooiot-professional 项目中,work_proces.go 文件通过与 worker.go 配合,实现了一套基于 Cron 表达式的定时任务调度系统。该系统采用了分布式设计,使用 Redis 作为存储媒介,并结合了工作池、信号量等机制确保任务执行的高效性和稳定性。

Cron 表达式定时任务的核心实现流程

1. 任务注册机制

Scheduled 结构体的 Cron 方法是用户注册定时任务的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Cron 采用定时任务的方式执行任务
func (s *Scheduled) Cron(ctx context.Context, topic, cronExpr string, data []byte) (err error) {
s.topic = topic
err = s.w.Cron(
WithRunUuid(topic),
WithRunGroup(topic),
WithRunExpr(cronExpr), // 设置 Cron 表达式
WithRunPayload(data), // 传递参数
)
if err != nil {
g.Log().Debug(ctx, "Run Cron TaskWorker %s Error: %v", topic, err)
}
return
}

其中,WithRunExpr(cronExpr) 是关键参数,用于传递 Cron 表达式,它是一个函数选项模式的实现:

1
2
3
4
5
6
// WithRunExpr Cron表达式, 最小单位1分钟, 参见gorhill/cronexpr
func WithRunExpr(s string) func(*RunOptions) {
return func(options *RunOptions) {
getRunOptionsOrSetDefault(options).expr = s
}
}

2. Worker 中的 Cron 任务处理

Worker 结构体的 Cron 方法是定时任务的核心实现,它完成以下工作:

  1. 参数校验:验证 Worker 是否已初始化、Cron 表达式是否为空、任务负载是否过大
  2. 计算下一次执行时间:通过 getNext 函数解析 Cron 表达式并计算下次执行时间
  3. 任务创建与存储:创建 periodTask 对象并存储到 Redis 中
  4. 任务更新处理:检查并处理已有任务的表达式变更情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Cron 设置周期性任务
func (wk *Worker) Cron(options ...func(*RunOptions)) (err error) {
// 检查Worker是否初始化
if wk == nil {
return fmt.Errorf("工作器未初始化")
}

// 解析选项
ops := getRunOptionsOrSetDefault(nil)
for _, f := range options {
f(ops)
}

// 参数校验
if ops.uid == "" {
return errors.Unwrap(ErrUuidNil)
}

if ops.expr == "" {
return fmt.Errorf("Cron表达式不能为空")
}

// 计算下一次执行时间
var next int64
now := time.Now().Unix()
next, err = getNext(ops.expr, now)
if err != nil {
alog.M(alog.WorkerModule).Errorf(context.Background(), "Cron表达式 '%s' 无效: %v", ops.expr, err)
return errors.Unwrap(ErrExprInvalid)
}

// 创建周期任务
t := periodTask{
Expr: ops.expr,
Group: strings.Join([]string{ops.group, "cron"}, "."),
Uid: ops.uid,
Payload: ops.payload,
Next: next,
MaxRetry: ops.maxRetry,
Timeout: ops.timeout,
}

// 保存任务到Redis
_, err = wk.redis.HSet(ctx, wk.ops.redisPeriodKey, ops.uid, t.String()).Result()
// ...
}

3. Cron 表达式解析与执行时间计算

getNext 函数负责解析 Cron 表达式并计算下一次执行时间:

1
2
3
4
5
6
7
8
9
10
11
// getNext 计算下一次执行时间
func getNext(expr string, timestamp int64) (next int64, err error) {
t := time.Unix(timestamp, 0)
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
sched, err := specParser.Parse(expr)
if err != nil {
return
}
next = sched.Next(t).Unix()
return
}

该函数使用了 Go 的 cron 包,支持秒、分、时、日、月、星期和描述符等多种时间单位的表达式解析。

4. 定时任务的扫描与执行

系统通过 Workerscan 方法定期扫描并处理到期的定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// scan 扫描并处理任务队列 - 优化版本
func (wk *Worker) scan() {
// 创建带超时的上下文
// ...

// 获取分布式锁
// ...

// 从Redis获取所有任务
m, err := wk.redis.HGetAll(fetchCtx, wk.ops.redisPeriodKey).Result()
// ...

now := time.Now().Unix()

// 找出需要执行的任务
tasksToExecute := make([]*periodTask, 0, min(len(m), 100))
for uid, taskData := range m {
// ...
if now >= item.Next {
taskCopy := item
tasksToExecute = append(tasksToExecute, &taskCopy)
}
// ...
}

// 批量执行任务
if len(tasksToExecute) > 0 {
wk.executeTasks(ctx, tasksToExecute, now)
}
}

5. 任务执行与更新

executeTasks 方法采用工作池模式并发执行任务,并在执行完成后计算和更新下一次执行时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// executeTasks 批量执行任务,提高性能
func (wk *Worker) executeTasks(ctx context.Context, tasks []*periodTask, now int64) {
// 使用工作池模式,限制并发执行的任务数量
maxConcurrent := min(len(tasks), 10) // 最多同时执行10个任务
semaphore := make(chan struct{}, maxConcurrent)
// ...

// 启动任务执行协程
for _, task := range tasks {
wg.Add(1)
go func(t *periodTask) {
defer wg.Done()
// ...

// 执行任务
if err := wk.executeTaskSafe(execCtx, t); err != nil {
alog.M(alog.WorkerModule).Debugf(ctx, "执行任务 %s 失败: %v", t.Uid, err)
}

// 计算下一次执行时间
next, err := getNext(t.Expr, now)
if err != nil {
alog.M(alog.WorkerModule).Debugf(ctx, "计算任务 %s 下一次执行时间失败: %v", t.Uid, err)
return
}

// 更新任务的下一次执行时间
t.Next = next
// ...
}(task)
}
// ...

// 批量更新任务到Redis
wk.batchUpdateTasks(ctx, updateTasks, len(tasks))
}

关键技术特点

  1. 分布式设计:使用 Redis 存储任务信息,支持多实例部署
  2. 分布式锁:使用 nx.Nx 实现分布式锁,确保任务不会被重复执行
  3. 工作池模式:限制并发任务数,防止系统资源耗尽
  4. 错误处理与恢复:多处使用 defer recover() 机制,提高系统稳定性
  5. 超时控制:对各个环节都设置了合理的超时时间,避免任务阻塞
  6. 原子操作:使用原子操作处理计数器等共享资源,确保线程安全
  7. 自适应调整:根据锁获取失败次数动态调整扫描间隔

总结

sagooiot-professional 项目中的 work_proces.go 通过与 worker.go 配合,实现了一套完整的基于 Cron 表达式的定时任务调度系统。该系统采用了分布式设计,结合了工作池模式、分布式锁、错误恢复等多种技术手段,确保了定时任务的高可靠性和高可用性。

整个流程可以概括为:用户通过 Scheduled.Cron 方法注册定时任务 → 系统将任务存储到 Redis 中 → Worker.scan 方法定期扫描到期任务 → 系统并发执行任务并更新下一次执行时间,从而实现了基于 Cron 表达式的定时任务调度功能。