A PostgreSQL-based job queue transporter for NestJS microservices, providing reliable background job processing with TypeORM integration.
- 🚀 PostgreSQL-backed job queue - Leverages PostgreSQL for reliable job storage and processing
- 🔄 Configurable processing - Set custom intervals, batch sizes, and timeouts per job type
- 🛡️ Built-in error handling - Graceful error handling with custom error handlers
- 📦 TypeORM integration - Seamless integration with existing TypeORM entities
- 🎯 NestJS microservice support - Works as a NestJS microservice transporter
- 🧹 Graceful shutdown - Clean shutdown with job cleanup and database connection closing
npm install nestjs-typeorm-pg-queueimport { Entity, Column } from 'typeorm';
import { JobQueueBaseEntity } from 'nestjs-typeorm-pg-queue';
@Entity('example_jobs')
export class ExampleJobEntity extends JobQueueBaseEntity {
@Column()
taskName: string;
@Column('jsonb', { nullable: true })
payload: any;
}import { Injectable } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Injectable()
export class ExampleJobHandler {
@MessagePattern('example_jobs')
async handleJob(data: any) {
console.log('Processing job:', data);
// Your job processing logic here
return { success: true };
}
}import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { PgTransporterClient } from 'nestjs-typeorm-pg-queue';
import { DataSource } from 'typeorm';
// Configure your database connection
const dataSource = new DataSource({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'your_username',
password: 'your_password',
database: 'your_database',
entities: [ExampleJobEntity],
synchronize: true, // Only for development
});
// Define job processing configuration
const topics = new Map([
[ExampleJobEntity, {
frequent: 1000, // Check every 1 second
amount: 5, // Process up to 5 jobs at once
constraint: {}, // Additional where conditions
timeout: 30000 // Job timeout in ms
}]
]);
async function bootstrap() {
await dataSource.initialize();
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: PgTransporterClient.connect(dataSource.manager)
.addTopics(topics)
.addConfig({ timeout: 60000 })
.errorHandler((error) => {
console.error('Job processing error:', error);
})
.connect(),
},
);
await app.listen();
console.log('🎯 Microservice is listening and processing jobs...');
}
bootstrap().catch(console.error);Each job type can be configured with the following options:
frequent: How often to check for new jobs (in milliseconds)amount: Maximum number of jobs to process in a single batchconstraint: Additional WHERE conditions for job selectiontimeout: Job processing timeout (in milliseconds)
timeout: Global timeout for job processing
Extend JobQueueBaseEntity for your job entities. It provides:
id: Primary keystatus: Job status trackingcreatedAt: Job creation timestampupdatedAt: Last update timestamp- Built-in status management
See the /example directory for a complete working example including:
- Docker Compose setup with PostgreSQL
- Job entity definition
- Job handler implementation
- Job seeding utilities
- Graceful shutdown handling
- Start PostgreSQL:
cd example
docker-compose up -d- Run the example:
npm run build
./start-example.sh# Build the project
npm run build
# Run tests
npm test- Node.js >= 16
- PostgreSQL >= 12
- NestJS >= 10
- TypeORM >= 0.3
ISC
Contributions are welcome! Please feel free to submit a Pull Request.
If you encounter any issues, please file them on the GitHub Issues page.