@@ -21,6 +21,7 @@ const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max
2121const  REVALIDATE_TIMEOUT  =  number . parseInt ( env . getConf ( 'y-socket-io-server-revalidate-timeout' )  ||  '60000' ) 
2222const  WORKER_DISABLED  =  env . getConf ( 'y-worker-disabled' )  ===  'true' 
2323const  DEFAULT_CLEAR_TIMEOUT  =  number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' )  ||  '30000' ) 
24+ const  WORKER_HEALTH_CHECK_INTERVAL  =  number . parseInt ( env . getConf ( 'y-socket-io-worker-health-check-interval' )  ||  '5000' ) 
2425
2526process . on ( 'SIGINT' ,  function  ( )  { 
2627  // calling .shutdown allows your process to exit normally 
@@ -146,6 +147,26 @@ export class YSocketIO {
146147   * @readonly  
147148   */ 
148149  awaitingCleanupNamespace  =  new  Map ( ) 
150+   /** 
151+    * @type  {boolean } 
152+    * @private  
153+    */ 
154+   workerReady  =  false 
155+   /** 
156+    * @type  {number | null } 
157+    * @private  
158+    */ 
159+   workerLastHeartbeat  =  null 
160+   /** 
161+    * @type  {{ promise: Promise<boolean>, resolve: (result: boolean) => void } | null } 
162+    * @private  
163+    */ 
164+   workerHeartbeatContext  =  null 
165+   /** 
166+    * @type  {NodeJS.Timeout | null } 
167+    * @private  
168+    */ 
169+   persistWorkerHealthCheckTimeout  =  null 
149170
150171  /** 
151172   * YSocketIO constructor. 
@@ -169,20 +190,22 @@ export class YSocketIO {
169190   * 
170191   *  It also starts socket connection listeners. 
171192   * @param  {import('../storage.js').AbstractStorage } store 
172-    * @param  {{ redisPrefix?: string, redisUrl?: string, persistWorker?:  import('worker_threads').Worker }= } opts 
193+    * @param  {{ redisPrefix?: string, redisUrl?: string, getPersistWorker?: () =>  import('worker_threads').Worker }= } opts 
173194   * @public  
174195   */ 
175-   async  initialize  ( store ,  {  redisUrl,  redisPrefix =  'y' ,  persistWorker  }  =  { } )  { 
196+   async  initialize  ( store ,  {  redisUrl,  redisPrefix =  'y' ,  getPersistWorker  }  =  { } )  { 
176197    const  {  enableAwareness }  =  this . configuration 
177198    const  [ client ,  subscriber ]  =  await  promise . all ( [ 
178199      api . createApiClient ( store ,  {  redisUrl,  redisPrefix,  enableAwareness } ) , 
179200      createSubscriber ( store ,  {  redisUrl,  redisPrefix,  enableAwareness } ) 
180201    ] ) 
181202    this . client  =  client 
182203    this . subscriber  =  subscriber 
183-     if  ( persistWorker )  { 
184-       this . client . persistWorker  =  persistWorker 
204+     if  ( getPersistWorker )  { 
205+       this . getPersistWorker  =  getPersistWorker 
206+       this . persistWorker  =  getPersistWorker ( ) 
185207      this . registerPersistWorkerResolve ( ) 
208+       this . registerPersistWorkerHealthCheck ( ) 
186209    } 
187210
188211    this . nsp  =  this . io . of ( / ^ \/ y j s \| .* $ / ) 
@@ -518,16 +541,15 @@ export class YSocketIO {
518541          const  doc  =  this . namespaceDocMap . get ( namespace ) ?. ydoc 
519542          logSocketIO ( `trying to persist ${ namespace }  ) 
520543          if  ( ! doc )  return 
521-           if  ( this . client . persistWorker )  { 
544+           if  ( this . persistWorker   &&   this . workerReady )  { 
522545            /** @type  {ReturnType<typeof promiseWithResolvers<void>> } */ 
523546            const  {  promise,  resolve }  =  promiseWithResolvers ( ) 
524-             assert ( this . client ?. persistWorker ) 
525547            this . awaitingPersistMap . set ( namespace ,  {  promise,  resolve } ) 
526548
527549            const  docState  =  Y . encodeStateAsUpdateV2 ( doc ) 
528550            const  buf  =  new  Uint8Array ( new  SharedArrayBuffer ( docState . length ) ) 
529551            buf . set ( docState ) 
530-             this . client . persistWorker . postMessage ( { 
552+             this . persistWorker . postMessage ( { 
531553              room : namespace , 
532554              docstate : buf 
533555            } ) 
@@ -627,6 +649,9 @@ export class YSocketIO {
627649
628650  destroy  ( )  { 
629651    try  { 
652+       if  ( this . persistWorkerHealthCheckTimeout )  { 
653+         clearInterval ( this . persistWorkerHealthCheckTimeout ) 
654+       } 
630655      this . subscriber ?. destroy ( ) 
631656      return  this . client ?. destroy ( ) 
632657    }  catch  ( e )  { 
@@ -635,9 +660,13 @@ export class YSocketIO {
635660  } 
636661
637662  registerPersistWorkerResolve  ( )  { 
638-     if  ( ! this . client ?. persistWorker )  return 
639-     this . client . persistWorker . on ( 'message' ,  ( {  event,  room } )  =>  { 
663+     if  ( ! this . persistWorker )  return 
664+     this . persistWorker . on ( 'message' ,  ( {  event,  room } )  =>  { 
640665      if  ( event  ===  'persisted' )  this . awaitingPersistMap . get ( room ) ?. resolve ( ) 
666+       if  ( event  ===  'pong'  &&  this . workerHeartbeatContext )  { 
667+         this . workerHeartbeatContext . resolve ( true ) 
668+       } 
669+       this . workerReady  =  true 
641670    } ) 
642671  } 
643672
@@ -677,4 +706,58 @@ export class YSocketIO {
677706    this . namespaceDocMap . delete ( namespace ) 
678707    this . namespacePersistentMap . delete ( namespace ) 
679708  } 
709+ 
710+   async  waitUntilWorkerReady  ( )  { 
711+     if  ( ! this . persistWorker  ||  this . workerReady )  return 
712+     /** @type  {ReturnType<typeof promiseWithResolvers<void>> } */ 
713+     const  {  promise,  resolve }  =  promiseWithResolvers ( ) 
714+     const  timer  =  setInterval ( ( )  =>  { 
715+       if  ( ! this . workerReady )  return 
716+       clearInterval ( timer ) 
717+       resolve ( ) 
718+     } ,  100 ) 
719+     await  promise 
720+   } 
721+ 
722+   registerPersistWorkerHealthCheck  ( )  { 
723+     this . persistWorkerHealthCheckTimeout  =  setTimeout ( async  ( )  =>  { 
724+       const  workerHealth  =  await  this . workerHealthCheck ( ) 
725+       if  ( ! workerHealth )  { 
726+         logSocketIO ( 'worker thread is unhealthy, recreating' ) 
727+         assert ( this . getPersistWorker ) 
728+         this . workerReady  =  false 
729+         await  this . persistWorker ?. removeAllListeners ( ) . terminate ( ) 
730+         this . persistWorker  =  this . getPersistWorker ( ) 
731+         this . registerPersistWorkerResolve ( ) 
732+         await  this . waitUntilWorkerReady ( ) 
733+       } 
734+       this . registerPersistWorkerHealthCheck ( ) 
735+     } ,  WORKER_HEALTH_CHECK_INTERVAL ) 
736+   } 
737+ 
738+   async  workerHealthCheck  ( )  { 
739+     if  ( ! this . persistWorker  ||  this . workerHeartbeatContext )  return  null 
740+     if  ( 
741+       this . workerLastHeartbeat  && 
742+       Date . now ( )  -  this . workerLastHeartbeat  <  WORKER_HEALTH_CHECK_INTERVAL  *  2 
743+     )  { 
744+       return  true 
745+     } 
746+ 
747+     /** @type  {ReturnType<typeof promiseWithResolvers<boolean>> } */ 
748+     const  {  promise : heartbeatPromise ,  resolve }  =  promiseWithResolvers ( ) 
749+     this . workerHeartbeatContext  =  {  promise : heartbeatPromise ,  resolve } 
750+     const  now  =  performance . now ( ) 
751+     this . persistWorker . postMessage ( {  event : 'ping'  } ) 
752+     const  health  =  await  Promise . race ( [ 
753+       heartbeatPromise , 
754+       promise . wait ( 3000 ) . then ( ( )  =>  false ) 
755+     ] ) 
756+     this . workerHeartbeatContext  =  null 
757+     if  ( health )  { 
758+       logSocketIO ( `worker health check: responded in ${ performance . now ( )  -  now }  ) 
759+       this . workerLastHeartbeat  =  Date . now ( ) 
760+     } 
761+     return  health 
762+   } 
680763} 
0 commit comments