Skip to content

Commit 86d4bdb

Browse files
author
Johannes Duesing
committed
Removed old implementation of server, added a RequestHandler that implements business logic. New implementation is now working fine, but the tests for the RequestHandler still need to be implemented.
1 parent d5826d0 commit 86d4bdb

File tree

7 files changed

+282
-84
lines changed

7 files changed

+282
-84
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package de.upb.cs.swt.delphi.instanceregistry
22

3-
class Configuration( //Server configuration
4-
val bindHost: String = "0.0.0.0",
5-
val bindPort: Int = 8087,
6-
val recoveryFileName : String = "dump.temp",
7-
)
3+
class Configuration( ) {
4+
val bindHost: String = "0.0.0.0"
5+
val bindPort: Int = 8087
6+
val recoveryFileName : String = "dump.temp"
7+
}
8+
9+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package de.upb.cs.swt.delphi.instanceregistry
2+
3+
import akka.actor.ActorSystem
4+
import akka.stream.ActorMaterializer
5+
6+
import scala.concurrent.ExecutionContext
7+
8+
object Registry extends AppLogging{
9+
implicit val system : ActorSystem = ActorSystem("delphi-registry")
10+
implicit val materializer : ActorMaterializer = ActorMaterializer()
11+
implicit val ec : ExecutionContext = system.dispatcher
12+
13+
val configuration = new Configuration()
14+
val requestHandler = new RequestHandler(configuration)
15+
16+
def main(args: Array[String]): Unit = {
17+
requestHandler.initialize()
18+
log.info("Starting server ...")
19+
Server.startServer(configuration.bindHost, configuration.bindPort)
20+
log.info("Shutting down ...")
21+
requestHandler.shutdown()
22+
system.terminate()
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package de.upb.cs.swt.delphi.instanceregistry
2+
3+
import de.upb.cs.swt.delphi.instanceregistry.daos.{DynamicInstanceDAO, InstanceDAO}
4+
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.Instance
5+
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.ComponentType
6+
7+
import scala.util.{Failure, Success, Try}
8+
9+
class RequestHandler (configuration: Configuration) extends AppLogging {
10+
11+
implicit val system = Registry.system
12+
13+
private val instanceDao : InstanceDAO = new DynamicInstanceDAO(configuration)
14+
15+
def initialize() : Unit = {
16+
log.info("Initializing request handler...")
17+
instanceDao.initialize()
18+
//Add default ES instance
19+
registerNewInstance(Instance(None, "elasticsearch://localhost", 9200, "Default ElasticSearch Instance", ComponentType.ElasticSearch))
20+
log.info("Done initializing request handler.")
21+
}
22+
23+
def shutdown() : Unit = {
24+
instanceDao.shutdown()
25+
}
26+
27+
def registerNewInstance(instance : Instance) : Try[Long] = {
28+
val newID = if(instanceDao.getAllInstances().isEmpty){
29+
0L
30+
} else {
31+
(instanceDao.getAllInstances().map(i => i.id.getOrElse(0L)) max) + 1L
32+
}
33+
34+
log.info(s"Assigned new id $newID to registering instance with name ${instance.name}.")
35+
36+
val newInstance = Instance(id = Some(newID), name = instance.name, host = instance.host,
37+
portNumber = instance.portNumber, componentType = instance.componentType)
38+
39+
instanceDao.addInstance(newInstance) match {
40+
case Success(_) => Success(newID)
41+
case Failure(x) => Failure(x)
42+
}
43+
}
44+
45+
def removeInstance(instanceId : Long) : Try[Unit] = {
46+
if(!instanceDao.hasInstance(instanceId)){
47+
Failure(new RuntimeException(s"Cannot remove instance with id $instanceId, that id is not known to the server."))
48+
} else {
49+
instanceDao.removeInstance(instanceId)
50+
}
51+
}
52+
53+
def getAllInstancesOfType(compType : ComponentType) : List[Instance] = {
54+
instanceDao.getInstancesOfType(compType)
55+
}
56+
57+
def getNumberOfInstances(compType : ComponentType) : Int = {
58+
instanceDao.getAllInstances().count(i => i.componentType == compType)
59+
}
60+
61+
def getMatchingInstanceOfType(compType : ComponentType ) : Try[Instance] = {
62+
log.info(s"Trying to match to instance of type $compType ...")
63+
getNumberOfInstances(compType) match {
64+
case 0 =>
65+
log.error(s"Cannot match to any instance of type $compType, no such instance present.")
66+
Failure(new RuntimeException(s"Cannot match to any instance of type $compType, no instance present."))
67+
case 1 =>
68+
val instance : Instance = instanceDao.getInstancesOfType(compType).head
69+
log.info(s"Only one instance of that type present, matching to instance with id ${instance.id.get}.")
70+
Success(instance)
71+
case x =>
72+
log.info(s"Found $x instances of type $compType.")
73+
74+
//First try: Match to instance with most consecutive positive matching results
75+
var maxConsecutivePositiveResults = 0
76+
var instanceToMatch : Instance = null
77+
78+
for(instance <- instanceDao.getInstancesOfType(compType)){
79+
if(countConsecutivePositiveMatchingResults(instance.id.get) > maxConsecutivePositiveResults){
80+
maxConsecutivePositiveResults = countConsecutivePositiveMatchingResults(instance.id.get)
81+
instanceToMatch = instance
82+
}
83+
}
84+
85+
if(instanceToMatch != null){
86+
log.info(s"Matching to instance with id ${instanceToMatch.id}, as it has $maxConsecutivePositiveResults positive results in a row.")
87+
Success(instanceToMatch)
88+
} else {
89+
//Second try: Match to instance with most positive matching results
90+
var maxPositiveResults = 0
91+
92+
for(instance <- instanceDao.getInstancesOfType(compType)){
93+
val noOfPositiveResults : Int = instanceDao.getMatchingResultsFor(instance.id.get).get.count(i => i)
94+
if( noOfPositiveResults > maxPositiveResults){
95+
maxPositiveResults = noOfPositiveResults
96+
instanceToMatch = instance
97+
}
98+
}
99+
100+
if(instanceToMatch != null){
101+
log.info(s"Matching to instance with id ${instanceToMatch.id}, as it has $maxPositiveResults positive results.")
102+
Success(instanceToMatch)
103+
} else {
104+
//All instances are equally good (or bad), match to any of them
105+
instanceToMatch = instanceDao.getInstancesOfType(compType).head
106+
log.info(s"Matching to instance with id ${instanceToMatch.id}, no differences between instances have been found.")
107+
Success(instanceToMatch)
108+
}
109+
}
110+
}
111+
112+
}
113+
114+
def applyMatchingResult(id : Long, result : Boolean) : Try[Unit] = {
115+
if(!instanceDao.hasInstance(id)){
116+
Failure(new RuntimeException(s"Cannot apply matching result to instance with id $id, that id is not known to the server"))
117+
} else {
118+
instanceDao.addMatchingResult(id, result)
119+
}
120+
}
121+
122+
private def countConsecutivePositiveMatchingResults(id : Long) : Int = {
123+
if(!instanceDao.hasInstance(id) || instanceDao.getMatchingResultsFor(id).get.isEmpty){
124+
0
125+
} else {
126+
val matchingResults = instanceDao.getMatchingResultsFor(id).get
127+
var count = 0
128+
129+
for (index <- matchingResults.size to 1){
130+
if(matchingResults(index - 1)){
131+
count += 1
132+
} else {
133+
return count
134+
}
135+
}
136+
count
137+
}
138+
139+
}
140+
}
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,30 @@
11
package de.upb.cs.swt.delphi.instanceregistry
22

3-
import java.util.concurrent.TimeUnit
43

54
import akka.actor.ActorSystem
65
import akka.http.scaladsl.server
76
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
87
import akka.http.scaladsl.server.HttpApp
9-
import akka.http.scaladsl.unmarshalling.Unmarshal
108
import akka.stream.ActorMaterializer
11-
import akka.util.Timeout
129
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.ComponentType
1310
import io.swagger.client.model.{Instance, JsonSupport}
1411

15-
import scala.collection.mutable
16-
import scala.concurrent.{Await, ExecutionContext}
17-
import scala.concurrent.duration.Duration
12+
import scala.concurrent.ExecutionContext
13+
import spray.json._
14+
15+
import scala.util.{Failure, Success}
1816

1917

2018
/**
2119
* Web server configuration for Instance Registry API.
2220
*/
2321
object Server extends HttpApp with JsonSupport with AppLogging {
2422

25-
//Default ES instance for testing
26-
private val instances = mutable.HashSet (Instance(Some(0), "elasticsearch://localhost", 9200, "Default ElasticSearch Instance", ComponentType.ElasticSearch))
27-
28-
implicit val system : ActorSystem = ActorSystem("delphi-registry")
23+
implicit val system : ActorSystem = Registry.system
2924
implicit val materializer : ActorMaterializer = ActorMaterializer()
3025
implicit val ec : ExecutionContext = system.dispatcher
31-
implicit val timeout : Timeout = Timeout(5, TimeUnit.SECONDS)
26+
27+
private val handler : RequestHandler = Registry.requestHandler
3228

3329
override def routes : server.Route =
3430
path("register") {entity(as[String]) { jsonString => addInstance(jsonString) }} ~
@@ -43,106 +39,102 @@ object Server extends HttpApp with JsonSupport with AppLogging {
4339
post
4440
{
4541
log.debug(s"POST /register has been called, parameter is: $InstanceString")
46-
Await.result(Unmarshal(InstanceString).to[Instance] map {paramInstance =>
47-
val name = paramInstance.name
48-
val newID : Long = {
49-
if(instances.isEmpty){
50-
0L
51-
}
52-
else{
53-
(instances map( instance => instance.id.get) max) + 1L
54-
}
55-
}
5642

57-
val instanceToRegister = Instance(id = Some(newID), host = paramInstance.host, portNumber = paramInstance.portNumber, name = paramInstance.name, componentType = paramInstance.componentType)
58-
59-
instances += instanceToRegister
60-
log.info(s"Instance with name $name registered, ID $newID assigned.")
61-
62-
complete {newID.toString()}
63-
} recover {case ex =>
64-
log.warning(s"Failed to read registering instance, exception: $ex")
65-
complete(HttpResponse(StatusCodes.InternalServerError, entity = "Failed to unmarshal parameter."))
66-
}, Duration.Inf)
43+
try {
44+
val paramInstance : Instance = InstanceString.parseJson.convertTo[Instance](instanceFormat)
45+
handler.registerNewInstance(paramInstance) match {
46+
case Success(id) => complete{id.toString}
47+
case Failure(_) => complete(HttpResponse(StatusCodes.InternalServerError, entity = "An internal server error occurred."))
48+
}
49+
} catch {
50+
case dx : DeserializationException =>
51+
log.error(dx, "Deserialization exception")
52+
complete(HttpResponse(StatusCodes.BadRequest, entity = s"Could not deserialize parameter instance with message ${dx.getMessage}."))
53+
case _ : Exception => complete(HttpResponse(StatusCodes.InternalServerError, entity = "An internal server error occurred."))
54+
}
6755
}
6856
}
6957

7058
def deleteInstance() : server.Route = parameters('Id.as[Long]){ Id =>
7159
post {
7260
log.debug(s"POST /deregister?Id=$Id has been called")
7361

74-
val instanceToRemove = instances find(instance => instance.id.get == Id)
75-
76-
if(instanceToRemove.isEmpty){
77-
log.warning(s"Cannot remove instance with id $Id, that id is not present on the server")
78-
complete{HttpResponse(StatusCodes.NotFound, entity = s"Id $Id not present on the server")}
79-
}
80-
else{
81-
instances remove instanceToRemove.get
82-
log.info(s"Successfully removed instance with id $Id")
83-
complete {s"Successfully removed instance with id $Id"}
62+
handler.removeInstance(Id) match {
63+
case Success(_) =>
64+
log.info(s"Successfully removed instance with id $Id")
65+
complete {s"Successfully removed instance with id $Id"}
66+
case Failure(x) =>
67+
log.error(x, s"Cannot remove instance with id $Id, that id is not known to the server.")
68+
complete{HttpResponse(StatusCodes.NotFound, entity = s"Id $Id not known to the server")}
8469
}
8570
}
8671
}
72+
8773
def fetchInstancesOfType () : server.Route = parameters('ComponentType.as[String]) { compTypeString =>
8874
get {
8975
log.debug(s"GET /instances?ComponentType=$compTypeString has been called")
76+
9077
val compType : ComponentType = ComponentType.values.find(v => v.toString == compTypeString).orNull
91-
val matchingInstancesList = List() ++ instances filter {instance => instance.componentType == compType}
9278

93-
complete {matchingInstancesList}
79+
if(compType != null) {
80+
complete{handler.getAllInstancesOfType(compType)}
81+
} else {
82+
log.error(s"Failed to deserialize parameter string $compTypeString to ComponentType.")
83+
complete(HttpResponse(StatusCodes.BadRequest, entity = s"Could not deserialize parameter string $compTypeString to ComponentType"))
84+
}
9485
}
9586
}
9687

9788
def numberOfInstances() : server.Route = parameters('ComponentType.as[String]) { compTypeString =>
9889
get {
9990
log.debug(s"GET /numberOfInstances?ComponentType=$compTypeString has been called")
10091
val compType : ComponentType = ComponentType.values.find(v => v.toString == compTypeString).orNull
101-
val count : Int = instances count {instance => instance.componentType == compType}
102-
complete{count.toString()}
92+
93+
if(compType != null) {
94+
complete{handler.getNumberOfInstances(compType).toString()}
95+
} else {
96+
log.error(s"Failed to deserialize parameter string $compTypeString to ComponentType.")
97+
complete(HttpResponse(StatusCodes.BadRequest, entity = s"Could not deserialize parameter string $compTypeString to ComponentType"))
98+
}
10399
}
104100
}
105101

106102
def getMatchingInstance() : server.Route = parameters('ComponentType.as[String]){ compTypeString =>
107103
get{
108104
log.debug(s"GET /matchingInstance?ComponentType=$compTypeString has been called")
105+
109106
val compType : ComponentType = ComponentType.values.find(v => v.toString == compTypeString).orNull
110107
log.info(s"Looking for instance of type $compType ...")
111-
val matchingInstances = instances filter {instance => instance.componentType == compType}
112-
if(matchingInstances.isEmpty){
113-
log.warning(s"Could not find matching instance for type $compType .")
114-
complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find matching instance for type $compType"))
115-
}
116-
else {
117-
val matchedInstance = matchingInstances.iterator.next()
118-
log.info(s"Matched to $matchedInstance.")
119-
complete(matchedInstance)
120-
}
121108

109+
if(compType != null){
110+
handler.getMatchingInstanceOfType(compType) match {
111+
case Success(matchedInstance) =>
112+
log.info(s"Matched to $matchedInstance.")
113+
complete(matchedInstance)
114+
case Failure(x) =>
115+
log.warning(s"Could not find matching instance for type $compType, message was ${x.getMessage}.")
116+
complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find matching instance for type $compType"))
117+
}
118+
} else {
119+
log.error(s"Failed to deserialize parameter string $compTypeString to ComponentType.")
120+
complete(HttpResponse(StatusCodes.BadRequest, entity = s"Could not deserialize parameter string $compTypeString to ComponentType"))
121+
}
122122
}
123123
}
124124

125-
def matchInstance() : server.Route = parameters('Id.as[Long], 'MatchingSuccessful.as[Boolean]){ (Id, MatchingResult) =>
125+
def matchInstance() : server.Route = parameters('Id.as[Long], 'MatchingSuccessful.as[Boolean]){ (id, matchingResult) =>
126126
post {
127-
//TODO: Need to keep track of matching, maybe remove instances if not reachable!
128-
log.debug(s"POST /matchingResult?Id=$Id&MatchingSuccessful=$MatchingResult has been called")
129-
if(MatchingResult){
130-
log.info(s"Instance with Id $Id was successfully matched.")
131-
}
132-
else{
133-
log.warning(s"A client was not able to reach matched instance with Id $Id !")
127+
log.debug(s"POST /matchingResult?Id=$id&MatchingSuccessful=$matchingResult has been called")
128+
129+
handler.applyMatchingResult(id, matchingResult) match {
130+
case Success(_) => complete{s"Matching result $matchingResult processed."}
131+
case Failure(x) =>
132+
log.warning(s"Could not process matching result, exception was: ${x.getMessage}")
133+
complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not process matching result, id $id was not found."))
134134
}
135-
complete {s"Matching result $MatchingResult processed."}
136135
}
137136
}
138137

139-
def main(args: Array[String]): Unit = {
140-
val configuration = new Configuration()
141-
Server.startServer(configuration.bindHost, configuration.bindPort)
142-
system.terminate()
143-
}
144-
145-
146138
}
147139

148140

0 commit comments

Comments
 (0)