@@ -76,6 +76,8 @@ type serviceCache struct {
76
76
exportNamespace * utils.SyncMap [string , * utils.SyncSet [string ]]
77
77
// exportServices 某个服务对部分命名空间全部可见 exportNamespace -> svcName -> model.Service
78
78
exportServices * utils.SyncMap [string , * utils.SyncMap [string , * model.Service ]]
79
+
80
+ subCtx * eventhub.SubscribtionContext
79
81
}
80
82
81
83
// NewServiceCache 返回一个serviceCache
@@ -98,14 +100,18 @@ func (sc *serviceCache) Initialize(opt map[string]interface{}) error {
98
100
sc .cl5Names = utils .NewSyncMap [string , * model.Service ]()
99
101
sc .pendingServices = utils .NewSyncMap [string , struct {}]()
100
102
sc .namespaceServiceCnt = utils .NewSyncMap [string , * model.NamespaceServiceCount ]()
101
- sc .revisionWorker = newRevisionWorker (sc , sc .instCache .(* instanceCache ), opt )
102
103
sc .exportNamespace = utils .NewSyncMap [string , * utils.SyncSet [string ]]()
103
104
sc .exportServices = utils .NewSyncMap [string , * utils.SyncMap [string , * model.Service ]]()
104
-
105
105
ctx , cancel := context .WithCancel (context .Background ())
106
106
sc .cancel = cancel
107
+ sc .revisionWorker = newRevisionWorker (sc , sc .instCache .(* instanceCache ), opt )
107
108
// 先启动revision计算协程
108
109
go sc .revisionWorker .revisionWorker (ctx )
110
+ subCtx , err := eventhub .SubscribeWithFunc (eventhub .CacheNamespaceEventTopic , sc .handleNamespaceChange )
111
+ if err != nil {
112
+ return err
113
+ }
114
+ sc .subCtx = subCtx
109
115
if opt == nil {
110
116
return nil
111
117
}
@@ -119,6 +125,9 @@ func (sc *serviceCache) Close() error {
119
125
if err := sc .BaseCache .Close (); err != nil {
120
126
return err
121
127
}
128
+ if sc .subCtx != nil {
129
+ sc .subCtx .Cancel ()
130
+ }
122
131
if sc .cancel != nil {
123
132
sc .cancel ()
124
133
}
@@ -413,27 +422,30 @@ func (sc *serviceCache) setServices(services map[string]*model.Service) (map[str
413
422
if service .IsAlias () {
414
423
aliases = append (aliases , service )
415
424
}
425
+ oldVal , exist := sc .ids .Load (service .ID )
426
+ if oldVal != nil {
427
+ service .OldExportTo = oldVal .ExportTo
428
+ }
416
429
417
430
spaceName := service .Namespace
418
431
changeNs [spaceName ] = struct {}{}
419
432
// 发现有删除操作
420
433
if ! service .Valid {
421
434
sc .removeServices (service )
422
- sc .revisionWorker . Notify (service .ID , false )
435
+ sc .notifyRevisionWorker (service .ID , false )
423
436
del ++
424
437
svcCount --
425
438
continue
426
439
}
427
440
428
441
update ++
429
- _ , exist := sc .ids .Load (service .ID )
430
442
if ! exist {
431
443
svcCount ++
432
444
}
433
445
434
446
sc .ids .Store (service .ID , service )
435
447
sc .serviceList .addService (service )
436
- sc .revisionWorker . Notify (service .ID , true )
448
+ sc .notifyRevisionWorker (service .ID , true )
437
449
438
450
spaces , ok := sc .names .Load (spaceName )
439
451
if ! ok {
@@ -455,6 +467,7 @@ func (sc *serviceCache) setServices(services map[string]*model.Service) (map[str
455
467
456
468
sc .postProcessServiceAlias (aliases )
457
469
sc .postProcessUpdatedServices (changeNs )
470
+ sc .postProcessServiceExports (services )
458
471
sc .serviceList .reloadRevision ()
459
472
return map [string ]time.Time {
460
473
sc .Name (): time .Unix (lastMtime , 0 ),
@@ -633,7 +646,7 @@ func (sc *serviceCache) GetVisibleServicesInOtherNamespace(svcName, namespace st
633
646
return visibleServices
634
647
}
635
648
636
- func (sc * serviceCache ) postProcessServiceExports (services [ ]* model.Service ) {
649
+ func (sc * serviceCache ) postProcessServiceExports (services map [ string ]* model.Service ) {
637
650
638
651
for i := range services {
639
652
svc := services [i ]
@@ -680,6 +693,14 @@ func (sc *serviceCache) handleNamespaceChange(ctx context.Context, args interfac
680
693
return nil
681
694
}
682
695
696
+ func (sc * serviceCache ) notifyRevisionWorker (serviceID string , valid bool ) {
697
+ revisionWorker := sc .revisionWorker
698
+ if revisionWorker == nil {
699
+ return
700
+ }
701
+ revisionWorker .Notify (serviceID , valid )
702
+ }
703
+
683
704
// GetRevisionWorker
684
705
func (sc * serviceCache ) GetRevisionWorker () types.ServiceRevisionWorker {
685
706
return sc .revisionWorker
0 commit comments