Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/azure-cli/azure/cli/command_modules/acs/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------

import re
from typing import Any, List, TypeVar

from azure.cli.command_modules.acs._client_factory import cf_snapshots, get_msi_client
from azure.cli.core.azclierror import (
Expand All @@ -21,6 +22,69 @@
from azure.core.exceptions import AzureError, HttpResponseError, ServiceRequestError, ServiceResponseError
from msrestazure.azure_exceptions import CloudError

# type variables
ManagedCluster = TypeVar("ManagedCluster")


def format_parameter_name_to_option_name(parameter_name: str) -> str:
"""Convert a name in parameter format to option format.

Underscores ("_") are used to connect the various parts of a parameter name, while hyphens ("-") are used to connect
each part of an option name. Besides, the option name starts with double hyphens ("--").

:return: str
"""
option_name = "--" + parameter_name.replace("_", "-")
return option_name


def safe_list_get(li: List, idx: int, default: Any = None) -> Any:
"""Get an element from a list without raising IndexError.

Attempt to get the element with index idx from a list-like object li, and if the index is invalid (such as out of
range), return default (whose default value is None).

:return: an element of any type
"""
if isinstance(li, list):
try:
return li[idx]
except IndexError:
return default
return None


def safe_lower(obj: Any) -> Any:
"""Return lowercase string if the provided obj is a string, otherwise return the object itself.

:return: Any
"""
if isinstance(obj, str):
return obj.lower()
return obj


def check_is_msi_cluster(mc: ManagedCluster) -> bool:
"""Check `mc` object to determine whether managed identity is enabled.

:return: bool
"""
if mc and mc.identity and mc.identity.type is not None:
identity_type = mc.identity.type.casefold()
if identity_type in ("systemassigned", "userassigned"):
return True
return False


def check_is_private_cluster(mc: ManagedCluster) -> bool:
"""Check `mc` object to determine whether private cluster is enabled.

:return: bool
"""
if mc and mc.api_server_access_profile:
return bool(mc.api_server_access_profile.enable_private_cluster)
return False


# pylint: disable=too-many-return-statements
def map_azure_error_to_cli_error(azure_error):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from azure.cli.command_modules.acs._client_factory import cf_agent_pools
from azure.cli.command_modules.acs._consts import DecoratorMode
from azure.cli.command_modules.acs._validators import extract_comma_separated_string
from azure.cli.command_modules.acs.decorator import validate_decorator_mode
from azure.cli.core import AzCommandsLoader
from azure.cli.command_modules.acs.base_decorator import BaseAKSContext, BaseAKSModels, BaseAKSParamDict
from azure.cli.core.azclierror import CLIInternalError, InvalidArgumentValueError, RequiredArgumentMissingError
from azure.cli.core.commands import AzCliCommand
from azure.cli.core.profiles import ResourceType
Expand All @@ -23,60 +22,34 @@
AgentPoolsOperations = TypeVar("AgentPoolsOperations")


# pylint: disable=too-many-instance-attributes, too-few-public-methods
class AKSAgentPoolModels:
"""Store the models used in aks_agentpool_add and aks_agentpool_update.
# pylint: disable=too-few-public-methods
class AKSAgentPoolModels(BaseAKSModels):
"""Store the models used in aks agentpool series of commands.

The api version of the class corresponding to a model is determined by resource_type.
"""

def __init__(
self,
cmd: AzCommandsLoader,
resource_type: ResourceType,
):
self.__cmd = cmd
self.resource_type = resource_type
self.AgentPool = self.__cmd.get_models(
"AgentPool",
resource_type=self.resource_type,
operation_group="agent_pools",
)
self.AgentPoolUpgradeSettings = self.__cmd.get_models(
"AgentPoolUpgradeSettings",
resource_type=self.resource_type,
operation_group="agent_pools",
)

# pylint: disable=too-few-public-methods
class AKSAgentPoolParamDict(BaseAKSParamDict):
"""Store the original parameters passed in by aks agentpool series of commands as an internal dictionary.

Only expose the "get" method externally to obtain parameter values, while recording usage.
"""


# pylint: disable=too-many-public-methods
class AKSAgentPoolContext:
class AKSAgentPoolContext(BaseAKSContext):
"""Implement getter functions for all parameters in aks_agentpool_add and aks_agentpool_update.
"""
def __init__(
self,
cmd: AzCliCommand,
raw_parameters: Dict,
raw_parameters: AKSAgentPoolParamDict,
models: AKSAgentPoolModels,
decorator_mode: DecoratorMode,
):
if not isinstance(raw_parameters, dict):
raise CLIInternalError(
"Unexpected raw_parameters object with type '{}'.".format(
type(raw_parameters)
)
)
if not validate_decorator_mode(decorator_mode):
raise CLIInternalError(
"Unexpected decorator_mode '{}' with type '{}'.".format(
decorator_mode, type(decorator_mode)
)
)
self.cmd = cmd
self.raw_param = raw_parameters
self.models = models
self.decorator_mode = decorator_mode
self.intermediates = dict()
super().__init__(cmd, raw_parameters, models, decorator_mode)
self.agentpool = None

# pylint: disable=no-self-use
Expand Down Expand Up @@ -389,7 +362,9 @@ def __init__(
self.client = client
self.models = AKSAgentPoolModels(cmd, resource_type)
# store the context in the process of assemble the AgentPool object
self.context = AKSAgentPoolContext(cmd, raw_parameters, self.models, decorator_mode=DecoratorMode.CREATE)
self.context = AKSAgentPoolContext(
cmd, AKSAgentPoolParamDict(raw_parameters), self.models, decorator_mode=DecoratorMode.CREATE
)

def _ensure_agentpool(self, agentpool: AgentPool) -> None:
"""Internal function to ensure that the incoming `agentpool` object is valid and the same as the attached
Expand Down Expand Up @@ -535,4 +510,6 @@ def __init__(
self.client = client
self.models = AKSAgentPoolModels(cmd, resource_type)
# store the context in the process of assemble the AgentPool object
self.context = AKSAgentPoolContext(cmd, raw_parameters, self.models, decorator_mode=DecoratorMode.UPDATE)
self.context = AKSAgentPoolContext(
cmd, AKSAgentPoolParamDict(raw_parameters), self.models, decorator_mode=DecoratorMode.UPDATE
)
192 changes: 192 additions & 0 deletions src/azure-cli/azure/cli/command_modules/acs/base_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from typing import Any

from azure.cli.command_modules.acs._consts import DecoratorMode
from azure.cli.core import AzCommandsLoader
from azure.cli.core.azclierror import CLIInternalError
from azure.cli.core.commands import AzCliCommand
from azure.cli.core.profiles import ResourceType
from knack.log import get_logger

logger = get_logger(__name__)


def validate_decorator_mode(decorator_mode) -> bool:
"""Check if decorator_mode is a value of enum type DecoratorMode.

:return: bool
"""
is_valid_decorator_mode = False
try:
is_valid_decorator_mode = decorator_mode in DecoratorMode
# will raise TypeError in Python >= 3.8
except TypeError:
pass

return is_valid_decorator_mode


class BaseAKSModels:
"""A base class for storing the models used by aks commands.

The api version of the class corresponding to a model is determined by resource_type.
"""
def __init__(
self,
cmd: AzCommandsLoader,
resource_type: ResourceType,
):
self.__cmd = cmd
self.__raw_models = None
self.resource_type = resource_type
self.set_up_models()

@property
def raw_models(self):
if self.__raw_models is None:
self.__raw_models = self.__cmd.get_models(
resource_type=self.resource_type,
operation_group="managed_clusters",
).models
return self.__raw_models

def set_up_models(self):
for model_name, model_class in vars(self.raw_models).items():
if not model_name.startswith('_'):
setattr(self, model_name, model_class)


class BaseAKSParamDict:
"""A base class for storing the original parameters passed in by the aks commands as an internal dictionary.

Only expose the "get" method externally to obtain parameter values, while recording usage.
"""
def __init__(self, param_dict):
if not isinstance(param_dict, dict):
raise CLIInternalError(
"Unexpected param_dict object with type '{}'.".format(
type(param_dict)
)
)
self.__store = param_dict.copy()
self.__count = {}

def __increase(self, key):
self.__count[key] = self.__count.get(key, 0) + 1

def get(self, key):
self.__increase(key)
return self.__store.get(key)

def keys(self):
return self.__store.keys()

def values(self):
return self.__store.values()

def items(self):
return self.__store.items()

def __format_count(self):
untouched_keys = [x for x in self.__store.keys() if x not in self.__count.keys()]
for k in untouched_keys:
self.__count[k] = 0

def print_usage_statistics(self):
self.__format_count()
print("\nParameter usage statistics:")
for k, v in self.__count.items():
print(k, v)
print("Total: {}".format(len(self.__count.keys())))


class BaseAKSContext:
"""A base class for holding raw parameters, models and methods to get and store intermediates that will be used by
the decorators of aks commands.

Note: This is a base class and should not be used directly, you need to implement getter functions in inherited
classes.

Each getter function is responsible for obtaining the corresponding one or more parameter values, and perform
necessary parameter value completion or normalization and validation checks.
"""
def __init__(
self, cmd: AzCliCommand, raw_parameters: BaseAKSParamDict, models: BaseAKSModels, decorator_mode: DecoratorMode
):
if not isinstance(raw_parameters, BaseAKSParamDict):
raise CLIInternalError(
"Unexpected raw_parameters object with type '{}'.".format(
type(raw_parameters)
)
)
if not validate_decorator_mode(decorator_mode):
raise CLIInternalError(
"Unexpected decorator_mode '{}' with type '{}'.".format(
decorator_mode, type(decorator_mode)
)
)
self.cmd = cmd
self.raw_param = raw_parameters
self.models = models
self.decorator_mode = decorator_mode
self.intermediates = dict()

def get_intermediate(self, variable_name: str, default_value: Any = None) -> Any:
"""Get the value of an intermediate by its name.

Get the value from the intermediates dictionary with variable_name as the key. If variable_name does not exist,
default_value will be returned.

:return: Any
"""
if variable_name not in self.intermediates:
logger.debug(
"The intermediate '%s' does not exist. Return default value '%s'.",
variable_name,
default_value,
)
intermediate_value = self.intermediates.get(variable_name, default_value)
return intermediate_value

def set_intermediate(
self, variable_name: str, value: Any, overwrite_exists: bool = False
) -> None:
"""Set the value of an intermediate by its name.

In the case that the intermediate value already exists, if overwrite_exists is enabled, the value will be
overwritten and the log will be output at the debug level, otherwise the value will not be overwritten and
the log will be output at the warning level, which by default will be output to stderr and seen by user.

:return: None
"""
if variable_name in self.intermediates:
if overwrite_exists:
msg = "The intermediate '{}' is overwritten. Original value: '{}', new value: '{}'.".format(
variable_name, self.intermediates.get(variable_name), value
)
logger.debug(msg)
self.intermediates[variable_name] = value
elif self.intermediates.get(variable_name) != value:
msg = "The intermediate '{}' already exists, but overwrite is not enabled. " \
"Original value: '{}', candidate value: '{}'.".format(
variable_name,
self.intermediates.get(variable_name),
value,
)
# warning level log will be output to the console, which may cause confusion to users
logger.warning(msg)
else:
self.intermediates[variable_name] = value

def remove_intermediate(self, variable_name: str) -> None:
"""Remove the value of an intermediate by its name.

No exception will be raised if the intermediate does not exist.

:return: None
"""
self.intermediates.pop(variable_name, None)
Loading