Skip to content

Build wrapper definitions by parsing cli help output #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# CLI Wrapper

![Codecov](https://img.shields.io/codecov/c/github/orstensemantics/cli_wrapper)
![PyPI - License](https://img.shields.io/pypi/l/cli_wrapper)
![PyPI - Version](https://img.shields.io/pypi/v/cli_wrapper)


CLI Wrapper uses subprocess to wrap external CLI tools and present an interface that looks more like a python class. CLI
commands become methods on the class, positional arguments and flags become args and kwargs respectively. It
supports input validation and output parsing.
Expand Down Expand Up @@ -34,16 +39,20 @@ kubectl = CLIWrapper("kubectl")
# by default, this will translate to `kubectl get pods --namespace default`, and it will return the text output
kubectl.get("pods", namespace="default")
# you can refine this by defining the command explicitly:
kubectl._update_command("get", default_flags={"output": "json"}, parse=["json", "dotted_dict"])
kubectl.update_command_("get", default_flags={"output": "json"}, parse=["json", "dotted_dict"])
# (the trailing '_' is to avoid collisions with a cli command)
a = kubectl.get("pods", namespace="kube-system")
print(a.items[0].metadata.name) # prints a pod name


# you can do your own parsing:
def skip_lists(result):
if result["kind"] == "List":
return result["items"]
return result
kubectl._update_command("get", default_flags={"output": "json"}, parse=["json", skip_lists, "dotted_dict"])


kubectl.update_command_("get", default_flags={"output": "json"}, parse=["json", skip_lists, "dotted_dict"])
a = kubectl.get("pods", namespace="kube-system")
assert isinstance(a, list)
a = kubectl.get("pods", a[0].metadata.name, namespace="kube-system")
Expand Down Expand Up @@ -88,11 +97,12 @@ pip install dotted_dict # for dotted_dict support shown above
- We can already do this by putting a function in the parse list, but it would be nice to make this serializable
- [ ] Custom error handling
- [ ] Nested wrappers (e.g., `helm.repos.list()` instead of `helm.repos('list')`)]
- currently doing helm.repos_list() from help parser
- [ ] Tool to create configuration dictionaries by parsing help output recursively
- [ ] golang flag style help/usage
- [x] golang flag style help/usage
- [ ] argparse style
- [ ] Configuration dictionaries for common tools
- [ ] kubectl
- [ ] helm
- [x] kubectl
- [x] helm
- [ ] docker
- [ ] cilium
- [x] cilium
58 changes: 43 additions & 15 deletions src/cli_wrapper/cli_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def validate_pod_name(name):
import logging
import os
import subprocess
from copy import copy
from itertools import chain

from attrs import define, field
Expand All @@ -59,7 +60,6 @@ def validate_pod_name(name):
from .validators import validators, Validator

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


@define
Expand Down Expand Up @@ -120,6 +120,14 @@ def transform(self, name, value, **kwargs):
)


def cli_command_converter(value: str | list[str]):
if value is None:
return []
if isinstance(value, str):
return [value]
return value


def arg_converter(value: dict):
"""
Convert the value of the argument to a string
Expand All @@ -128,6 +136,8 @@ def arg_converter(value: dict):
"""
value = value.copy()
for k, v in value.items():
if isinstance(v, str):
v = {"validator": v}
if isinstance(v, dict):
if "literal_name" not in v:
v["literal_name"] = k
Expand All @@ -144,7 +154,7 @@ class Command: # pylint: disable=too-many-instance-attributes
Command represents a command to be run with the cli_wrapper
"""

cli_command: str
cli_command: list[str] | str = field(converter=cli_command_converter)
default_flags: dict = {}
args: dict[str | int, any] = field(factory=dict, converter=arg_converter)
parse: Parser = field(converter=Parser, default=None)
Expand All @@ -163,8 +173,12 @@ def from_dict(cls, command_dict, **kwargs):
command_dict = command_dict.copy()
if "args" in command_dict:
for k, v in command_dict["args"].items():
if "literal_name" not in v:
v["literal_name"] = k
if isinstance(v, dict):
if "literal_name" not in v:
v["literal_name"] = k
if isinstance(v, Argument):
if v.literal_name is None:
v.literal_name = k
if "cli_command" not in command_dict:
command_dict["cli_command"] = kwargs.pop("cli_command", None)
return Command(
Expand Down Expand Up @@ -196,26 +210,28 @@ def validate_args(self, *args, **kwargs):
if isinstance(name, int):
name += 1 # let's call positional arg 0, "Argument 1"
if isinstance(v, str):
raise ValueError(f"Value '{arg}' is invalid for command {self.cli_command} arg {name}: {v}")
raise ValueError(
f"Value '{arg}' is invalid for command {' '.join(self.cli_command)} arg {name}: {v}"
)
if not v:
raise ValueError(f"Value '{arg}' is invalid for command {self.cli_command} arg {name}")
raise ValueError(f"Value '{arg}' is invalid for command {' '.join(self.cli_command)} arg {name}")

def build_args(self, *args, **kwargs):
positional = [self.cli_command]
positional = copy(self.cli_command) if self.cli_command is not None else []
params = []
for arg, value in chain(
enumerate(args), kwargs.items(), [(k, v) for k, v in self.default_flags.items() if k not in kwargs]
):
logger.debug(f"arg: {arg}, value: {value}")
if arg in self.args:
arg = self.args[arg].literal_name if self.args[arg].literal_name is not None else arg
arg, value = self.args[arg].transform(arg, value)
literal_arg = self.args[arg].literal_name if self.args[arg].literal_name is not None else arg
arg, value = self.args[arg].transform(literal_arg, value)
else:
arg, value = transformers.get(self.default_transformer)(arg, value)
logger.debug(f"after: arg: {arg}, value: {value}")
if isinstance(arg, str):
prefix = self.long_prefix if len(arg) > 1 else self.short_prefix
if value is not None:
if value is not None and not isinstance(value, bool):
if self.arg_separator != " ":
params.append(f"{prefix}{arg}{self.arg_separator}{value}")
else:
Expand All @@ -224,7 +240,6 @@ def build_args(self, *args, **kwargs):
params.append(f"{prefix}{arg}")
else:
positional.append(value)
logger.debug(positional + params)
result = positional + params
logger.debug(result)
return result
Expand Down Expand Up @@ -260,11 +275,10 @@ def _get_command(self, command: str):
long_prefix=self.long_prefix,
arg_separator=self.arg_separator,
)
logger.error(c.parse.__dict__)
return c
return self.commands[command]

def _update_command( # pylint: disable=too-many-arguments
def update_command_( # pylint: disable=too-many-arguments
self,
command: str,
*,
Expand Down Expand Up @@ -316,7 +330,7 @@ async def _run_async(self, command: str, *args, **kwargs):
command_obj.validate_args(*args, **kwargs)
command_args = [self.path] + list(command_obj.build_args(*args, **kwargs))
env = os.environ.copy().update(self.env if self.env is not None else {})
logger.error(f"Running command: {', '.join(command_args)}")
logger.debug(f"Running command: {', '.join(command_args)}")
proc = await asyncio.subprocess.create_subprocess_exec( # pylint: disable=no-member
*command_args,
stdout=asyncio.subprocess.PIPE,
Expand All @@ -329,7 +343,7 @@ async def _run_async(self, command: str, *args, **kwargs):
raise RuntimeError(f"Command {command} failed with error: {stderr.decode()}")
return command_obj.parse(stdout.decode())

def __getattr__(self, item):
def __getattr__(self, item, *args, **kwargs):
"""
get the command from the cli_wrapper
:param item: the command to be run
Expand All @@ -339,6 +353,9 @@ def __getattr__(self, item):
return lambda *args, **kwargs: self._run_async(item, *args, **kwargs)
return lambda *args, **kwargs: self._run(item, *args, **kwargs)

def __call__(self, *args, **kwargs):
return (self.__getattr__(None))(*args, **kwargs)

@classmethod
def from_dict(cls, cliwrapper_dict):
"""
Expand All @@ -348,12 +365,19 @@ def from_dict(cls, cliwrapper_dict):
"""
cliwrapper_dict = cliwrapper_dict.copy()
commands = {}
command_config = {
"arg_separator": cliwrapper_dict.get("arg_separator", "="),
"default_transformer": cliwrapper_dict.get("default_transformer", "snake2kebab"),
"short_prefix": cliwrapper_dict.get("short_prefix", "-"),
"long_prefix": cliwrapper_dict.get("long_prefix", "--"),
}
for command, config in cliwrapper_dict.pop("commands", {}).items():
if isinstance(config, str):
config = {"cli_command": config}
else:
if "cli_command" not in config:
config["cli_command"] = command
config = command_config | config
commands[command] = Command.from_dict(config)

return CLIWrapper(
Expand All @@ -372,4 +396,8 @@ def to_dict(self):
"commands": {k: v.to_dict() for k, v in self.commands.items()},
"trusting": self.trusting,
"async_": self.async_,
"default_transformer": self.default_transformer,
"short_prefix": self.short_prefix,
"long_prefix": self.long_prefix,
"arg_separator": self.arg_separator,
}
4 changes: 4 additions & 0 deletions src/cli_wrapper/pre_packaged/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Prepackaged CLI Wrapper Configurations

This directory contains configurations for some CLI tools. These aren't well tested yet. They cover every command
(they are generated by the help_parser tool elsewhere in the repo) but I haven't set up tests for them yet.
20 changes: 20 additions & 0 deletions src/cli_wrapper/pre_packaged/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from json import loads
from pathlib import Path

from cli_wrapper import CLIWrapper


def get_wrapper(name, status=None):
if status is None:
status = ["stable", "beta"]

Check warning on line 9 in src/cli_wrapper/pre_packaged/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/cli_wrapper/pre_packaged/__init__.py#L9

Added line #L9 was not covered by tests
if isinstance(status, str):
status = [status]
wrapper_config = None

Check warning on line 12 in src/cli_wrapper/pre_packaged/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/cli_wrapper/pre_packaged/__init__.py#L11-L12

Added lines #L11 - L12 were not covered by tests
for d in status:
path = Path(__file__).parent / d / f"{name}.json"

Check warning on line 14 in src/cli_wrapper/pre_packaged/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/cli_wrapper/pre_packaged/__init__.py#L14

Added line #L14 was not covered by tests
if path.exists():
with open(path, "r", encoding="utf-8") as f:
wrapper_config = loads(f.read())

Check warning on line 17 in src/cli_wrapper/pre_packaged/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/cli_wrapper/pre_packaged/__init__.py#L16-L17

Added lines #L16 - L17 were not covered by tests
if wrapper_config is None:
raise ValueError(f"Wrapper {name} not found")
return CLIWrapper.from_dict(wrapper_config)

Check warning on line 20 in src/cli_wrapper/pre_packaged/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/cli_wrapper/pre_packaged/__init__.py#L19-L20

Added lines #L19 - L20 were not covered by tests
Loading