Skip to content

Commit

Permalink
fix: #4295 #4293 #4299 #4294 (#4290)
Browse files Browse the repository at this point in the history
* fix: test lint

* fix: some issue

* fix: #4299

* fix: #4294

* fix: use addJob instead of runJob

* fix: bullmq method
  • Loading branch information
czy88840616 authored Mar 2, 2025
1 parent 68d91e2 commit 441fa88
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 58 deletions.
11 changes: 3 additions & 8 deletions packages/bull-board/src/board.middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IMidwayContext,
NextFunction,
Config,
Init,
Inject,
Provide,
Scope,
Expand Down Expand Up @@ -61,12 +62,9 @@ export class BoardMiddleware

private basePath: string;
private serverAdapter: MidwayAdapter;
private inited = false;

protected init() {
// 如果使用 @Init 在服务器上会在 egg-cluster willReady 和 serverReady 之间执行
// 这个时候 queue 不确保完全被初始化,serverAdapter 可能并没有 register 所有队列
// 所以会出现 bull-board 刷新时不时缺少队列,改成在调用时,判断 this.inited 执行
@Init()
protected async init() {
let framework: BullFramework | BullMQFramework =
this.frameworkService.getFramework('bull') as BullFramework;
if (!framework) {
Expand Down Expand Up @@ -99,7 +97,6 @@ export class BoardMiddleware
});
this.serverAdapter.setBasePath(this.basePath);
this.bullBoardManager.setBullBoard(bullBoard);
this.inited = true;
}

resolve(app: IMidwayApplication) {
Expand All @@ -109,7 +106,6 @@ export class BoardMiddleware
if (pathname.indexOf(this.basePath) === -1) {
return next();
}
if (!this.inited) this.init();
const routePath: string = pathname.replace(this.basePath, '') || '/';

let content;
Expand Down Expand Up @@ -153,7 +149,6 @@ export class BoardMiddleware
if (pathname.indexOf(this.basePath) === -1) {
return next();
}
if (!this.inited) this.init();

const routePath: string = pathname.replace(this.basePath, '') || '/';

Expand Down
24 changes: 21 additions & 3 deletions packages/bull/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ export class BullQueue extends Bull implements IQueue<Job> {
super(queueName, queueOptions);
}

public async addJobToQueue(data: any, options?: JobOptions): Promise<Job> {
return this.add(data || {}, options) as unknown as Job;
}

/**
* @deprecated use addJobToQueue instead
*/
public async runJob(data: any, options?: JobOptions): Promise<Job> {
return this.add(data || {}, options) as unknown as Job;
}
Expand Down Expand Up @@ -97,7 +104,7 @@ export class BullFramework
}
await this.addProcessor(mod, options.queueName, options.concurrency);
if (options.jobOptions?.repeat) {
await this.runJob(options.queueName, {}, options.jobOptions);
await this.addJobToQueue(options.queueName, {}, options.jobOptions);
}
}
}
Expand Down Expand Up @@ -182,17 +189,28 @@ export class BullFramework
});
}

public async runJob(
public async addJobToQueue(
queueName: string,
jobData: any,
options?: JobOptions
): Promise<Job | undefined> {
const queue = this.queueMap.get(queueName);
if (queue) {
return await queue.runJob(jobData, options);
return await queue.addJobToQueue(jobData, options);
}
}

/**
* @deprecated use addJob instead
*/
public async runJob(
queueName: string,
jobData: any,
options?: JobOptions
): Promise<Job | undefined> {
return this.addJobToQueue(queueName, jobData, options);
}

public async getJob(queueName: string, jobName: string): Promise<Job> {
const queue = this.queueMap.get(queueName);
if (queue) {
Expand Down
8 changes: 8 additions & 0 deletions packages/bull/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@ export interface IProcessor {
}

export interface IQueue<Job> {
/**
* @deprecated use addJobToQueue instead
*/
runJob(data: Record<string, any>, options?: unknown): Promise<Job>;
addJobToQueue(data: Record<string, any>, options?: unknown): Promise<Job>;
getJob(name: string): Promise<Job>;
getQueueName(): string;
}

export interface IQueueManager<Queue extends IQueue<Job>, Job> {
addJobToQueue(queueName: string, jobData: any, options?: unknown): Promise<Job|undefined>;
/**
* @deprecated use addJobToQueue instead
*/
runJob(queueName: string, jobData: any, options?: unknown): Promise<Job|undefined>;
getJob(queueName: string, jobName: string): Promise<Job>;
createQueue(queueName: string, queueOptions?: unknown): Queue;
Expand Down
41 changes: 38 additions & 3 deletions packages/bullmq/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ export class BullMQQueue extends Queue {
super(queueName, queueOptions);
}

// runJob 与 @midwayjs/bull 保持一致,如果想要使用 jobName 则可以直接调用 queue.add
public async runJob(data: any, options?: JobsOptions): Promise<Job> {
public async addJobToQueue(data: any, options?: JobsOptions): Promise<Job> {
const { repeat, ...OtherOptions } = options ?? {};
if (repeat) {
return this.upsertJobScheduler(this.name, repeat, {
Expand All @@ -52,6 +51,14 @@ export class BullMQQueue extends Queue {
return this.add('jobName', data || {}, options);
}

/**
* @deprecated use addJobToQueue instead
*/
// runJob 与 @midwayjs/bull 保持一致,如果想要使用 jobName 则可以直接调用 queue.add
public async runJob(data: any, options?: JobsOptions): Promise<Job> {
return this.addJobToQueue(data, options);
}

public getQueueName(): string {
return this.queueName;
}
Expand Down Expand Up @@ -183,7 +190,10 @@ export class BullMQFramework extends BaseFramework<Application, Context, any> {
await this.addProcessor(mod, options.queueName, options.workerOptions);
if (repeat) {
// add repeatable job
await this.getQueue(options.queueName)?.runJob({}, options.jobOptions);
await this.getQueue(options.queueName)?.addJobToQueue(
{},
options.jobOptions
);
}
}
}
Expand Down Expand Up @@ -338,6 +348,31 @@ export class BullMQFramework extends BaseFramework<Application, Context, any> {
);
}

/**
* Add a job to the queue
*/
public async addJobToQueue(
queueName: string,
jobData: any,
options?: JobsOptions
): Promise<Job | undefined> {
const queue = this.queueMap.get(queueName);
if (queue) {
return await queue.addJobToQueue(jobData, options);
}
}

/**
* @deprecated use addJobToQueue instead
*/
public async runJob(
queueName: string,
jobData: any,
options?: JobsOptions
): Promise<Job | undefined> {
return this.addJobToQueue(queueName, jobData, options);
}

/**
* Create a flow producer, if producerName is provided, it will be store.
*/
Expand Down
4 changes: 4 additions & 0 deletions packages/busboy/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export interface UploadOptions extends BusboyConfig {
* Mime type white list
*/
mimeTypeWhiteList?: Record<string, string | string[]> | ((ctx: IMidwayContext<any>) => string | string[]);
/**
* Whether to allow fields duplication, default is `false`, only for `file` and `stream` mode
*/
allowFieldsDuplication?: boolean;
}

export interface UploadFileInfo {
Expand Down
26 changes: 21 additions & 5 deletions packages/busboy/src/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ export class UploadMiddleware implements IMiddleware<any, any> {
ctxOrReq
) {
let isStreamResolve = false;
const { mode, tmpdir } = uploadConfig;
const { mode, tmpdir, allowFieldsDuplication } =
uploadConfig as UploadOptions;
const { files = [], fields = [] } = await new Promise<any>(
(resolveP, reject) => {
const bb = busboy({
Expand Down Expand Up @@ -305,10 +306,25 @@ export class UploadMiddleware implements IMiddleware<any, any> {
);

ctxOrReq.files = files;
ctxOrReq.fields = fields.reduce((accumulator, current) => {
accumulator[current.name] = current.value;
return accumulator;
}, {});
if (allowFieldsDuplication) {
// 如果重复,则使用数组
ctxOrReq.fields = fields.reduce((accumulator, current) => {
if (accumulator[current.name]) {
if (!Array.isArray(accumulator[current.name])) {
accumulator[current.name] = [accumulator[current.name]];
}
accumulator[current.name].push(current.value);
} else {
accumulator[current.name] = current.value;
}
return accumulator;
}, {});
} else {
ctxOrReq.fields = fields.reduce((accumulator, current) => {
accumulator[current.name] = current.value;
return accumulator;
}, {});
}
}

private async processAsyncIterator(
Expand Down
4 changes: 2 additions & 2 deletions packages/busboy/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe('/test/index.test.ts', () => {
const fsWriteStream = createWriteStream(fileName);
const fieldName = files[0].fieldName;

await new Promise(resolve => {
await new Promise<void>(resolve => {
fsWriteStream.on('close', resolve);
files[0].data.pipe(fsWriteStream);
});
Expand Down Expand Up @@ -174,7 +174,7 @@ describe('/test/index.test.ts', () => {
const fileName = join(tmpdir(), Date.now() + '_' + ctx.files[0].filename);
const fsWriteStream = createWriteStream(fileName);

await new Promise(resolve => {
await new Promise<void>(resolve => {
fsWriteStream.on('close', resolve);
ctx.files[0].data.pipe(fsWriteStream);
});
Expand Down
10 changes: 7 additions & 3 deletions packages/core/src/common/applicationManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ export class MidwayApplicationManager {
namespaces?: Array<string | FrameworkType>
): IMidwayApplication[] {
if (!namespaces) {
return Array.from(this.globalFrameworkMap.values()).map(framework => {
return framework.getApplication();
});
return Array.from(this.globalFrameworkMap.values())
.map(framework => {
return framework.getApplication();
})
.filter(app => {
return !!app;
});
} else {
return namespaces
.map(namespace => {
Expand Down
8 changes: 4 additions & 4 deletions packages/cron/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ export class CronFramework extends BaseFramework<Application, Context, any> {
} else {
const options = getClassMetadata(CRON_JOB_KEY, name) as {
jobOptions?: CronJobParameters;
name?: string;
jobName?: string;
};
jobName = options.name || getProviderUUId(name);
jobName = options.jobName || getProviderUUId(name);
jobOptions = extend(
true,
{},
Expand Down Expand Up @@ -137,9 +137,9 @@ export class CronFramework extends BaseFramework<Application, Context, any> {
} else {
const options = getClassMetadata(CRON_JOB_KEY, name) as {
jobOptions?: CronJobParameters;
name?: string;
jobName?: string;
};
return options.name || getProviderUUId(name);
return options.jobName || getProviderUUId(name);
}
}
}
27 changes: 26 additions & 1 deletion packages/cron/test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { createApp, close } from '@midwayjs/mock';
import { createApp, close, createLightApp } from '@midwayjs/mock';
import { join } from 'path';
import { sleep } from '@midwayjs/core';
import * as cron from '../src';
import { IJob, Job } from '../src';

describe(`/test/index.test.ts`, () => {
it('test job with decorator and start', async () => {
Expand All @@ -17,4 +18,28 @@ describe(`/test/index.test.ts`, () => {
await sleep(5 * 1000);
await close(app);
});

it('should test get job name with string and class', async () => {
@Job('syncJob', {
cronTime: '*/2 * * * * *', // 每隔 2s 执行
})
class DataSyncCheckerJob implements IJob {
async onTick() {
console.log('syncJob');
}
}
const app = await createLightApp({
imports: [
cron
],
preloadModules: [
DataSyncCheckerJob
]
});

const framework = app.getFramework() as cron.Framework;

expect(framework.getJob(DataSyncCheckerJob)).toBeTruthy();
expect(framework.getJob('syncJob')).toBeTruthy();
});
});
10 changes: 0 additions & 10 deletions packages/validate/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,6 @@ export class ValidateConfiguration {
await container.getAsync(ParseFloatPipe);
await container.getAsync(DecoratorValidPipe);

this.decoratorService.registerParameterHandler(
VALID_KEY,
({ parameterIndex, originParamType, originArgs, metadata }) => {
if (!metadata.schema) {
metadata.schema = this.validateService.getSchema(originParamType);
}
return originArgs[parameterIndex];
}
);

// register web param default pipe
this.decoratorService.registerParameterPipes(WEB_ROUTER_PARAM_KEY, [
ValidationPipe,
Expand Down
Loading

0 comments on commit 441fa88

Please sign in to comment.