Drone原码分析(二)任务调度执行流程

drone原码分析(二)任务调度执行流程

注册runner

  • main入口启动文件: cmd/drone-server/main.go

  • 1、注册启动配置: 这里有很多服务参数启动注册,这里只关注runner的部分,进入InitializeApplication后就能看到runner

    1
    2
    3
    4
    5
    6
    app, err := InitializeApplication(config)
    if err != nil {
    logger := logrus.WithError(err)
    logger.Fatalln("main: cannot initialize server")
    }

  • 2、runner配置

    1
    runner := provideRunner(buildManager, secretService, registryService, config2)
  • 3、应用配置中注入runner

    1
    mainApplication := newApplication(cronScheduler, reaper, datadog, runner, serverServer, userStore)

drone提供多种runner:docker、k8s等。runner仓库地址: https://github.com/drone-runners

  • 4、main入口文件中启动server

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    g := errgroup.Group{}
    g.Go(func() error {
    logrus.WithFields(
    logrus.Fields{
    "proto": config.Server.Proto,
    "host": config.Server.Host,
    "port": config.Server.Port,
    "url": config.Server.Addr,
    "acme": config.Server.Acme,
    },
    ).Infoln("starting the http server")
    return app.server.ListenAndServe(ctx)
    })


  • 5、启动runner: 如果单独配置了runner则不会启动这里的,比如配置了drone k8s runner则会使用k8s runner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // launches the build runner in a goroutine. If the local
    // runner is disabled (because nomad or kubernetes is enabled)
    // then the goroutine exits immediately without error.
    g.Go(func() (err error) {
    if app.runner == nil {
    return nil
    }
    logrus.WithField("threads", config.Runner.Capacity).
    Infoln("main: starting the local build runner")
    return app.runner.Start(ctx, config.Runner.Capacity)
    })


  • runner扩展帮助项目: https://github.com/drone/runner-go

  • k8s runner: https://github.com/drone-runners/drone-runner-kube

  • k8s runner为例: 文件command/command.go 注册runner/ 启动runner/

    • 1、注册配置
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      // Command parses the command line arguments and then executes a
      // subcommand program.
      func Command() {
      app := kingpin.New("drone", "drone kubernetes runner")
      registerCompile(app)
      registerExec(app)
      daemon.Register(app)

      kingpin.Version(version)
      kingpin.MustParse(app.Parse(os.Args[1:]))
      }
    • 2、启动runner
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      // Register the daemon command.
      func Register(app *kingpin.Application) {
      c := new(daemonCommand)

      cmd := app.Command("daemon", "starts the runner daemon").
      Default().
      Action(c.run)

      cmd.Arg("envfile", "load the environment variable file").
      Default("").
      StringVar(&c.envfile)
      }

  • 3、command/daemon/daemon.go中有定义runner任务拉取和server启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var g errgroup.Group
server := server.Server{
Addr: config.Server.Port,
Handler: router.New(tracer, hook, router.Config{
Username: config.Dashboard.Username,
Password: config.Dashboard.Password,
Realm: config.Dashboard.Realm,
}),
}

logrus.WithField("addr", config.Server.Port).
Infoln("starting the server")

g.Go(func() error {
return server.ListenAndServe(ctx)
})


1
2
3
4
5
6
7
8
9
10
11
g.Go(func() error {
logrus.WithField("capacity", config.Runner.Capacity).
WithField("endpoint", config.Client.Address).
WithField("kind", resource.Kind).
WithField("type", resource.Type).
Infoln("polling the remote server")

poller.Poll(ctx, config.Runner.Capacity)
return nil
})

  • 总结:
    • 如果不配置runner,默认server中会启动一个go协成进行任务拉取、调度执行,默认以docker方式运行。
    • 若单独定义了runner,如k8s则不会使用默认的go协成。会通过定义的runner是自行维护调度worker。
    • 用户可以基于runner-go自行实现runner
    • runner所需要做的事情:与server交互通过令牌注册,watch server pendding的任务,任务、平台等参数符合有拉取到任务后执行调度创建资源、执行任务运行。当然也换有其他许多事项,比如任务实时日志等。可以基于k8s runner和runner-go参考实现自己需要的runner。

任务调度

  • 参见drone原码分析(一)任务创建流程中的任务创建t.sched.Schedule(ctx, stage),当pengding的stage任务出现时,server push到队列中。
  • 接着已经启动的runner,通过poller.Poll(ctx, config.Runner.Capacity) 请求server队列,通过rpc拉取任务。拉取到任务之后就是runner调度执行的逻辑。
  • 这里接着drone项目代码中的逻辑进行分析。其他runner大致相似,都是实现自己的runner去创建不同的资源去执行任务。

runner启动、拉取任务

  • 1、main入口启动runner。 主入口main文件cmd/drone-server/main.go

    1
    2
    3
    4
    5
    6
    7
    8
    g.Go(func() (err error) {
    if app.runner == nil {
    return nil
    }
    logrus.WithField("threads", config.Runner.Capacity).
    Infoln("main: starting the local build runner")
    return app.runner.Start(ctx, config.Runner.Capacity)
    })
  • 2、根据配置参数启动制定数量的runner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Start starts N build runner processes. Each process polls
    // the server for pending builds to execute.
    func (r *Runner) Start(ctx context.Context, n int) error {
    var g errgroup.Group
    for i := 0; i < n; i++ {
    g.Go(func() error {
    return r.start(ctx)
    })
    }
    return g.Wait()
    }

  • 3、守护拉取任务r.poll(ctx)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func (r *Runner) start(ctx context.Context) error {
    for {
    select {
    case <-ctx.Done():
    return nil
    default:
    // This error is ignored on purpose. The system
    // should not exit the runner on error. The run
    // function logs all errors, which should be enough
    // to surface potential issues to an administrator.
    r.poll(ctx)
    }
    }
    }

  • 4、poll函数拉取任务逻辑

  • 4.1 请求当前runner可以执行的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    p, err := r.Manager.Request(ctx, &manager.Request{
    Kind: "pipeline",
    Type: "docker",
    OS: r.OS,
    Arch: r.Arch,
    Kernel: r.Kernel,
    Variant: r.Variant,
    Labels: r.Labels,
    })
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("runner: cannot get queue item")
    return err
    }
    if p == nil || p.ID == 0 {
    return nil
    }

  • 4.2枷锁确认当前任务

1
2
3
4
5
6
7
8
9
10
11
12
13
_, err = r.Manager.Accept(ctx, p.ID, r.Machine)
if err == db.ErrOptimisticLock {
return nil
} else if err != nil {
logger.WithError(err).
WithFields(
logrus.Fields{
"stage-id": p.ID,
"build-id": p.BuildID,
"repo-id": p.RepoID,
}).Warnln("runner: cannot ack stage")
return err
}
  • 4.3 watch 任务cancel信号, 如果接收到取消信号则关闭任务执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    go func() {
    logger.Debugln("runner: watch for cancel signal")
    done, _ := r.Manager.Watch(ctx, p.BuildID)
    if done {
    cancel()
    logger.Debugln("runner: received cancel signal")
    } else {
    logger.Debugln("runner: done listening for cancel signals")
    }
    }()

  • 4.4 r.Run(ctx, p.ID) 调度创建运行资源

    1
    return r.Run(ctx, p.ID)
  • 5、runner.Run执行逻辑

    1
    2
    3
    func (r *Runner) Run(ctx context.Context, id int64) error {
    ...
    }
  • 5.1 捕获panic

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    defer func() {
    // taking the paranoid approach to recover from
    // a panic that should absolutely never happen.
    if r := recover(); r != nil {
    logger.Errorf("runner: unexpected panic: %s", r)
    debug.PrintStack()
    }
    }()


  • 5.2、请求任务详情

1
2
3
4
5
6
m, err := r.Manager.Details(ctx, id)
if err != nil {
logger.WithError(err).Warnln("runner: cannot get stage details")
return err
}

  • 5.3、请求netrc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    netrc, err := r.Manager.Netrc(ctx, m.Repo.ID)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("runner: cannot get netrc file")
    return r.handleError(ctx, m.Stage, err)
    }
    if netrc == nil {
    netrc = new(core.Netrc)
    }

  • netrc结构定义。 Netrc 包含自动登录过程使用的登录和初始化信息。

    1
    2
    3
    4
    5
    6
    Netrc struct {
    Machine string `json:"machine"`
    Login string `json:"login"`
    Password string `json:"password"`
    }

  • NetrcService 返回一个有效的 netrc 文件,该文件可用于验证和克隆私有存储库。如果不需要或启用身份验证,则返回 nil Netrc 文件和 nil 错误。

  • 5.4、任务状态判断,如果此时任务取消了,则会退出后续逻辑

    1
    2
    3
    4
    5
    6
    if m.Build.Status == core.StatusKilled || m.Build.Status == core.StatusSkipped {
    logger = logger.WithError(err)
    logger.Infoln("runner: cannot run a canceled build")
    return nil
    }

  • 5.5、构建环境变量参数

1
2
3
4
5
6
7
8
9
environ := combineEnviron(
agentEnviron(r),
buildEnviron(m.Build),
repoEnviron(m.Repo),
stageEnviron(m.Stage),
systemEnviron(m.System),
linkEnviron(m.Repo, m.Build, m.System),
m.Build.Params,
)
  • 5.6、pipeline yaml文件解析、验证。这块代码逻辑和trigger创建任务逻辑有些相似。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    y, err := converter.ConvertString(string(m.Config.Data), converter.Metadata{
    Filename: m.Repo.Config,
    URL: m.Repo.Link,
    Ref: m.Build.Ref,
    })

    if err != nil {
    return err
    }

    y, err = envsubst.Eval(y, func(name string) string {
    env := environ[name]
    if strings.Contains(env, "\n") {
    env = fmt.Sprintf("%q", env)
    }
    return env
    })
    if err != nil {
    return r.handleError(ctx, m.Stage, err)
    }

    manifest, err := yaml.ParseString(y)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("runner: cannot parse yaml")
    return r.handleError(ctx, m.Stage, err)
    }

  • 5.7、pipeline yaml解析stage,找到当前任务stage的yaml配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var pipeline *yaml.Pipeline
for _, resource := range manifest.Resources {
v, ok := resource.(*yaml.Pipeline)
if !ok {
continue
}
if v.Name == m.Stage.Name {
pipeline = v
break
}
}
if pipeline == nil {
logger = logger.WithError(err)
logger.Errorln("runner: cannot find named pipeline")
return r.handleError(ctx, m.Stage,
errors.New("cannot find named pipeline"),
)
}

logger = logger.WithField("pipeline", pipeline.Name)

err = linter.Lint(pipeline, m.Repo.Trusted)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("runner: yaml lint errors")
return r.handleError(ctx, m.Stage, err)
}

  • 5.8、配置docker registry认证参数
1
2
3
4
5
6
7
8
9
10
secretService := secret.Combine(
secret.Encrypted(),
secret.Static(m.Secrets),
r.Secrets,
)
registryService := registry.Combine(
registry.Static(m.Secrets),
r.Registry,
)

  • 5.9、配置docker启动配置。docker image、volume、workspace、label、docker资源配置额等

    1
    2
    3
    ....
    ir := comp.Compile(pipeline)

  • 5.10、构建stage steps

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    steps := map[string]*core.Step{}
    i := 0
    for _, s := range ir.Steps {
    if s.RunPolicy == engine.RunNever {
    continue
    }
    i++
    dst := &core.Step{
    Number: i,
    Name: s.Metadata.Name,
    StageID: m.Stage.ID,
    Status: core.StatusPending,
    ErrIgnore: s.IgnoreErr,
    }
    steps[dst.Name] = dst
    m.Stage.Steps = append(m.Stage.Steps, dst)
    }

  • 5.11、构建hook,代码比较长

    1
    2
    3
    4
    hooks := &runtime.Hook{
    ....
    }

  • 5.12、new runner: 基于上边组装的hook和docker engine spec参数创建runner

    1
    2
    3
    4
    5
    6
    runner := runtime.New(
    runtime.WithEngine(r.Engine),
    runtime.WithConfig(ir),
    runtime.WithHooks(hooks),
    )

  • 5.13 runner运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

m.Stage.Status = core.StatusRunning
m.Stage.Started = time.Now().Unix()
m.Stage.Machine = r.Machine
err = r.Manager.BeforeAll(ctx, m.Stage)
...
err = runner.Run(timeout)
...

m.Stage.Status = core.StatusPassing
m.Stage.Stopped = time.Now().Unix()
for _, step := range m.Stage.Steps {
if step.Status == core.StatusPending {
step.Status = core.StatusSkipped
}
if step.Status == core.StatusRunning {
step.Status = core.StatusPassing
step.Stopped = time.Now().Unix()
}
}

return r.Manager.AfterAll(ctx, m.Stage)
}

r.Manager.BeforeAll和 r.Manager.AfterAll通知服务端更新任务状态,最对应逻辑处理。
runner.Run(timeout) 真正调用docker api创建容器

  • 6.1、 runner.Run。 这里和5.0区别,5.0是运行执行任务id的任务。而这里是运行runner创建容器运行pipeline并等待完成

    1
    2
    3
    func (r *Runtime) Run(ctx context.Context) error {
    return r.Resume(ctx, 0)
    }
  • 7、(r *Runtime) Resume(ctx context.Context, start int) 代码逻辑

  • 7.1 、延迟执行,函数执行完后清理容器环境现场

    1
    2
    3
    4
    5
    6
    7
    defer func() {
    // note that we use a new context to destroy the
    // environment to ensure it is not in a canceled
    // state.
    r.engine.Destroy(context.Background(), r.config)
    }()

  • 7.2 hook before, 如果有配置则执行,在执行所有步骤之前调用。

    1
    2
    3
    4
    5
    6
    if r.hook.Before != nil {
    state := snapshot(r, nil, nil)
    if err := r.hook.Before(state); err != nil {
    return err
    }
    }
  • 7.3 r.engine.Setup: 创建容器存储、网络

    1
    2
    3
    if err := r.engine.Setup(ctx, r.config); err != nil {
    return err
    }
  • 7.4、如果要在串行模式下执行步骤,并且未定义image依赖项,则 isSerial 返回 true。

  • 判断pipeline容器组有没有docker DependsOn服务,有就需要顺序创建执行,没有则串行创建执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if isSerial(r.config) {
for i, step := range r.config.Steps {
steps := []*engine.Step{step}
if i < start {
continue
}
select {
case <-ctx.Done():
return ErrCancel
case err := <-r.execAll(steps):
if err != nil {
r.error = err
}
}
}
} else {
err := r.execGraph(ctx)
if err != nil {
return err
}
}
  • 7.4.1 r.execAll(steps) 创建wait group,批量执行多个step,并批量等待

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    func (r *Runtime) execAll(group []*engine.Step) <-chan error {
    var g errgroup.Group
    done := make(chan error)

    // if a previous step returned error code 78
    // the pipeline process skips all subsequent
    // pipeline steps.
    if r.error == ErrInterrupt {
    close(done)
    return done
    }

    for _, step := range group {
    step := step
    g.Go(func() error {
    return r.exec(step)
    })
    }

    go func() {
    done <- g.Wait()
    close(done)
    }()
    return done
    }
  • 7.4.1.2、return r.exec(step) 核心代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    func (r *Runtime) exec(step *engine.Step) error {

    # 第一部分 r.hook.BeforeEach,执行前钩子
    ...
    if r.hook.BeforeEach != nil {
    state := snapshot(r, step, nil)
    if err := r.hook.BeforeEach(state); err == ErrSkip {
    return nil
    } else if err != nil {
    return err
    }
    }
    ...
    # 第二部分 r.engine.Create: 拉镜像、配置网络、copy文件、创建容器
    if err := r.engine.Create(ctx, r.config, step); err != nil {
    // TODO(bradrydzewski) refactor duplicate code
    if r.hook.AfterEach != nil {
    r.hook.AfterEach(
    snapshot(r, step, &engine.State{
    ExitCode: 255, Exited: true,
    }),
    )
    }
    return err
    }


    # 第三部分 r.engine.Start,真正的启动容器了,调用docker api启动容器
    if err := r.engine.Start(ctx, r.config, step); err != nil {
    // TODO(bradrydzewski) refactor duplicate code
    if r.hook.AfterEach != nil {
    r.hook.AfterEach(
    snapshot(r, step, &engine.State{
    ExitCode: 255, Exited: true,
    }),
    )
    }
    return err
    }

    # 第四部分 r.engine.Tail,调用docker api taillog,并copy io流实现实时日志输出展示
    rc, err := r.engine.Tail(ctx, r.config, step)
    if err != nil {
    // TODO(bradrydzewski) refactor duplicate code
    if r.hook.AfterEach != nil {
    r.hook.AfterEach(
    snapshot(r, step, &engine.State{
    ExitCode: 255, Exited: true,
    }),
    )
    }
    return err
    }


    # 第五部分 r.engine.Wait:等待容器执行结果,成功或失败
    wait, err := r.engine.Wait(ctx, r.config, step)


    # 第六部分 r.hook.AfterEach: 钩子
    if r.hook.AfterEach != nil {
    state := snapshot(r, step, wait)
    if err := r.hook.AfterEach(state); err != nil {
    return err
    }
    }



    }

    回到7if isSerial(r.config) {的另外一个情况 err := r.execGraph(ctx) 区别是容器有依赖关系,需要先启动被依赖容器后等到完成后再启动自身这种情况的处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35

    func (r *Runtime) execGraph(ctx context.Context) error {
    var d dag.Runner
    for _, s := range r.config.Steps {
    step := s
    #### AddVertex 在图形中添加一个函数作为顶点。只有以这种方式添加的函数才会在运行期间执行。

    d.AddVertex(step.Metadata.Name, func() error {
    select {
    case <-ctx.Done():
    return ErrCancel
    default:
    }
    r.mu.Lock()
    skip := r.error == ErrInterrupt
    r.mu.Unlock()
    if skip {
    return nil
    }
    err := r.exec(step) ### 这里和7中的调用容器启动容器逻辑一样
    if err != nil {
    r.mu.Lock()
    r.error = err
    r.mu.Unlock()
    }
    return nil
    })
    }
    for _, s := range r.config.Steps {
    for _, dep := range s.DependsOn {
    d.AddEdge(dep, s.Metadata.Name)
    }
    }
    return d.Run()
    }

到这里位置主要任务调度执行和创建docker container到任务执行逻辑就结束。