Drone原码分析(一)任务创建流程

drone原码分析(一)任务创建流程

任务创建

webhook trigger

  • 调用创建逻辑:当用户通过drone web页面激活项目时,drone会往对应项目仓库创建一个webhook 订阅,对应git scm仓库有git push、merge等event操作会往drone对应的webook地址发送事件消息,drone读取对应项目、对应分支的.drone.yaml,按照配置进行任务创建或忽略。

  • 创建scm hook代码路径: service/hook/hook.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

func (s *service) Create(ctx context.Context, user *core.User, repo *core.Repository) error {
err := s.renew.Renew(ctx, user, false)
if err != nil {
return err
}
ctx = context.WithValue(ctx, scm.TokenKey{}, &scm.Token{
Token: user.Token,
Refresh: user.Refresh,
Expires: time.Unix(user.Expiry, 0),
})
hook := &scm.HookInput{
Name: "drone",
Target: s.addr + "/hook",
Secret: repo.Signer,
Events: scm.HookEvents{
Branch: true,
Deployment: true,
PullRequest: true,
Push: true,
Tag: true,
},
}
return replaceHook(ctx, s.client, repo.Slug, hook)
}

  • 接着调用server启动时配置的scm,创建hook
    1
    2
    3
    4
    5
    6
    7
    8
    9

    func replaceHook(ctx context.Context, client *scm.Client, repo string, hook *scm.HookInput) error {
    if err := deleteHook(ctx, client, repo, hook.Target); err != nil {
    return err
    }
    _, _, err := client.Repositories.CreateHook(ctx, repo, hook)
    return err
    }

drone git-scm 中对gitlab多级路径的项目地址不支持。只支持owner/repo-name这种项目仓库。如果加了group或多级路径是不支持的。

任务创建流程

  • drone api路由文件入口:handler/api/api.go
  • 任务创建trigger具体实现: trigger/trigger.go
    1
    2
    3
    4
    func (t *triggerer) Trigger(ctx context.Context, repo *core.Repository, base *core.Hook) (*core.Build, error) {
    ...
    }

  • 查看该函数的调用,可以具体看到调用创建如无的来源:

  • 这些就是具体触发创建任务的方式: 有webhook、cron定时任务、retry等方式。

Trigger创建任务代码逻辑

  • Trigger函数代码比较长。简单罗列下具体做了什么:

  • 1、定义logger和处理panic恢复

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    logger := logrus.WithFields(
    logrus.Fields{
    "repo": repo.Slug,
    "ref": base.Ref,
    "event": base.Event,
    "commit": base.After,
    },
    )

    logger.Debugln("trigger: received")
    defer func() {
    // taking the paranoid approach to recover from
    // a panic that should absolutely never happen.
    if r := recover(); r != nil {
    logger.Errorf("runner: unexpected panic: %s", r)
    debug.PrintStack()
    }
    }()
  • 2、对git-scm的事件是否忽略处理, 如果commit message或title有[ci skip]/[skip ci]/***no_ci***会跳过不执行任务创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    if skipMessage(base) {
    logger.Infoln("trigger: skipping hook. found skip directive")
    return nil, nil
    }
    if base.Event == core.EventPullRequest {
    if repo.IgnorePulls {
    logger.Infoln("trigger: skipping hook. project ignores pull requests")
    return nil, nil
    }
    if repo.IgnoreForks && !strings.EqualFold(base.Fork, repo.Slug) {
    logger.Infoln("trigger: skipping hook. project ignores forks")
    return nil, nil
    }
    }


  • 3、用户、仓库权限判断是否仓库已开启trigger

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    user, err := t.users.Find(ctx, repo.UserID)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot find repository owner")
    return nil, err
    }

    if user.Active == false {
    logger.Infoln("trigger: skipping hook. repository owner is inactive")
    return nil, nil
    }


  • 4、如果没有commit信息,补充commit信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    if base.Message == "" && base.After != "" {
    commit, err := t.commits.Find(ctx, user, repo.Slug, base.After)
    if err == nil && commit != nil {
    base.Message = commit.Message
    if base.AuthorEmail == "" {
    base.AuthorEmail = commit.Author.Email
    }
    if base.AuthorName == "" {
    base.AuthorName = commit.Author.Name
    }
    if base.AuthorAvatar == "" {
    base.AuthorAvatar = commit.Author.Avatar
    }
    }
    }
  • 5、获取项目仓库dron.yaml配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    req := &core.ConfigArgs{
    User: user,
    Repo: repo,
    Build: tmpBuild,
    }
    raw, err := t.config.Find(ctx, req)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot find yaml")
    return nil, err
    }

    raw, err = t.convert.Convert(ctx, &core.ConvertArgs{
    User: user,
    Repo: repo,
    Build: tmpBuild,
    Config: raw,
    })
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot convert yaml")
    return t.createBuildError(ctx, repo, base, err.Error())
    }

    // this code is temporarily in place to detect and convert
    // the legacy yaml configuration file to the new format.
    raw.Data, err = converter.ConvertString(raw.Data, converter.Metadata{
    Filename: repo.Config,
    URL: repo.Link,
    Ref: base.Ref,
    })
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot convert yaml")
    return t.createBuildError(ctx, repo, base, err.Error())
    }

  • 7、解析yaml,获取pipeline 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    manifest, err := yaml.ParseString(raw.Data)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot parse yaml")
    return t.createBuildError(ctx, repo, base, err.Error())
    }

    verr := t.validate.Validate(ctx, &core.ValidateArgs{
    User: user,
    Repo: repo,
    Build: tmpBuild,
    Config: raw,
    })
    switch verr {
    case core.ErrValidatorBlock:
    logger.Debugln("trigger: yaml validation error: block pipeline")
    case core.ErrValidatorSkip:
    logger.Debugln("trigger: yaml validation error: skip pipeline")
    return nil, nil
    default:
    if verr != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: yaml validation error")
    return t.createBuildError(ctx, repo, base, verr.Error())
    }
    }

    err = linter.Manifest(manifest, repo.Trusted)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: yaml linting error")
    return t.createBuildError(ctx, repo, base, err.Error())
    }

    verified := true
    if repo.Protected && base.Trigger == core.TriggerHook {
    key := signer.KeyString(repo.Secret)
    val := []byte(raw.Data)
    verified, _ = signer.Verify(val, key)
    }
    // if pipeline validation failed with a block error, the
    // pipeline verification should be set to false, which will
    // force manual review and approval.
    if verr == core.ErrValidatorBlock {
    verified = false
    }

  • 8、yaml过滤匹配,跳过分支、跳过action等判断,最终获取大需要执行的stage保留,获取到匹配的stagematched

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
var matched []*yaml.Pipeline
var dag = dag.New()
for _, document := range manifest.Resources {
pipeline, ok := document.(*yaml.Pipeline)
if !ok {
continue
}
// TODO add repo
// TODO add instance
// TODO add target
// TODO add ref
name := pipeline.Name
if name == "" {
name = "default"
}
node := dag.Add(pipeline.Name, pipeline.DependsOn...)
node.Skip = true

if skipBranch(pipeline, base.Target) {
logger = logger.WithField("pipeline", pipeline.Name)
logger.Infoln("trigger: skipping pipeline, does not match branch")
} else if skipEvent(pipeline, base.Event) {
logger = logger.WithField("pipeline", pipeline.Name)
logger.Infoln("trigger: skipping pipeline, does not match event")
} else if skipAction(pipeline, base.Action) {
logger = logger.WithField("pipeline", pipeline.Name).WithField("action", base.Action)
logger.Infoln("trigger: skipping pipeline, does not match action")
} else if skipRef(pipeline, base.Ref) {
logger = logger.WithField("pipeline", pipeline.Name)
logger.Infoln("trigger: skipping pipeline, does not match ref")
} else if skipRepo(pipeline, repo.Slug) {
logger = logger.WithField("pipeline", pipeline.Name)
logger.Infoln("trigger: skipping pipeline, does not match repo")
} else if skipTarget(pipeline, base.Deployment) {
logger = logger.WithField("pipeline", pipeline.Name)
logger.Infoln("trigger: skipping pipeline, does not match deploy target")
} else if skipCron(pipeline, base.Cron) {
logger = logger.WithField("pipeline", pipeline.Name)
logger.Infoln("trigger: skipping pipeline, does not match cron job")
} else {
matched = append(matched, pipeline)
node.Skip = false
}
}

  • 9、确保有向五环,如果pipeline闭环了删除

    1
    2
    3
    4

    if dag.DetectCycles() {
    return t.createBuildError(ctx, repo, base, "Error: Dependency cycle detected in Pipeline")
    }
  • 10、获取项目仓库的一个递增taskID

    1
    2
    3
    4
    5
    6
    7
    repo, err = t.repos.Increment(ctx, repo)
    if err != nil {
    logger = logger.WithError(err)
    logger.Errorln("trigger: cannot increment build sequence")
    return nil, err
    }

  • 11、按照匹配的stage matched 组装stage参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    stages := make([]*core.Stage, len(matched))
    for i, match := range matched {
    onSuccess := match.Trigger.Status.Match(core.StatusPassing)
    onFailure := match.Trigger.Status.Match(core.StatusFailing)
    if len(match.Trigger.Status.Include)+len(match.Trigger.Status.Exclude) == 0 {
    onFailure = false
    }

    stage := &core.Stage{
    RepoID: repo.ID,
    Number: i + 1,
    Name: match.Name,
    Kind: match.Kind,
    Type: match.Type,
    OS: match.Platform.OS,
    Arch: match.Platform.Arch,
    Variant: match.Platform.Variant,
    Kernel: match.Platform.Version,
    Limit: match.Concurrency.Limit,
    LimitRepo: int(repo.Throttle),
    Status: core.StatusWaiting,
    DependsOn: match.DependsOn,
    OnSuccess: onSuccess,
    OnFailure: onFailure,
    Labels: match.Node,
    Created: time.Now().Unix(),
    Updated: time.Now().Unix(),
    }
    if stage.Kind == "pipeline" && stage.Type == "" {
    stage.Type = "docker"
    }
    if stage.OS == "" {
    stage.OS = "linux"
    }
    if stage.Arch == "" {
    stage.Arch = "amd64"
    }

    if stage.Name == "" {
    stage.Name = "default"
    }
    if verified == false {
    stage.Status = core.StatusBlocked
    } else if len(stage.DependsOn) == 0 {
    stage.Status = core.StatusPending
    }
    stages[i] = stage
    }

    for _, stage := range stages {
    // here we re-work the dependencies for the stage to
    // account for the fact that some steps may be skipped
    // and may otherwise break the dependency chain.
    stage.DependsOn = dag.Dependencies(stage.Name)

    // if the stage is pending dependencies, but those
    // dependencies are skipped, the stage can be executed
    // immediately.
    if stage.Status == core.StatusWaiting &&
    len(stage.DependsOn) == 0 {
    stage.Status = core.StatusPending
    }
    }

  • 12、创建stage: 入库操作,先吧stage保存起来

    1
    2
    3
    4
    5
    6
    7
    err = t.builds.Create(ctx, build, stages)
    if err != nil {
    logger = logger.WithError(err)
    logger.Errorln("trigger: cannot create build")
    return nil, err
    }

  • 13、给对应scm发送事件状态, 比如gitlab有merge提交,自动触发了创建构建、测试的流程则能在gitlab项目看到对应状态,已经创建、运行、成功等状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    err = t.status.Send(ctx, user, &core.StatusInput{
    Repo: repo,
    Build: build,
    })
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot create status")
    }

  • 14、把StatusPending的stage推送到任务队列

1
2
3
4
5
6
7
8
9
10
11
12
for _, stage := range stages {
if stage.Status != core.StatusPending {
continue
}
err = t.sched.Schedule(ctx, stage)
if err != nil {
logger = logger.WithError(err)
logger.Errorln("trigger: cannot enqueue build")
return nil, err
}
}

  • 15、发送webhook,如果是cancel或pull request取消任务创建
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    err = t.hooks.Send(ctx, payload)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("trigger: cannot send webhook")
    }

    if repo.CancelPush && build.Event == core.EventPush ||
    repo.CancelPulls && build.Event == core.EventPullRequest {
    go t.canceler.CancelPending(ctx, repo, build)
    }