diff --git a/Command/CronRunCommand.php b/Command/CronRunCommand.php index 78b66f9..0797ed7 100644 --- a/Command/CronRunCommand.php +++ b/Command/CronRunCommand.php @@ -11,6 +11,7 @@ use Cron\Cron; use Cron\CronBundle\Cron\CronCommand; +use Cron\CronBundle\Cron\LockedExecutor; use Cron\CronBundle\Entity\CronJob; use Cron\CronBundle\Job\ShellJobWrapper; use Cron\Resolver\ArrayResolver; @@ -19,6 +20,8 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\FlockStore; use Symfony\Component\Process\PhpExecutableFinder; /** @@ -42,7 +45,7 @@ protected function configure(): void protected function execute(InputInterface $input, OutputInterface $output): int { $cron = new Cron(); - $cron->setExecutor($this->getContainer()->get('cron.executor')); + $cron->setExecutor($this->createLockedExecutor()); if ($input->getArgument('job')) { $resolver = $this->getJobResolver($input->getArgument('job'), $input->getParameterOption('--force') !== false, $input->getParameterOption('--schedule_now') !== false); } else { @@ -70,6 +73,22 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 0; } + /** + * Creates a job executor with locking mechanism + */ + protected function createLockedExecutor(): \Cron\Executor\Executor + { + // Use the default executor with our custom locking wrapper + $executor = new LockedExecutor(); + + // Create lock factory using filesystem locks + $lockStore = new FlockStore(sys_get_temp_dir()); + $lockFactory = new LockFactory($lockStore); + $executor->setLockFactory($lockFactory); + + return $executor; + } + /** * @throws \InvalidArgumentException */ diff --git a/Cron/LockedExecutor.php b/Cron/LockedExecutor.php new file mode 100644 index 0000000..f04a3b9 --- /dev/null +++ b/Cron/LockedExecutor.php @@ -0,0 +1,135 @@ +lockFactory = $lockFactory; + } + + /** + * Override the startProcesses method to add locking + */ + protected function startProcesses(CronReport $report): void + { + if (!$this->lockFactory) { + // Fall back to parent implementation if no lock factory is set + parent::startProcesses($report); + return; + } + + foreach ($this->sets as $set) { + $job = $set->getJob(); + + // Skip jobs that are already running + if ($this->isJobLocked($job)) { + continue; + } + + // Try to acquire a lock for this job + $lock = $this->acquireLock($job); + if (!$lock) { + // Skip if we couldn't get a lock + continue; + } + + // Store the lock with the job + $this->locks[spl_object_hash($job)] = $lock; + + // Add report and run the job + $report->addJobReport($set->getReport()); + $set->run(); + } + } + + /** + * Check if a job is already locked/running + */ + private function isJobLocked(JobInterface $job): bool + { + if (!$job instanceof ShellJobWrapper || !$job->raw) { + // We can't determine the job ID, so assume it's not locked + return false; + } + + // Create a lock without acquiring it to check if someone else has it + $lockName = $this->getLockName($job); + $lock = $this->lockFactory->createLock($lockName); + + return !$lock->acquire(false); + } + + /** + * Try to acquire a lock for a job + */ + private function acquireLock(JobInterface $job): ?LockInterface + { + if (!$job instanceof ShellJobWrapper || !$job->raw) { + // No need to lock if we can't identify the job + return null; + } + + $lockName = $this->getLockName($job); + $lock = $this->lockFactory->createLock($lockName, 3600); // 1 hour TTL to prevent stale locks + + if ($lock->acquire(false)) { + return $lock; + } + + return null; + } + + /** + * Get a unique lock name for the job + */ + private function getLockName(JobInterface $job): string + { + if ($job instanceof ShellJobWrapper && $job->raw) { + return 'cron_job_' . $job->raw->getId(); + } + + // Fallback if no job ID is available + return 'cron_job_' . md5(serialize($job)); + } + + /** + * Override isRunning to release locks when jobs finish + */ + public function isRunning(): bool + { + $running = false; + + foreach ($this->sets as $set) { + $job = $set->getJob(); + $jobHash = spl_object_hash($job); + + if ($job->isRunning()) { + $running = true; + } elseif (isset($this->locks[$jobHash])) { + // Job is done, release the lock + $this->locks[$jobHash]->release(); + unset($this->locks[$jobHash]); + } + } + + return $running; + } +} \ No newline at end of file diff --git a/composer.json b/composer.json index 9ac8728..0b2096b 100644 --- a/composer.json +++ b/composer.json @@ -22,7 +22,8 @@ "symfony/console": "^5.0|^6.0|^7.0", "symfony/dependency-injection": "^5.0|^6.0|^7.0", "symfony/http-kernel": "^5.0|^6.0|^7.0", - "symfony/process": "^5.0|^6.0|^7.0" + "symfony/process": "^5.0|^6.0|^7.0", + "symfony/lock": "^5.0|^6.0|^7.0" }, "autoload": { "psr-4" : {