Skip to content

Commit 914bafc

Browse files
committed
Fixed RPC loader so that it uses a singleton (more efficient with connections) and identified a bug where key checkes would always hist the hybrid master
1 parent d075d26 commit 914bafc

8 files changed

+77
-26
lines changed

api_definition_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ func (a *APIDefinitionLoader) LoadDefinitionsFromRPC(orgId string) *[]*APISpec {
522522

523523
apiCollection := store.GetApiDefinitions(orgId, tags)
524524

525-
store.Disconnect()
525+
//store.Disconnect()
526526

527527
if RPC_LoadCount > 0 {
528528
SaveRPCDefinitionsBackup(apiCollection)

api_loader.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -586,11 +586,6 @@ func loadApps(APISpecs *[]*APISpec, Muxer *mux.Router) {
586586
// Only create this once, add other types here as needed, seems wasteful but we can let the GC handle it
587587
redisStore, redisOrgStore, healthStore, rpcAuthStore, rpcOrgStore := prepareStorage()
588588

589-
if config.SlaveOptions.UseRPC {
590-
StartRPCKeepaliveWatcher(rpcAuthStore)
591-
StartRPCKeepaliveWatcher(rpcOrgStore)
592-
}
593-
594589
prepareSortOrder(APISpecs)
595590

596591
chainChannel := make(chan *ChainObject)
@@ -673,4 +668,10 @@ func loadApps(APISpecs *[]*APISpec, Muxer *mux.Router) {
673668
"prefix": "main",
674669
}).Info("Initialised API Definitions")
675670

671+
if config.SlaveOptions.UseRPC {
672+
//log.Warning("TODO: PUT THE KEEPALIVE WATCHER BACK")
673+
StartRPCKeepaliveWatcher(rpcAuthStore)
674+
StartRPCKeepaliveWatcher(rpcOrgStore)
675+
}
676+
676677
}

auth_manager.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package main
33
import (
44
"encoding/base64"
55
"encoding/json"
6-
"github.com/TykTechnologies/logrus"
7-
"github.com/nu7hatch/gouuid"
86
"strings"
97
"time"
8+
9+
"github.com/TykTechnologies/logrus"
10+
"github.com/nu7hatch/gouuid"
1011
)
1112

1213
// AuthorisationHandler is used to validate a session key,

handler_success.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package main
33
import (
44
"bytes"
55
b64 "encoding/base64"
6-
"github.com/gorilla/context"
7-
"github.com/pmylund/go-cache"
86
"io"
97
"net/http"
108
"runtime/pprof"
119
"strconv"
1210
"strings"
1311
"time"
12+
13+
"github.com/gorilla/context"
14+
"github.com/pmylund/go-cache"
1415
)
1516

1617
// ContextKey is a key type to avoid collisions
@@ -181,11 +182,12 @@ func (t TykMiddleware) CheckSessionAndIdentityForValidKey(key string) (SessionSt
181182
var thisSession SessionState
182183
var found bool
183184

185+
log.Debug("Querying local cache")
184186
// Check in-memory cache
185187
if !config.LocalSessionCache.DisableCacheSessionState {
186188
cachedVal, found := SessionCache.Get(key)
187189
if found {
188-
log.Debug("Key found in local cache")
190+
log.Debug("--> Key found in local cache")
189191
thisSession = cachedVal.(SessionState)
190192
t.ApplyPolicyIfExists(key, &thisSession)
191193
return thisSession, true
@@ -202,10 +204,11 @@ func (t TykMiddleware) CheckSessionAndIdentityForValidKey(key string) (SessionSt
202204

203205
// Check for a policy, if there is a policy, pull it and overwrite the session values
204206
t.ApplyPolicyIfExists(key, &thisSession)
205-
log.Debug("Got key")
207+
log.Debug("--> Got key")
206208
return thisSession, true
207209
}
208210

211+
log.Debug("Querying authstore")
209212
// 2. If not there, get it from the AuthorizationHandler
210213
thisSession, found = t.Spec.AuthManager.IsKeyAuthorised(key)
211214
if found {
@@ -217,6 +220,10 @@ func (t TykMiddleware) CheckSessionAndIdentityForValidKey(key string) (SessionSt
217220

218221
// Check for a policy, if there is a policy, pull it and overwrite the session values
219222
t.ApplyPolicyIfExists(key, &thisSession)
223+
224+
log.Debug("Lifetime is: ", GetLifetime(t.Spec, &thisSession))
225+
// Need to set this in order for the write to work!
226+
thisSession.LastUpdated = time.Now().String()
220227
t.Spec.SessionManager.UpdateSession(key, thisSession, GetLifetime(t.Spec, &thisSession))
221228
}
222229

main.go

+9
Original file line numberDiff line numberDiff line change
@@ -983,23 +983,32 @@ func getCmdArguments() map[string]interface{} {
983983
return arguments
984984
}
985985

986+
var KeepaliveRunning bool
987+
986988
func StartRPCKeepaliveWatcher(engine *RPCStorageHandler) {
989+
if KeepaliveRunning {
990+
return
991+
}
992+
987993
go func() {
988994
log.WithFields(logrus.Fields{
989995
"prefix": "RPC Conn Mgr",
990996
}).Info("[RPC Conn Mgr] Starting keepalive watcher...")
991997
for {
998+
KeepaliveRunning = true
992999
RPCKeepAliveCheck(engine)
9931000
if engine == nil {
9941001
log.WithFields(logrus.Fields{
9951002
"prefix": "RPC Conn Mgr",
9961003
}).Info("No engine, break")
1004+
KeepaliveRunning = false
9971005
break
9981006
}
9991007
if engine.Killed == true {
10001008
log.WithFields(logrus.Fields{
10011009
"prefix": "RPC Conn Mgr",
10021010
}).Debug("[RPC Conn Mgr] this connection killed")
1011+
KeepaliveRunning = false
10031012
break
10041013
}
10051014
}

policy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func LoadPoliciesFromRPC(orgId string) map[string]Policy {
196196

197197
rpcPolicies := store.GetPolicies(orgId)
198198

199-
store.Disconnect()
199+
//store.Disconnect()
200200

201201
jErr1 := json.Unmarshal([]byte(rpcPolicies), &dbPolicyList)
202202

rpc_analytics_purger.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package main
22

33
import (
44
"encoding/json"
5+
"time"
6+
57
"github.com/TykTechnologies/logrus"
68
"github.com/lonelycode/gorpc"
79
"gopkg.in/vmihailenco/msgpack.v2"
8-
"time"
910
)
1011

1112
// Purger is an interface that will define how the in-memory store will be purged
@@ -31,6 +32,17 @@ func (r *RPCPurger) ReConnect() {
3132

3233
// Connect Connects to RPC
3334
func (r *RPCPurger) Connect() {
35+
if RPCClientIsConnected {
36+
if RPCCLientSingleton != nil {
37+
if RPCFuncClientSingleton != nil {
38+
log.Info("RPC Analytics client using singleton")
39+
r.RPCClient = RPCCLientSingleton
40+
r.Client = RPCFuncClientSingleton
41+
return
42+
}
43+
}
44+
}
45+
3446
log.Info("Connecting to RPC Analytics service")
3547
r.RPCClient = gorpc.NewTCPClient(r.Address)
3648

rpc_storage_handler.go

+33-12
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ var RPCCLientRWMutex sync.RWMutex = sync.RWMutex{}
5858
var RPCClients = map[string]chan int{}
5959

6060
func ClearRPCClients() {
61+
return
6162
log.Info("Found: ", len(RPCClients), " RPC connections, terminating")
6263
for _, c := range RPCClients {
6364

@@ -74,12 +75,14 @@ func ClearRPCClients() {
7475

7576
func RPCKeepAliveCheck(r *RPCStorageHandler) {
7677
// Only run when connected
77-
if r.Connected {
78+
if RPCClientIsConnected && r.cache != nil {
7879
// Make sure the auth back end is still alive
7980
c1 := make(chan string, 1)
8081

8182
go func() {
83+
log.Debug("Getting keyspace check test key")
8284
r.GetKey("0000")
85+
log.Debug("--> done")
8386
c1 <- "1"
8487
close(c1)
8588
}()
@@ -145,31 +148,47 @@ func (r *RPCStorageHandler) checkDisconnect() {
145148

146149
func (r *RPCStorageHandler) ReConnect() {
147150
// Should only be used by reload checker
148-
r.Disconnect()
151+
// r.Disconnect()
149152
r.Connect()
150153
log.Info("Reconnected.")
151154
}
152155

156+
var RPCCLientSingleton *gorpc.Client
157+
var RPCFuncClientSingleton *gorpc.DispatcherClient
158+
var RPCGlobalCache = cache.New(30*time.Second, 15*time.Second)
159+
var RPCClientIsConnected bool
160+
153161
// Connect will establish a connection to the DB
154162
func (r *RPCStorageHandler) Connect() bool {
155-
// We don't want to constantly connect
156-
if r.Connected {
163+
164+
if RPCClientIsConnected {
165+
log.Debug("Using RPC singleton for connection")
166+
r.RPCClient = RPCCLientSingleton
167+
r.Client = RPCFuncClientSingleton
168+
r.cache = RPCGlobalCache
157169
return true
158170
}
159171

172+
// RPC Client is unset
160173
// Set up the cache
161-
r.cache = cache.New(30*time.Second, 15*time.Second)
162-
r.RPCClient = gorpc.NewTCPClient(r.Address)
174+
log.Info("Setting new RPC connection!")
175+
r.cache = RPCGlobalCache
176+
RPCCLientSingleton = gorpc.NewTCPClient(r.Address)
177+
r.RPCClient = RPCCLientSingleton
163178

164179
if log.Level != logrus.DebugLevel {
165180
gorpc.SetErrorLogger(gorpc.NilErrorLogger)
166181
}
167182

168183
r.RPCClient.OnConnect = r.OnConnectFunc
169-
r.RPCClient.Conns = 10
184+
r.RPCClient.Conns = 50
170185
r.RPCClient.Start()
171186
d := GetDispatcher()
172-
r.Client = d.NewFuncClient(r.RPCClient)
187+
188+
if RPCFuncClientSingleton == nil {
189+
RPCFuncClientSingleton = d.NewFuncClient(r.RPCClient)
190+
}
191+
r.Client = RPCFuncClientSingleton
173192
r.Login()
174193

175194
if !r.SuppressRegister {
@@ -181,14 +200,14 @@ func (r *RPCStorageHandler) Connect() bool {
181200
}
182201

183202
func (r *RPCStorageHandler) OnConnectFunc(remoteAddr string, rwc io.ReadWriteCloser) (io.ReadWriteCloser, error) {
184-
r.Connected = true
203+
RPCClientIsConnected = true
185204
return rwc, nil
186205
}
187206

188207
func (r *RPCStorageHandler) Disconnect() bool {
189-
if r.Connected {
208+
if RPCClientIsConnected {
190209
go r.RPCClient.Stop()
191-
r.Connected = false
210+
RPCClientIsConnected = false
192211
RPCCLientRWMutex.Lock()
193212
delete(RPCClients, r.ID)
194213
RPCCLientRWMutex.Unlock()
@@ -301,7 +320,9 @@ func (r *RPCStorageHandler) GetKey(keyName string) (string, error) {
301320

302321
// Check the cache first
303322
if config.SlaveOptions.EnableRPCCache {
323+
log.Debug("Using cache for: ", keyName)
304324
cachedVal, found := r.cache.Get(r.fixKey(keyName))
325+
log.Debug("--> Found? ", found)
305326
if found {
306327
elapsed := time.Since(start)
307328
log.Debug("GetKey took ", elapsed)
@@ -718,7 +739,7 @@ func (r *RPCStorageHandler) StartRPCLoopCheck(orgId string) {
718739
return
719740
}
720741

721-
log.Info("Starting keyspace poller")
742+
log.Info("[RPC] Starting keyspace poller")
722743

723744
for {
724745
r.CheckForKeyspaceChanges(orgId)

0 commit comments

Comments
 (0)