Skip to content

Commit 0ace090

Browse files
committed
Recconct socket && worker ping
Signed-off-by: elestrias <[email protected]>
1 parent ad026fd commit 0ace090

File tree

9 files changed

+65
-4
lines changed

9 files changed

+65
-4
lines changed

core/api/rpc/wsc.cpp

+21-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ namespace fc::api::rpc {
5656
}));
5757
}
5858
socket.handshake(host, target, ec);
59+
client_data = ClientData{host, port, target, token};
5960
if (ec) {
6061
return ec;
6162
}
@@ -87,10 +88,11 @@ namespace fc::api::rpc {
8788
}
8889
}
8990
chans.clear();
91+
reconnect(3, std::chrono::seconds(5));
9092
}
9193

9294
void Client::_flush() {
93-
if (!writing && !write_queue.empty()) {
95+
if (!writing && !write_queue.empty() && !reconnecting){
9496
auto &[id, buffer] = write_queue.front();
9597
writing = true;
9698
socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()),
@@ -185,4 +187,22 @@ namespace fc::api::rpc {
185187
}
186188
}
187189
}
190+
191+
void Client::reconnect(int counter, std::chrono::milliseconds wait) {
192+
if(reconnecting.exchange(true)) return;
193+
logger_->info("Starting reconnect to {}:{}", client_data.host, client_data.port);
194+
for(int i = 0; i < counter; i++){
195+
std::this_thread::sleep_for(wait*(i+1));
196+
auto res = connect(client_data.host,
197+
client_data.port,
198+
client_data.target,
199+
client_data.token);
200+
if(!res.has_error()) {
201+
break;
202+
}
203+
}
204+
reconnecting.store(false);
205+
logger_->info("Reconnect to {}:{} was successful", client_data.host, client_data.port);
206+
_flush();
207+
}
188208
} // namespace fc::api::rpc

core/api/rpc/wsc.hpp

+13
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ namespace fc::api::rpc {
5757
void setup(A &api) {
5858
visit(api, [&](auto &m) { _setup(*this, m); });
5959
}
60+
struct ClientData {
61+
ClientData(std::string host,
62+
std::string port,
63+
std::string target,
64+
std::string token)
65+
: host(host), port(port), target(target), token(token){};
66+
ClientData() = default;
67+
68+
std::string host, port, target, token;
69+
}client_data;
70+
71+
void reconnect(int counter, std::chrono::milliseconds wait);
6072

6173
private:
6274
std::thread thread;
@@ -72,6 +84,7 @@ namespace fc::api::rpc {
7284
std::map<uint64_t, ChanCb> chans;
7385
std::queue<std::pair<uint64_t, Bytes>> write_queue;
7486
bool writing{false};
87+
std::atomic<bool> reconnecting;
7588

7689
template <typename M>
7790
void _setup(Client &c, M &m);

core/sector_storage/impl/local_worker.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -904,4 +904,7 @@ namespace fc::sector_storage {
904904

905905
return call_id;
906906
}
907+
void LocalWorker::ping(std::function<void(const bool &resp)> cb) {
908+
cb(true);
909+
}
907910
} // namespace fc::sector_storage

core/sector_storage/impl/local_worker.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ namespace fc::sector_storage {
9999
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
100100
override;
101101

102+
void ping(std::function<void(const bool &resp)> cb) override;
103+
102104
private:
103105
template <typename W, typename R>
104106
outcome::result<CallId> asyncCall(const SectorRef &sector,

core/sector_storage/impl/remote_worker.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,10 @@ namespace fc::sector_storage {
254254
AcquireMode mode) {
255255
return api_.Fetch(sector, file_type, path_type, mode);
256256
}
257+
258+
void RemoteWorker::ping(std::function<void(const bool &resp)> cb) {
259+
api_.Version([=](auto res){
260+
cb(!res.has_error());
261+
});
262+
}
257263
} // namespace fc::sector_storage

core/sector_storage/impl/remote_worker.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ namespace fc::sector_storage {
3939
const SectorRef &sector,
4040
const PreCommit1Output &pre_commit_1_output) override;
4141

42+
void ping(std::function<void(const bool &resp)> cb) override;
43+
4244
outcome::result<CallId> sealCommit1(const SectorRef &sector,
4345
const SealRandomness &ticket,
4446
const InteractiveRandomness &seed,

core/sector_storage/impl/scheduler_impl.cpp

+15-3
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,22 @@ namespace fc::sector_storage {
196196
return SchedulerErrors::kCannotSelectWorker;
197197
}
198198

199-
WorkerID wid = acceptable[0];
200-
199+
std::promise<WorkerID> wid_promise;
200+
std::future<WorkerID> wid_future = wid_promise.get_future();
201+
auto done = std::make_shared<std::atomic_bool>();
202+
for (const auto &cur : acceptable) {
203+
workers_[cur]->worker->ping([&wid_promise, done, cur](const bool &resp) {
204+
if (resp && !done->exchange(true)) {
205+
wid_promise.set_value(cur);
206+
}
207+
});
208+
}
209+
auto status = wid_future.wait_for(std::chrono::seconds(5));
210+
if(status == std::future_status::timeout){
211+
return false;
212+
}
213+
WorkerID wid = wid_future.get();
201214
assignWorker(wid, workers_[wid], request);
202-
203215
return true;
204216
}
205217

core/sector_storage/worker.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ namespace fc::sector_storage {
142142

143143
virtual outcome::result<std::vector<primitives::StoragePath>>
144144
getAccessiblePaths() = 0;
145+
146+
virtual void ping(std::function<void(const bool &resp)> cb) = 0;
145147
};
146148

147149
enum class CallErrorCode : uint64_t {

test/testutil/mocks/sector_storage/worker_mock.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,6 @@ namespace fc::sector_storage {
101101
gsl::span<const UnpaddedPieceSize>,
102102
const UnpaddedPieceSize &,
103103
int));
104+
MOCK_METHOD1(ping, void(std::function<void(const bool &)>));
104105
};
105106
} // namespace fc::sector_storage

0 commit comments

Comments
 (0)