@@ -280,6 +280,8 @@ std::pair<String, Int32> StreamingStoreSourceMultiplexer::getStreamShard() const
280
280
return stream_shard->getStreamShard ();
281
281
}
282
282
283
+ std::atomic<uint32_t > StreamingStoreSourceMultiplexers::multiplexer_id = 0 ;
284
+
283
285
StreamingStoreSourceMultiplexers::StreamingStoreSourceMultiplexers (ContextPtr global_context_, Poco::Logger * log_)
284
286
: global_context(std::move(global_context_)), log(log_)
285
287
{
@@ -310,7 +312,8 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
310
312
{
311
313
multiplexers.emplace (
312
314
shard,
313
- StreamingStoreSourceMultiplexerPtrs{std::make_shared<StreamingStoreSourceMultiplexer>(0 , stream_shard, global_context, log )});
315
+ StreamingStoreSourceMultiplexerPtrs{
316
+ std::make_shared<StreamingStoreSourceMultiplexer>(getMultiplexerID (), stream_shard, global_context, log )});
314
317
iter = multiplexers.find (shard);
315
318
}
316
319
@@ -342,7 +345,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
342
345
// / If min channels is greater than > 20(default value), create another multiplexer for this shard
343
346
if (min_channels > global_context->getSettingsRef ().max_channels_per_resource_group .value )
344
347
{
345
- best_multiplexer = std::make_shared<StreamingStoreSourceMultiplexer>(iter-> second . size (), stream_shard, global_context, log );
348
+ best_multiplexer = std::make_shared<StreamingStoreSourceMultiplexer>(getMultiplexerID (), stream_shard, global_context, log );
346
349
iter->second .push_back (best_multiplexer);
347
350
}
348
351
@@ -351,7 +354,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
351
354
else
352
355
{
353
356
// / All multiplexers are shutdown
354
- auto multiplexer{std::make_shared<StreamingStoreSourceMultiplexer>(iter-> second . size (), stream_shard, global_context, log )};
357
+ auto multiplexer{std::make_shared<StreamingStoreSourceMultiplexer>(getMultiplexerID (), stream_shard, global_context, log )};
355
358
iter->second .push_back (multiplexer);
356
359
return multiplexer->createChannel (column_names, storage_snapshot, query_context);
357
360
}
@@ -366,7 +369,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndepende
366
369
// / will startup after `StreamingStoreSourceChannel::recover()` and reset recovered sn
367
370
// / The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one
368
371
auto multiplexer = std::make_shared<StreamingStoreSourceMultiplexer>(
369
- 0 , std::move (stream_shard), global_context, log , [this ](auto multiplexer_) { attachToSharedGroup (multiplexer_); });
372
+ getMultiplexerID () , std::move (stream_shard), global_context, log , [this ](auto multiplexer_) { attachToSharedGroup (multiplexer_); });
370
373
return multiplexer->createChannel (column_names, storage_snapshot, query_context);
371
374
}
372
375
@@ -379,7 +382,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndepende
379
382
{
380
383
// / The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one
381
384
auto multiplexer = std::make_shared<StreamingStoreSourceMultiplexer>(
382
- 0 , std::move (stream_shard), global_context, log , [this ](auto multiplexer_) { attachToSharedGroup (multiplexer_); });
385
+ getMultiplexerID () , std::move (stream_shard), global_context, log , [this ](auto multiplexer_) { attachToSharedGroup (multiplexer_); });
383
386
auto channel = multiplexer->createChannel (column_names, storage_snapshot, query_context);
384
387
multiplexer->resetSequenceNumber (start_sn);
385
388
multiplexer->startup ();
@@ -394,8 +397,17 @@ void StreamingStoreSourceMultiplexers::attachToSharedGroup(StreamingStoreSourceM
394
397
detached_multiplexers.clear ();
395
398
396
399
auto & multiplexer_list = multiplexers[multiplexer->stream_shard ->getShard ()];
397
- for (auto & shared_multiplexer : multiplexer_list)
400
+ for (auto it = multiplexer_list. begin (); it != multiplexer_list. end (); )
398
401
{
402
+ if ((*it)->isShutdown ())
403
+ {
404
+ it = multiplexer_list.erase (it);
405
+ continue ;
406
+ }
407
+
408
+ auto & shared_multiplexer = *it;
409
+ ++it;
410
+
399
411
// / Skip multiplexer that already have too many channels
400
412
if (shared_multiplexer->totalChannels () > global_context->getSettingsRef ().max_channels_per_resource_group .value )
401
413
continue ;
@@ -410,7 +422,6 @@ void StreamingStoreSourceMultiplexers::attachToSharedGroup(StreamingStoreSourceM
410
422
}
411
423
412
424
// / Not detach channels into any existed shared multiplexer, so we reuse it and join in shared groups
413
- multiplexer->id = multiplexer_list.size (); // / it's thread safe
414
425
multiplexer_list.emplace_back (std::move (multiplexer));
415
426
}
416
427
}
0 commit comments