Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions src/databricks/labs/blueprint/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import logging
import os
import re
from collections.abc import Callable
from typing import Any
Expand Down Expand Up @@ -45,20 +46,88 @@ def choice_from_dict(self, text: str, choices: dict[str, Any], *, sort: bool = T
key = self.choice(text, list(choices.keys()), sort=sort)
return choices[key]

def choice(self, text: str, choices: list[Any], *, max_attempts: int = 10, sort: bool = True) -> str:
"""Use to select a value from a list
@staticmethod
def _clear_screen():
"""Clear terminal screen cross-platform"""
os.system("cls" if os.name == "nt" else "clear")

def _display_page_and_get_input(
self, text: str, page_choices: list[Any], current_page: int, total_pages: int, start: int, end: int
) -> str:
"""Display current page and get user input"""
display = [f"\033[1m[{start + i}]\033[0m \033[36m{choice}\033[0m" for i, choice in enumerate(page_choices)]
numbered = "\n".join(display)

nav_info = []
if current_page > 0:
nav_info.append("\033[1m\033[36m'p' for previous\033[0m")
if current_page < total_pages - 1:
nav_info.append("\033[1m\033[36m'n' for next\033[0m")
nav_text = f" ({', '.join(nav_info)})" if nav_info else ""

page_info = f"\033[1m\033[36mPage {current_page + 1} of {total_pages}\033[0m"
prompt = (
f"\033[1m{text}\033[0m ({page_info}){nav_text}\n{numbered}\nEnter a number between {start} and {end - 1}"
)

return self.question(prompt, valid_number=False)

def _paginate_choices(self, text: str, choices: list[Any], *, max_attempts: int = 10, page_size: int = 10) -> str:
"""Handle paginated choice selection for large lists"""
total_pages = (len(choices) + page_size - 1) // page_size
current_page = 0
attempt = 0

while attempt < max_attempts:
start = current_page * page_size
end = min(start + page_size, len(choices))
page_choices = choices[start:end]

user_input = self._display_page_and_get_input(text, page_choices, current_page, total_pages, start, end)

if user_input.lower() == "p" and current_page > 0:
current_page -= 1
self._clear_screen()
continue

if user_input.lower() == "n" and current_page < total_pages - 1:
current_page += 1
self._clear_screen()
continue

try:
res = int(user_input)
if start <= res < end:
return choices[res]
print(f"\033[31m[ERROR] Out of range: {res}\033[0m\n")
except ValueError:
print(f"\033[31m[ERROR] Invalid input: {user_input}\033[0m\n")

attempt += 1

raise ValueError(f"Max attempts ({max_attempts}) exceeded")

def choice(
self, text: str, choices: list[Any], *, max_attempts: int = 10, sort: bool = True, page_size: int = 10
) -> str:
"""Use to select a value from a list with automatic pagination for large lists

:param text: str:
:param choices: list[Any]:
:param *:
:param max_attempts: int: (Default value = 10)
:param sort: bool: (Default value = True)

:param page_size: int: (Default value = 10)
"""
if sort:
choices = sorted(choices, key=str.casefold)

if len(choices) > page_size:
return self._paginate_choices(text, choices, max_attempts=max_attempts, page_size=page_size)

numbered = "\n".join(f"\033[1m[{i}]\033[0m \033[36m{v}\033[0m" for i, v in enumerate(choices))
prompt = f"\033[1m{text}\033[0m\n{numbered}\nEnter a number between 0 and {len(choices) - 1}"

attempt = 0
while attempt < max_attempts:
attempt += 1
Expand All @@ -67,6 +136,7 @@ def choice(self, text: str, choices: list[Any], *, max_attempts: int = 10, sort:
print(f"\033[31m[ERROR] Out of range: {res}\033[0m\n")
continue
return choices[res]

msg = f"cannot get answer within {max_attempts} attempt"
raise ValueError(msg)

Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,28 @@ def test_extend_prompts():
# Test that new question is still not mocked in the original prompts
with pytest.raises(ValueError, match="not mocked: new_question"):
prompts.question("new_question")


def test_choice_uses_pagination(mocker):
"""Test that choice() uses pagination when list > 10 items"""
prompts = Prompts()
# Test that pagination is show and input n will change to page 2 and select option 11
mocker.patch("builtins.input", side_effect=["n", "12"])

choices = [
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
]
res = prompts.choice("foo", choices)
assert "m" == res
Loading