Skip to content

Commit 24a05dc

Browse files
committed
fetchAsync
1 parent c027835 commit 24a05dc

6 files changed

Lines changed: 473 additions & 59 deletions

File tree

packages/cache/composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"windwalker/test": "^4.0",
2424
"windwalker/filesystem": "^4.0",
2525
"windwalker/crypt": "^4.0",
26+
"windwalker/promise": "^4.0",
2627
"league/flysystem": "^2.0",
2728
"mockery/mockery": "^1.4"
2829
},

packages/cache/src/CachePool.php

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Windwalker\Cache\Storage\GroupedStorageInterface;
2222
use Windwalker\Cache\Storage\PhpFileStorage;
2323
use Windwalker\Cache\Storage\StorageInterface;
24+
use Windwalker\Promise\Promise;
2425
use Windwalker\Utilities\Assert\ArgumentsAssert;
2526

2627
/**
@@ -279,16 +280,13 @@ public function get($key, $default = null): mixed
279280
/**
280281
* @inheritDoc
281282
*/
282-
public function set($key, $value, $ttl = null, array $tags = []): bool
283+
public function set($key, $value, $ttl = null): bool
283284
{
284285
$item = $this->getItem($key);
285286

286287
$item->expiresAfter($ttl ?? $this->defaultTtl);
287288
$item->set($value);
288289

289-
if ($item instanceof CacheItem && $tags !== []) {
290-
$item->tags(...$tags);
291-
}
292290

293291
return $this->save($item);
294292
}
@@ -367,7 +365,6 @@ public function has($key): bool
367365
* @param string $key
368366
* @param CacheHandler $handler Invoked to compute the value on cache miss.
369367
* Receives the CacheItem as first argument.
370-
* May call $item->tag('foo', 'bar') to associate tags.
371368
* @param null|int|DateInterval $ttl
372369
* @param float $beta XFetch beta factor.
373370
* 0 = no early expiration.
@@ -451,6 +448,108 @@ public function call(
451448
return $this->fetch($key, $handler, $ttl, 1.0, $lock);
452449
}
453450

451+
/**
452+
* Asynchronous variant of fetch() that wraps the computation in a Promise.
453+
*
454+
* Differences from fetch():
455+
* - The handler may return either a value OR a Promise / thenable.
456+
* When a thenable is returned, the cache will only persist the resolved value
457+
* (NOT the promise object itself).
458+
* - The returned Promise can be combined with Promise::all(), Promise::race(), etc.
459+
*
460+
* Example:
461+
* Promise::all([
462+
* $pool->fetchAsync('a', fn() => fetchRemote('a')), // handler returns Promise
463+
* $pool->fetchAsync('b', fn() => fetchRemote('b')),
464+
* ])->then(fn(array $values) => useValues($values))->wait();
465+
*
466+
* @psalm-param callable(CacheItem): mixed $handler
467+
*/
468+
public function fetchAsync(
469+
string $key,
470+
callable $handler,
471+
DateInterval|int|null $ttl = null,
472+
float $beta = 1.0,
473+
bool $lock = true,
474+
): Promise {
475+
if (!class_exists(Promise::class)) {
476+
throw new \DomainException('Please install `windwalker/promise` to use fetchAsync().');
477+
}
478+
479+
return new Promise(function ($resolve, $reject) use ($key, $handler, $ttl, $beta, $lock) {
480+
$locked = $lock && CacheLock::lock($key, $isNew);
481+
$released = false;
482+
483+
$release = static function () use (&$released, &$locked, $key) {
484+
if ($locked && !$released) {
485+
CacheLock::release($key);
486+
$released = true;
487+
}
488+
};
489+
490+
try {
491+
$item = $this->getItem($key);
492+
493+
// Re-entrant: another stripe in this process already holds the lock.
494+
if ($locked && !$isNew) {
495+
$release();
496+
$resolve($item->get());
497+
498+
return;
499+
}
500+
501+
$isHit = $item->isHit();
502+
503+
if ($isHit && !$this->shouldRecomputeEarly($item, $beta)) {
504+
$release();
505+
$resolve($item->get());
506+
507+
return;
508+
}
509+
510+
$item->expiresAfter($ttl);
511+
512+
$start = microtime(true);
513+
$data = $handler($item);
514+
515+
// Whether $data is a plain value or a Promise/thenable, Promise::resolve()
516+
// normalises it. The .then() chain only fires once the (possibly async)
517+
// value has actually been produced.
518+
Promise::resolve($data)->then(
519+
function ($resolvedData) use ($item, $start, $resolve, $release) {
520+
try {
521+
$ctime = max(1, (int) round(max(0.0, microtime(true) - $start) * 1000));
522+
523+
if (!$resolvedData instanceof CacheItemInterface) {
524+
$item->set($resolvedData);
525+
} else {
526+
$item = $resolvedData;
527+
$resolvedData = $item->get();
528+
}
529+
530+
if ($item instanceof CacheItem) {
531+
$item->setCtime($ctime);
532+
}
533+
534+
$this->save($item);
535+
} finally {
536+
$release();
537+
}
538+
539+
$resolve($resolvedData);
540+
},
541+
function ($reason) use ($release, $reject) {
542+
$release();
543+
$reject($reason);
544+
}
545+
);
546+
} catch (\Throwable $e) {
547+
$release();
548+
$reject($e);
549+
}
550+
});
551+
}
552+
454553
/**
455554
* XFetch probabilistic early-expiration check.
456555
*

packages/cache/src/CachePoolInterface.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
use Psr\Log\LoggerAwareInterface;
1010
use Psr\Log\LoggerInterface;
1111
use Psr\SimpleCache\CacheInterface;
12+
use Windwalker\Promise\Promise;
13+
use Windwalker\Promise\PromiseInterface;
1214

1315
interface CachePoolInterface extends CacheItemPoolInterface, CacheInterface, LoggerAwareInterface
1416
{
@@ -21,6 +23,15 @@ public function fetch(
2123
bool $lock = true,
2224
): mixed;
2325

26+
/** @psalm-param callable(CacheItem): mixed $handler */
27+
public function fetchAsync(
28+
string $key,
29+
callable $handler,
30+
DateInterval|int|null $ttl = null,
31+
float $beta = 1.0,
32+
bool $lock = true,
33+
): PromiseInterface;
34+
2435
public function withLogger(LoggerInterface $logger): static;
2536

2637
public function withGroup(string $group): static;

packages/cache/src/TaggedCachePool.php

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -89,54 +89,23 @@ public function save(CacheItemInterface $item): bool
8989
return true;
9090
}
9191

92-
/** @psalm-param callable(CacheItem): mixed $handler */
93-
public function fetch(
94-
string $key,
95-
callable $handler,
96-
DateInterval|int|null $ttl = null,
97-
float $beta = 1.0,
98-
bool $lock = true,
99-
): mixed {
100-
$locked = $lock && CacheLock::lock($key, $isNew, $this->logger);
101-
102-
try {
103-
$item = $this->getItem($key);
104-
105-
if ($locked && !$isNew) {
106-
return $item->get();
107-
}
108-
109-
$isHit = $item->isHit() && $this->isItemTagsValid($key);
110-
111-
if ($isHit && !$this->shouldRecomputeEarly($item, $beta)) {
112-
return $item->get();
113-
}
114-
115-
$item->expiresAfter($ttl);
116-
117-
$start = microtime(true);
118-
$data = $handler($item);
119-
$ctime = max(1, (int) round(max(0.0, microtime(true) - $start) * 1000));
120-
121-
if (!$data instanceof CacheItemInterface) {
122-
$item->set($data);
123-
} else {
124-
$item = $data;
125-
$data = $item->get();
126-
}
127-
128-
if ($item instanceof CacheItem) {
129-
$item->setCtime($ctime);
130-
}
92+
/**
93+
* Tag-aware variant of set(): persists $value associated with the given $tags.
94+
*
95+
* @param string[] $tags
96+
*/
97+
public function set($key, $value, $ttl = null, array $tags = []): bool
98+
{
99+
$item = $this->getItem($key);
131100

132-
$this->save($item);
101+
$item->expiresAfter($ttl ?? $this->getDefaultTtl());
102+
$item->set($value);
133103

134-
return $data;
135-
} finally {
136-
if ($locked) {
137-
CacheLock::release($key, $this->logger);
138-
}
104+
if ($item instanceof CacheItem && $tags !== []) {
105+
$item->tags(...$tags);
139106
}
107+
108+
return $this->save($item);
140109
}
141110

142111
public function invalidateTags(string ...$tags): bool

0 commit comments

Comments
 (0)