Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions tools/shoestring/shoestring/internal/CertificateFactory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ipaddress
import os
import shutil
import tempfile
Expand Down Expand Up @@ -79,6 +80,7 @@ def _prepare_ca_certificate(self, ca_cn):
'serial = serial.dat',
f'private_key = {self.ca_key_path}',
'certificate = ca.crt.pem',
'copy_extensions = copy',
'policy = policy_catapult',
'',
'[policy_catapult]',
Expand All @@ -90,10 +92,11 @@ def _prepare_ca_certificate(self, ca_cn):
'x509_extensions = x509_v3',
'',
'[dn]',
f'CN = {ca_cn}'
f'CN = {ca_cn}',
'',
'[x509_v3]',
'basicConstraints = critical,CA:TRUE',
'keyUsage = critical,keyCertSign,cRLSign',
'subjectKeyIdentifier = hash',
'authorityKeyIdentifier = keyid:always,issuer'
]))
Expand Down Expand Up @@ -137,21 +140,40 @@ def generate_node_certificate(self, node_cn, days=375, start_date=None):
if not node_cn:
raise RuntimeError('Node common name cannot be empty')

def _alt_name_entry(common_name):
try:
ipaddress.ip_address(common_name)
return f'IP.1 = {common_name}'
except ValueError:
return f'DNS.1 = {common_name}'

# prepare node config
with open('node.cnf', 'wt', encoding='utf8') as outfile:
outfile.write('\n'.join([
'[req]',
'prompt = no',
'distinguished_name = dn',
'x509_extensions = x509_v3',
'req_extensions = req_v3',
'',
'[dn]',
f'CN = {node_cn}',
'',
'[x509_v3]',
'basicConstraints = CA:FALSE',
'[req_v3]',
'basicConstraints = critical,CA:FALSE',
'keyUsage = critical,digitalSignature',
'extendedKeyUsage = serverAuth,clientAuth',
'subjectAltName = @alt_names',
'',
'[x509_v3_node]',
'basicConstraints = critical,CA:FALSE',
'keyUsage = critical,digitalSignature',
'extendedKeyUsage = serverAuth,clientAuth',
'subjectKeyIdentifier = hash',
'authorityKeyIdentifier = keyid,issuer'
'authorityKeyIdentifier = keyid,issuer',
'subjectAltName = @alt_names',
'',
'[alt_names]',
_alt_name_entry(node_cn)
]))

# prepare node certificate signing request
Expand All @@ -173,11 +195,13 @@ def generate_node_certificate(self, node_cn, days=375, start_date=None):
self.openssl_executor.dispatch(self._add_ca_password([
'ca',
'-config', 'ca.cnf',
'-extfile', 'node.cnf',
'-extensions', 'x509_v3_node',
'-days', str(days),
'-notext',
'-batch',
'-in', 'node.csr.pem',
'-out', 'node.crt.pem'
'-out', 'node.crt.pem',
] + ([] if not start_date else ['-startdate', start_date.strftime('%y%m%d%H%M%SZ')])))

@staticmethod
Expand Down
59 changes: 52 additions & 7 deletions tools/shoestring/tests/internal/test_CertificateFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@


class CertificateFactoryTest(unittest.TestCase):
# pylint: disable=too-many-public-methods

# region common utils

@staticmethod
Expand Down Expand Up @@ -142,6 +144,23 @@ def _assert_certificate_duration(self, x509_output, test_start_time, expected_da

self.assertEqual(expected_days, (cert_end_time - cert_start_time).days)

def _assert_certificate_is_x509v3(self, x509_output):
self.assertIn('X509v3 extensions', x509_output)

def _assert_node_certificate_tls_extensions(self, x509_output, expected_san):
self.assertRegex(x509_output, r'X509v3 Basic Constraints: critical\n\s*CA:FALSE')
self.assertRegex(x509_output, r'X509v3 Key Usage: critical\n\s*Digital Signature')
self.assertRegex(x509_output, r'X509v3 Extended Key Usage:\s*\n\s*TLS Web Server Authentication, TLS Web Client Authentication')
self.assertRegex(x509_output, rf'X509v3 Subject Alternative Name:\s*\n\s*{expected_san}')

def _assert_certificate_is_strict_x509v3(self, certificate_path, ca_certificate_path):
self._create_executor().dispatch([
'verify',
'-x509_strict',
'-CAfile', ca_certificate_path,
certificate_path
])

def _assert_can_generate_ca_certificate(self, additional_args, expected_duration_days):
# Arrange: certificate has second resolution, so clear microseconds for assert below to work
test_start_time = datetime.datetime.utcnow().replace(microsecond=0)
Expand Down Expand Up @@ -171,16 +190,28 @@ def _assert_can_generate_ca_certificate(self, additional_args, expected_duration
# - check start and expiry times
self._assert_certificate_duration(x509_output, test_start_time, expected_duration_days)

# - verify certificate is properly self signed
self._create_executor().dispatch(['verify', '-CAfile', ca_certificate_path, ca_certificate_path])
# - verify certificate is properly self signed and strict x509v3
self._assert_certificate_is_strict_x509v3(ca_certificate_path, ca_certificate_path)

# - check certificate is x509v3
self._assert_certificate_is_x509v3(x509_output)

def test_can_generate_ca_certificate(self):
self._assert_can_generate_ca_certificate({}, 20 * 365)

def test_can_generate_ca_certificate_with_custom_duration(self):
self._assert_can_generate_ca_certificate({'days': 1000}, 1000)

def _assert_can_generate_node_certificate(self, should_generate_certificate_chain, additional_args, expected_values):
def _assert_can_generate_node_certificate(
self,
should_generate_certificate_chain,
additional_args,
expected_values,
node_common_name='my NODE common name',
expected_san='DNS:my NODE common name'
):
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals

# Arrange: certificate has second resolution, so clear microseconds for assert below to work
future_start_delay_days = expected_values.get('delay_days', 0)
test_start_time = datetime.datetime.utcnow().replace(microsecond=0) + datetime.timedelta(future_start_delay_days)
Expand All @@ -202,7 +233,7 @@ def _assert_can_generate_node_certificate(self, should_generate_certificate_chai
factory.generate_ca_certificate('my CA common name')

# Act:
factory.generate_node_certificate('my NODE common name', **additional_args)
factory.generate_node_certificate(node_common_name, **additional_args)
if should_generate_certificate_chain:
factory.create_node_certificate_chain()

Expand All @@ -222,7 +253,7 @@ def _assert_can_generate_node_certificate(self, should_generate_certificate_chai
], False))

# - check issuer and subject common names are correct
self._assert_certificate_issuer_and_subject(x509_output, 'my CA common name', 'my NODE common name')
self._assert_certificate_issuer_and_subject(x509_output, 'my CA common name', node_common_name)

# - check start and expiry times
self._assert_certificate_duration(
Expand All @@ -231,9 +262,15 @@ def _assert_can_generate_node_certificate(self, should_generate_certificate_chai
expected_values.get('duration', 375),
future_start_delay_days)

# - verify certificate is properly signed by CA (only if node start date is not in future)
# - verify certificate is properly signed by CA and strict x509v3 (only if node start date is not in future)
if not future_start_delay_days:
self._create_executor().dispatch(['verify', '-CAfile', ca_certificate_path, node_certificate_path])
self._assert_certificate_is_strict_x509v3(node_certificate_path, ca_certificate_path)

# - check certificate is x509v3
self._assert_certificate_is_x509v3(x509_output)

# - check certificate has strict TLS extensions
self._assert_node_certificate_tls_extensions(x509_output, expected_san)

def test_can_generate_node_certificate(self):
self._assert_can_generate_node_certificate(False, {}, {})
Expand All @@ -247,6 +284,14 @@ def test_can_generate_node_certificate_with_start_date(self):
def test_can_generate_node_certificate_chain(self):
self._assert_can_generate_node_certificate(True, {}, {})

def test_can_generate_node_certificate_with_ip_subject_alt_name(self):
self._assert_can_generate_node_certificate(
False,
{},
{},
'127.0.0.1',
'IP Address:127.0.0.1')

def test_cannot_generate_ca_certificate_without_common_name(self):
# Arrange:
with tempfile.TemporaryDirectory() as certificate_directory:
Expand Down