diff --git a/hls4ml/backends/vivado/passes/core_templates.py b/hls4ml/backends/vivado/passes/core_templates.py index 755eb6de4..133229135 100644 --- a/hls4ml/backends/vivado/passes/core_templates.py +++ b/hls4ml/backends/vivado/passes/core_templates.py @@ -7,6 +7,7 @@ BatchNormalization, Dense, HardActivation, + MultiplierReLU, LayerNormalization, ParametrizedActivation, PReLU, @@ -268,7 +269,7 @@ def format(self, node): class ParamActivationConfigTemplate(LayerConfigTemplate): def __init__(self): - super().__init__((ParametrizedActivation, PReLU)) + super().__init__((ParametrizedActivation, PReLU, MultiplierReLU)) self.template = param_activ_config_template def format(self, node): @@ -381,7 +382,7 @@ def format(self, node): class PReLUFunctionTemplate(FunctionCallTemplate): def __init__(self): - super().__init__(PReLU, include_header=activ_include_list) + super().__init__((PReLU, MultiplierReLU), include_header=activ_include_list) self.template = param_activ_function_template def format(self, node): diff --git a/hls4ml/converters/keras_v3/__init__.py b/hls4ml/converters/keras_v3/__init__.py index 595140df4..484aa61ad 100644 --- a/hls4ml/converters/keras_v3/__init__.py +++ b/hls4ml/converters/keras_v3/__init__.py @@ -4,6 +4,7 @@ from . import hgq2 # noqa: F401 from . import merge # noqa: F401 from . import pooling # noqa: F401 +from . import pquant # noqa: F401 from . import recurrent # noqa: F401 from ._base import registry as layer_handlers diff --git a/hls4ml/converters/keras_v3/conv.py b/hls4ml/converters/keras_v3/conv.py index cff353abf..af79adf7d 100644 --- a/hls4ml/converters/keras_v3/conv.py +++ b/hls4ml/converters/keras_v3/conv.py @@ -141,4 +141,7 @@ def handle( elif isinstance(layer, BaseConv): config['weight_data'] = kernel + if hasattr(layer, 'quantization_parameters'): + config['quantization_parameters'] = layer.quantization_parameters + return config diff --git a/hls4ml/converters/keras_v3/core.py b/hls4ml/converters/keras_v3/core.py index 33efcfaa6..28cdd65aa 100644 --- a/hls4ml/converters/keras_v3/core.py +++ b/hls4ml/converters/keras_v3/core.py @@ -33,6 +33,10 @@ def handle( 'n_out': n_out, 'n_in': n_in, } + + if hasattr(layer, 'quantization_parameters'): + config['quantization_parameters'] = layer.quantization_parameters + return config diff --git a/hls4ml/converters/keras_v3/pquant.py b/hls4ml/converters/keras_v3/pquant.py new file mode 100644 index 000000000..8fdd537d7 --- /dev/null +++ b/hls4ml/converters/keras_v3/pquant.py @@ -0,0 +1,127 @@ +import typing +from collections.abc import Sequence + +import numpy as np + +from hls4ml.model.types import FixedPrecisionType + +from ._base import KerasV3LayerHandler, register +from .conv import gen_conv_config + +if typing.TYPE_CHECKING: + import pquant + from keras import KerasTensor + + +@register +class PQuantReLUHandler(KerasV3LayerHandler): + handles = ('pquant.core.activations_quantizer.QuantizedReLU',) + + def handle( + self, + layer: 'pquant.core.activations_quantizer.QuantizedReLU', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + config = {} + config.update(self.default_config) + config['quantization_parameters'] = layer.quantization_parameters + + if ( + not config['quantization_parameters']['use_high_granularity_quantization'] + and layer.config['quantization_parameters']['use_relu_multiplier'] + ): + config['class_name'] = 'MultiplierReLU' + config['param_data'] = np.array(layer.multiplier) + config['activation'] = 'multiplier_relu' + + else: + config['class_name'] = 'QActivation' + config['activation'] = 'relu' + + return (config,) + + +@register +class PQuantTanhHandler(KerasV3LayerHandler): + handles = ('pquant.core.activations_quantizer.QuantizedTanh',) + + def handle( + self, + layer: 'pquant.core.activations_quantizer.QuantizedTanh', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + config = {} + config.update(self.default_config) + config['quantization_parameters'] = layer.quantization_parameters + + if not layer.config['quantization_parameters']['use_real_tanh']: + config['class_name'] = 'HardActivation' + config['slope'] = 0.5 # the default values in QKeras + config['shift'] = 0.5 + # Quartus seems to have trouble if the width is 1. + config['slope_prec'] = FixedPrecisionType(width=2, integer=0, signed=False) + config['shift_prec'] = FixedPrecisionType(width=2, integer=0, signed=False) + config['activation'] = 'hard_tanh' + + else: + config['class_name'] = 'QActivation' + config['activation'] = 'tanh' + + return (config,) + + +@register +class PQuantPoolingHandler(KerasV3LayerHandler): + handles = ('pquant.core.tf_impl.compressed_layers_tf.QuantizedPooling',) + + def handle( + self, + layer: 'pquant.core.tf_impl.compressed_layers_tf.QuantizedPooling', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + assert len(in_tensors) == 1, f'Layer {layer.name} has more than one input' + assert len(out_tensors) == 1, f'Layer {layer.name} has more than one output' + + in_shape: tuple[int, ...] = in_tensors[0].shape[1:] # type: ignore + out_shape: tuple[int, ...] = out_tensors[0].shape[1:] # type: ignore + assert all(isinstance(x, int) for x in in_shape), f'Layer {layer.name} has non-fixed size input: {in_shape}' + assert all(isinstance(x, int) for x in out_shape), f'Layer {layer.name} has non-fixed size output: {out_shape}' + + data_format = layer.data_format + + if data_format == 'channels_last': + *px_in_shape, _ = in_shape + else: + _, *px_in_shape = in_shape + + pool_size: tuple[int, ...] = layer.pool_size + + strides = layer.strides + padding = layer.padding + pooling_config = gen_conv_config( + in_shape=in_shape, + out_shape=out_shape, + ker_px_shape=pool_size, + strides=strides, + data_format=data_format, + padding=padding, + name=layer.name, + ) + + pooling_config['pool_width'] = pooling_config.pop('filt_width') + if 'filt_height' in pooling_config: + pooling_config['pool_height'] = pooling_config.pop('filt_height') + if len(px_in_shape) == 1: + # inconsistent pooling1d config key name... + pooling_config['n_in'] = pooling_config['in_width'] + pooling_config['n_out'] = pooling_config['out_width'] + + config = {} + config.update(self.default_config) + config.update(pooling_config) + config['class_name'] = f'AveragePooling{layer.dimensions}D' + config['quantization_parameters'] = layer.quantization_parameters + return (config,) diff --git a/hls4ml/converters/pytorch/convolution.py b/hls4ml/converters/pytorch/convolution.py index f000286c7..8714e4c9b 100644 --- a/hls4ml/converters/pytorch/convolution.py +++ b/hls4ml/converters/pytorch/convolution.py @@ -44,6 +44,10 @@ def parse_conv1d_layer(operation, layer_name, input_names, input_shapes, node, c output_shape = [input_shapes[0][0], layer['n_filt'], layer['out_width']] # Channel first as default + # Quantization parameter for PQuant integration + if hasattr(class_object, "quantization_parameters"): + layer['quantization_parameters'] = class_object.quantization_parameters + return layer, output_shape @@ -94,4 +98,8 @@ def parse_conv2d_layer(operation, layer_name, input_names, input_shapes, node, c output_shape = [input_shapes[0][0], layer['n_filt'], layer['out_height'], layer['out_width']] + # Quantization parameter for PQuant integration + if hasattr(class_object, "quantization_parameters"): + layer['quantization_parameters'] = class_object.quantization_parameters + return layer, output_shape diff --git a/hls4ml/converters/pytorch/core.py b/hls4ml/converters/pytorch/core.py index 070c26d64..1aa89ae06 100644 --- a/hls4ml/converters/pytorch/core.py +++ b/hls4ml/converters/pytorch/core.py @@ -54,6 +54,10 @@ def parse_linear_layer(operation, layer_name, input_names, input_shapes, node, c output_shape = input_shapes[0][:] output_shape[-1] = layer['n_out'] + # Quantization parameter for PQuant integration + if hasattr(class_object, "quantization_parameters"): + layer['quantization_parameters'] = class_object.quantization_parameters + return layer, output_shape diff --git a/hls4ml/converters/pytorch/pquant.py b/hls4ml/converters/pytorch/pquant.py new file mode 100644 index 000000000..23051dc94 --- /dev/null +++ b/hls4ml/converters/pytorch/pquant.py @@ -0,0 +1,69 @@ +from hls4ml.converters.pytorch.core import parse_activation_layer +from hls4ml.converters.pytorch.pooling import parse_pooling_layer +from hls4ml.converters.pytorch_to_hls import pytorch_handler +from hls4ml.model.types import FixedPrecisionType + + +@pytorch_handler('QuantizedActivationTorchWrapper') +def parse_pquant_activation_layer(operation, layer_name, input_names, input_shapes, node, class_object, data_reader, config): + + layer, output_shape = parse_activation_layer( + class_object.activation.__class__.__name__, + layer_name, + input_names, + input_shapes, + node, + class_object.activation, + data_reader, + config, + ) + layer['quantization_parameters'] = class_object.activation.quantization_parameters + + if ( + layer['activation'] == 'quantizedtanh' + and not class_object.activation.config['quantization_parameters']['use_real_tanh'] + ): + layer['class_name'] = 'HardActivation' + layer['slope'] = 0.5 # the default values in QKeras + layer['shift'] = 0.5 + # Quartus seems to have trouble if the width is 1. + layer['slope_prec'] = FixedPrecisionType(width=2, integer=0, signed=False) + layer['shift_prec'] = FixedPrecisionType(width=2, integer=0, signed=False) + layer['activation'] = 'hard_tanh' + + elif ( + layer['activation'] == 'quantizedrelu' + and not layer['quantization_parameters']["use_high_granularity_quantization"] + and class_object.activation.config['quantization_parameters']['use_relu_multiplier'] + ): + layer['class_name'] = 'MultiplierReLU' + layer['param_data'] = class_object.activation.multiplier.numpy() + layer['activation'] = 'multiplier_relu' + + else: + layer['class_name'] = 'QActivation' + activation_map = { + 'quantizedrelu': 'relu', + 'quantizedtanh': 'tanh', + } + layer['activation'] = activation_map.get(layer['activation'], layer['activation']) + + return layer, output_shape + + +@pytorch_handler('QuantizedPooling') +def parse_pquant_pooling_layer(operation, layer_name, input_names, input_shapes, node, class_object, data_reader, config): + + layer, output_shape = parse_pooling_layer( + class_object.pooling.__class__.__name__, + layer_name, + input_names, + input_shapes, + node, + class_object.pooling, + data_reader, + config, + ) + layer['quantization_parameters'] = class_object.quantization_parameters + + return layer, output_shape diff --git a/hls4ml/converters/pytorch_to_hls.py b/hls4ml/converters/pytorch_to_hls.py index 4bc3fbe85..f521a514c 100644 --- a/hls4ml/converters/pytorch_to_hls.py +++ b/hls4ml/converters/pytorch_to_hls.py @@ -352,7 +352,10 @@ def parse_pytorch_model(config, verbose=True): if '.' not in node.target: obj = getattr(model, node.name) else: - obj = getattr(children[node.target.split('.')[0], node.name]) + if '_' not in node.name: + obj = getattr(children[node.target.split('.')[0]], node.name) + else: + obj = getattr(children[node.target.split('.')[0]], node.name.split('_')[1]) input_layer = {} input_layer['name'] = node.name diff --git a/hls4ml/model/layers.py b/hls4ml/model/layers.py index f72d3595e..69f631bbe 100644 --- a/hls4ml/model/layers.py +++ b/hls4ml/model/layers.py @@ -1011,6 +1011,20 @@ def initialize(self): self.add_weights_variable(name='param', var_name='a{index}') +class MultiplierReLU(Activation): + _expected_attributes = [ + Attribute('n_in'), + WeightAttribute('param'), + TypeAttribute('param'), + ] + + def initialize(self): + super().initialize() + self.add_weights_variable( + name='param', var_name='m{index}', precision=FixedPrecisionType(width=4, integer=4, signed=True) + ) + + class Softmax(Activation): def initialize(self): super().initialize() @@ -1770,6 +1784,7 @@ def initialize(self): 'ThresholdedReLU': ParametrizedActivation, 'ELU': ParametrizedActivation, 'PReLU': PReLU, + 'MultiplierReLU': MultiplierReLU, 'Softmax': Softmax, 'TernaryTanh': TernaryTanh, 'HardActivation': HardActivation, diff --git a/hls4ml/templates/vivado/nnet_utils/nnet_activation.h b/hls4ml/templates/vivado/nnet_utils/nnet_activation.h index 1edf9e664..fbeed472e 100644 --- a/hls4ml/templates/vivado/nnet_utils/nnet_activation.h +++ b/hls4ml/templates/vivado/nnet_utils/nnet_activation.h @@ -785,6 +785,27 @@ void prelu(data_T data[CONFIG_T::n_in], param_T alpha[CONFIG_T::n_in], res_T res } } +// ************************************************* +// MultiplierReLU Activation +// ************************************************* +template +void multiplier_relu(data_T data[CONFIG_T::n_in], multiplier_T mul[1], res_T res[CONFIG_T::n_in]) { + #pragma HLS PIPELINE + + data_T datareg; + for (int ii = 0; ii < CONFIG_T::n_in; ii++) { + datareg = data[ii]; + if (datareg > 0) { + + if (mul[0] >= 0) + res[ii] = datareg << mul[0]; + else + res[ii] = datareg >> (-mul[0]); + } else + res[ii] = 0; + } +} + // ************************************************* // Binary TanH Activation // ************************************************* diff --git a/hls4ml/utils/config.py b/hls4ml/utils/config.py index 8e2d9b701..d3eecdb21 100644 --- a/hls4ml/utils/config.py +++ b/hls4ml/utils/config.py @@ -1,6 +1,7 @@ import json import hls4ml +from hls4ml.model.types import FixedPrecisionType def create_config(output_dir='my-hls-test', project_name='myproject', backend='Vivado', version='1.0.0', **kwargs): @@ -111,6 +112,44 @@ def _get_precision_from_quantizer(quantizer): return hls4ml.model.types.IntegerPrecisionType(width=integer, signed=signed) +def _get_precision_from_pquant(qconfig, take_max=False): + + precisions = {} + variables = ['weight', 'bias', 'act', 'pool'] + overflow = qconfig['overflow'] + round_mode = 'RND' + for var in variables: + if f'k_{var}' not in qconfig: + # print(f"var {var} not found") + continue + + k = qconfig[f'k_{var}'] + i = qconfig[f'i_{var}'] + f = qconfig[f'f_{var}'] + + if qconfig["use_high_granularity_quantization"]: + if not take_max: + assert k.size == 1 and i.size == 1 and f.size == 1, 'Only homogeneous quantizer is supported' + k = bool(k.ravel().item()) + i = int(i.ravel().item()) + f = int(f.ravel().item()) + else: + k = bool(k.max()) + i = int(i.max()) + f = int(f.max()) + else: + k = bool(k) + i = int(i) + f = int(f) + + k, b, I = k, k + i + f, k + i # noqa: E741 + b = max(1, b) + pname = var if var == 'weight' or var == 'bias' else 'result' + precisions[pname] = FixedPrecisionType(b, I, k, rounding_mode=round_mode, saturation_mode=overflow) + + return precisions + + def config_from_keras_model( model, granularity='model', backend=None, default_precision='fixed<16,6>', default_reuse_factor=1, max_precision=None ): @@ -202,17 +241,25 @@ def make_layer_config(layer): if attr.default is not None: layer_config[attr.config_name] = attr.default - quantizers = {qname: qclass for qname, qclass in layer.items() if 'quantizer' in qname and qclass is not None} - for qname, qclass in quantizers.items(): - pname = qname.lower().split('_quantizer')[0] - if pname == 'activation': - pname = 'result' - if isinstance(qclass, dict): - precision = _get_precision_from_quantizer(qclass) - else: - precision = qclass.hls_type - # TODO In the next version of this function, these should not be exposed to user to tweak - layer_config['Precision'][pname] = str(precision) + # PQuant quantization + if 'quantization_parameters' in layer: + precisions = _get_precision_from_pquant(layer['quantization_parameters']) + for pname, precision in precisions.items(): + layer_config['Precision'][pname] = str(precision) + + # QKeras quantization + else: + quantizers = {qname: qclass for qname, qclass in layer.items() if 'quantizer' in qname and qclass is not None} + for qname, qclass in quantizers.items(): + pname = qname.lower().split('_quantizer')[0] + if pname == 'activation': + pname = 'result' + if isinstance(qclass, dict): + precision = _get_precision_from_quantizer(qclass) + else: + precision = qclass.hls_type + # TODO In the next version of this function, these should not be exposed to user to tweak + layer_config['Precision'][pname] = str(precision) if layer['class_name'] in ['GarNet', 'GarNetStack']: # Define default precisions for various internal arrays (can be overridden from the config file) @@ -409,6 +456,11 @@ def make_layer_config(layer): if attr.default is not None: layer_config[attr.config_name] = attr.default + if 'quantization_parameters' in layer: + precisions = _get_precision_from_pquant(layer['quantization_parameters']) + for pname, precision in precisions.items(): + layer_config['Precision'][pname] = str(precision) + if layer['class_name'] == 'Input': dtype = layer['config']['dtype'] if dtype.startswith('int') or dtype.startswith('uint'): diff --git a/hls4ml/utils/torch.py b/hls4ml/utils/torch.py index bfd2c9f0c..6a213a978 100644 --- a/hls4ml/utils/torch.py +++ b/hls4ml/utils/torch.py @@ -23,4 +23,5 @@ def is_leaf_module(self, m, module_qualified_name: str) -> bool: or m.__module__.startswith('torch.nn') or m.__module__.startswith('torch.ao.nn') or m.__module__.startswith('brevitas.nn') + or m.__module__.startswith('pquant') ) and not isinstance(m, torch.nn.Sequential) diff --git a/test/pytest/test_pquant.py b/test/pytest/test_pquant.py new file mode 100644 index 000000000..7690f8bc8 --- /dev/null +++ b/test/pytest/test_pquant.py @@ -0,0 +1,518 @@ +import importlib +from pathlib import Path + +import numpy as np +import pytest + +import hls4ml + +CONV2D_WIDTH_HEIGHT = 12 +CONV2D_IN_CHANNELS = 4 +CONV2D_OUT_CHANNELS = 8 +CONV1D_OUT_CHANNELS = 4 +CONV_KERNEL_SIZE = 3 +CONV1D_KERNEL_SIZE = 3 +LINEAR_INPUT_UNITS = 48 +BATCH_SIZE = 10 + + +# PQuant Functions + + +def set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer=None): + if isinstance(layer, CompressedLayerBase): + if config["quantization_parameters"]["use_high_granularity_quantization"]: + i_weight = layer.hgq_weight.quantizer.i.cpu().numpy() + f_weight = layer.hgq_weight.quantizer.f.cpu().numpy() + quantization_parameters = { + "i_weight": i_weight, + "f_weight": f_weight, + "k_weight": 1.0, + "overflow": layer.overflow, + } + if layer.use_bias: + i_bias = layer.hgq_bias.quantizer.i.cpu().numpy() + f_bias = layer.hgq_bias.quantizer.f.cpu().numpy() + + quantization_parameters["i_bias"] = i_bias.cpu().numpy() + quantization_parameters["f_bias"] = f_bias.cpu().numpy() + quantization_parameters["k_bias"] = 1.0 + else: + quantization_parameters = { + "i_weight": layer.i_weight.cpu().numpy(), + "f_weight": layer.f_weight.cpu().numpy(), + "k_weight": 1.0, + "i_bias": layer.i_bias.cpu().numpy(), + "f_bias": layer.f_bias.cpu().numpy(), + "k_bias": 1.0, + "overflow": layer.overflow, + } + quantization_parameters["use_high_granularity_quantization"] = config["quantization_parameters"][ + "use_high_granularity_quantization" + ] + if new_layer is not None: + new_layer.quantization_parameters = quantization_parameters + return quantization_parameters + + +def set_activation_quantization_parameters(layer, config): + if config["quantization_parameters"]["use_high_granularity_quantization"]: + i = layer.hgq.quantizer.i.cpu().numpy() + f = layer.hgq.quantizer.f.cpu().numpy() + k = layer.hgq.quantizer.k.cpu().numpy() + quantization_parameters = { + "i_act": i, + "f_act": f, + "k_act": k, + "overflow": layer.overflow, + } + else: + quantization_parameters = { + "i_act": layer.i.cpu().numpy(), + "f_act": layer.f.cpu().numpy(), + "k_act": layer.k.cpu().numpy(), + "overflow": layer.overflow, + } + + quantization_parameters["use_high_granularity_quantization"] = config["quantization_parameters"][ + "use_high_granularity_quantization" + ] + layer.quantization_parameters = quantization_parameters + return quantization_parameters + + +def set_pooling_quantization_parameters(layer, config): + if config["quantization_parameters"]["use_high_granularity_quantization"]: + i = layer.hgq.quantizer.i.cpu().numpy() + f = layer.hgq.quantizer.f.cpu().numpy() + k = layer.hgq.quantizer.k.cpu().numpy() + quantization_parameters = { + "i_pool": i, + "f_pool": f, + "k_pool": k, + "overflow": layer.overflow, + } + else: + quantization_parameters = { + "i_pool": layer.i.cpu().numpy(), + "f_pool": layer.f.cpu().numpy(), + "k_pool": 1.0, + "overflow": layer.overflow, + } + + quantization_parameters["use_high_granularity_quantization"] = config["quantization_parameters"][ + "use_high_granularity_quantization" + ] + layer.quantization_parameters = quantization_parameters + return quantization_parameters + + +def remove_pruning_from_model_torch(module, config): + import torch.nn as nn + from pquant.core.activations_quantizer import QuantizedReLU, QuantizedTanh + from pquant.core.torch_impl.compressed_layers_torch import ( + CompressedLayerBase, + CompressedLayerConv1d, + CompressedLayerConv2d, + CompressedLayerLinear, + QuantizedPooling, + ) + + for name, layer in module.named_children(): + if isinstance(layer, CompressedLayerLinear): + if config["pruning_parameters"]["pruning_method"] == "pdp": # Find better solution later + if config["training_parameters"]["pruning_first"]: + weight = layer.pruning_layer.get_hard_mask(layer.weight) * layer.weight + weight, bias = layer.quantize(weight, layer.bias) + else: + weight, bias = layer.quantize(layer.weight, layer.bias) + weight = layer.pruning_layer.get_hard_mask(weight) * weight + else: + weight, bias = layer.prune_and_quantize(layer.weight, layer.bias) + out_features = layer.out_features + bias_values = bias + in_features = layer.in_features + bias = True if bias_values is not None else False + new_layer = nn.Linear(in_features=in_features, out_features=out_features, bias=bias) + set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer) + setattr(module, name, new_layer) + getattr(module, name).weight.data.copy_(weight) + if getattr(module, name).bias is not None: + getattr(module, name).bias.data.copy_(bias_values.data) + elif isinstance(layer, (CompressedLayerConv2d, CompressedLayerConv1d)): + if config["pruning_parameters"]["pruning_method"] == "pdp": # Find better solution later + if config["training_parameters"]["pruning_first"]: + weight = layer.pruning_layer.get_hard_mask(layer.weight) * layer.weight + weight, bias = layer.quantize(weight, layer.bias) + else: + weight, bias = layer.quantize(layer.weight, layer.bias) + weight = layer.pruning_layer.get_hard_mask(weight) * weight + else: + weight, bias = layer.prune_and_quantize(layer.weight, layer.bias) + bias_values = bias + bias = True if bias_values is not None else False + conv = nn.Conv2d if isinstance(layer, CompressedLayerConv2d) else nn.Conv1d + set_layer_quantization_attributes(layer, CompressedLayerBase, config, conv) + + setattr( + module, + name, + conv( + layer.in_channels, + layer.out_channels, + layer.kernel_size, + layer.stride, + layer.padding, + layer.dilation, + layer.groups, + bias, + layer.padding_mode, + ), + ) + getattr(module, name).weight.data.copy_(weight) + if getattr(module, name).bias is not None: + getattr(module, name).bias.data.copy_(bias_values.data) + elif isinstance(layer, (QuantizedTanh, QuantizedReLU)): + set_activation_quantization_parameters(layer, config) + elif isinstance(layer, QuantizedPooling): + set_pooling_quantization_parameters(layer, config) + else: + remove_pruning_from_model_torch(layer, config) + return module + + +def remove_pruning_from_model_tf(model, config): + import keras + from keras.layers import Activation, Conv1D, Conv2D, Dense, DepthwiseConv2D, SeparableConv2D + from pquant.core.activations_quantizer import QuantizedReLU, QuantizedTanh + from pquant.core.tf_impl.compressed_layers_tf import ( + CompressedLayerBase, + CompressedLayerConv1dKeras, + CompressedLayerConv2dKeras, + CompressedLayerDenseKeras, + CompressedLayerDepthwiseConv2dKeras, + CompressedLayerSeparableConv2dKeras, + QuantizedPooling, + _prune_and_quantize_layer, + ) + + x = model.layers[0].output + for layer in model.layers[1:]: + if isinstance(layer, CompressedLayerDepthwiseConv2dKeras): + new_layer = DepthwiseConv2D( + kernel_size=layer.kernel_size, + strides=layer.strides, + padding=layer.padding, + dilation_rate=layer.dilation_rate, + use_bias=layer.use_bias, + depthwise_regularizer=layer.depthwise_regularizer, + activity_regularizer=layer.activity_regularizer, + ) + set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer) + x = new_layer(x) + use_bias = layer.use_bias + weight, bias = _prune_and_quantize_layer(layer, use_bias) + new_layer.set_weights([weight, bias] if use_bias else [weight]) + elif isinstance(layer, CompressedLayerConv2dKeras): + new_layer = Conv2D( + filters=layer.filters, + kernel_size=layer.kernel_size, + strides=layer.strides, + padding=layer.padding, + dilation_rate=layer.dilation_rate, + use_bias=layer.use_bias, + kernel_regularizer=layer.kernel_regularizer, + activity_regularizer=layer.activity_regularizer, + ) + set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer) + x = new_layer(x) + use_bias = layer.use_bias + weight, bias = _prune_and_quantize_layer(layer, use_bias) + new_layer.set_weights([weight, bias] if use_bias else [weight]) + elif isinstance(layer, CompressedLayerSeparableConv2dKeras): + if not layer.enable_quantization: + new_layer = SeparableConv2D( + filters=layer.pointwise_conv.filters, + kernel_size=layer.depthwise_conv.kernel_size, + strides=layer.depthwise_conv.strides, + padding=layer.depthwise_conv.padding, + dilation_rate=layer.depthwise_conv.dilation_rate, + use_bias=layer.pointwise_conv.use_bias, + depthwise_regularizer=layer.depthwise_conv.depthwise_regularizer, + pointwise_regularizer=layer.pointwise_conv.kernel_regularizer, + activity_regularizer=layer.activity_regularizer, + ) + set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer) + x = new_layer(x) + use_bias = layer.pointwise_conv.use_bias + depthwise_weight, _ = _prune_and_quantize_layer(layer.depthwise_conv, False) + pointwise_weight, bias = _prune_and_quantize_layer(layer.pointwise_conv, layer.pointwise_conv.use_bias) + new_layer.set_weights( + [depthwise_weight, pointwise_weight, bias] if use_bias else [depthwise_weight, pointwise_weight] + ) + + else: + new_layer_depthwise = DepthwiseConv2D( + kernel_size=layer.depthwise_conv.kernel_size, + strides=layer.depthwise_conv.strides, + padding=layer.depthwise_conv.padding, + dilation_rate=layer.depthwise_conv.dilation_rate, + data_format=layer.data_format, + use_bias=False, + ) + new_layer_pointwise = Conv2D( + kernel_size=1, + filters=layer.pointwise_conv.filters, + use_bias=layer.pointwise_conv.use_bias, + padding=layer.pointwise_conv.padding, + dilation_rate=layer.pointwise_conv.dilation_rate, + data_format=layer.data_format, + activity_regularizer=layer.pointwise_conv.activity_regularizer, + ) + set_layer_quantization_attributes(layer.depthwise_conv, config, CompressedLayerBase, new_layer_depthwise) + set_layer_quantization_attributes(layer.pointwise_conv, config, CompressedLayerBase, new_layer_pointwise) + x = new_layer_depthwise(x) + depthwise_weight, _ = _prune_and_quantize_layer(layer.depthwise_conv, False) + new_layer_depthwise.set_weights([depthwise_weight]) + + if layer.enable_quantization: + if layer.use_high_granularity_quantization: + x = layer.hgq(x) + else: + quantizer = Activation(lambda x, q=layer.quantizer, k=1.0, i=layer.i, f=layer.f: q(x, k, i, f)) + x = quantizer(x) + + x = new_layer_pointwise(x) + use_bias = layer.pointwise_conv.use_bias + pointwise_weight, bias = _prune_and_quantize_layer(layer.pointwise_conv, layer.pointwise_conv.use_bias) + new_layer_pointwise.set_weights([pointwise_weight, bias] if use_bias else [pointwise_weight]) + elif isinstance(layer, CompressedLayerConv1dKeras): + new_layer = Conv1D( + filters=layer.filters, + kernel_size=layer.kernel_size, + strides=layer.strides, + padding=layer.padding, + dilation_rate=layer.dilation_rate, + use_bias=layer.use_bias, + kernel_regularizer=layer.kernel_regularizer, + activity_regularizer=layer.activity_regularizer, + ) + set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer) + x = new_layer(x) + use_bias = layer.use_bias + weight, bias = _prune_and_quantize_layer(layer, use_bias) + new_layer.set_weights([weight, bias] if use_bias else [weight]) + elif isinstance(layer, CompressedLayerDenseKeras): + new_layer = Dense(units=layer.units, use_bias=layer.use_bias, kernel_regularizer=layer.kernel_regularizer) + set_layer_quantization_attributes(layer, config, CompressedLayerBase, new_layer) + x = new_layer(x) + use_bias = new_layer.use_bias + weight, bias = _prune_and_quantize_layer(layer, use_bias) + new_layer.set_weights([weight, bias] if use_bias else [weight]) + elif isinstance(layer, (QuantizedTanh, QuantizedReLU)): + set_activation_quantization_parameters(layer, config) + elif isinstance(layer, QuantizedPooling): + set_activation_quantization_parameters(layer, config) + x = layer(x) + else: + x = layer(x) + replaced_model = keras.Model(inputs=model.inputs, outputs=x) + return replaced_model + + +# PyTorch Functions + + +def get_pytorch_model(INPUT_SHAPE): + import torch.nn as nn + + class TestModel(nn.Module): + + def __init__(self): + super().__init__() + self.conv2d = nn.Conv2d(CONV2D_IN_CHANNELS, CONV2D_OUT_CHANNELS, kernel_size=3, stride=2, padding=1, bias=True) + self.relu = nn.ReLU() + self.conv1d = nn.Conv1d(CONV2D_OUT_CHANNELS, CONV1D_OUT_CHANNELS, kernel_size=3, stride=1, padding=1, bias=True) + self.tanh = nn.Tanh() + self.avg = nn.AvgPool1d(kernel_size=3, stride=3) + self.linear1 = nn.Linear(LINEAR_INPUT_UNITS, 10, bias=True) + + def forward(self, x): + x = self.conv2d(x) + x = self.relu(x) + x = x.view(8, 36) + x = self.conv1d(x) + x = self.tanh(x) + x = self.avg(x) + x = x.view(-1) + x = self.linear1(x) + return x + + return TestModel() + + +def pass_through_hls4ml_pytorch(model, INPUT_SHAPE): + backend = 'Vitis' + default_precision = 'ap_fixed<32, 16>' if backend in ['Vivado', 'Vitis'] else 'ac_fixed<32, 16, true>' + hls_config = hls4ml.utils.config_from_pytorch_model( + model, input_shape=INPUT_SHAPE, granularity='name', default_precision=default_precision, backend=backend + ) + + output_dir = str(Path(__file__).parent / 'pytorch_test') + hls_model = hls4ml.converters.convert_from_pytorch_model( + model, hls_config=hls_config, output_dir=output_dir, backend=backend, io_type='io_parallel', part='xc7a15tcpg236-3' + ) + hls_model.compile() + + return hls_model + + +def create_data_pytorch(INPUT_SHAPE): + import torch + + return torch.rand(INPUT_SHAPE) + + +# Keras Functions + + +def get_keras_model(INPUT_SHAPE): + from keras.layers import ( + Activation, + AveragePooling1D, + Conv1D, + Conv2D, + Dense, + DepthwiseConv2D, + Flatten, + Input, + ReLU, + Reshape, + ) + from keras.models import Model + + inputs = Input(shape=INPUT_SHAPE) + print(inputs.shape) + x = DepthwiseConv2D(CONV_KERNEL_SIZE, strides=(2, 2))(inputs) + print(x.shape) + x = Conv2D(CONV2D_OUT_CHANNELS, CONV_KERNEL_SIZE, strides=(2, 2))(x) + print(x.shape) + x = ReLU()(x) + x = Reshape((4, 8))(x) + print(x.shape) + x = Conv1D(filters=CONV1D_OUT_CHANNELS, kernel_size=CONV1D_KERNEL_SIZE, strides=1, padding="same", use_bias=True)(x) + print(x.shape) + x = Activation("tanh")(x) + x = AveragePooling1D(2)(x) + print(x.shape) + x = Flatten()(x) + print(x.shape) + x = Dense(10)(x) + + model = Model(inputs=inputs, outputs=x) + return model + + +def pass_through_hls4ml_keras(model, INPUT_SHAPE): + backend = 'Vitis' + default_precision = 'ap_fixed<32, 16>' if backend in ['Vivado', 'Vitis'] else 'ac_fixed<32, 16, true>' + hls_config = hls4ml.utils.config_from_keras_model( + model, granularity='name', default_precision=default_precision, backend=backend + ) + + output_dir = str(Path(__file__).parent / 'pytorch_test') + hls_model = hls4ml.converters.convert_from_keras_model( + model, hls_config=hls_config, output_dir=output_dir, backend=backend, io_type='io_parallel', part='xc7a15tcpg236-3' + ) + hls_model.compile() + + return hls_model + + +def create_data_keras(INPUT_SHAPE): + import keras + + return keras.random.uniform(INPUT_SHAPE) + + +# Configuration Dictionary + + +def framework_config(framework): + config = { + 'pytorch': { + 'get_model': get_pytorch_model, + 'remove_pruning': remove_pruning_from_model_torch, + 'pass_through_hls4ml': pass_through_hls4ml_pytorch, + 'create_data': create_data_pytorch, + 'INPUT_SHAPE': (CONV2D_IN_CHANNELS, CONV2D_WIDTH_HEIGHT, CONV2D_WIDTH_HEIGHT), + }, + 'keras': { + 'get_model': get_keras_model, + 'remove_pruning': remove_pruning_from_model_tf, + 'pass_through_hls4ml': pass_through_hls4ml_keras, + 'create_data': create_data_keras, + 'INPUT_SHAPE': (CONV2D_WIDTH_HEIGHT, CONV2D_WIDTH_HEIGHT, CONV2D_IN_CHANNELS), + }, + } + return config[framework] + + +# Act + + +def get_model(framework, config): + if framework == 'pytorch': + import os + + os.environ["KERAS_BACKEND"] = "torch" # Needs to be set, some pruning layers as well as the quantizers are Keras + pretrain_module = importlib.import_module('pquant.core.torch_impl.compressed_layers_torch') + # layer_module = importlib.import_module('pquant.core.torch_impl.compressed_layers_torch') + # compression = 'add_compression_layers_torch' + elif framework == 'keras': + import os + + os.environ["KERAS_BACKEND"] = "tensorflow" + pretrain_module = importlib.import_module('pquant.core.tf_impl.compressed_layers_tf') + # layer_module = importlib.import_module('pquant') + # compression = 'add_compression_layers' + else: + raise ValueError(f"Unsupported framework: {framework}") + from pquant import add_compression_layers, get_default_config + + model = config['get_model'](config['INPUT_SHAPE']) + + # prune and quantize the model + pquant_config = get_default_config("pdp") + pquant_config["pruning_parameters"]["epsilon"] = 1.0 + PQUANT_SHAPE = (BATCH_SIZE, *(config['INPUT_SHAPE'])) + # model = getattr(layer_module, compression)(model, pquant_config, PQUANT_SHAPE) + model = add_compression_layers(model, pquant_config, PQUANT_SHAPE) + pretrain_module.post_pretrain_functions(model, pquant_config) + model = config['remove_pruning'](model, pquant_config) + + return model + + +# Assert + + +@pytest.mark.parametrize('framework', ['pytorch', 'keras']) +def test_pquant(framework): + + # setup + config = framework_config(framework) + model = get_model(framework, config) + + # pass it through hls4ml + hls_model = config['pass_through_hls4ml'](model, config['INPUT_SHAPE']) + + # predict + data = config['create_data']((100 * BATCH_SIZE, *(config['INPUT_SHAPE']))) + prediction = model(data) + if framework == 'pytorch': + prediction = prediction.detach() + prediction = prediction.numpy().flatten() + hls_prediction = hls_model.predict(data.numpy()).flatten() + + np.testing.assert_allclose(hls_prediction, prediction, rtol=0.0, atol=5e-3)