gofr操作的相关关键字

一、代码中关键关键字 / 组件的作用详解

以下是基于 GoFrame 并发代码中核心关键字和组件的详细说明,结合代码场景解释其作用:

1. 协程与并发基础

  • go

    Go 语言的协程启动关键字,用于创建轻量级线程(goroutine)。

    在代码中,

    1
    go worker(...)
    1
    go resultProcessor(...)

    等语句启动了多个并行执行的协程,实现任务的并发处理。

    相比操作系统线程,协程由 Go runtime 管理,内存占用极低(初始栈仅 2KB),可同时启动数万级协程。

2. 通道(数据通信)

  • **chan**用于协程间安全通信的数据结构,实现 “通过通信共享内存,而非通过共享内存通信”。
    • 代码中 taskChan := make(chan DataTask, 100) 创建了带缓冲的通道,缓冲大小 100 表示最多可暂存 100 个任务,避免协程阻塞。
    • 方向限定符 <-chan(只读)和 chan<-(只写)用于限制通道操作权限,增强代码安全性(如 worker 函数的 taskChan <-chan DataTask 表示只能从该通道接收数据)。
  • 通道操作 <-
    • 发送:taskChan <- task 将任务写入通道,供工作协程读取;
    • 接收:task, ok := <-taskChan 从通道读取任务,ok 用于判断通道是否已关闭。

3. 同步与等待

  • sync.WaitGroup

    用于等待一组协程完成的同步工具,核心方法:

    • wg.Add(1):注册一个待完成的协程;

    • wg.Done():通知 WaitGroup 某个协程已完成(通常在 defer 中调用);

    • wg.Wait()
      
      1
      2
      3

      阻塞当前协程,直到所有注册的协程都调用

      Done()
      1
      2
      3

      在代码中,

      sync.WaitGroup
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24

      用于等待所有工作协程处理完任务后再退出程序。

      #### 4. 资源管理与清理

      - `defer`

      用于延迟执行函数调用,确保在当前函数退出时(无论正常返回还是 panic)执行清理逻辑。

      - 代码中 `defer wg.Done()` 确保工作协程退出前通知 WaitGroup;
      - `defer ticker.Stop()` 确保定时器在结果处理器退出时停止,避免资源泄漏。

      #### 5. 上下文与退出控制

      - `context.Context`



      用于传递请求生命周期的上下文信息(如超时、取消信号),实现协程间的优雅退出。

      - `ctx := gctx.New()` 创建基础上下文;

      - ```
      <-ctx.Done()
      监听取消信号,当上下文被取消(如程序退出)时,所有依赖该上下文的协程可及时退出。 在代码中,
      1
      select
      语句结合
      1
      case <-ctx.Done()
      实现了协程的安全退出逻辑。

6. 流程控制

  • select

    用于同时监听多个通道操作,当其中一个通道可操作时执行对应逻辑,常用于协程中的多条件等待。

    代码中

    1
    select

    结合通道接收和

    1
    ctx.Done()

    实现了 “要么处理任务,要么响应退出信号” 的逻辑。

7. GoFrame 框架组件

  • gtimer

    GoFrame 的定时器组件,用于周期性执行任务。代码中

    1
    gtimer.New(...)

    定期输出任务处理进度。

  • g.Log

    GoFrame 的日志组件,用于并发安全的日志输出,避免多协程打印日志时的混乱。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main

import (
"context"
"fmt"
"time"

"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/gconv"
"sync"
)

// 数据处理任务结构体
type DataTask struct {
ID int
Data string
Result string
}

func main() {
ctx := gctx.New()

// 1. 创建通道
taskChan := make(chan DataTask, 100) // 任务通道
resultChan := make(chan DataTask, 100) // 结果通道
doneChan := make(chan struct{}) // 完成信号通道

// 2. 启动多个工作协程
const workerCount = 3
var wg sync.WaitGroup

// 启动工作协程
for i := 0; i < workerCount; i++ {
wg.Add(1)
go worker(ctx, i, taskChan, resultChan, &wg)
}

// 3. 启动结果处理协程
go resultProcessor(ctx, resultChan, doneChan)

// 4. 生成任务
go generateTasks(ctx, taskChan)

// 5. 等待所有任务完成并退出
<-doneChan
g.Log().Info(ctx, "所有任务处理完成")

// 等待工作协程结束
wg.Wait()
g.Log().Info(ctx, "所有工作协程已退出")

// 关闭所有通道
close(taskChan)
close(resultChan)
close(doneChan)
}

// 工作协程:处理任务
func worker(ctx context.Context, workerID int, taskChan <-chan DataTask, resultChan chan<- DataTask, wg *sync.WaitGroup) {
// defer确保在函数退出时通知WaitGroup
defer wg.Done()
g.Log().Infof(ctx, "工作协程 %d 启动", workerID)

// 退出时的清理工作
defer func() {
g.Log().Infof(ctx, "工作协程 %d 退出", workerID)
}()

for {
select {
case task, ok := <-taskChan:
if !ok {
// 通道已关闭,退出
return
}

g.Log().Infof(ctx, "工作协程 %d 开始处理任务 %d: %s", workerID, task.ID, task.Data)

// 模拟处理耗时
time.Sleep(time.Millisecond * 500)

// 处理数据
task.Result = fmt.Sprintf("processed: %s (by worker %d)", task.Data, workerID)

// 发送结果
resultChan <- task

case <-ctx.Done():
// 上下文取消,退出
g.Log().Infof(ctx, "工作协程 %d 收到退出信号", workerID)
return
}
}
}

// 结果处理协程:处理并汇总结果
func resultProcessor(ctx context.Context, resultChan <-chan DataTask, doneChan chan<- struct{}) {
resultCount := 0
totalTasks := 10 // 预期总任务数

// 使用定时器定期输出进度
ticker := gtimer.New(time.Second*2, func(ctx context.Context) {
g.Log().Infof(ctx, "处理进度: %d/%d", resultCount, totalTasks)
})
defer ticker.Stop() // 确保定时器在函数退出时停止

for {
select {
case result, ok := <-resultChan:
if !ok {
return
}

resultCount++
g.Log().Infof(ctx, "收到任务 %d 结果: %s", result.ID, result.Result)

// 所有任务处理完成
if resultCount >= totalTasks {
g.Log().Infof(ctx, "所有 %d 个任务已处理完毕", totalTasks)
doneChan <- struct{}{}
return
}

case <-ctx.Done():
g.Log().Info(ctx, "结果处理器收到退出信号")
return
}
}
}

// 生成任务
func generateTasks(ctx context.Context, taskChan chan<- DataTask) {
defer g.Log().Info(ctx, "任务生成器退出")

// 生成10个任务
for i := 0; i < 10; i++ {
task := DataTask{
ID: i + 1,
Data: "原始数据 " + gconv.String(i+1),
}
taskChan <- task
g.Log().Infof(ctx, "生成任务 %d", task.ID)

// 间隔一段时间生成任务
time.Sleep(time.Millisecond * 200)
}
}