@@ -17,9 +17,12 @@ import chat.rocket.core.model.Room
17
17
import com.squareup.moshi.JsonAdapter
18
18
import kotlinx.coroutines.CoroutineScope
19
19
import kotlinx.coroutines.Dispatchers
20
+ import kotlinx.coroutines.ExperimentalCoroutinesApi
20
21
import kotlinx.coroutines.Job
22
+ import kotlinx.coroutines.ObsoleteCoroutinesApi
21
23
import kotlinx.coroutines.channels.Channel
22
24
import kotlinx.coroutines.channels.SendChannel
25
+ import kotlinx.coroutines.coroutineScope
23
26
import kotlinx.coroutines.delay
24
27
import kotlinx.coroutines.isActive
25
28
import kotlinx.coroutines.launch
@@ -35,6 +38,7 @@ import kotlin.coroutines.CoroutineContext
35
38
36
39
const val PING_INTERVAL = 15000L
37
40
41
+ @ObsoleteCoroutinesApi
38
42
class Socket (
39
43
internal val client : RocketChatClient ,
40
44
internal val roomsChannel : SendChannel <StreamMessage <Room >>,
@@ -71,6 +75,7 @@ class Socket(
71
75
72
76
internal val subscriptionsMap = HashMap <String , (Boolean , String ) - > Unit > ()
73
77
78
+ @ObsoleteCoroutinesApi
74
79
private val connectionContext = newSingleThreadContext(" connection-context" )
75
80
private val reconnectionStrategy = ReconnectionStrategy ()
76
81
private var reconnectJob: Job ? = null
@@ -84,6 +89,7 @@ class Socket(
84
89
socketMessageAdapter = moshi.adapter(SocketMessage ::class .java)
85
90
}
86
91
92
+ @ObsoleteCoroutinesApi
87
93
internal fun connect (resetCounter : Boolean = false) {
88
94
selfDisconnect = false
89
95
// reset id counter
@@ -101,6 +107,7 @@ class Socket(
101
107
socket = httpClient.newWebSocket(request, this )
102
108
}
103
109
110
+ @ObsoleteCoroutinesApi
104
111
internal fun disconnect () {
105
112
when (currentState) {
106
113
State .Disconnected () -> return
@@ -116,6 +123,7 @@ class Socket(
116
123
}
117
124
}
118
125
126
+ @ObsoleteCoroutinesApi
119
127
private fun startReconnection () {
120
128
// Ignore self disconnection
121
129
if (selfDisconnect) {
@@ -142,10 +150,11 @@ class Socket(
142
150
}
143
151
}
144
152
153
+ @ObsoleteCoroutinesApi
145
154
private suspend fun delayReconnection (reconnectInterval : Int ) {
146
155
val seconds = reconnectInterval / 1000
147
156
withContext(connectionContext) {
148
- for (second in 0 .. (seconds - 1 ) ) {
157
+ for (second in 0 until seconds ) {
149
158
if (! coroutineContext.isActive) {
150
159
logger.debug { " Reconnect job inactive, ignoring" }
151
160
return @withContext
@@ -158,6 +167,7 @@ class Socket(
158
167
}
159
168
}
160
169
170
+ @ExperimentalCoroutinesApi
161
171
private fun processIncomingMessage (text : String ) {
162
172
messagesProcessed++
163
173
logger.debug {
@@ -175,7 +185,7 @@ class Socket(
175
185
return
176
186
}
177
187
178
- reschedulePing(message.type )
188
+ reschedulePing()
179
189
180
190
when (currentState) {
181
191
is State .Connecting -> {
@@ -193,6 +203,7 @@ class Socket(
193
203
}
194
204
}
195
205
206
+ @ObsoleteCoroutinesApi
196
207
private fun processConnectionMessage (message : SocketMessage ) {
197
208
when (message.type) {
198
209
MessageType .CONNECTED -> {
@@ -205,6 +216,8 @@ class Socket(
205
216
}
206
217
}
207
218
219
+ @ObsoleteCoroutinesApi
220
+ @ExperimentalCoroutinesApi
208
221
private fun processAuthenticationResponse (message : SocketMessage , text : String ) {
209
222
when (message.type) {
210
223
MessageType .ADDED , MessageType .UPDATED -> {
@@ -224,6 +237,7 @@ class Socket(
224
237
}
225
238
}
226
239
240
+ @ExperimentalCoroutinesApi
227
241
private fun processMessage (message : SocketMessage , text : String ) {
228
242
when (message.type) {
229
243
MessageType .PING -> {
@@ -245,7 +259,7 @@ class Socket(
245
259
socket?.send(message)
246
260
}
247
261
248
- private fun reschedulePing (type : MessageType ) {
262
+ private fun reschedulePing () {
249
263
logger.debug { " Rescheduling ping in $PING_INTERVAL milliseconds" }
250
264
251
265
timeoutJob?.cancel()
@@ -266,20 +280,23 @@ class Socket(
266
280
private suspend fun schedulePingTimeout () {
267
281
val timeout = (PING_INTERVAL * 1.5 ).toLong()
268
282
logger.debug { " Scheduling ping timeout in $timeout milliseconds" }
269
- timeoutJob = launch(parentJob) {
270
- delay(timeout)
271
- if (! isActive) return @launch
272
- when (currentState) {
273
- is State .Disconnected ,
274
- is State .Disconnecting -> logger.warn { " Pong not received, but already disconnected" }
275
- else -> {
276
- logger.warn { " Pong not received" }
277
- socket?.cancel()
283
+ timeoutJob = coroutineScope {
284
+ launch(parentJob) {
285
+ delay(timeout)
286
+ if (! isActive) return @launch
287
+ when (currentState) {
288
+ is State .Disconnected ,
289
+ is State .Disconnecting -> logger.warn { " Pong not received, but already disconnected" }
290
+ else -> {
291
+ logger.warn { " Pong not received" }
292
+ socket?.cancel()
293
+ }
278
294
}
279
295
}
280
296
}
281
297
}
282
298
299
+ @ObsoleteCoroutinesApi
283
300
internal fun setState (newState : State ) {
284
301
if (newState != currentState) {
285
302
logger.debug { " Setting state to: $newState - oldState: $currentState , channels: ${statusChannelList.size} " }
@@ -288,6 +305,7 @@ class Socket(
288
305
}
289
306
}
290
307
308
+ @ObsoleteCoroutinesApi
291
309
private fun sendState (state : State ) {
292
310
launch(connectionContext) {
293
311
for (channel in statusChannelList) {
@@ -306,6 +324,7 @@ class Socket(
306
324
parentJob.cancel()
307
325
}
308
326
327
+ @ExperimentalCoroutinesApi
309
328
override fun onOpen (webSocket : WebSocket , response : Response ? ) {
310
329
readJob = launch {
311
330
for (message in processingChannel!! ) {
@@ -316,6 +335,7 @@ class Socket(
316
335
send(CONNECT_MESSAGE )
317
336
}
318
337
338
+ @ObsoleteCoroutinesApi
319
339
override fun onFailure (webSocket : WebSocket , throwable : Throwable ? , response : Response ? ) {
320
340
logger.warn { " Socket.onFailure(). THROWABLE MESSAGE: ${throwable?.message} - RESPONSE MESSAGE: ${response?.message()} " }
321
341
throwable?.printStackTrace()
@@ -324,18 +344,21 @@ class Socket(
324
344
startReconnection()
325
345
}
326
346
347
+ @ObsoleteCoroutinesApi
327
348
override fun onClosing (webSocket : WebSocket , code : Int , reason : String? ) {
328
349
logger.warn { " Socket.onClosing() called. Received CODE = $code - Received REASON = $reason " }
329
350
setState(State .Disconnecting ())
330
351
startReconnection()
331
352
}
353
+ @ObsoleteCoroutinesApi
332
354
override fun onClosed (webSocket : WebSocket , code : Int , reason : String? ) {
333
355
logger.warn { " Socket.onClosed() called. Received CODE = $code - Received REASON = $reason " }
334
356
setState(State .Disconnected ())
335
357
close()
336
358
startReconnection()
337
359
}
338
360
361
+ @ExperimentalCoroutinesApi
339
362
override fun onMessage (webSocket : WebSocket , text : String? ) {
340
363
logger.warn { " Socket.onMessage(). Received TEXT = $text for processing channel = $processingChannel " }
341
364
text?.let {
@@ -357,6 +380,8 @@ class Socket(
357
380
}
358
381
}
359
382
383
+ @ObsoleteCoroutinesApi
360
384
fun RocketChatClient.connect (resetCounter : Boolean = false) = socket.connect(resetCounter)
361
385
386
+ @ObsoleteCoroutinesApi
362
387
fun RocketChatClient.disconnect () = socket.disconnect()
0 commit comments