@@ -219,20 +219,41 @@ class L2CAP_PDU:
219219 See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT
220220 '''
221221
222- @staticmethod
223- def from_bytes (data : bytes ) -> L2CAP_PDU :
222+ @classmethod
223+ def from_bytes (cls , data : bytes ) -> L2CAP_PDU :
224224 # Check parameters
225225 if len (data ) < 4 :
226226 raise InvalidPacketError ('not enough data for L2CAP header' )
227227
228- _ , l2cap_pdu_cid = struct .unpack_from ('<HH' , data , 0 )
229- l2cap_pdu_payload = data [4 : ]
228+ length , l2cap_pdu_cid = struct .unpack_from ('<HH' , data , 0 )
229+ l2cap_pdu_payload = data [4 : 4 + length ]
230230
231- return L2CAP_PDU (l2cap_pdu_cid , l2cap_pdu_payload )
231+ return cls (l2cap_pdu_cid , l2cap_pdu_payload )
232232
233233 def __bytes__ (self ) -> bytes :
234- header = struct .pack ('<HH' , len (self .payload ), self .cid )
235- return header + self .payload
234+ return self .to_bytes (with_fcs = False )
235+
236+ @classmethod
237+ def crc16 (self , data : bytes ) -> int :
238+ crc = 0x0000
239+ for byte in data :
240+ crc ^= byte
241+ for _ in range (8 ):
242+ if (crc & 0x0001 ) > 0 :
243+ crc = (crc >> 1 ) ^ 0xA001
244+ else :
245+ crc = crc >> 1
246+ return crc
247+
248+ def to_bytes (self , with_fcs : bool = False ) -> bytes :
249+ length = len (self .payload )
250+ if with_fcs :
251+ length += 2
252+ header = struct .pack ('<HH' , length , self .cid )
253+ body = header + self .payload
254+ if with_fcs :
255+ body += struct .pack ('<H' , self .crc16 (body ))
256+ return body
236257
237258 def __init__ (self , cid : int , payload : bytes ) -> None :
238259 self .cid = cid
@@ -1120,6 +1141,7 @@ def __init__(
11201141 self .connection_result = None
11211142 self .disconnection_result = None
11221143 self .sink = None
1144+ self .fcs_enabled = False
11231145 self .spec = spec
11241146 if spec .mode == TransmissionMode .BASIC :
11251147 self .processor = BaseProcessor (self )
@@ -1138,12 +1160,17 @@ def write(self, sdu: bytes) -> None:
11381160 def send_pdu (self , pdu : Union [SupportsBytes , bytes ]) -> None :
11391161 if self .state != self .State .OPEN :
11401162 raise InvalidStateError ('channel not open' )
1141- self .manager .send_pdu (self .connection , self .destination_cid , pdu )
1163+ self .manager .send_pdu (
1164+ self .connection , self .destination_cid , pdu , self .fcs_enabled
1165+ )
11421166
11431167 def send_control_frame (self , frame : L2CAP_Control_Frame ) -> None :
11441168 self .manager .send_control_frame (self .connection , self .signaling_cid , frame )
11451169
11461170 def on_pdu (self , pdu : bytes ) -> None :
1171+ if self .fcs_enabled :
1172+ # Drop FCS.
1173+ pdu = pdu [:- 2 ]
11471174 self .processor .on_pdu (pdu )
11481175
11491176 def on_sdu (self , sdu : bytes ) -> None :
@@ -1308,7 +1335,12 @@ def on_configure_request(self, request: L2CAP_Configure_Request) -> None:
13081335 peer_mps ,
13091336 ) = struct .unpack_from ('<BBBHHH' , option [1 ])
13101337 logger .debug (
1311- 'Peer requests Retransmission or Flow Control: mode=%s, tx_window_size=%s, retransmission_timeout=%s, monitor_timeout=%s, mps=%s' ,
1338+ 'Peer requests Retransmission or Flow Control: mode=%s,'
1339+ ' tx_window_size=%s,'
1340+ ' max_retransmission=%s,'
1341+ ' retransmission_timeout=%s,'
1342+ ' monitor_timeout=%s,'
1343+ ' mps=%s' ,
13121344 TransmissionMode (mode ).name ,
13131345 peer_tx_window_size ,
13141346 peer_max_retransmission ,
@@ -1330,8 +1362,7 @@ def on_configure_request(self, request: L2CAP_Configure_Request) -> None:
13301362 else :
13311363 logger .error ("Enhanced Retransmission Mode is not enabled" )
13321364 result = L2CAP_Configure_Response .Result .FAILURE_REJECTED
1333- replied_options .clear ()
1334- replied_options .append (option )
1365+ replied_options = [option ]
13351366 break
13361367 else :
13371368 logger .error (
@@ -1340,8 +1371,23 @@ def on_configure_request(self, request: L2CAP_Configure_Request) -> None:
13401371 result = (
13411372 L2CAP_Configure_Response .Result .FAILURE_UNACCEPTABLE_PARAMETERS
13421373 )
1343- replied_options .clear ()
1374+ replied_options = [option ]
1375+ break
1376+ elif option [0 ] == L2CAP_Configure_Request .ParameterType .FCS :
1377+ enabled = option [1 ][0 ] != 0
1378+ logger .debug ("Peer requests FCS: %s" , enabled )
1379+ if (
1380+ L2CAP_Information_Request .ExtendedFeatures .FCS_OPTION
1381+ in self .manager .extended_features
1382+ ):
1383+ self .fcs_enabled = enabled
13441384 replied_options .append (option )
1385+ else :
1386+ logger .error ("Frame Check Sequence is not supported" )
1387+ result = (
1388+ L2CAP_Configure_Response .Result .FAILURE_UNACCEPTABLE_PARAMETERS
1389+ )
1390+ replied_options = [option ]
13451391 break
13461392 else :
13471393 logger .debug (
@@ -1350,8 +1396,7 @@ def on_configure_request(self, request: L2CAP_Configure_Request) -> None:
13501396 option [1 ].hex (),
13511397 )
13521398 result = L2CAP_Configure_Response .Result .FAILURE_UNKNOWN_OPTIONS
1353- replied_options .clear ()
1354- replied_options .append (option )
1399+ replied_options = [option ]
13551400 break
13561401
13571402 self .send_control_frame (
@@ -2061,15 +2106,21 @@ def on_disconnection(self, connection_handle: int, _reason: int) -> None:
20612106 if connection_handle in self .identifiers :
20622107 del self .identifiers [connection_handle ]
20632108
2064- def send_pdu (self , connection , cid : int , pdu : Union [SupportsBytes , bytes ]) -> None :
2109+ def send_pdu (
2110+ self ,
2111+ connection ,
2112+ cid : int ,
2113+ pdu : Union [SupportsBytes , bytes ],
2114+ with_fcs : bool = False ,
2115+ ) -> None :
20652116 pdu_str = pdu .hex () if isinstance (pdu , bytes ) else str (pdu )
20662117 pdu_bytes = bytes (pdu )
20672118 logger .debug (
20682119 f'{ color (">>> Sending L2CAP PDU" , "blue" )} '
20692120 f'on connection [0x{ connection .handle :04X} ] (CID={ cid } ) '
20702121 f'{ connection .peer_address } : { len (pdu_bytes )} bytes, { pdu_str } '
20712122 )
2072- self .host .send_l2cap_pdu (connection .handle , cid , pdu_bytes )
2123+ self .host .send_l2cap_pdu (connection .handle , cid , pdu_bytes , with_fcs )
20732124
20742125 def on_pdu (self , connection : Connection , cid : int , pdu : bytes ) -> None :
20752126 if cid in (L2CAP_SIGNALING_CID , L2CAP_LE_SIGNALING_CID ):
0 commit comments