11import { ClusterAdapter , ClusterMessage , MessageType } from "./cluster-adapter" ;
22import { decode , encode } from "notepack.io" ;
3- import { hasBinary , parseNumSubResponse , sumValues } from "./util" ;
3+ import { hasBinary , PUBSUB , SPUBLISH , SSUBSCRIBE , SUNSUBSCRIBE } from "./util" ;
44import debugModule from "debug" ;
55
66const debug = debugModule ( "socket.io-redis" ) ;
77
8- const RETURN_BUFFERS = true ;
9-
108export interface ShardedRedisAdapterOptions {
119 /**
1210 * The prefix for the Redis Pub/Sub channels.
@@ -78,25 +76,21 @@ class ShardedRedisAdapter extends ClusterAdapter {
7876
7977 const handler = ( message , channel ) => this . onRawMessage ( message , channel ) ;
8078
81- this . subClient . sSubscribe ( this . channel , handler , RETURN_BUFFERS ) ;
82- this . subClient . sSubscribe ( this . responseChannel , handler , RETURN_BUFFERS ) ;
79+ SSUBSCRIBE ( this . subClient , this . channel , handler ) ;
80+ SSUBSCRIBE ( this . subClient , this . responseChannel , handler ) ;
8381
8482 if ( this . opts . subscriptionMode === "dynamic" ) {
8583 this . on ( "create-room" , ( room ) => {
8684 const isPublicRoom = ! this . sids . has ( room ) ;
8785 if ( isPublicRoom ) {
88- this . subClient . sSubscribe (
89- this . dynamicChannel ( room ) ,
90- handler ,
91- RETURN_BUFFERS
92- ) ;
86+ SSUBSCRIBE ( this . subClient , this . dynamicChannel ( room ) , handler ) ;
9387 }
9488 } ) ;
9589
9690 this . on ( "delete-room" , ( room ) => {
9791 const isPublicRoom = ! this . sids . has ( room ) ;
9892 if ( isPublicRoom ) {
99- this . subClient . sUnsubscribe ( this . dynamicChannel ( room ) ) ;
93+ SUNSUBSCRIBE ( this . subClient , this . dynamicChannel ( room ) ) ;
10094 }
10195 } ) ;
10296 }
@@ -114,13 +108,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
114108 } ) ;
115109 }
116110
117- return this . subClient . sUnsubscribe ( channels ) ;
111+ return SUNSUBSCRIBE ( this . subClient , channels ) ;
118112 }
119113
120114 override publishMessage ( message ) {
121115 const channel = this . computeChannel ( message ) ;
122116 debug ( "publishing message of type %s to %s" , message . type , channel ) ;
123- this . pubClient . sPublish ( channel , this . encode ( message ) ) ;
117+ SPUBLISH ( this . pubClient , channel , this . encode ( message ) ) ;
124118
125119 return Promise . resolve ( "" ) ;
126120 }
@@ -147,7 +141,8 @@ class ShardedRedisAdapter extends ClusterAdapter {
147141 override publishResponse ( requesterUid , response ) {
148142 debug ( "publishing response of type %s to %s" , response . type , requesterUid ) ;
149143
150- this . pubClient . sPublish (
144+ SPUBLISH (
145+ this . pubClient ,
151146 `${ this . channel } ${ requesterUid } #` ,
152147 this . encode ( response )
153148 ) ;
@@ -189,21 +184,6 @@ class ShardedRedisAdapter extends ClusterAdapter {
189184 }
190185
191186 override serverCount ( ) : Promise < number > {
192- if (
193- this . pubClient . constructor . name === "Cluster" ||
194- this . pubClient . isCluster
195- ) {
196- return Promise . all (
197- this . pubClient . nodes ( ) . map ( ( node ) => {
198- return node
199- . sendCommand ( [ "PUBSUB" , "SHARDNUMSUB" , this . channel ] )
200- . then ( parseNumSubResponse ) ;
201- } )
202- ) . then ( sumValues ) ;
203- } else {
204- return this . pubClient
205- . sendCommand ( [ "PUBSUB" , "SHARDNUMSUB" , this . channel ] )
206- . then ( parseNumSubResponse ) ;
207- }
187+ return PUBSUB ( this . pubClient , "SHARDNUMSUB" , this . channel ) ;
208188 }
209189}
0 commit comments