为什么需要定时任务
以电网接口为例,系统需要向目标接口发送数据,对方要求数据需要定时上报,这是就需要后端执行定时任务,每到定时任务时间自动执行发送数据任务;还有的情况是,系统需要定时向某第三方api请求数据,或者需要定时执行系统上的功能。
在插件中的定时任务的实现
首先定时任务整个流程如下:

task层中的具体工作流程:

定时任务执行的文件代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package task
import ( "context" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" "huadian-dcsc/module/dcscpush/service" "sagooiot/pkg/worker" )
var ScheduledSyncUserList = new(worker.Scheduled)
func ScheduledSyncUserListRun() { ScheduledSyncUserList = worker.RegisterProcess(SyncUserList) }
var SyncUserList = &qSyncUserList{}
type qSyncUserList struct { worker.BaseProcess }
func (q *qSyncUserList) GetTopic() string { return TaskSyncUserList }
func (q *qSyncUserList) Handle(ctx context.Context, mqMsg worker.Payload) (err error) { newCtx := context.Background() workerName := gconv.String(mqMsg.Payload) g.Log().Info(newCtx, workerName, " Start TaskSyncUserList") err = service.Datasync().SyncSysUser(ctx) if err != nil { return } return }
|
task.go的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package task
import ( "context" _ "huadian-dcsc/module/dcscpush/logic/datasync" )
func Run() (err error) { ScheduledSyncUserListRun()
err = ScheduledSyncUserList.Cron(context.Background(), TaskSyncUserList, "0 0 2 * * ?", []byte("ctwing")) if err != nil { return }
}
|
sagooiot中work包的定时任务处理
work_proces.go 中 Cron 表达式定时执行任务的实现机制
整体架构概述
在 sagooiot-professional 项目中,work_proces.go 文件通过与 worker.go 配合,实现了一套基于 Cron 表达式的定时任务调度系统。该系统采用了分布式设计,使用 Redis 作为存储媒介,并结合了工作池、信号量等机制确保任务执行的高效性和稳定性。
Cron 表达式定时任务的核心实现流程
1. 任务注册机制
Scheduled 结构体的 Cron 方法是用户注册定时任务的入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (s *Scheduled) Cron(ctx context.Context, topic, cronExpr string, data []byte) (err error) { s.topic = topic err = s.w.Cron( WithRunUuid(topic), WithRunGroup(topic), WithRunExpr(cronExpr), WithRunPayload(data), ) if err != nil { g.Log().Debug(ctx, "Run Cron TaskWorker %s Error: %v", topic, err) } return }
|
其中,WithRunExpr(cronExpr) 是关键参数,用于传递 Cron 表达式,它是一个函数选项模式的实现:
1 2 3 4 5 6
| func WithRunExpr(s string) func(*RunOptions) { return func(options *RunOptions) { getRunOptionsOrSetDefault(options).expr = s } }
|
2. Worker 中的 Cron 任务处理
Worker 结构体的 Cron 方法是定时任务的核心实现,它完成以下工作:
- 参数校验:验证 Worker 是否已初始化、Cron 表达式是否为空、任务负载是否过大
- 计算下一次执行时间:通过
getNext 函数解析 Cron 表达式并计算下次执行时间
- 任务创建与存储:创建
periodTask 对象并存储到 Redis 中
- 任务更新处理:检查并处理已有任务的表达式变更情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (wk *Worker) Cron(options ...func(*RunOptions)) (err error) { if wk == nil { return fmt.Errorf("工作器未初始化") }
ops := getRunOptionsOrSetDefault(nil) for _, f := range options { f(ops) }
if ops.uid == "" { return errors.Unwrap(ErrUuidNil) }
if ops.expr == "" { return fmt.Errorf("Cron表达式不能为空") }
var next int64 now := time.Now().Unix() next, err = getNext(ops.expr, now) if err != nil { alog.M(alog.WorkerModule).Errorf(context.Background(), "Cron表达式 '%s' 无效: %v", ops.expr, err) return errors.Unwrap(ErrExprInvalid) }
t := periodTask{ Expr: ops.expr, Group: strings.Join([]string{ops.group, "cron"}, "."), Uid: ops.uid, Payload: ops.payload, Next: next, MaxRetry: ops.maxRetry, Timeout: ops.timeout, }
_, err = wk.redis.HSet(ctx, wk.ops.redisPeriodKey, ops.uid, t.String()).Result() }
|
3. Cron 表达式解析与执行时间计算
getNext 函数负责解析 Cron 表达式并计算下一次执行时间:
1 2 3 4 5 6 7 8 9 10 11
| func getNext(expr string, timestamp int64) (next int64, err error) { t := time.Unix(timestamp, 0) specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) sched, err := specParser.Parse(expr) if err != nil { return } next = sched.Next(t).Unix() return }
|
该函数使用了 Go 的 cron 包,支持秒、分、时、日、月、星期和描述符等多种时间单位的表达式解析。
4. 定时任务的扫描与执行
系统通过 Worker 的 scan 方法定期扫描并处理到期的定时任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (wk *Worker) scan() {
m, err := wk.redis.HGetAll(fetchCtx, wk.ops.redisPeriodKey).Result()
now := time.Now().Unix()
tasksToExecute := make([]*periodTask, 0, min(len(m), 100)) for uid, taskData := range m { if now >= item.Next { taskCopy := item tasksToExecute = append(tasksToExecute, &taskCopy) } }
if len(tasksToExecute) > 0 { wk.executeTasks(ctx, tasksToExecute, now) } }
|
5. 任务执行与更新
executeTasks 方法采用工作池模式并发执行任务,并在执行完成后计算和更新下一次执行时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (wk *Worker) executeTasks(ctx context.Context, tasks []*periodTask, now int64) { maxConcurrent := min(len(tasks), 10) semaphore := make(chan struct{}, maxConcurrent)
for _, task := range tasks { wg.Add(1) go func(t *periodTask) { defer wg.Done()
if err := wk.executeTaskSafe(execCtx, t); err != nil { alog.M(alog.WorkerModule).Debugf(ctx, "执行任务 %s 失败: %v", t.Uid, err) }
next, err := getNext(t.Expr, now) if err != nil { alog.M(alog.WorkerModule).Debugf(ctx, "计算任务 %s 下一次执行时间失败: %v", t.Uid, err) return }
t.Next = next }(task) }
wk.batchUpdateTasks(ctx, updateTasks, len(tasks)) }
|
关键技术特点
- 分布式设计:使用 Redis 存储任务信息,支持多实例部署
- 分布式锁:使用
nx.Nx 实现分布式锁,确保任务不会被重复执行
- 工作池模式:限制并发任务数,防止系统资源耗尽
- 错误处理与恢复:多处使用
defer recover() 机制,提高系统稳定性
- 超时控制:对各个环节都设置了合理的超时时间,避免任务阻塞
- 原子操作:使用原子操作处理计数器等共享资源,确保线程安全
- 自适应调整:根据锁获取失败次数动态调整扫描间隔
总结
sagooiot-professional 项目中的 work_proces.go 通过与 worker.go 配合,实现了一套完整的基于 Cron 表达式的定时任务调度系统。该系统采用了分布式设计,结合了工作池模式、分布式锁、错误恢复等多种技术手段,确保了定时任务的高可靠性和高可用性。
整个流程可以概括为:用户通过 Scheduled.Cron 方法注册定时任务 → 系统将任务存储到 Redis 中 → Worker.scan 方法定期扫描到期任务 → 系统并发执行任务并更新下一次执行时间,从而实现了基于 Cron 表达式的定时任务调度功能。