@@ -306,6 +306,13 @@ class CacheSender::Impl
306306 std::scoped_lock lkResp (mSenderMutex );
307307 mReadyResponses .emplace (
308308 llmRequest.mRequestId , Response{std::addressof (llmRequest), std::move (promise)});
309+ // if the request is already in the pending queue, submit a send request to ready queue
310+ auto it = mPendingRequests .find (llmRequest.mRequestId );
311+ if (it != mPendingRequests .end ())
312+ {
313+ mReadyPendingRequests .push (std::move (it->second ));
314+ mPendingRequests .erase (it);
315+ }
309316 }
310317 std::unique_lock lkCond (mCondMutex );
311318 mAnyReady = true ;
@@ -353,6 +360,17 @@ class CacheSender::Impl
353360
354361 [[nodiscard]] RequestInfo recvRequestInfo ()
355362 {
363+ // if there is a pending request in the ready queue, respond to it first
364+ {
365+ std::scoped_lock lk (mSenderMutex );
366+ if (!mReadyPendingRequests .empty ())
367+ {
368+ auto info = std::move (mReadyPendingRequests .front ());
369+ mReadyPendingRequests .pop ();
370+ return info;
371+ }
372+ }
373+
356374 auto * agentConnectionManager = dynamic_cast <executor::kv_cache::AgentConnectionManager*>(mManager );
357375 bool isAgent = agentConnectionManager != nullptr ;
358376
@@ -619,14 +637,14 @@ class CacheSender::Impl
619637 {
620638 break ;
621639 }
640+ auto const & requestInfo = recvRequestInfo ();
641+ auto reqId = requestInfo.getRequestId ();
622642 if (!mReadyResponses .empty ())
623643 {
624- auto const & requestInfo = recvRequestInfo ();
625644 if (mTerminate || !mManager ->isRunning ())
626645 {
627646 return ;
628647 }
629- auto reqId = requestInfo.getRequestId ();
630648
631649 {
632650 std::scoped_lock lk (mSenderMutex );
@@ -638,26 +656,11 @@ class CacheSender::Impl
638656 mRemainSendCount [reqId] = getCounterpartsCount (reqId);
639657 }
640658 }
641- auto it = getCurrentResponse ( );
659+ auto it = getReadyResponse (requestInfo );
642660 if (it != mReadyResponses .end ())
643661 {
644662 sendResponse (it);
645663 }
646- else
647- {
648- auto it = getCurrentResponse ();
649- while (it == mReadyResponses .end ())
650- {
651- std::unique_lock lk (mCondMutex );
652- mSenderCv .wait (lk, [this ]() { return (mAnyReady || mTerminate ); });
653- if (mTerminate )
654- {
655- break ;
656- }
657- it = getCurrentResponse ();
658- }
659- sendResponse (it);
660- }
661664 }
662665 }
663666 catch (std::exception const & err)
@@ -692,6 +695,7 @@ class CacheSender::Impl
692695 {
693696 std::scoped_lock lkResp (mSenderMutex );
694697 mReadyResponses .erase (it);
698+ mPendingRequests .erase (it->first );
695699 }
696700 if (mReadyResponses .empty ())
697701 {
@@ -705,10 +709,29 @@ class CacheSender::Impl
705709 return mCurrentRequest .value ();
706710 }
707711
708- [[nodiscard]] std::map<RequestIdType, Response>::iterator getCurrentResponse ( )
712+ [[nodiscard]] std::map<RequestIdType, Response>::iterator getReadyResponse (RequestInfo const & requestInfo )
709713 {
710714 std::scoped_lock lk (mSenderMutex );
711- return mReadyResponses .find (getCurrentRequestId ());
715+ auto reqId = requestInfo.getRequestId ();
716+ auto it = mReadyResponses .find (reqId);
717+ if (it != mReadyResponses .end ())
718+ {
719+ return it;
720+ }
721+ else
722+ {
723+ // If a request is received but response is not ready, stash it in the pending map to send it later
724+ TLLM_LOG_INFO (" No ready response found for request %zu" , reqId);
725+ mPendingRequests [reqId] = requestInfo;
726+ }
727+ return mReadyResponses .end ();
728+ }
729+
730+ bool checkContextRequestReady (LlmRequest const & llmRequest)
731+ {
732+ std::scoped_lock lk (mSenderMutex );
733+ auto it = mPendingRequests .find (llmRequest.mRequestId );
734+ return it != mPendingRequests .end ();
712735 }
713736
714737private:
@@ -723,6 +746,8 @@ class CacheSender::Impl
723746 AsyncSendResource mAsyncSendResource ;
724747 std::vector<std::future<void >> mAsyncSendFutures ;
725748 int mDeviceId {-1 };
749+ std::unordered_map<LlmRequest::RequestIdType, RequestInfo> mPendingRequests ;
750+ std::queue<RequestInfo> mReadyPendingRequests ;
726751
727752 executor::kv_cache::ConnectionManager* mManager ;
728753 std::map<LlmRequest::RequestIdType, TransferSession> mRequestToSession ;
@@ -1189,6 +1214,11 @@ void CacheSender::sendReadySignal(LlmRequest::RequestIdType requestId, bool isRe
11891214 mImpl ->sendReadySignal (requestId, isReady);
11901215}
11911216
1217+ void CacheSender::checkContextRequestReady (LlmRequest const & llmRequest)
1218+ {
1219+ return mImpl ->checkContextRequestReady (llmRequest);
1220+ }
1221+
11921222CacheReceiver::CacheReceiver (executor::kv_cache::ConnectionManager* manager,
11931223 executor::kv_cache::CacheState selfCacheState, SizeType32 selfIndex, std::unique_ptr<BaseCacheFormatter> formatter)
11941224 : mImpl {std::unique_ptr<Impl, ImplDeleter>(new Impl (manager, selfCacheState, selfIndex, std::move (formatter)))}
0 commit comments