[eggjs/egg]【请教】bull bee-queue队列中process如何在多个work上执行一次

2025-11-04 823 views
6

app.js文件代码

class AppBootHook {
  constructor (app) {
    this.app = app
  }

  /**
   * 应用已经启动完毕
   * @return {Promise<void>}
   */
  async didReady () {
    const ctx = await this.app.createAnonymousContext()
    const queue = this.app.bull.get('q1')
    ctx.getLogger('build').info('队列监听' + JSON.stringify(this.app))
    queue.process(async job => {
      return await ctx.service.buildIos.runJob(job)
    })

    // 监听任务
    queue.on('completed', (job, result) => {
      ctx.getLogger('build').info('任务已完成,任务ID: %s', job.id)
    })
    queue.on('failed', (job, err) => {
      ctx.getLogger('build').error('任务已失败,任务ID: %s', job.id)
    })
  }
}

module.exports = AppBootHook

问题: 默认queue.process是同时执行一个任务,npm run dev下正常,但是npm run start后会在多个work里面执行,导致队列process同时执行多个。 看了issue中又2个朋友有类型情况,请问如何在一个work上执行或者同时只有一个proccess有效? (备注:agent通知这个不行,因为无法获取到队列进度,还是会导致并行)

回答

3
2496
6

封装为一个 egg-schedule 的策略,触发的时候丢给 worker 执行,执行完后通知回来,再调度下一个。

2

要看下这个库是不是分布式的

3

多进程研发模式增强 https://eggjs.org/zh-cn/advanced/cluster-client.html 这个可以解决我的问题吗?

2

难道不能吧任务放在单个work上执行,并返回结果? https://github.com/OptimalBits/bull/issues/488 agent和app传递消息可以实现单个work上执行,代码贴在下面,但是我担心如果work意外退出了不传递结果消息到agent,那队列就暂停了

@atian25 你好,你的思路我想了一下,但是有个问题,定时最小间隔是1分钟,会出现队列执行延迟,有更好的方法吗?

0
// agent.js
class AgentBootHook {
  constructor (agent) {
    this.agent = agent
  }

  async didReady () {
    const ctx = await this.agent.createAnonymousContext()

    const queue = this.agent.bull.get('q1')
    queue.process((job) => {
      // 监听app通知处理结果,收到消息后返回成功
      this.agent.getLogger('build').info(`Job process with id ${job.id} and data ${JSON.stringify(job.data)}`)

      // 通知一个app起来干活
      this.agent.messenger.sendRandom('build-ios', {job})

      // 监听任务处理结果
      return new Promise((resolve, reject) => {
        this.agent.messenger.on('build-ios',  (data) => {
          resolve(data)
        })
      })
    })

    queue.on('completed', (job, result) => {
      this.agent.getLogger('build').info(`Job completed with id ${job.id} result ${JSON.stringify(result)}`)
    })
  }
}

module.exports = AgentBootHook
// app.js
class AppBootHook {
  constructor (app) {
    this.app = app
  }

  async didReady () {
    const ctx = await this.app.createAnonymousContext()

    this.app.messenger.on('build', async (data) => {
      const { job } = data
      const result = await ctx.service.buil.runJob(job)

      this.app.messenger.sendToAgent('build', result)
    })
  }
}

module.exports = AppBootHook
7

我那个指的是自定义一个 ScheduleStrategy

https://github.com/eggjs/egg-schedule#schedule-type