Skip to content

Commit 379f038

Browse files
Mariana MirandaMariana Miranda
Mariana Miranda
authored and
Mariana Miranda
committed
Updates connection of components
1 parent d940369 commit 379f038

18 files changed

+274
-236
lines changed

files/core_config_file

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
controller: core
2-
core_address: 0.0.0.0:50053
2+
core_address: 0.0.0.0:50051
33
control_type: 1
44
system_limit: 1000000
55
housekeeping_rules_file: ../files/posix_layer_housekeeping_rules_static_op
6-
policies_rules_file: ../files/static_rules_with_time_job
6+
policies_rules_file: ../files/static_rules_with_time_file_job

files/local_config_file

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
controller: local
2-
core_address: 0.0.0.0:50053
3-
local_address: 0.0.0.0:50052
2+
core_address: 0.0.0.0:50051
3+
local_address: 0.0.0.0:50053

files/local_config_file_2

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
controller: local
2+
core_address: 0.0.0.0:50051
3+
local_address: 0.0.0.0:50052

files/posix_layer_housekeeping_rules_static_op

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
2 create_channel 2000 posix 2000 write no_op
33
3 create_channel 3000 posix 3000 open no_op
44
4 create_channel 4000 posix 4000 close no_op
5-
5 create_object 1000 1 posix read no_op drl 10000 1073741824
6-
6 create_object 2000 1 posix write no_op drl 10000 1073741824
7-
7 create_object 3000 1 posix open no_op drl 10000 1073741824
8-
8 create_object 4000 1 posix close no_op drl 10000 1073741824
5+
5 create_object 1000 1 posix no_op no_op drl 10000 1073741824
6+
6 create_object 2000 1 posix no_op no_op drl 10000 1073741824
7+
7 create_object 3000 1 posix no_op no_op drl 10000 1073741824
8+
8 create_object 4000 1 posix no_op no_op drl 10000 1073741824

include/cheferd/controller/control_application.hpp

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ using namespace std::chrono;
1818

1919
namespace cheferd {
2020

21+
// Detailed information related to a data plane stage.
22+
struct StageInfo {
23+
std::string m_stage_name;
24+
std::string m_stage_env;
25+
std::string m_stage_user;
26+
std::string m_local_address;
27+
};
28+
2129
class ControlApplication {
2230

2331
protected:

include/cheferd/controller/core_control_application.hpp

+6-12
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,6 @@ namespace cheferd {
1515
// Threshold to decide if a new should be enforced.
1616
#define IOPS_THRESHOLD 10
1717

18-
// Detailed information related to a data plane stage.
19-
struct StageInfoCore {
20-
std::string m_stage_name;
21-
std::string m_stage_env;
22-
std::string m_stage_user;
23-
std::string m_local_address;
24-
};
2518

2619
/**
2720
* CoreControlApplication class.
@@ -42,7 +35,8 @@ class CoreControlApplication : public ControlApplication {
4235
std::queue<std::string> local_queue;
4336

4437
// Related to the registration of new data plane stages.
45-
std::queue<std::tuple<std::string, std::pair<std::string, std::string>, std::string>>
38+
std::mutex pending_register_stage_lock_;
39+
std::queue<std::unique_ptr<StageInfo>>
4640
local_to_data_queue_;
4741

4842
// Type of control (e.g., Static, Dynamic, ...).
@@ -56,9 +50,9 @@ class CoreControlApplication : public ControlApplication {
5650
// controller. (e.g., <"0.0.0.0:50052", ["job1+1", "job1+2"]>)
5751
std::unordered_map<std::string, std::vector<std::string>> local_to_stages;
5852

59-
// StageID -> StageInfoCore. Details each stage information.
60-
// (e.g., <"job1+1", StageInfoCore>)
61-
std::unordered_map<std::string, StageInfoCore> stage_info_detailed;
53+
// StageID -> StageInfo. Details each stage information.
54+
// (e.g., <"job1+1", StageInfo>)
55+
std::unordered_map<std::string, std::unique_ptr<StageInfo>> stage_info_detailed;
6256

6357
// AppID -> < LocalControllerID -> [Envs] >. Holds info about the location and of each job
6458
// instance. (e.g., <"job1", <"0.0.0.0:50052", [1,2]>>
@@ -157,7 +151,7 @@ class CoreControlApplication : public ControlApplication {
157151

158152
void operator() () override;
159153

160-
LocalControllerSession* register_local_controller_session (
154+
void register_local_controller_session (
161155
const std::string& local_controller_address);
162156

163157
void register_stage_session (const std::string& local_controller_address,

include/cheferd/controller/local_control_application.hpp

+9-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
using controllers_grpc_interface::ACK;
2424
using controllers_grpc_interface::ConnectReply;
2525
using controllers_grpc_interface::ConnectRequest;
26-
using controllers_grpc_interface::StageInfo;
26+
using controllers_grpc_interface::StageInfoConnect;
2727
using controllers_grpc_interface::StageReadyRaw;
2828
using controllers_grpc_interface::StatsGlobalMap;
2929
using grpc::Server;
@@ -55,7 +55,10 @@ class LocalControlApplication : public GlobalToLocal::Service, public ControlApp
5555
std::unordered_map<std::string, std::unique_ptr<DataPlaneSession>> data_sessions_;
5656
std::unordered_map<std::string, std::unique_ptr<DataPlaneSession>> preparing_data_sessions_;
5757

58-
std::unordered_map<int, std::unique_ptr<HandshakeSession>> pending_data_sessions_;
58+
// Related to the registration of new local controller sessions.
59+
std::mutex pending_data_plane_sessions_lock_;
60+
std::queue<std::unique_ptr<HandshakeSession>> pending_data_sessions_;
61+
5962

6063
std::unordered_map<std::string, std::vector<std::pair<int, int>>> operation_to_channel_object;
6164

@@ -81,7 +84,7 @@ class LocalControlApplication : public GlobalToLocal::Service, public ControlApp
8184
* @param index
8285
* @return
8386
*/
84-
PStatus stage_handshake (int index);
87+
void handle_data_plane_sessions ();
8588

8689
/**
8790
* Sleep:
@@ -92,8 +95,8 @@ class LocalControlApplication : public GlobalToLocal::Service, public ControlApp
9295
* CallStageHandshake:
9396
* @return
9497
*/
95-
std::tuple<const std::string, const std::string, const std::string> call_stage_handshake (
96-
const int& index);
98+
std::unique_ptr<StageInfo> call_stage_handshake (
99+
HandshakeSession* handshake_session);
97100

98101
/*
99102
* SubmitHousekeepingRules:
@@ -150,7 +153,7 @@ class LocalControlApplication : public GlobalToLocal::Service, public ControlApp
150153
void operator() () override;
151154

152155
// TODO: FIX this in control application parent class
153-
HandshakeSession* register_stage_session (int index, int i);
156+
void register_stage_session (int socket_t);
154157

155158
void stop_feedback_loop () override;
156159

include/cheferd/networking/core_connection_manager.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
using controllers_grpc_interface::ACK;
2929
using controllers_grpc_interface::ConnectReply;
3030
using controllers_grpc_interface::ConnectRequest;
31-
using controllers_grpc_interface::StageInfo;
31+
using controllers_grpc_interface::StageInfoConnect;
3232
using grpc::Server;
3333
using grpc::ServerBuilder;
3434
using grpc::ServerContext;
@@ -68,7 +68,7 @@ class CoreConnectionManager : public LocalToGlobal::Service, public ConnectionMa
6868
ConnectReply* reply) override;
6969

7070
Status ConnectStageToGlobal (ServerContext* context,
71-
const StageInfo* request,
71+
const StageInfoConnect* request,
7272
ConnectReply* reply) override;
7373

7474
public:

include/cheferd/networking/local_interface.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ using controllers_grpc_interface::ConnectReply;
3131
using controllers_grpc_interface::ConnectRequest;
3232
using controllers_grpc_interface::EnforcementOpRules;
3333
using controllers_grpc_interface::EnforcementRules;
34-
using controllers_grpc_interface::StageInfo;
34+
using controllers_grpc_interface::StageInfoConnect;
3535
using controllers_grpc_interface::StageReadyRaw;
3636
using controllers_grpc_interface::StatsGlobalMap;
3737
using grpc::Server;

include/cheferd/session/handshake_session.hpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace cheferd {
2525
class HandshakeSession {
2626

2727
private:
28-
long session_id_;
28+
long socket_id_;
2929

3030
std::queue<std::string> submission_queue_; // queue that contains the request to submit to the
3131
// data plane
@@ -106,7 +106,7 @@ class HandshakeSession {
106106

107107
/**
108108
* HandshakeSession parameterized constructor.
109-
* @param id Data plane stage session id.
109+
* @param id Data plane stage socket id.
110110
*/
111111
explicit HandshakeSession (long id);
112112

@@ -118,10 +118,9 @@ class HandshakeSession {
118118
/**
119119
* StartSession: begin the enforcement session between the controller and
120120
* the data plane stage.
121-
* @param socket Socket identifier of the communication channel between the
122121
* controller and the data plane stage.
123122
*/
124-
void StartSession (int socket);
123+
void StartSession ();
125124

126125
void RemoveSession ();
127126

protos/controllers_grpc_interface.proto

+3-3
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,16 @@ message StatsGlobal {
9191

9292
service LocalToGlobal{
9393
rpc ConnectLocalToGlobal (ConnectRequest) returns (ConnectReply) {}
94-
rpc ConnectStageToGlobal (StageInfo) returns (ConnectReply) {}
95-
rpc DisconnectStageToGlobal (StageInfo) returns (ConnectReply) {}
94+
rpc ConnectStageToGlobal (StageInfoConnect) returns (ConnectReply) {}
95+
rpc DisconnectStageToGlobal (StageInfoConnect) returns (ConnectReply) {}
9696
}
9797

9898
// The request message containing the user's name.
9999
message ConnectRequest {
100100
string user_address = 1;
101101
}
102102

103-
message StageInfo {
103+
message StageInfoConnect {
104104
string local_address = 1;
105105
string stage_name = 2;
106106
string stage_env = 3;

0 commit comments

Comments
 (0)