Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK: Prevent multiple catchup runs #4751

Closed
Closed
3 changes: 3 additions & 0 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\CatchUp\CatchUp;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
Expand Down Expand Up @@ -79,6 +80,7 @@ public function __construct(
private readonly ContentDimensionSourceInterface $contentDimensionSource,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
) {
}

Expand Down Expand Up @@ -191,6 +193,7 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
}
$catchUp->run($eventStream);
$catchUpHook?->onAfterCatchUp();
$this->catchUpDeduplicationQueue->releaseCatchUpLock($projectionClassName);
}

public function setUp(): SetupResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@

use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\CommandHandler\PendingProjections;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStore\CommitResult;

/**
* Internal service to persist {@see EventInterface} with the proper normalization, and triggering the
Expand All @@ -27,7 +26,7 @@ final class EventPersister
{
public function __construct(
private readonly EventStoreInterface $eventStore,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
private readonly EventNormalizer $eventNormalizer,
private readonly Projections $projections,
) {
Expand Down Expand Up @@ -67,7 +66,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
$projection->markStale();
}
}
$this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections);
$this->catchUpDeduplicationQueue->requestCatchUp($pendingProjections->projections);

// The CommandResult can be used to block until projections are up to date.
return new CommandResult($pendingProjections, $commitResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler;
use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Psr\Clock\ClockInterface;
use Symfony\Component\Serializer\Serializer;
Expand All @@ -52,7 +52,7 @@ public function __construct(
ContentDimensionSourceInterface $contentDimensionSource,
Serializer $propertySerializer,
ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
) {
Expand Down Expand Up @@ -100,6 +100,7 @@ public function getOrBuild(): ContentRepository
$this->projectionFactoryDependencies->contentDimensionSource,
$this->userIdProvider,
$this->clock,
$this->catchUpDeduplicationQueue
);
}
return $this->contentRepository;
Expand Down Expand Up @@ -164,7 +165,7 @@ private function buildEventPersister(): EventPersister
if (!$this->eventPersister) {
$this->eventPersister = new EventPersister(
$this->projectionFactoryDependencies->eventStore,
$this->projectionCatchUpTrigger,
$this->catchUpDeduplicationQueue,
$this->projectionFactoryDependencies->eventNormalizer,
$this->projectionsAndCatchUpHooks->projections,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public function getIterator(): \Traversable
{
yield from $this->projections;
}
public function isEmpty(): bool
{
return count($this->projections) === 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry\Command;

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
Expand All @@ -22,7 +23,7 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
* @Flow\Inject
* @var ContentRepositoryRegistry
*/
protected $contentRepositoryRegistry;
protected ContentRepositoryRegistry $contentRepositoryRegistry;

/**
* @param string $contentRepositoryIdentifier
Expand All @@ -31,7 +32,8 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
*/
public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void
{
$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier));
$contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier);
$contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId);
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry;

use Neos\Cache\Frontend\FrontendInterface;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
Expand All @@ -14,7 +15,6 @@
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Node;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException;
Expand All @@ -25,6 +25,7 @@
use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface;
use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
Expand Down Expand Up @@ -139,7 +140,7 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content
$this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings),
$this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings),
$this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings),
$this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings),
$this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings),
$this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings),
$clock,
);
Expand Down Expand Up @@ -228,14 +229,25 @@ private function buildProjectionsFactory(ContentRepositoryId $contentRepositoryI
}

/** @param array<string, mixed> $contentRepositorySettings */
private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface
private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): CatchUpDeduplicationQueue
{
isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value);
$projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']);
if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) {
throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory));
}
return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);
$projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);

$catchUpStateCache = $this->objectManager->get('Neos.ContentRepositoryRegistry:CacheCatchUpStates');
if (!$catchUpStateCache instanceof FrontendInterface) {
throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:CacheCatchUpStates" must provide a Cache Frontend, but is "%s".', get_debug_type($catchUpStateCache));
}

return new CatchUpDeduplicationQueue(
$contentRepositoryId,
$catchUpStateCache,
$projectionCatchUpTrigger
);
}

/** @param array<string, mixed> $contentRepositorySettings */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
<?php
namespace Neos\ContentRepositoryRegistry\Service;

use Neos\Cache\Frontend\FrontendInterface;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;

/**
*
*/
final readonly class CatchUpDeduplicationQueue
{
public function __construct(
private ContentRepositoryId $contentRepositoryId,
private FrontendInterface $catchUpLock,
private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger
) {}

public function requestCatchUp(Projections $projections): void
{
$queuedProjections = $this->triggerCatchUpAndReturnQueued($projections);
$attempts = 0;
/** @phpstan-ignore-next-line */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity: What was PHPStan complaining about?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new Projections::emptyis in an internal class if you have suggestions I am happy, we could make Projections not internal or I could not use it, but it seems very convenient.

while ($queuedProjections->isEmpty() === false) {
usleep(random_int(100, 100 + $attempts) + ($attempts * $attempts * 10));
$queuedProjections = $this->retryQueued($queuedProjections);
$attempts++;
}
}

/**
* @param class-string $projectionClassName
* @return void
*/
public function releaseCatchUpLock(string $projectionClassName): void
{
$this->setStopped($projectionClassName);
}

private function triggerCatchUpAndReturnQueued(Projections $projections): Projections
{
$passToCatchUp = [];
$queuedProjections = [];
foreach ($projections as $projection) {
if (!$this->isRunning($projection::class)) {
$this->run($projection::class);
// We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it.
$this->dequeue($projection::class);
$passToCatchUp[] = $projection;
continue;
}

if (!$this->isQueued($projection::class)) {
$this->queue($projection::class);
$queuedProjections[] = $projection;
}
}

$this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp));

return Projections::fromArray($queuedProjections);
}

private function retryQueued(Projections $queuedProjections): Projections
{
$passToCatchUp = [];
$stillQueuedProjections = [];
foreach ($queuedProjections as $projection) {
if (!$this->isQueued($projection::class)) {
// was dequeued, we can drop it
continue;
}

if (!$this->isRunning($projection::class)) {
$this->run($projection::class);
// We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it.
$this->dequeue($projection::class);
$passToCatchUp[] = $projection;
continue;
}

$stillQueuedProjections[] = $projection;
}

$this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp));

return Projections::fromArray($stillQueuedProjections);
}

/**
* @param class-string $projectionClassName
* @return bool
*/
private function isRunning(string $projectionClassName): bool
{
return $this->catchUpLock->has($this->cacheKeyRunning($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function run(string $projectionClassName): void
{
$this->catchUpLock->set($this->cacheKeyRunning($projectionClassName), 1);
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function setStopped(string $projectionClassName): void
{
$this->catchUpLock->remove($this->cacheKeyRunning($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return bool
*/
private function isQueued(string $projectionClassName): bool
{
return $this->catchUpLock->has($this->cacheKeyQueued($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function queue(string $projectionClassName): void
{
$this->catchUpLock->set($this->cacheKeyQueued($projectionClassName), 1);
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function dequeue(string $projectionClassName): void
{
$this->catchUpLock->remove($this->cacheKeyQueued($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return string
*/
private function cacheKeyPrefix(string $projectionClassName): string
{
return $this->contentRepositoryId->value . '_' . md5($projectionClassName);
}

/**
* @param class-string $projectionClassName
* @return string
*/
private function cacheKeyRunning(string $projectionClassName): string
{
return $this->cacheKeyPrefix($projectionClassName) . '_RUNNING';
}

/**
* @param class-string $projectionClassName
* @return string
*/
private function cacheKeyQueued(string $projectionClassName): string
{
return $this->cacheKeyPrefix($projectionClassName) . '_QUEUED';
}
}
6 changes: 6 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Caches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents:
backend: Neos\Cache\Backend\FileBackend
backendOptions:
defaultLifetime: 400

Neos_ContentRepositoryRegistry_CatchUpStates:
frontend: Neos\Cache\Frontend\VariableFrontend
backend: Neos\Cache\Backend\FileBackend
backendOptions:
defaultLifetime: 30
8 changes: 8 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Objects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection
value: Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjectionFactory
2:
object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface'

'Neos.ContentRepositoryRegistry:CacheCatchUpStates':
className: Neos\Cache\Frontend\VariableFrontend
factoryObjectName: Neos\Flow\Cache\CacheManager
factoryMethodName: getCache
arguments:
1:
value: Neos_ContentRepositoryRegistry_CatchUpStates