Skip to content

Commit

Permalink
feat: package bullmq
Browse files Browse the repository at this point in the history
  • Loading branch information
gqc committed Jan 6, 2025
1 parent 7c45ca2 commit 8a5d42d
Show file tree
Hide file tree
Showing 22 changed files with 626 additions and 0 deletions.
15 changes: 15 additions & 0 deletions packages/bull/test/fixtures/base-app/src/task/limit.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { App, Inject, sleep } from '@midwayjs/core';
import { Processor, Application } from '../../../../../src';

@Processor('limit',2,{},{limiter: { max: 3, duration: 1000 }})
export class QueueTask {
@App()
app: Application;

@Inject()
logger;

async execute(params) {
await sleep(3*1000)
}
}
1 change: 1 addition & 0 deletions packages/bullmq/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Change Log
9 changes: 9 additions & 0 deletions packages/bullmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# @midwayjs/bullmq

this is a sub package for midway.

Document: [https://midwayjs.org](https://midwayjs.org)

## License

[MIT]((https://github.com/midwayjs/midway/blob/master/LICENSE))
17 changes: 17 additions & 0 deletions packages/bullmq/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { QueueOptions, WorkerOptions } from 'bullmq';
export * from './dist/index';
export { Job } from 'bullmq';


Check failure on line 5 in packages/bullmq/index.d.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Delete `⏎`
declare module '@midwayjs/core/dist/interface' {
// eslint-disable-next-line

interface MidwayConfig {
bullmq?: {
defaultQueueNWorkerOptions?: QueueOptions & WorkerOptions;
defaultConcurrency?: number;
clearRepeatJobWhenStart?: boolean;
contextLoggerFormat?: (info: any) => string;
};
}
}

Check failure on line 17 in packages/bullmq/index.d.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Insert `⏎`

Check failure on line 17 in packages/bullmq/index.d.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Newline required at end of file but not found
8 changes: 8 additions & 0 deletions packages/bullmq/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testPathIgnorePatterns: ['<rootDir>/test/fixtures'],
coveragePathIgnorePatterns: ['<rootDir>/test/'],
setupFilesAfterEnv: ['./jest.setup.js'],
coverageProvider: 'v8',
};
2 changes: 2 additions & 0 deletions packages/bullmq/jest.setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
process.env.MIDWAY_TS_MODE = 'true';
jest.setTimeout(30000);
37 changes: 37 additions & 0 deletions packages/bullmq/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"name": "@midwayjs/bullmq",
"version": "1.0.0",
"description": "midway component for bullmq",
"main": "dist/index.js",
"typings": "index.d.ts",
"scripts": {
"build": "tsc",
"test": "node --require=ts-node/register ../../node_modules/.bin/jest --runInBand",
"cov": "node --require=ts-node/register ../../node_modules/.bin/jest --runInBand --coverage --forceExit"
},
"keywords": [
"midway",
"IoC",
"task",
"bullmq",
"plugin"
],
"author": "guo qicong",
"files": [
"dist/**/*.js",
"dist/**/*.d.ts",
"index.d.ts"
],
"license": "MIT",
"devDependencies": {
"@midwayjs/core": "^3.19.0",
"@midwayjs/mock": "^3.19.2"
},
"dependencies": {
"bullmq": "5.34.6"
},
"engines": {
"node": ">=12"
},
"repository": "https://github.com/midwayjs/midway.git"
}
27 changes: 27 additions & 0 deletions packages/bullmq/src/config/config.default.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export const bullmq = {
defaultQueueNWorkerOptions: {
prefix: '{midway-bullmq}',
connection: {
host: '127.0.0.1',
port: 6379,
},
defaultJobOptions: {
removeOnComplete: 3,
removeOnFail: 10,
},
},
defaultConcurrency: 1,
clearRepeatJobWhenStart: true,
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`;
},
};

export const midwayLogger = {
clients: {
bullLogger: {
fileLogName: 'midway-bull.log',
},
},
};

Check failure on line 27 in packages/bullmq/src/config/config.default.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Insert `⏎`

Check failure on line 27 in packages/bullmq/src/config/config.default.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Newline required at end of file but not found
44 changes: 44 additions & 0 deletions packages/bullmq/src/configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
Configuration,
Init,
Inject,
MidwayDecoratorService,
} from '@midwayjs/core';
import * as DefaultConfig from './config/config.default';
import { BullMQFramework } from './framework';
import { BULLMQ_QUEUE_KEY } from './constants';

@Configuration({
namespace: 'bullmq',
importConfigs: [
{
default: DefaultConfig,
},
],
})
export class BullConfiguration {
@Inject()
framework: BullMQFramework;

@Inject()
decoratorService: MidwayDecoratorService;

@Init()
async init() {
this.decoratorService.registerPropertyHandler(
BULLMQ_QUEUE_KEY,
(
propertyName,
meta: {
queueName: string;
}
) => {
return this.framework.getQueue(meta.queueName);
}
);
}

async onReady() {
this.framework.loadConfig();
}
}

Check failure on line 44 in packages/bullmq/src/configuration.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Insert `⏎`

Check failure on line 44 in packages/bullmq/src/configuration.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Newline required at end of file but not found
3 changes: 3 additions & 0 deletions packages/bullmq/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// task
export const BULLMQ_QUEUE_KEY = 'bullmq:queue';
export const BULLMQ_PROCESSOR_KEY = 'bullmq:processor';

Check failure on line 3 in packages/bullmq/src/constants.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Insert `⏎`

Check failure on line 3 in packages/bullmq/src/constants.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Newline required at end of file but not found
59 changes: 59 additions & 0 deletions packages/bullmq/src/decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {
createCustomPropertyDecorator,
Provide,
saveClassMetadata,
saveModule,
Scope,
ScopeEnum,
} from '@midwayjs/core';
import { QueueNWorkerOptions } from './interface';
import { BULLMQ_PROCESSOR_KEY, BULLMQ_QUEUE_KEY } from './constants';
import { JobsOptions } from 'bullmq';

export function Processor(
queueName: string,
jobOptions?: JobsOptions,
queueNWorkerOptions?: QueueNWorkerOptions
): ClassDecorator;
export function Processor(
queueName: string,
concurrency?: number,
jobOptions?: JobsOptions,
queueNWorkerOptions?: QueueNWorkerOptions
): ClassDecorator;
export function Processor(
queueName: string,
concurrency?: number | JobsOptions,
jobOptions?: JobsOptions | QueueNWorkerOptions,
// bullmq 新引入了 worker 作为执行任务的实例,一个队列 queue 和 worker 中 connection, prefix 必须一致才能正常执行
// 目前 QueueOptions WorkerOptions 没有不一致的同名参数,考虑到从 @midwayjs/bull 迁移方便,Processor 中把他们合并
queueNWorkerOptions?: JobsOptions | QueueNWorkerOptions
): ClassDecorator {
return function (target: any) {
if (typeof concurrency !== 'number') {
queueNWorkerOptions = { ...jobOptions };
jobOptions = { ...concurrency };
// 这里不能设置成 1 defaultConcurrency 就不生效了
concurrency = undefined;
}
saveModule(BULLMQ_PROCESSOR_KEY, target);
saveClassMetadata(
BULLMQ_PROCESSOR_KEY,
{
queueName,
concurrency,
jobOptions,
queueNWorkerOptions,
},
target
);
Provide()(target);
Scope(ScopeEnum.Request)(target);
};
}

export function InjectQueue(queueName: string): PropertyDecorator {
return createCustomPropertyDecorator(BULLMQ_QUEUE_KEY, {
queueName,
});
}

Check failure on line 59 in packages/bullmq/src/decorator.ts

View workflow job for this annotation

GitHub Actions / lintAndTestLegacy (lts/*, ubuntu-latest)

Insert `⏎`
Loading

0 comments on commit 8a5d42d

Please sign in to comment.