Drone原码分析(二)任务调度执行流程
Drone原码分析(二)任务调度执行流程
Alexdrone原码分析(二)任务调度执行流程
注册runner
main入口启动文件:
cmd/drone-server/main.go
1、注册启动配置: 这里有很多服务参数启动注册,这里只关注runner的部分,进入
InitializeApplication
后就能看到runner1
2
3
4
5
6app, err := InitializeApplication(config)
if err != nil {
logger := logrus.WithError(err)
logger.Fatalln("main: cannot initialize server")
}2、runner配置
1
runner := provideRunner(buildManager, secretService, registryService, config2)
3、应用配置中注入runner
1
mainApplication := newApplication(cronScheduler, reaper, datadog, runner, serverServer, userStore)
drone提供多种runner:docker、k8s等。runner仓库地址: https://github.com/drone-runners
4、main入口文件中启动server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15g := errgroup.Group{}
g.Go(func() error {
logrus.WithFields(
logrus.Fields{
"proto": config.Server.Proto,
"host": config.Server.Host,
"port": config.Server.Port,
"url": config.Server.Addr,
"acme": config.Server.Acme,
},
).Infoln("starting the http server")
return app.server.ListenAndServe(ctx)
})5、启动runner: 如果单独配置了runner则不会启动这里的,比如配置了drone k8s runner则会使用k8s runner
1
2
3
4
5
6
7
8
9
10
11
12
13// launches the build runner in a goroutine. If the local
// runner is disabled (because nomad or kubernetes is enabled)
// then the goroutine exits immediately without error.
g.Go(func() (err error) {
if app.runner == nil {
return nil
}
logrus.WithField("threads", config.Runner.Capacity).
Infoln("main: starting the local build runner")
return app.runner.Start(ctx, config.Runner.Capacity)
})runner扩展帮助项目: https://github.com/drone/runner-go
k8s runner: https://github.com/drone-runners/drone-runner-kube
k8s runner为例: 文件
command/command.go
注册runner/ 启动runner/- 1、注册配置
1
2
3
4
5
6
7
8
9
10
11// Command parses the command line arguments and then executes a
// subcommand program.
func Command() {
app := kingpin.New("drone", "drone kubernetes runner")
registerCompile(app)
registerExec(app)
daemon.Register(app)
kingpin.Version(version)
kingpin.MustParse(app.Parse(os.Args[1:]))
} - 2、启动runner
1
2
3
4
5
6
7
8
9
10
11
12
13// Register the daemon command.
func Register(app *kingpin.Application) {
c := new(daemonCommand)
cmd := app.Command("daemon", "starts the runner daemon").
Default().
Action(c.run)
cmd.Arg("envfile", "load the environment variable file").
Default("").
StringVar(&c.envfile)
}
- 1、注册配置
3、
command/daemon/daemon.go
中有定义runner任务拉取和server启动
1 | var g errgroup.Group |
1 | g.Go(func() error { |
- 总结:
- 如果不配置runner,默认server中会启动一个go协成进行任务拉取、调度执行,默认以docker方式运行。
- 若单独定义了runner,如k8s则不会使用默认的go协成。会通过定义的runner是自行维护调度worker。
- 用户可以基于runner-go自行实现runner
- runner所需要做的事情:与server交互通过令牌注册,watch server pendding的任务,任务、平台等参数符合有拉取到任务后执行调度创建资源、执行任务运行。当然也换有其他许多事项,比如任务实时日志等。可以基于k8s runner和runner-go参考实现自己需要的runner。
任务调度
- 参见drone原码分析(一)任务创建流程中的任务创建
t.sched.Schedule(ctx, stage)
,当pengding的stage任务出现时,server push到队列中。 - 接着已经启动的runner,通过
poller.Poll(ctx, config.Runner.Capacity)
请求server队列,通过rpc拉取任务。拉取到任务之后就是runner调度执行的逻辑。 - 这里接着drone项目代码中的逻辑进行分析。其他runner大致相似,都是实现自己的runner去创建不同的资源去执行任务。
runner启动、拉取任务
1、main入口启动runner。 主入口main文件
cmd/drone-server/main.go
1
2
3
4
5
6
7
8g.Go(func() (err error) {
if app.runner == nil {
return nil
}
logrus.WithField("threads", config.Runner.Capacity).
Infoln("main: starting the local build runner")
return app.runner.Start(ctx, config.Runner.Capacity)
})2、根据配置参数启动制定数量的runner
1
2
3
4
5
6
7
8
9
10
11
12// Start starts N build runner processes. Each process polls
// the server for pending builds to execute.
func (r *Runner) Start(ctx context.Context, n int) error {
var g errgroup.Group
for i := 0; i < n; i++ {
g.Go(func() error {
return r.start(ctx)
})
}
return g.Wait()
}3、守护拉取任务
r.poll(ctx)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15func (r *Runner) start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
// This error is ignored on purpose. The system
// should not exit the runner on error. The run
// function logs all errors, which should be enough
// to surface potential issues to an administrator.
r.poll(ctx)
}
}
}4、
poll
函数拉取任务逻辑4.1 请求当前runner可以执行的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18p, err := r.Manager.Request(ctx, &manager.Request{
Kind: "pipeline",
Type: "docker",
OS: r.OS,
Arch: r.Arch,
Kernel: r.Kernel,
Variant: r.Variant,
Labels: r.Labels,
})
if err != nil {
logger = logger.WithError(err)
logger.Warnln("runner: cannot get queue item")
return err
}
if p == nil || p.ID == 0 {
return nil
}4.2枷锁确认当前任务
1 | _, err = r.Manager.Accept(ctx, p.ID, r.Machine) |
4.3 watch 任务cancel信号, 如果接收到取消信号则关闭任务执行
1
2
3
4
5
6
7
8
9
10
11go func() {
logger.Debugln("runner: watch for cancel signal")
done, _ := r.Manager.Watch(ctx, p.BuildID)
if done {
cancel()
logger.Debugln("runner: received cancel signal")
} else {
logger.Debugln("runner: done listening for cancel signals")
}
}()4.4
r.Run(ctx, p.ID)
调度创建运行资源1
return r.Run(ctx, p.ID)
5、runner.Run执行逻辑
1
2
3func (r *Runner) Run(ctx context.Context, id int64) error {
...
}5.1 捕获panic
1
2
3
4
5
6
7
8
9
10defer func() {
// taking the paranoid approach to recover from
// a panic that should absolutely never happen.
if r := recover(); r != nil {
logger.Errorf("runner: unexpected panic: %s", r)
debug.PrintStack()
}
}()5.2、请求任务详情
1 | m, err := r.Manager.Details(ctx, id) |
5.3、请求netrc
1
2
3
4
5
6
7
8
9
10netrc, err := r.Manager.Netrc(ctx, m.Repo.ID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("runner: cannot get netrc file")
return r.handleError(ctx, m.Stage, err)
}
if netrc == nil {
netrc = new(core.Netrc)
}netrc结构定义。 Netrc 包含自动登录过程使用的登录和初始化信息。
1
2
3
4
5
6Netrc struct {
Machine string `json:"machine"`
Login string `json:"login"`
Password string `json:"password"`
}NetrcService 返回一个有效的 netrc 文件,该文件可用于验证和克隆私有存储库。如果不需要或启用身份验证,则返回 nil Netrc 文件和 nil 错误。
5.4、任务状态判断,如果此时任务取消了,则会退出后续逻辑
1
2
3
4
5
6if m.Build.Status == core.StatusKilled || m.Build.Status == core.StatusSkipped {
logger = logger.WithError(err)
logger.Infoln("runner: cannot run a canceled build")
return nil
}5.5、构建环境变量参数
1 | environ := combineEnviron( |
- 5.6、pipeline yaml文件解析、验证。这块代码逻辑和trigger创建任务逻辑有些相似。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28y, err := converter.ConvertString(string(m.Config.Data), converter.Metadata{
Filename: m.Repo.Config,
URL: m.Repo.Link,
Ref: m.Build.Ref,
})
if err != nil {
return err
}
y, err = envsubst.Eval(y, func(name string) string {
env := environ[name]
if strings.Contains(env, "\n") {
env = fmt.Sprintf("%q", env)
}
return env
})
if err != nil {
return r.handleError(ctx, m.Stage, err)
}
manifest, err := yaml.ParseString(y)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("runner: cannot parse yaml")
return r.handleError(ctx, m.Stage, err)
} - 5.7、pipeline yaml解析stage,找到当前任务stage的yaml配置
1 | var pipeline *yaml.Pipeline |
- 5.8、配置docker registry认证参数
1 | secretService := secret.Combine( |
5.9、配置docker启动配置。docker image、volume、workspace、label、docker资源配置额等
1
2
3....
ir := comp.Compile(pipeline)5.10、构建stage steps
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18steps := map[string]*core.Step{}
i := 0
for _, s := range ir.Steps {
if s.RunPolicy == engine.RunNever {
continue
}
i++
dst := &core.Step{
Number: i,
Name: s.Metadata.Name,
StageID: m.Stage.ID,
Status: core.StatusPending,
ErrIgnore: s.IgnoreErr,
}
steps[dst.Name] = dst
m.Stage.Steps = append(m.Stage.Steps, dst)
}5.11、构建hook,代码比较长
1
2
3
4hooks := &runtime.Hook{
....
}5.12、new runner: 基于上边组装的hook和docker engine spec参数创建runner
1
2
3
4
5
6runner := runtime.New(
runtime.WithEngine(r.Engine),
runtime.WithConfig(ir),
runtime.WithHooks(hooks),
)5.13 runner运行
1 |
|
r.Manager.BeforeAll和 r.Manager.AfterAll通知服务端更新任务状态,最对应逻辑处理。
runner.Run(timeout) 真正调用docker api创建容器
6.1、 runner.Run。 这里和5.0区别,5.0是运行执行任务id的任务。而这里是运行runner创建容器运行pipeline并等待完成
1
2
3func (r *Runtime) Run(ctx context.Context) error {
return r.Resume(ctx, 0)
}7、
(r *Runtime) Resume(ctx context.Context, start int)
代码逻辑7.1 、延迟执行,函数执行完后清理容器环境现场
1
2
3
4
5
6
7defer func() {
// note that we use a new context to destroy the
// environment to ensure it is not in a canceled
// state.
r.engine.Destroy(context.Background(), r.config)
}()7.2 hook before, 如果有配置则执行,在执行所有步骤之前调用。
1
2
3
4
5
6if r.hook.Before != nil {
state := snapshot(r, nil, nil)
if err := r.hook.Before(state); err != nil {
return err
}
}7.3
r.engine.Setup
: 创建容器存储、网络1
2
3if err := r.engine.Setup(ctx, r.config); err != nil {
return err
}7.4、如果要在串行模式下执行步骤,并且未定义image依赖项,则 isSerial 返回 true。
判断pipeline容器组有没有docker
DependsOn
服务,有就需要顺序创建执行,没有则串行创建执行
1 | if isSerial(r.config) { |
7.4.1
r.execAll(steps)
创建wait group,批量执行多个step,并批量等待1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (r *Runtime) execAll(group []*engine.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
// if a previous step returned error code 78
// the pipeline process skips all subsequent
// pipeline steps.
if r.error == ErrInterrupt {
close(done)
return done
}
for _, step := range group {
step := step
g.Go(func() error {
return r.exec(step)
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}7.4.1.2、
return r.exec(step)
核心代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71func (r *Runtime) exec(step *engine.Step) error {
# 第一部分 r.hook.BeforeEach,执行前钩子
...
if r.hook.BeforeEach != nil {
state := snapshot(r, step, nil)
if err := r.hook.BeforeEach(state); err == ErrSkip {
return nil
} else if err != nil {
return err
}
}
...
# 第二部分 r.engine.Create: 拉镜像、配置网络、copy文件、创建容器
if err := r.engine.Create(ctx, r.config, step); err != nil {
// TODO(bradrydzewski) refactor duplicate code
if r.hook.AfterEach != nil {
r.hook.AfterEach(
snapshot(r, step, &engine.State{
ExitCode: 255, Exited: true,
}),
)
}
return err
}
# 第三部分 r.engine.Start,真正的启动容器了,调用docker api启动容器
if err := r.engine.Start(ctx, r.config, step); err != nil {
// TODO(bradrydzewski) refactor duplicate code
if r.hook.AfterEach != nil {
r.hook.AfterEach(
snapshot(r, step, &engine.State{
ExitCode: 255, Exited: true,
}),
)
}
return err
}
# 第四部分 r.engine.Tail,调用docker api taillog,并copy io流实现实时日志输出展示
rc, err := r.engine.Tail(ctx, r.config, step)
if err != nil {
// TODO(bradrydzewski) refactor duplicate code
if r.hook.AfterEach != nil {
r.hook.AfterEach(
snapshot(r, step, &engine.State{
ExitCode: 255, Exited: true,
}),
)
}
return err
}
# 第五部分 r.engine.Wait:等待容器执行结果,成功或失败
wait, err := r.engine.Wait(ctx, r.config, step)
# 第六部分 r.hook.AfterEach: 钩子
if r.hook.AfterEach != nil {
state := snapshot(r, step, wait)
if err := r.hook.AfterEach(state); err != nil {
return err
}
}
}回到7
if isSerial(r.config) {
的另外一个情况err := r.execGraph(ctx)
区别是容器有依赖关系,需要先启动被依赖容器后等到完成后再启动自身这种情况的处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (r *Runtime) execGraph(ctx context.Context) error {
var d dag.Runner
for _, s := range r.config.Steps {
step := s
#### AddVertex 在图形中添加一个函数作为顶点。只有以这种方式添加的函数才会在运行期间执行。
d.AddVertex(step.Metadata.Name, func() error {
select {
case <-ctx.Done():
return ErrCancel
default:
}
r.mu.Lock()
skip := r.error == ErrInterrupt
r.mu.Unlock()
if skip {
return nil
}
err := r.exec(step) ### 这里和7中的调用容器启动容器逻辑一样
if err != nil {
r.mu.Lock()
r.error = err
r.mu.Unlock()
}
return nil
})
}
for _, s := range r.config.Steps {
for _, dep := range s.DependsOn {
d.AddEdge(dep, s.Metadata.Name)
}
}
return d.Run()
}
到这里位置主要任务调度执行和创建docker container到任务执行逻辑就结束。