Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion Command/CronRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use Cron\Cron;
use Cron\CronBundle\Cron\CronCommand;
use Cron\CronBundle\Cron\LockedExecutor;
use Cron\CronBundle\Entity\CronJob;
use Cron\CronBundle\Job\ShellJobWrapper;
use Cron\Resolver\ArrayResolver;
Expand All @@ -19,6 +20,8 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\FlockStore;
use Symfony\Component\Process\PhpExecutableFinder;

/**
Expand All @@ -42,7 +45,7 @@ protected function configure(): void
protected function execute(InputInterface $input, OutputInterface $output): int
{
$cron = new Cron();
$cron->setExecutor($this->getContainer()->get('cron.executor'));
$cron->setExecutor($this->createLockedExecutor());
if ($input->getArgument('job')) {
$resolver = $this->getJobResolver($input->getArgument('job'), $input->getParameterOption('--force') !== false, $input->getParameterOption('--schedule_now') !== false);
} else {
Expand Down Expand Up @@ -70,6 +73,22 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return 0;
}

/**
* Creates a job executor with locking mechanism
*/
protected function createLockedExecutor(): \Cron\Executor\Executor
{
// Use the default executor with our custom locking wrapper
$executor = new LockedExecutor();

// Create lock factory using filesystem locks
$lockStore = new FlockStore(sys_get_temp_dir());
$lockFactory = new LockFactory($lockStore);
$executor->setLockFactory($lockFactory);

return $executor;
}

/**
* @throws \InvalidArgumentException
*/
Expand Down
135 changes: 135 additions & 0 deletions Cron/LockedExecutor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?php declare(strict_types=1);

namespace Cron\CronBundle\Cron;

use Cron\CronBundle\Job\ShellJobWrapper;
use Cron\Executor\Executor;
use Cron\Job\JobInterface;
use Cron\Report\CronReport;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;

/**
* Executor that locks jobs to prevent concurrent execution of the same job.
*/
class LockedExecutor extends Executor
{
private ?LockFactory $lockFactory = null;
private array $locks = [];

/**
* Set the lock factory
*/
public function setLockFactory(LockFactory $lockFactory): void
{
$this->lockFactory = $lockFactory;
}

/**
* Override the startProcesses method to add locking
*/
protected function startProcesses(CronReport $report): void
{
if (!$this->lockFactory) {
// Fall back to parent implementation if no lock factory is set
parent::startProcesses($report);
return;
}

foreach ($this->sets as $set) {
$job = $set->getJob();

// Skip jobs that are already running
if ($this->isJobLocked($job)) {
continue;
}

// Try to acquire a lock for this job
$lock = $this->acquireLock($job);
if (!$lock) {
// Skip if we couldn't get a lock
continue;
}

// Store the lock with the job
$this->locks[spl_object_hash($job)] = $lock;

// Add report and run the job
$report->addJobReport($set->getReport());
$set->run();
}
}

/**
* Check if a job is already locked/running
*/
private function isJobLocked(JobInterface $job): bool
{
if (!$job instanceof ShellJobWrapper || !$job->raw) {
// We can't determine the job ID, so assume it's not locked
return false;
}

// Create a lock without acquiring it to check if someone else has it
$lockName = $this->getLockName($job);
$lock = $this->lockFactory->createLock($lockName);

return !$lock->acquire(false);
}

/**
* Try to acquire a lock for a job
*/
private function acquireLock(JobInterface $job): ?LockInterface
{
if (!$job instanceof ShellJobWrapper || !$job->raw) {
// No need to lock if we can't identify the job
return null;
}

$lockName = $this->getLockName($job);
$lock = $this->lockFactory->createLock($lockName, 3600); // 1 hour TTL to prevent stale locks

if ($lock->acquire(false)) {
return $lock;
}

return null;
}

/**
* Get a unique lock name for the job
*/
private function getLockName(JobInterface $job): string
{
if ($job instanceof ShellJobWrapper && $job->raw) {
return 'cron_job_' . $job->raw->getId();
}

// Fallback if no job ID is available
return 'cron_job_' . md5(serialize($job));
}

/**
* Override isRunning to release locks when jobs finish
*/
public function isRunning(): bool
{
$running = false;

foreach ($this->sets as $set) {
$job = $set->getJob();
$jobHash = spl_object_hash($job);

if ($job->isRunning()) {
$running = true;
} elseif (isset($this->locks[$jobHash])) {
// Job is done, release the lock
$this->locks[$jobHash]->release();
unset($this->locks[$jobHash]);
}
}

return $running;
}
}
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"symfony/console": "^5.0|^6.0|^7.0",
"symfony/dependency-injection": "^5.0|^6.0|^7.0",
"symfony/http-kernel": "^5.0|^6.0|^7.0",
"symfony/process": "^5.0|^6.0|^7.0"
"symfony/process": "^5.0|^6.0|^7.0",
"symfony/lock": "^5.0|^6.0|^7.0"
},
"autoload": {
"psr-4" : {
Expand Down