6
6
7
7
import abc
8
8
import asyncio
9
+ import logging
9
10
import os
10
- from asyncio import Task
11
+ from asyncio import Condition , Task
11
12
from threading import Lock
12
13
from typing import (
13
14
Any ,
45
46
R = TypeVar ("R" )
46
47
T = TypeVar ("T" )
47
48
49
+ COH_LOG = logging .getLogger ("coherence" )
50
+
48
51
49
52
@no_type_check
50
53
def _pre_call_cache (func ):
@@ -58,6 +61,9 @@ async def inner_async(self, *args, **kwargs):
58
61
if not self .active :
59
62
raise Exception ("Cache [{}] has been {}." .format (self .name , "released" if self .released else "destroyed" ))
60
63
64
+ # noinspection PyProtectedMember
65
+ await self ._session ._wait_for_active ()
66
+
61
67
return await func (self , * args , ** kwargs )
62
68
63
69
if asyncio .iscoroutinefunction (func ):
@@ -499,6 +505,7 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer):
499
505
self ._internal_emitter : EventEmitter = EventEmitter ()
500
506
self ._destroyed : bool = False
501
507
self ._released : bool = False
508
+ self ._session : Session = session
502
509
from .event import _MapEventsManager
503
510
504
511
self ._setup_event_handlers ()
@@ -743,16 +750,18 @@ def _setup_event_handlers(self) -> None:
743
750
# noinspection PyProtectedMember
744
751
def on_destroyed (name : str ) -> None :
745
752
if name == cache_name :
746
- this ._events_manager ._close ()
747
- this ._destroyed = True
748
- emitter .emit (MapLifecycleEvent .DESTROYED .value , name )
753
+ if not this .destroyed :
754
+ this ._events_manager ._close ()
755
+ this ._destroyed = True
756
+ emitter .emit (MapLifecycleEvent .DESTROYED .value , name )
749
757
750
758
# noinspection PyProtectedMember
751
759
def on_released (name : str ) -> None :
752
760
if name == cache_name :
753
- this ._events_manager ._close ()
754
- this ._released = True
755
- emitter .emit (MapLifecycleEvent .RELEASED .value , name )
761
+ if not this .released :
762
+ this ._events_manager ._close ()
763
+ this ._released = True
764
+ emitter .emit (MapLifecycleEvent .RELEASED .value , name )
756
765
757
766
def on_truncated (name : str ) -> None :
758
767
if name == cache_name :
@@ -922,7 +931,7 @@ def __init__(
922
931
corresponding `ConfigurableCacheFactory` on the server.
923
932
:param request_timeout_seconds: Defines the request timeout, in `seconds`, that will be applied to each
924
933
remote call. If not explicitly set, this defaults to :func:`coherence.client.Options.DEFAULT_REQUEST_TIMEOUT`.
925
- See also See also :func:`coherence.client.Options.ENV_REQUEST_TIMEOUT`
934
+ See also :func:`coherence.client.Options.ENV_REQUEST_TIMEOUT`
926
935
:param ser_format: The serialization format. Currently, this is always `json`
927
936
:param channel_options: The `gRPC` `ChannelOptions`. See
928
937
https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments and
@@ -941,7 +950,8 @@ def __init__(
941
950
try :
942
951
time_out = float (timeout )
943
952
except ValueError :
944
- print (f"The value of { Options .ENV_REQUEST_TIMEOUT } cannot be converted to a float" )
953
+ COH_LOG .warning ("The timeout value of [%s] cannot be converted to a float" , Options .ENV_REQUEST_TIMEOUT )
954
+
945
955
self ._request_timeout_seconds = time_out
946
956
else :
947
957
self ._request_timeout_seconds = request_timeout_seconds
@@ -1080,6 +1090,8 @@ def __init__(self, session_options: Optional[Options] = None):
1080
1090
:param session_options: the provided :func:`coherence.client.Options`
1081
1091
"""
1082
1092
self ._closed : bool = False
1093
+ self ._active = False
1094
+ self ._active_condition : Condition = Condition ()
1083
1095
self ._caches : dict [str , NamedCache [Any , Any ]] = dict ()
1084
1096
self ._lock : Lock = Lock ()
1085
1097
if session_options is not None :
@@ -1121,6 +1133,13 @@ def __init__(self, session_options: Optional[Options] = None):
1121
1133
watch_task : Task [None ] = asyncio .create_task (watch_channel_state (self ))
1122
1134
self ._tasks .add (watch_task )
1123
1135
self ._emitter : EventEmitter = EventEmitter ()
1136
+ self ._channel .get_state (True ) # trigger connect
1137
+
1138
+ @staticmethod
1139
+ async def create (session_options : Optional [Options ] = None ) -> Session :
1140
+ session : Session = Session (session_options )
1141
+ await session ._set_active (False )
1142
+ return session
1124
1143
1125
1144
# noinspection PyTypeHints
1126
1145
@_pre_call_session
@@ -1236,6 +1255,30 @@ async def get_map(self, name: str, ser_format: str = DEFAULT_FORMAT) -> "NamedMa
1236
1255
self ._caches .update ({name : c })
1237
1256
return c
1238
1257
1258
+ def is_active (self ) -> bool :
1259
+ """
1260
+ Returns
1261
+ :return:
1262
+ """
1263
+ return self ._active
1264
+
1265
+ async def _set_active (self , active : bool ) -> None :
1266
+ self ._active = active
1267
+ if self ._active :
1268
+ if not self ._active_condition .locked ():
1269
+ await self ._active_condition .acquire ()
1270
+ self ._active_condition .notify_all ()
1271
+ self ._active_condition .release ()
1272
+ else :
1273
+ await self ._active_condition .acquire ()
1274
+
1275
+ async def _wait_for_active (self ) -> None :
1276
+ if not self .is_active ():
1277
+ timeout : float = self ._session_options .request_timeout_seconds
1278
+ COH_LOG .debug ("Waiting for session to become active; timeout=[%s seconds]" , timeout )
1279
+ async with asyncio .timeout (timeout ):
1280
+ await self ._active_condition .wait ()
1281
+
1239
1282
# noinspection PyUnresolvedReferences
1240
1283
async def close (self ) -> None :
1241
1284
"""
@@ -1248,6 +1291,10 @@ async def close(self) -> None:
1248
1291
task .cancel ()
1249
1292
self ._tasks .clear ()
1250
1293
1294
+ caches_copy : dict [str , NamedCache [Any , Any ]] = self ._caches .copy ()
1295
+ for cache in caches_copy .values ():
1296
+ await cache .destroy ()
1297
+
1251
1298
await self ._channel .close () # TODO: consider grace period?
1252
1299
1253
1300
def _setup_event_handlers (self , client : NamedCacheClient [K , V ]) -> None :
@@ -1342,28 +1389,37 @@ async def watch_channel_state(session: Session) -> None:
1342
1389
emitter : EventEmitter = session ._emitter
1343
1390
channel : grpc .aio .Channel = session .channel
1344
1391
first_connect : bool = True
1345
- connected : bool = False
1392
+ last_state : grpc . ChannelConnectivity = grpc . ChannelConnectivity . IDLE
1346
1393
1347
1394
try :
1348
1395
while True :
1349
1396
state : grpc .ChannelConnectivity = channel .get_state (False )
1397
+ COH_LOG .debug ("New Channel State: transitioning from [%s] to [%s]" , last_state , state )
1350
1398
match state :
1351
1399
case grpc .ChannelConnectivity .SHUTDOWN :
1352
- continue # nothing to do
1400
+ COH_LOG .info ("Session to [%s] terminated" , session .options .address )
1401
+ await session ._set_active (False )
1353
1402
case grpc .ChannelConnectivity .READY :
1354
- if not first_connect and not connected :
1403
+ if not first_connect and not session .is_active ():
1404
+ COH_LOG .info ("Session re-established to [%s]" , session .options .address )
1405
+
1355
1406
await emitter .emit_async (SessionLifecycleEvent .RECONNECTED .value )
1356
- connected = True
1357
- elif first_connect and not connected :
1407
+ await session ._set_active (True )
1408
+ elif first_connect and not session .is_active ():
1409
+ COH_LOG .info ("Session established to [%s]" , session .options .address )
1410
+
1358
1411
first_connect = False
1359
- connected = True
1360
1412
await emitter .emit_async (SessionLifecycleEvent .CONNECTED .value )
1413
+ await session ._set_active (True )
1361
1414
case _:
1362
- if connected :
1415
+ if session .is_active ():
1416
+ COH_LOG .warning ("Session to [%s] disconnected; will attempt reconnect" , session .options .address )
1417
+
1363
1418
await emitter .emit_async (SessionLifecycleEvent .DISCONNECTED .value )
1364
- connected = False
1419
+ await session . _set_active ( False )
1365
1420
1366
- await channel .wait_for_state_change (state )
1421
+ COH_LOG .debug ("Waiting for state change from [%s]" , state )
1422
+ await channel .wait_for_state_change (channel .get_state (True ))
1367
1423
except asyncio .CancelledError :
1368
1424
return
1369
1425
0 commit comments