Notification-Manager 原码分析(三)消息接收、发送流程分析
Notification-Manager 原码分析(三)消息接收、发送流程分析
Alexnotification-manager 原码分析(三)消息接收、发送流程分析
main 启动文件解析
- 文件路径:
cmd/notification-manager/main.go
main主流程
1、相关flag参数解析
1
kingpin.Parse()
2、日志级别设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
if *logfmt == logFormatJson {
logger = log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
}
switch *logLevel {
case logLevelDebug:
logger = level.NewFilter(logger, level.AllowDebug())
case logLevelInfo:
logger = level.NewFilter(logger, level.AllowInfo())
case logLevelWarn:
logger = level.NewFilter(logger, level.AllowWarn())
case logLevelError:
logger = level.NewFilter(logger, level.AllowError())
default:
_, _ = fmt.Fprintf(os.Stderr, "log level %v unknown, %v are possible values", *logLevel, logLevels)
return 1
}
logger = log.With(logger, "ts", log.DefaultTimestamp)
logger = log.With(logger, "caller", log.DefaultCaller)
_ = level.Info(logger).Log("msg", "Starting notification manager...", "addr", *listenAddress, "timeout", *webhookTimeout)3、初始化controller并传递context和logger,这里主要是相关k8s资源使用的controller
1
2
3
4
5
6
7
8// Setup notification manager controller
var err error
ctlCtx, cancelCtl := context.WithCancel(context.Background())
defer cancelCtl()
if ctl, err = controller.New(ctlCtx, logger); err != nil {
_ = level.Error(logger).Log("msg", "Failed to create notification manager controller")
return -1
}4、创建infomer watch k8s 资源,这里包含自定义的CRD和template配置及相关secret
1
2
3
4
5// Sync notification manager config
if err := ctl.Run(); err != nil {
_ = level.Error(logger).Log("msg", "Failed to create sync notification manager controller")
return -1
}5、创建存储处理器,管理alert数据,系统默认实现基于memory的channel队列,实现了
push
/pull
/colose
方法1
alerts := store.NewAlertStore(*storeType)
6、注册启动http server,
wh.New
追进去是具体的http路由注册,消息接收的部分再单独解析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// webhook 请求路由注册
// Setup webhook to receive alert/notification msg
webhook := wh.New(
logger,
ctl,
alerts,
&wh.Options{
ListenAddress: *listenAddress,
WebhookTimeout: *webhookTimeout,
WorkerTimeout: *wkrTimeout,
})
ctxHttp, cancelHttp := context.WithCancel(context.Background())
defer cancelHttp()
srvCh := make(chan error, 1)
go func() {
// 启动http服务, 如果退出会往chanel写入close,下边channel会接收处理http服务关闭逻辑
srvCh <- webhook.Run(ctxHttp)
}()7、消息调度: 启动异步任务,轮训告警消息队列请求alert消息,如果拿到就走消息发送流程逻辑
1
2
3
4
5
6
7dispCh := make(chan error, 1)
// 告警、通知消息调度处理
disp := dispatcher.New(logger, ctl, alerts, *webhookTimeout, *wkrTimeout, *wkrQueue)
go func() {
// 调度器运行: 循环pull消息,拿到就走发送逻辑
dispCh <- disp.Run()
}()8、监听系统信号和上边创建的chanel,处理异常和关闭事件
1 | termCh := make(chan os.Signal, 1) |
整体看启动入口逻辑并不复杂。核心两块:一块接收api消息,和另外一块对接收的消息异步队列去处理送消息。
消息接收
http路由:
- 基于main入口
webhook := wh.New(
追进去可以看到对外提供的api和处理方法1
2
3
4
5
6
7
8
9
10
11h.router.Get("/receivers", h.handler.ListReceivers)
h.router.Get("/configs", h.handler.ListConfigs)
h.router.Get("/receiverWithConfig", h.handler.ListReceiverWithConfig)
h.router.Post("/api/v2/alerts", h.handler.Alert)
h.router.Post("/api/v2/verify", h.handler.Verify)
h.router.Post("/api/v2/notifications", h.handler.Notification)
h.router.Get("/metrics", h.handler.ServeMetrics)
h.router.Get("/-/reload", h.handler.ServeReload)
h.router.Get("/-/ready", h.handler.ServeHealthCheck)
h.router.Get("/-/live", h.handler.ServeReadinessCheck)
h.router.Get("/status", h.handler.ServeStatus)
- 基于main入口
接收alert消息Api
h.router.Post("/api/v2/alerts", h.handler.Alert)
就是对外主要提供消息发送的webhook Api了,查看实现
消息接收h.handler.Alert实现内容
1、读取request body体,
1
2
3
4
5
6b, err := io.ReadAll(r.Body)
if err != nil {
fmt.Println(err)
return
}2、数据解析、绑定
1
2
3
4
5data := template.Data{}
if err := utils.JsonDecode(r.Body, &data); err != nil {
h.handle(w, &response{http.StatusBadRequest, err.Error()})
return
}3、接收alert、加工和push队列操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
cluster := h.notifierCtl.GetCluster()
for _, alert := range data.Alerts {
if v := alert.Labels["cluster"]; v == "" {
// 追加cluster label操作,如果有就忽略,没有尝试从集群配置中读取并设置上
alert.Labels["cluster"] = cluster
}
alert.ID = utils.Hash(alert)
// 通过Provider push 推送alert到store中,至此消息接收的洛基就完成了。
// dispatcher中轮训d.alerts.Pull拉取数据进行告警发送
if err := h.alerts.Push(alert); err != nil {
_ = level.Error(h.logger).Log("msg", "push alert error", "error", err.Error())
}
}4、看下push操作:
- memProvider系统实现了一个channel,main入口的时候已经初始化了,这里直接调用push就往channel写入alert数据进去了
1
2
3
4
5
6
7
8
9
10
11
12func (p *memProvider) Push(alert *template.Alert) error {
ctx, cancel := context.WithTimeout(context.Background(), *pushTimeout)
defer cancel()
select {
case p.ch <- alert:
return nil
case <-ctx.Done():
return utils.Error("Time out")
}
}
- memProvider系统实现了一个channel,main入口的时候已经初始化了,这里直接调用push就往channel写入alert数据进去了
5、返回成功接收提示信息
1
h.handle(w, &response{http.StatusOK, "Notification request accepted"})
至此消息接收就完成了。逻辑比较简单,接下来看下如何调度推送消息到reciver的。
消息发送流程逻辑
- main入口文件
dispCh <- disp.Run()
创建异步任务开始执行alert消息队列的调度和处理,追进去看下逻辑。
disp.Run()消息调度处理逻辑
Run函数代码
1
2
3
4
5
6
7
8
9
10
11
12func (d *Dispatcher) Run() error {
for {
// err is not nil means the store had closed, dispatcher should process remaining alerts, then exit.
if alerts, err := d.alerts.Pull(d.notifierCtl.GetBatchMaxSize(), d.notifierCtl.GetBatchMaxWait()); err == nil {
go d.processAlerts(alerts)
} else {
d.processAlerts(alerts)
return nil
}
}
}1、轮询拉取alert数据
d.alerts.Pull
, 追进去能看到实际就是一个chanel等待alert消息,如果有就返回到当前方法了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22func (p *memProvider) Pull(batchSize int, batchWait time.Duration) ([]*template.Alert, error) {
ctx, cancel := context.WithTimeout(context.Background(), batchWait)
defer cancel()
var as []*template.Alert
for {
select {
case <-ctx.Done():
return as, nil
case alert := <-p.ch: // 读channel,对应/api/v2/alerts hannder中的push,通信就是通过channel。一端写,一端读
if alert == nil {
return as, utils.Error("Store closed")
}
as = append(as, alert)
if len(as) >= batchSize {
return as, nil
}
// 拿到消息后返回
}
}
}2、
processAlerts
拿到alert后开始处理发送
processAlerts 处理alert消息逻辑
- 1、打印日志信息,并且起了一个channel异步等待任务结束,并打印处理时间
1 | _ = level.Debug(d.l).Log("msg", "Dispatcher: Begins to process alerts...", "alerts", len(alerts)) |
2、设置超时时间,main启动入口new的时候设置了,默认5s
1
2
3
4
5
6d.seq = d.seq + 1
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), d.wkrTimeout)
ctx = context.WithValue(ctx, "seq", d.seq)
defer cancel()3、异步处理,并阻塞等待结果,到此处理大的逻辑结束。
1 | stopCh := make(chan struct{}) |
- 3.1、调度worker处理alert消息内容流程
go d.worker(ctx, alerts, stopCh)
- 3.1.1、worker方法代码。
- 这里添加了多个stage,对应README.md流程中的stage: silence-> router->filter-> aggregation->notify
- 添加完stage之后调用
pipeline.Exec
执行对应stage的Exec
- 3.1.1、worker方法代码。
1 | func (d *Dispatcher) worker(ctx context.Context, data interface{}, stopCh chan struct{}) { |
- 4、alert消息处理stage
pipeline.Exec(ctx, d.l, data)
1 | // Exec implements the Stage interface. |
ctx, data, err = s.Exec(ctx, l, data)
这里调用pipeline 对应的stage具体实现Exec方法, 顺序silence-> router->filter-> aggregation->notify每个stage实现内容:
- silence: 如果有静默规则匹配则跳过告警消息; 否则数据追加最终返回
- router: router的label匹配,租户和接收者匹配,匹配的receiver复制告警消息到receiver,给后续发送使用
- filter: 消息过滤,如果处于静默的消息、标签匹配的这里会过滤掉
- aggregation: 聚合
- notify: 最终具体的消息发送处理逻辑
notify stage Exec稍微展开看下, 代码位置:
pkg/notify/notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25...
for k, v := range input {
receiver := k
ds := v
// 定义可以借鉴,直接就获取到了消息类型的数据结构和controller,后边就可以直接调用对应的controller方法,nice
nf, err := factories[receiver.GetType()](l, receiver, s.notifierCtl)
if err != nil {
e := err
group.Add(func(stopCh chan interface{}) {
stopCh <- e
})
continue
}
// 直接调用对应告警消息类型的实现去发送消息了,类型、controller都做了区分,直接调用对应实现逻辑了。
nf.SetSentSuccessfulHandler(&handler)
for _, d := range ds {
alert := d
group.Add(func(stopCh chan interface{}) {
// 这里就调实际各自类型receiver接收类型的发送方法了,开始发送消息
stopCh <- nf.Notify(ctx, alert)
})
}
}
...官方默认实现了如下的notify, 对应里面就是具体发送消息的逻辑,比如email、feishu等。不同的发送渠道参数不一样,所以receiver中对应的接收渠道参数也率有差异
- 最终反回发送状态和结果。至此发送消息逻辑就结束了。返回到一直循环的方法,不停循环一直监听alert channel,有数据后发送。