Skip to content

Commit a4143cf

Browse files
committed
Improved link error handling. Fixes #387.
1 parent 3d645ae commit a4143cf

File tree

1 file changed

+115
-104
lines changed

1 file changed

+115
-104
lines changed

RNS/Link.py

+115-104
Original file line numberDiff line numberDiff line change
@@ -392,19 +392,20 @@ def rtt_packet(self, packet):
392392
try:
393393
measured_rtt = time.time() - self.request_time
394394
plaintext = self.decrypt(packet.data)
395-
rtt = umsgpack.unpackb(plaintext)
396-
self.rtt = max(measured_rtt, rtt)
397-
self.status = Link.ACTIVE
398-
self.activated_at = time.time()
395+
if plaintext != None:
396+
rtt = umsgpack.unpackb(plaintext)
397+
self.rtt = max(measured_rtt, rtt)
398+
self.status = Link.ACTIVE
399+
self.activated_at = time.time()
399400

400-
if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0:
401-
self.establishment_rate = self.establishment_cost/self.rtt
401+
if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0:
402+
self.establishment_rate = self.establishment_cost/self.rtt
402403

403-
try:
404-
if self.owner.callbacks.link_established != None:
405-
self.owner.callbacks.link_established(self)
406-
except Exception as e:
407-
RNS.log("Error occurred in external link establishment callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
404+
try:
405+
if self.owner.callbacks.link_established != None:
406+
self.owner.callbacks.link_established(self)
407+
except Exception as e:
408+
RNS.log("Error occurred in external link establishment callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
408409

409410
except Exception as e:
410411
RNS.log("Error occurred while processing RTT packet, tearing down link. The contained exception was: "+str(e), RNS.LOG_ERROR)
@@ -748,65 +749,68 @@ def receive(self, packet):
748749
should_query = False
749750
if packet.context == RNS.Packet.NONE:
750751
plaintext = self.decrypt(packet.data)
751-
if self.callbacks.packet != None:
752-
thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet))
753-
thread.daemon = True
754-
thread.start()
755-
756-
if self.destination.proof_strategy == RNS.Destination.PROVE_ALL:
757-
packet.prove()
758-
should_query = True
752+
if plaintext != None:
753+
if self.callbacks.packet != None:
754+
thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet))
755+
thread.daemon = True
756+
thread.start()
757+
758+
if self.destination.proof_strategy == RNS.Destination.PROVE_ALL:
759+
packet.prove()
760+
should_query = True
759761

760-
elif self.destination.proof_strategy == RNS.Destination.PROVE_APP:
761-
if self.destination.callbacks.proof_requested:
762-
try:
763-
if self.destination.callbacks.proof_requested(packet):
764-
packet.prove()
765-
should_query = True
766-
except Exception as e:
767-
RNS.log("Error while executing proof request callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
762+
elif self.destination.proof_strategy == RNS.Destination.PROVE_APP:
763+
if self.destination.callbacks.proof_requested:
764+
try:
765+
if self.destination.callbacks.proof_requested(packet):
766+
packet.prove()
767+
should_query = True
768+
except Exception as e:
769+
RNS.log("Error while executing proof request callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
768770

769-
self.__update_phy_stats(packet, query_shared=should_query)
771+
self.__update_phy_stats(packet, query_shared=should_query)
770772

771773
elif packet.context == RNS.Packet.LINKIDENTIFY:
772774
plaintext = self.decrypt(packet.data)
773-
774-
if not self.initiator and len(plaintext) == RNS.Identity.KEYSIZE//8 + RNS.Identity.SIGLENGTH//8:
775-
public_key = plaintext[:RNS.Identity.KEYSIZE//8]
776-
signed_data = self.link_id+public_key
777-
signature = plaintext[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.SIGLENGTH//8]
778-
identity = RNS.Identity(create_keys=False)
779-
identity.load_public_key(public_key)
780-
781-
if identity.validate(signature, signed_data):
782-
self.__remote_identity = identity
783-
if self.callbacks.remote_identified != None:
784-
try:
785-
self.callbacks.remote_identified(self, self.__remote_identity)
786-
except Exception as e:
787-
RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
788-
789-
self.__update_phy_stats(packet, query_shared=True)
775+
if plaintext != None:
776+
if not self.initiator and len(plaintext) == RNS.Identity.KEYSIZE//8 + RNS.Identity.SIGLENGTH//8:
777+
public_key = plaintext[:RNS.Identity.KEYSIZE//8]
778+
signed_data = self.link_id+public_key
779+
signature = plaintext[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.SIGLENGTH//8]
780+
identity = RNS.Identity(create_keys=False)
781+
identity.load_public_key(public_key)
782+
783+
if identity.validate(signature, signed_data):
784+
self.__remote_identity = identity
785+
if self.callbacks.remote_identified != None:
786+
try:
787+
self.callbacks.remote_identified(self, self.__remote_identity)
788+
except Exception as e:
789+
RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
790+
791+
self.__update_phy_stats(packet, query_shared=True)
790792

791793
elif packet.context == RNS.Packet.REQUEST:
792794
try:
793795
request_id = packet.getTruncatedHash()
794796
packed_request = self.decrypt(packet.data)
795-
unpacked_request = umsgpack.unpackb(packed_request)
796-
self.handle_request(request_id, unpacked_request)
797-
self.__update_phy_stats(packet, query_shared=True)
797+
if packed_request != None:
798+
unpacked_request = umsgpack.unpackb(packed_request)
799+
self.handle_request(request_id, unpacked_request)
800+
self.__update_phy_stats(packet, query_shared=True)
798801
except Exception as e:
799802
RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR)
800803

801804
elif packet.context == RNS.Packet.RESPONSE:
802805
try:
803806
packed_response = self.decrypt(packet.data)
804-
unpacked_response = umsgpack.unpackb(packed_response)
805-
request_id = unpacked_response[0]
806-
response_data = unpacked_response[1]
807-
transfer_size = len(umsgpack.packb(response_data))-2
808-
self.handle_response(request_id, response_data, transfer_size, transfer_size)
809-
self.__update_phy_stats(packet, query_shared=True)
807+
if packed_response != None:
808+
unpacked_response = umsgpack.unpackb(packed_response)
809+
request_id = unpacked_response[0]
810+
response_data = unpacked_response[1]
811+
transfer_size = len(umsgpack.packb(response_data))-2
812+
self.handle_response(request_id, response_data, transfer_size, transfer_size)
813+
self.__update_phy_stats(packet, query_shared=True)
810814
except Exception as e:
811815
RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
812816

@@ -821,63 +825,67 @@ def receive(self, packet):
821825

822826
elif packet.context == RNS.Packet.RESOURCE_ADV:
823827
packet.plaintext = self.decrypt(packet.data)
824-
self.__update_phy_stats(packet, query_shared=True)
828+
if packet.plaintext != None:
829+
self.__update_phy_stats(packet, query_shared=True)
825830

826-
if RNS.ResourceAdvertisement.is_request(packet):
827-
RNS.Resource.accept(packet, callback=self.request_resource_concluded)
828-
elif RNS.ResourceAdvertisement.is_response(packet):
829-
request_id = RNS.ResourceAdvertisement.read_request_id(packet)
830-
for pending_request in self.pending_requests:
831-
if pending_request.request_id == request_id:
832-
RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id)
833-
pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet)
834-
pending_request.response_transfer_size = RNS.ResourceAdvertisement.read_transfer_size(packet)
835-
pending_request.started_at = time.time()
836-
elif self.resource_strategy == Link.ACCEPT_NONE:
837-
pass
838-
elif self.resource_strategy == Link.ACCEPT_APP:
839-
if self.callbacks.resource != None:
840-
try:
841-
resource_advertisement = RNS.ResourceAdvertisement.unpack(packet.plaintext)
842-
resource_advertisement.link = self
843-
if self.callbacks.resource(resource_advertisement):
844-
RNS.Resource.accept(packet, self.callbacks.resource_concluded)
845-
except Exception as e:
846-
RNS.log("Error while executing resource accept callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
847-
elif self.resource_strategy == Link.ACCEPT_ALL:
848-
RNS.Resource.accept(packet, self.callbacks.resource_concluded)
831+
if RNS.ResourceAdvertisement.is_request(packet):
832+
RNS.Resource.accept(packet, callback=self.request_resource_concluded)
833+
elif RNS.ResourceAdvertisement.is_response(packet):
834+
request_id = RNS.ResourceAdvertisement.read_request_id(packet)
835+
for pending_request in self.pending_requests:
836+
if pending_request.request_id == request_id:
837+
RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id)
838+
pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet)
839+
pending_request.response_transfer_size = RNS.ResourceAdvertisement.read_transfer_size(packet)
840+
pending_request.started_at = time.time()
841+
elif self.resource_strategy == Link.ACCEPT_NONE:
842+
pass
843+
elif self.resource_strategy == Link.ACCEPT_APP:
844+
if self.callbacks.resource != None:
845+
try:
846+
resource_advertisement = RNS.ResourceAdvertisement.unpack(packet.plaintext)
847+
resource_advertisement.link = self
848+
if self.callbacks.resource(resource_advertisement):
849+
RNS.Resource.accept(packet, self.callbacks.resource_concluded)
850+
except Exception as e:
851+
RNS.log("Error while executing resource accept callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
852+
elif self.resource_strategy == Link.ACCEPT_ALL:
853+
RNS.Resource.accept(packet, self.callbacks.resource_concluded)
849854

850855
elif packet.context == RNS.Packet.RESOURCE_REQ:
851856
plaintext = self.decrypt(packet.data)
852-
self.__update_phy_stats(packet, query_shared=True)
853-
if ord(plaintext[:1]) == RNS.Resource.HASHMAP_IS_EXHAUSTED:
854-
resource_hash = plaintext[1+RNS.Resource.MAPHASH_LEN:RNS.Identity.HASHLENGTH//8+1+RNS.Resource.MAPHASH_LEN]
855-
else:
856-
resource_hash = plaintext[1:RNS.Identity.HASHLENGTH//8+1]
857-
858-
for resource in self.outgoing_resources:
859-
if resource.hash == resource_hash:
860-
# We need to check that this request has not been
861-
# received before in order to avoid sequencing errors.
862-
if not packet.packet_hash in resource.req_hashlist:
863-
resource.req_hashlist.append(packet.packet_hash)
864-
resource.request(plaintext)
857+
if plaintext != None:
858+
self.__update_phy_stats(packet, query_shared=True)
859+
if ord(plaintext[:1]) == RNS.Resource.HASHMAP_IS_EXHAUSTED:
860+
resource_hash = plaintext[1+RNS.Resource.MAPHASH_LEN:RNS.Identity.HASHLENGTH//8+1+RNS.Resource.MAPHASH_LEN]
861+
else:
862+
resource_hash = plaintext[1:RNS.Identity.HASHLENGTH//8+1]
863+
864+
for resource in self.outgoing_resources:
865+
if resource.hash == resource_hash:
866+
# We need to check that this request has not been
867+
# received before in order to avoid sequencing errors.
868+
if not packet.packet_hash in resource.req_hashlist:
869+
resource.req_hashlist.append(packet.packet_hash)
870+
resource.request(plaintext)
865871

866872
elif packet.context == RNS.Packet.RESOURCE_HMU:
867873
plaintext = self.decrypt(packet.data)
868-
self.__update_phy_stats(packet, query_shared=True)
869-
resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
870-
for resource in self.incoming_resources:
871-
if resource_hash == resource.hash:
872-
resource.hashmap_update_packet(plaintext)
874+
if plaintext != None:
875+
self.__update_phy_stats(packet, query_shared=True)
876+
resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
877+
for resource in self.incoming_resources:
878+
if resource_hash == resource.hash:
879+
resource.hashmap_update_packet(plaintext)
873880

874881
elif packet.context == RNS.Packet.RESOURCE_ICL:
875882
plaintext = self.decrypt(packet.data)
876-
self.__update_phy_stats(packet)
877-
resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
878-
for resource in self.incoming_resources:
879-
if resource_hash == resource.hash:
880-
resource.cancel()
883+
if plaintext != None:
884+
self.__update_phy_stats(packet)
885+
resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
886+
for resource in self.incoming_resources:
887+
if resource_hash == resource.hash:
888+
resource.cancel()
881889

882890
elif packet.context == RNS.Packet.KEEPALIVE:
883891
if not self.initiator and packet.data == bytes([0xFF]):
@@ -909,13 +917,15 @@ def receive(self, packet):
909917
# else:
910918
# packet.prove()
911919
# plaintext = self.decrypt(packet.data)
912-
# self._channel._receive(plaintext)
920+
# if plaintext != None:
921+
# self._channel._receive(plaintext)
913922
############################################
914923

915924
packet.prove()
916925
plaintext = self.decrypt(packet.data)
917-
self.__update_phy_stats(packet)
918-
self._channel._receive(plaintext)
926+
if plaintext != None:
927+
self.__update_phy_stats(packet)
928+
self._channel._receive(plaintext)
919929

920930
elif packet.packet_type == RNS.Packet.PROOF:
921931
if packet.context == RNS.Packet.RESOURCE_PRF:
@@ -953,6 +963,7 @@ def decrypt(self, ciphertext):
953963

954964
except Exception as e:
955965
RNS.log("Decryption failed on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
966+
return None
956967

957968

958969
def sign(self, message):

0 commit comments

Comments
 (0)