Sidekiq Style Guide

原文:https://docs.gitlab.com/ee/development/sidekiq_style_guide.html

Sidekiq Style Guide

本文档概述了添加或修改 Sidekiq 工作程序时应遵循的各种准则.

ApplicationWorker

All workers should include ApplicationWorker instead of Sidekiq::Worker, which adds some convenience methods and automatically sets the queue based on the worker’s name.

Dedicated Queues

所有工作程序都应使用自己的队列,该队列将根据工作程序类名称自动设置. 对于名为ProcessSomethingWorker的工作程序,队列名称将为process_something . 如果不确定工人使用什么队列,可以使用SomeWorker.queue找到它. 几乎没有理由使用sidekiq_options queue: :some_queue手动覆盖队列名称.

添加新队列后,运行bin/rake gitlab:sidekiq:all_queues_yml:generate来重新生成app/workers/all_queues.ymlee/app/workers/all_queues.yml以便可以由sidekiq-cluster拾取.

Queue Namespaces

虽然不同的工作人员无法共享队列,但是他们可以共享队列名称空间.

为工作程序定义队列名称空间可以启动 Sidekiq 进程,该进程自动为该工作空间中的所有工作程序处理作业,而无需显式列出其所有队列名称. 例如,如果由sidekiq-cron管理的所有工作人员都使用cronjob队列名称空间,那么我们可以专门针对此类计划的作业启动 Sidekiq 进程. 如果稍后添加使用cronjob命名空间的新工作程序,则 Sidekiq 进程也将自动为该工作程序选择作业(重新启动后),而无需更改任何配置.

可以使用queue_namespace DSL 类方法设置队列名称空间:

  1. class SomeScheduledTaskWorker
  2. include ApplicationWorker
  3. queue_namespace :cronjob
  4. # ...
  5. end

在后台,这会将SomeScheduledTaskWorker.queue设置为cronjob:some_scheduled_task . 常用的名称空间将具有自己的关注模块,可以轻松地将其包含在 worker 类中,并且可以设置队列名称空间以外的其他 Sidekiq 选项. 例如, CronjobQueue设置名称空间,但也禁用重试.

bundle exec sidekiq是可感知名称空间的,当提供名称空间而不是--queue-q )选项中的简单队列名称时,它将自动侦听名称空间中的所有队列(技术上:所有以名称空间名称为前缀的队列) ,或config/sidekiq_queues.yml中的:queues:部分.

请注意,应谨慎执行将工作程序添加到现有命名空间的操作,因为如果没有适当调整可用于处理命名空间的 Sidekiq 进程可用的资源,则额外的作业将占用已经存在的工作程序的资源.

Idempotent Jobs

众所周知,一项作业可能由于多种原因而失败. 例如,网络中断或错误. 为了解决此问题,Sidekiq 具有内置的重试机制,GitLab 中的大多数工作人员默认使用该机制.

期望作业在失败后可以再次运行,而不会给应用程序或用户带来重大副作用,这就是 Sidekiq 鼓励作业具有幂等性和事务性的原因 .

通常,在以下情况下,可以将工人视为等幂的:

  • 它可以使用相同的参数安全地运行多次.
  • 预期应用程序副作用仅发生一次(或第二次运行的副作用无效).

一个很好的例子是缓存过期工作器.

注意:如果队列中已经存在具有相同参数的未启动作业,则为等幂工作器调度的作业将自动进行重复数据删除 .

Ensuring a worker is idempotent

确保使用以下共享示例通过工作程序测试:

  1. include_examples 'an idempotent worker' do
  2. it 'marks the MR as merged' do
  3. # Using subject inside this block will process the job multiple times
  4. subject
  5. expect(merge_request.state).to eq('merged')
  6. end
  7. end

直接使用perform_multiple方法而不是job.perform (此辅助方法将自动包含在 worker 中).

Declaring a worker as idempotent

  1. class IdempotentWorker
  2. include ApplicationWorker
  3. # Declares a worker is idempotent and can
  4. # safely run multiple times.
  5. idempotent!
  6. # ...
  7. end

鼓励只具有idempotent! 即使在另一个类或模块中定义了perform方法,也要在最顶层的 worker 类中调用.

注意:如果工人阶级没有被标记为幂等,那么警察将失败. 如果您不确定自己的工作可以安全地多次运行,请考虑跳过警察.

Deduplication

当队列中有另一个幂函数的作业入队而另一个未启动的作业时,GitLab 会删除第二个作业. 之所以跳过该工作,是因为首先安排的工作将完成相同的工作; 到第二个作业执行时,第一个作业什么也做不了.

例如, AuthorizedProjectsWorker需要一个用户 ID. 当工作程序运行时,它将重新计算用户的授权. 每当操作有可能更改用户的授权时,GitLab 都会计划此作业. 如果将同一用户同时添加到两个项目,则如果第一个作业尚未开始,则可以跳过第二个作业,因为当第一个作业运行时,它将为两个项目创建授权.

GitLab 不会跳过将来计划的作业,因为我们假设在计划执行作业时状态将已更改.

已经提出了更多的重复数据删除策略 . 如果您正在部署的员工可能会从其他策略中受益,请在问题中发表评论.

如果自动重复数据删除会导致某些队列出现问题. 可以通过启用名为disable_<queue name>_deduplication的功能标志来暂时禁用此功能. 例如,要禁用AuthorizedProjectsWorker重复数据删除,我们将启用功能标记disable_authorized_projects_deduplication .

从 ChatOps:

  1. /chatops run feature set disable_authorized_projects_deduplication true

从 rails 控制台:

  1. Feature.enable!(:disable_authorized_projects_deduplication)

Job urgency

作业可以设置一个urgency属性,可以是:high:low:throttled . 这些目标如下:

Urgency 队列调度目标 执行延迟要求
:high 10 秒 1 秒的 p50、10 秒的 p99
:low 1 分钟 最长运行时间为 5 分钟
:throttled None 最长运行时间为 5 分钟

要设置作业的紧急程度,请使用urgency类方法:

  1. class HighUrgencyWorker
  2. include ApplicationWorker
  3. urgency :high
  4. # ...
  5. end

Latency sensitive jobs

如果立即安排大量后台作业,则在作业等待辅助节点可用时,可能会出现作业排队. 这是正常现象,它可以通过系统地处理流量高峰来赋予系统弹性. 但是,某些作业比其他作业对延迟更敏感. 这些工作的示例包括:

  1. 在推送到分支之后更新合并请求的作业.
  2. 在推送到分支之后,该任务会使项目的已知分支的缓存无效.
  3. 更改权限后,用户可以看到重新计算组和项目的作业.
  4. 在状态更改为管道中的作业之后更新 CI 管道状态的作业.

当这些作业被延迟时,用户可能会将延迟视为错误:例如,他们可以推送分支,然后尝试为该分支创建合并请求,但在 UI 中被告知该分支不存在. 我们认为这些工作很urgency :high

做出额外的努力以确保这些作业在计划后的很短时间内启动. 但是,为了确保吞吐量,这些作业还具有非常严格的执行持续时间要求:

  1. 中位作业执行时间应少于 1 秒.
  2. 99%的工作应在 10 秒内完成.

如果一个工作人员不能满足这些期望,那么就不能将其视为urgency :high工作人员:考虑重新设计该工作人员,或在两个不同的工作人员之间拆分工作,其中一个工作urgency :high执行快速的urgency :high代码,另一个工作urgency :low ,它没有执行延迟要求(但也有较低的调度目标).

Changing a queue’s urgency

在 GitLab.com,我们几个跑 Sidekiq 碎片 ,其中每一个代表一个特定类型的工作负载.

更改队列的紧急性或添加新队列时,我们需要考虑新分片上的预期工作量. 请注意,如果我们要更改现有队列,那么也会对旧分片产生影响,但这始终会减少工作量.

为此,我们要计算新分片的总执行时间和 RPS(吞吐量)的预期增长. 我们可以从以下获得这些值:

  • 队列详细信息”仪表板具有队列本身的值. 对于新队列,我们​​可以查找具有类似模式或在类似情况下安排的队列.
  • 碎片详细信息仪表板具有总执行时间和吞吐量(RPS). “分片利用率”面板将显示该分片当前是否有多余的容量.

然后,我们可以计算我们要更改的队列的 RPS *平均运行时间(针对新作业的估算值),以查看新分片期望的 RPS 和执行时间的相对增加:

  1. new_queue_consumption = queue_rps * queue_duration_avg
  2. shard_consumption = shard_rps * shard_duration_avg
  3. (new_queue_consumption / shard_consumption) * 100

如果我们预期增加幅度小于 5% ,则无需采取进一步措施.

否则,请对合并请求 ping @gitlab-org/scalability并要求进行审查.

Jobs with External Dependencies

GitLab 应用程序中的大多数后台作业都与其他 GitLab 服务进行通信. 例如,PostgreSQL,Redis,Gitaly 和对象存储. 这些被视为作业的”内部”依赖性.

但是,某些作业将依赖于外部服务才能成功完成. 一些示例包括:

  1. 调用用户配置的 Web 钩子的作业.
  2. 将应用程序部署到用户配置的 k8s 集群的作业.

这些作业具有”外部依赖性”. 这对于后台处理群集的运行有多种重要的作用:

  1. 大多数外部依赖项(例如 Web 钩子)都不提供 SLO,因此我们不能保证这些作业的执行延迟. 由于我们无法保证执行延迟,因此无法确保吞吐量,因此,在高流量环境中,我们需要确保将具有外部依赖关系的作业与高紧急性作业分开,以确保这些队列上的吞吐量.
  2. Errors in jobs with external dependencies have higher alerting thresholds as there is a likelihood that the cause of the error is external.
  1. class ExternalDependencyWorker
  2. include ApplicationWorker
  3. # Declares that this worker depends on
  4. # third-party, external services in order
  5. # to complete successfully
  6. worker_has_external_dependencies!
  7. # ...
  8. end

注意:请注意,一项工作既不能具有很高的紧迫性,又不能具有外部依赖性.

CPU-bound and Memory-bound Workers

受 CPU 或内存资源限制约束的工作程序应使用worker_resource_boundary方法进行注释.

大多数工作人员倾向于将大部分时间都花在阻止时间上,等待来自 Redis,PostgreSQL 和 Gitaly 等其他服务的网络响应. 由于 Sidekiq 是多线程环境,因此可以高并发地调度这些作业.

但是,有些工人在 Ruby 中花费大量时间在 CPU运行逻辑上. Ruby MRI 不支持真正的多线程-它依赖GIL来极大简化应用程序开发,无论托管该进程的计算机有多少核,一次仅允许一个进程中的一部分 Ruby 代码运行一次. 对于受 IO 约束的工作人员,这不是问题,因为大多数线程在基础库(位于 GIL 之外)中被阻止.

如果许多线程试图同时运行 Ruby 代码,则将导致 GIL 争用,这将减慢所有进程的速度.

在高流量的环境中,知道一个工作人员受 CPU 限制,可以使我们在具有较低并发性的其他队列中运行它. 这样可以确保最佳性能.

同样,如果工作人员使用大量内存,则可以在定制的低并发,高内存队列上运行这些内存.

请注意,受内存限制的工作程序会创建大量的 GC 工作负载,暂停时间为 10-50ms. 这将对工作人员的延迟要求产生影响. 因此, memory限制, urgency :high作业是不允许的,并且将使 CI 失败. 通常,不鼓励受memory限制的工作人员,应考虑处理工作的替代方法.

如果工作程序需要大量的内存和 CPU 时间,则由于上述对高紧急性的内存绑定工作程序的限制,应将其标记为内存绑定.

Declaring a Job as CPU-bound

本示例说明如何将作业声明为受 CPU 约束.

  1. class CPUIntensiveWorker
  2. include ApplicationWorker
  3. # Declares that this worker will perform a lot of
  4. # calculations on-CPU.
  5. worker_resource_boundary :cpu
  6. # ...
  7. end

Determining whether a worker is CPU-bound

我们使用以下方法来确定工作程序是否受 CPU 限制:

  • 在 Sidekiq 结构化 JSON 日志中,汇总工作durationcpu_s字段.
  • duration refers to the total job execution duration, in seconds
  • cpu_s是从Process::CLOCK_THREAD_CPUTIME_ID计数器派生的,它是作业在 CPU 上花费的时间的度量.
  • cpu_s除以duration即可得到在 CPU 上花费的duration百分比.
  • 如果该比例超过 33%,则认为该工作线程受 CPU 限制,因此应进行注释.
  • 请注意,这些值不应用于较小的样本量,而应用于相当大的汇总.

Feature category

所有 Sidekiq 工作人员都必须定义一个已知的特征类别 .

Job weights

某些作业的重量已声明. 仅在默认执行模式下运行 Sidekiq 时才使用此选项-使用sidekiq-cluster不能计算权重.

随着我们朝在 Core 中使用sidekiq-cluster迈进 ,新增加的工作人员无需指定权重. 他们可以简单地使用默认权重 1.

Worker context

版本历史

为了在日志中获得有关工作程序的更多信息,我们ApplicationContext的形式向工作添加元数据 . 在大多数情况下,从请求计划作业时,该上下文已经从请求中扣除并添加到计划的作业中.

运行作业时,将还原计划时处于活动状态的上下文. 这会使上下文传播到正在运行的作业中计划的任何作业.

所有这些意味着在大多数情况下,要将上下文添加到作业中,我们无需执行任何操作.

但是,在某些情况下,计划作业时将不存在任何上下文,或者存在的上下文很可能不正确. 对于这些实例,我们添加了 Rubocop 规则以引起注意并避免日志中的元数据不正确.

与大多数警察一样,有完全正当的理由禁用它们. 在这种情况下,可能来自请求的上下文是正确的. 或者,您可能已经以警察无法接受的方式指定了上下文. 无论如何,请在禁用警察时留下指向将使用哪个上下文的代码注释.

当确实为上下文提供对象时,请确保已预先加载名称空间和项目的路由. 这可以通过使用来完成.with_route上所有定义范围Routable秒.

Cron workers

对于 Cronjob 队列( include CronjobQueue )中的工作人员,将自动清除上下文,即使从请求中安排工作人员时也是如此. 我们这样做是为了避免从 cron worker 安排其他作业时出现不正确的元数据.

Cron 工作人员自己在实例范围内运行,因此它们的作用域不限于应添加到上下文中的用户,名称空间,项目或其他资源.

然而,他们往往安排确实需要方面的其他工作.

这就是为什么需要在工作人员中某处显示上下文的原因. 可以通过在工作器内的某些位置使用以下方法之一来完成此操作:

  1. with_context帮助器中包装用于调度作业的代码:

    1. def perform
    2. deletion_cutoff = Gitlab::CurrentSettings
    3. .deletion_adjourned_period.days.ago.to_date
    4. projects = Project.with_route.with_namespace
    5. .aimed_for_deletion(deletion_cutoff)
    6. projects.find_each(batch_size: 100).with_index do |project, index|
    7. delay = index * INTERVAL
    8. with_context(project: project) do
    9. AdjournedProjectDeletionWorker.perform_in(delay, project.id)
    10. end
    11. end
    12. end
  2. 使用提供上下文的批处理调度方法:

    1. def schedule_projects_in_batch(projects)
    2. ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
    3. projects,
    4. arguments_proc: -> (project) { project.id },
    5. context_proc: -> (project) { { project: project } }
    6. )
    7. end

    或者,在延迟调度时:

    1. diffs.each_batch(of: BATCH_SIZE) do |diffs, index|
    2. DeleteDiffFilesWorker
    3. .bulk_perform_in_with_contexts(index * 5.minutes,
    4. diffs,
    5. arguments_proc: -> (diff) { diff.id },
    6. context_proc: -> (diff) { { project: diff.merge_request.target_project } })
    7. end

Jobs scheduled in bulk

通常,在批量调度作业时,这些作业应具有单独的上下文而不是总体上下文.

如果是这样的话, bulk_perform_async可以通过更换bulk_perform_async_with_context帮手,而不是bulk_perform_in使用bulk_perform_in_with_context .

例如:

  1. ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
  2. projects,
  3. arguments_proc: -> (project) { project.id },
  4. context_proc: -> (project) { { project: project } }
  5. )

第一个参数中可枚举的每个对象分为两个块:

  • arguments_proc ,它需要返回作业需要调度的参数列表.

  • 需要返回带有作业上下文信息的哈希值的context_proc .

Arguments logging

SIDEKIQ_LOG_ARGUMENTS启用,Sidekiq 作业参数将被记录.

默认情况下,记录的唯一参数是数字参数,因为其他类型的参数可能包含敏感信息. 要覆盖此参数,请在工作程序内部使用loggable_arguments并记录要记录的参数的索引. (此处不需要指定数字参数.)

例如:

  1. class MyWorker
  2. include ApplicationWorker
  3. loggable_arguments 1, 3
  4. # object_id will be logged as it's numeric
  5. # string_a will be logged due to the loggable_arguments call
  6. # string_b will be filtered from logs
  7. # string_c will be logged due to the loggable_arguments call
  8. def perform(object_id, string_a, string_b, string_c)
  9. end
  10. end

Tests

与其他任何类一样,每个 Sidekiq 工作者都必须使用 RSpec 进行测试. 这些测试应放在spec/workers .

Sidekiq Compatibility across Updates

请记住,Sidekiq 作业的参数在计划执行时存储在队列中. 在线更新期间,这可能会导致几种可能的情况:

  1. 该应用程序的较旧版本发布作业,该作业由升级的 Sidekiq 节点执行.
  2. 作业在升级之前排队,但在升级之后执行.
  3. 作业由运行较新版本应用程序的节点排队,但在运行较旧版本应用程序的节点上执行.

Changing the arguments for a worker

作业需要在应用程序的连续版本之间向后和向前兼容. 在所有 Rails 和 Sidekiq 节点都具有更新的代码之前,添加或删除参数可能会在部署期间引起问题.

Remove an argument

不要从perform函数中删除参数. . 而是,使用以下方法:

  1. 提供默认值(通常为nil )并使用注释将参数标记为已弃用
  2. 停止在perform_async使用该参数.
  3. 忽略 worker 类中的值,但是直到下一个主要版本才将其删除.

在以下示例中,如果要删除arg2 ,请首先设置nil默认值,然后更新调用ExampleWorker.perform_async位置.

  1. class ExampleWorker
  2. def perform(object_id, arg1, arg2 = nil)
  3. # ...
  4. end
  5. end

Add an argument

有两种方法可以安全地向 Sidekiq 工作者添加新参数:

  1. Set up a multi-step deployment in which the new argument is first added to the worker
  2. 参数哈希用于其他参数. 这也许是最灵活的选择.
Multi-step deployment

这种方法需要多个合并请求,并且在合并其他更改之前,要合并和部署第一个合并请求.

  1. 在初始合并请求中,使用默认值将参数添加到 worker 中:

    1. class ExampleWorker
    2. def perform(object_id, new_arg = nil)
    3. # ...
    4. end
    5. end
  2. 使用新参数合并和部署工作程序.

  3. 在另一个合并请求中,更新ExampleWorker.perform_async调用以使用新参数.
Parameter hash

如果现有工作人员已经利用参数哈希,则此方法将不需要多次部署.

  1. 在 worker 中使用参数散列以实现将来的灵活性:

    1. class ExampleWorker
    2. def perform(object_id, params = {})
    3. # ...
    4. end
    5. end

Removing workers

尽量避免在次要版本和修补程序版本中删除工作人员及其队列.

在联机更新期间,实例可能有待处理的作业,而删除队列可能导致这些作业永远卡住. 如果您无法为这些 Sidekiq 作业编写迁移,请考虑仅在主要版本中删除该工作程序.

Renaming queues

出于同样的原因,遣散工人也很危险,因此在重命名队列时应格外小心.

重命名队列时,请使用sidekiq_queue_migrate帮助程序迁移方法,如本示例所示:

  1. class MigrateTheRenamedSidekiqQueue < ActiveRecord::Migration[5.0]
  2. include Gitlab::Database::MigrationHelpers
  3. DOWNTIME = false
  4. def up
  5. sidekiq_queue_migrate 'old_queue_name', to: 'new_queue_name'
  6. end
  7. def down
  8. sidekiq_queue_migrate 'new_queue_name', to: 'old_queue_name'
  9. end
  10. end