Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
125 changes: 0 additions & 125 deletions lightapi/python/cffi_src/openssl_build.py

This file was deleted.

4 changes: 0 additions & 4 deletions lightapi/python/scripts/ci/setup_lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,3 @@ set -ex
python3 -m pip install -r "$(git rev-parse --show-toplevel)/linters/python/lint_requirements.txt"
python3 -m pip install -r requirements.txt
python3 -m pip install -r dev_requirements.txt

# build bindings as part of lint setup to avoid import-error
python3 -m cffi_src.openssl_build
mv _openssl* symbollightapi/bindings
34 changes: 0 additions & 34 deletions lightapi/python/setup.py

This file was deleted.

1 change: 0 additions & 1 deletion lightapi/python/symbollightapi/bindings/.gitignore

This file was deleted.

Empty file.
7 changes: 0 additions & 7 deletions lightapi/python/symbollightapi/bindings/openssl.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from collections import namedtuple

from cryptography import x509
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from symbolchain.CryptoTypes import PublicKey
from zenlog import log

from ..bindings.openssl import lib
from .CertificateUtils import try_parse_certificate, verify_self_signed
CertificateInfo = namedtuple('CertificateInfo', ['subject', 'public_key'])


class CatapultCertificateProcessor:
Expand All @@ -26,51 +30,103 @@ def certificate(self, depth):

return self.certificate_infos[depth]

def verify(self, preverified, certificate_store_context):
"""Verifies the current certificate in certificate_store_context given preverified result."""

# reject all certificate chains that are not composed of two certificates
chain = lib.X509_STORE_CTX_get0_chain(certificate_store_context)
chain_size = lib.sk_X509_num(chain)
if 2 != chain_size:
log.warning(f'rejecting certificate chain with size {chain_size}')
return False

certificate = lib.X509_STORE_CTX_get_current_cert(certificate_store_context)
if not certificate:
raise RuntimeError('rejecting certificate chain with no active certificate')
def verify_der_chain(self, chain_der):
"""Verifies a DER-encoded certificate chain without OpenSSL verify callback."""

if preverified:
if self._push(certificate):
return True
self.certificate_infos = []

lib.X509_STORE_CTX_set_error(certificate_store_context, lib.X509_V_ERR_APPLICATION_VERIFICATION)
if 2 != len(chain_der):
log.warning(f'rejecting certificate chain with size {len(chain_der)}')
return False

error_code = lib.X509_STORE_CTX_get_error(certificate_store_context)
return self.verify_unverified_root(certificate, error_code)

def verify_unverified_root(self, certificate, error_code):
if self.certificate_infos:
log.warning('rejecting certificate chain with unverified non-root certificate')
certificates = self._load_certificates_from_der(chain_der)
if certificates is None:
return False

# only verify self signed certificates
if lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN != error_code:
log.warning(f'rejecting certificate chain with unverified unexpected error {error_code}')
root_and_leaf = self._find_root_and_leaf(certificates)
if root_and_leaf is None:
return False

if not verify_self_signed(certificate):
log.warning('rejecting certificate chain with improperly self-signed root certificate')
(root, leaf) = root_and_leaf
root_info = self._try_parse_certificate_from_cryptography(root)
leaf_info = self._try_parse_certificate_from_cryptography(leaf)
if not root_info or not leaf_info:
log.warning('rejecting certificate chain due to certificate parse failure')
return False

self.certificate_infos.append(root_info)
self.certificate_infos.append(leaf_info)
return True

def _push(self, certificate):
certificate_info = try_parse_certificate(certificate)
if not certificate_info:
log.warning('rejecting certificate chain due to certificate parse failure')
def try_extract_public_key_from_der(self, certificate_der):
"""Tries to extract Symbol public key from a single DER certificate."""

try:
certificate = x509.load_der_x509_certificate(certificate_der)
except Exception: # pylint: disable=broad-except
return None

certificate_info = self._try_parse_certificate_from_cryptography(certificate)
return certificate_info.public_key if certificate_info else None

@staticmethod
def _load_certificates_from_der(chain_der):
"""Loads x509 certificate objects from DER-encoded bytes. Returns None on parse failure."""
certificates = []
for der in chain_der:
try:
certificates.append(x509.load_der_x509_certificate(der))
except Exception: # pylint: disable=broad-except
log.warning('rejecting certificate chain due to certificate parse failure')
return None

return certificates

def _find_root_and_leaf(self, certificates):
"""Finds and validates the root and leaf certificates. Returns (root, leaf) or None on failure."""

root = None
leaf = None
for certificate in certificates:
if self._is_self_signed(certificate):
root = certificate
else:
leaf = certificate

if not root:
log.warning('rejecting certificate chain with no self-signed root certificate')
return None

if not leaf:
log.warning('rejecting certificate chain where both certificates are self-signed')
return None

if not self._is_signed_by(leaf, root):
log.warning('rejecting certificate chain where leaf certificate is not signed by root certificate')
return None

return (root, leaf)

@staticmethod
def _is_self_signed(certificate):
if certificate.issuer != certificate.subject:
return False

self.certificate_infos.append(certificate_info)
return True
return CatapultCertificateProcessor._is_signed_by(certificate, certificate)

@staticmethod
def _is_signed_by(certificate, signer):
try:
signer.public_key().verify(certificate.signature, certificate.tbs_certificate_bytes)
return True
except Exception: # pylint: disable=broad-except
return False

@staticmethod
def _try_parse_certificate_from_cryptography(certificate):
public_key = certificate.public_key()
if not isinstance(public_key, Ed25519PublicKey):
return None

subject = certificate.subject.rfc4514_string()
return CertificateInfo(subject, PublicKey(public_key.public_bytes_raw()))
66 changes: 0 additions & 66 deletions lightapi/python/symbollightapi/connector/CertificateUtils.py

This file was deleted.

Loading
Loading