diff --git a/.gitignore b/.gitignore index ba51215..60e0f13 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,4 @@ dmypy.json # Pyre type checker .pyre/ +.vscode/ diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md new file mode 100644 index 0000000..31ce34e --- /dev/null +++ b/DEVELOPMENT.md @@ -0,0 +1,295 @@ +# Development Guide for SunPower Home Assistant Integration + +This guide will help new developers get started with contributing to the SunPower Home Assistant integration. + +## Prerequisites + +- Python 3.11 or higher +- Git +- A SunPower PVS system for testing (optional but recommended) + +## Development Setup + +### 1. Clone the Repository + +```bash +git clone https://github.com/krbaker/hass-sunpower.git +cd hass-sunpower +``` + +### 2. Set Up Python Virtual Environment + +Create and activate a virtual environment to isolate dependencies: + +```bash +# Create virtual environment +python3 -m venv venv + +# Activate virtual environment +# On macOS/Linux: +source venv/bin/activate + +# On Windows: +# venv\Scripts\activate +``` + +### 3. Install Dependencies + +Install the required packages for development: + +```bash +# Install Home Assistant and integration dependencies +pip install homeassistant +pip install -r requirements-dev.txt + +# If requirements-dev.txt doesn't exist, install common dependencies: +pip install pytest requests voluptuous +``` + +### 4. Install Pre-commit Hooks (Optional but Recommended) + +Pre-commit hooks help maintain code quality: + +```bash +pip install pre-commit +pre-commit install +``` + +## Project Structure + +```text +hass-sunpower/ +├── custom_components/sunpower/ # Main integration code +│ ├── __init__.py # Integration setup and coordinator +│ ├── config_flow.py # Configuration flow (UI setup) +│ ├── const.py # Constants and sensor definitions +│ ├── entity.py # Base entity class +│ ├── sensor.py # Sensor platform implementation +│ ├── binary_sensor.py # Binary sensor platform +│ ├── sunpower.py # SunPower API client +│ ├── manifest.json # Integration metadata +│ ├── strings.json # UI strings +│ └── translations/ # Localization files +├── tests/ # Test files +├── .vscode/ # VS Code configuration +├── README.md # User documentation +└── DEVELOPMENT.md # This file +``` + +## Running Tests + +### Test the SunPower API Client + +The integration includes tests for the core API functionality: + +```bash +# Ensure virtual environment is activated +source venv/bin/activate + +# Run all tests +python -m pytest tests/ -v + +# Run specific test file +python -m pytest tests/test_sunpower_api.py -v + +# Run with coverage +python -m pytest tests/ --cov=custom_components/sunpower --cov-report=html +``` + +### Test Integration with Home Assistant + +For integration testing with Home Assistant: + +```bash +# Install Home Assistant in development mode +pip install homeassistant + +# Create a test configuration +mkdir -p config/custom_components +ln -s $(pwd)/custom_components/sunpower config/custom_components/ + +# Run Home Assistant with test config +hass -c config --debug +``` + +## Development Workflow + +### 1. Making Changes + +1. **Create a feature branch**: + + ```bash + git checkout -b feature/your-feature-name + ``` + +2. **Make your changes** to the appropriate files in `custom_components/sunpower/` + +3. **Test your changes**: + + ```bash + # Run tests + source venv/bin/activate + python -m pytest tests/ -v + + # Check code formatting + pre-commit run --all-files + ``` + +### 2. Testing Configuration Changes + +If you modify the configuration flow (`config_flow.py`): + +1. Delete any existing SunPower integration entries in Home Assistant +2. Restart Home Assistant +3. Go to Settings → Devices & Services → Add Integration +4. Search for "SunPower" and test the setup flow + +### 3. Testing Sensor Changes + +If you modify sensors (`sensor.py`, `binary_sensor.py`, `const.py`): + +1. Restart Home Assistant or reload the integration +2. Check that entities appear correctly in Developer Tools → States +3. Verify data updates are working + +## Common Development Tasks + +### Adding a New Sensor + +1. **Define the sensor** in `const.py`: + + ```python + "NEW_SENSOR": { + "field": "api_field_name", + "title": "Display Name", + "unit": UnitOfMeasurement.UNIT, + "icon": "mdi:icon-name", + "device": SensorDeviceClass.CLASS, + "state": SensorStateClass.MEASUREMENT, + } + ``` + +2. **Test with real data** to ensure the field exists in API responses + +3. **Update tests** if needed + +### Modifying the Configuration Flow + +1. **Update the schema** in `config_flow.py`: + + ```python + DATA_SCHEMA = vol.Schema({ + vol.Required(CONF_HOST): str, + vol.Optional(CONF_NEW_FIELD): str, + }) + ``` + +2. **Update validation** in `validate_input()` function + +3. **Test the flow** by adding the integration through the UI + +### Debugging + +#### Enable Debug Logging + +Add to your Home Assistant `configuration.yaml`: + +```yaml +logger: + default: warning + logs: + custom_components.sunpower: debug +``` + +#### Common Debug Commands + +```bash +# Check if integration loads +hass --script check_config -c config + +# View logs in real-time +tail -f config/home-assistant.log | grep sunpower +``` + +## Testing Environment + +### Mocking SunPower API + +For development without a physical SunPower system, you can: + +1. **Use the existing mock data** in tests +2. **Create a mock server** that responds to SunPower API endpoints +3. **Modify `sunpower.py`** to return sample data when in development mode + +### Sample Test Data + +The `samples/` directory contains example API responses that can be used for testing. + +## Code Quality Standards + +### Code Style + +- Follow [PEP 8](https://pep8.org/) style guidelines +- Use [Black](https://black.readthedocs.io/) for automatic formatting +- Use type hints where possible +- Add docstrings to functions and classes + +### Testing Requirements + +- All new features should include tests +- API client functions must have unit tests +- Configuration flow changes should be manually tested +- Maintain test coverage above 80% + +### Documentation + +- Update this DEVELOPMENT.md for any workflow changes +- Update README.md for user-facing changes +- Add inline comments for complex logic +- Update `strings.json` for any UI text changes + +## Submitting Changes + +### Before Submitting a Pull Request + +1. **Run all tests**: + + ```bash + source venv/bin/activate + python -m pytest tests/ -v + ``` + +2. **Check code style**: + + ```bash + pre-commit run --all-files + ``` + +3. **Test manually** with Home Assistant if possible + +4. **Update documentation** if needed + +### Pull Request Guidelines + +- Use clear, descriptive commit messages +- Reference any related issues +- Include tests for new functionality +- Update documentation as needed +- Keep changes focused and atomic + +## Getting Help + +- **Check existing issues** on GitHub +- **Review Home Assistant developer docs**: +- **Ask questions** in the project's GitHub Discussions or Issues + +## Useful Resources + +- [Home Assistant Developer Documentation](https://developers.home-assistant.io/) +- [Home Assistant Integration Development](https://developers.home-assistant.io/docs/creating_component_index/) +- [SunPower API Documentation](README.md#api-endpoints) (see README.md) +- [Python Testing with pytest](https://docs.pytest.org/) + +--- + +Happy coding! 🚀 diff --git a/custom_components/sunpower/__init__.py b/custom_components/sunpower/__init__.py index ba16edc..1e51d35 100644 --- a/custom_components/sunpower/__init__.py +++ b/custom_components/sunpower/__init__.py @@ -27,6 +27,7 @@ PVS_DEVICE_TYPE, SETUP_TIMEOUT_MIN, SUNPOWER_COORDINATOR, + SUNPOWER_ENTRY_ID, SUNPOWER_HOST, SUNPOWER_OBJECT, SUNPOWER_UPDATE_INTERVAL, @@ -45,10 +46,8 @@ PLATFORMS = ["sensor", "binary_sensor"] -PREVIOUS_PVS_SAMPLE_TIME = 0 -PREVIOUS_PVS_SAMPLE = {} -PREVIOUS_ESS_SAMPLE_TIME = 0 -PREVIOUS_ESS_SAMPLE = {} +# Use entry-specific data storage to avoid conflicts between multiple accounts +ENTRY_DATA_CACHE = {} def create_vmeter(data): @@ -261,24 +260,30 @@ def sunpower_fetch( sunpower_monitor, sunpower_update_invertal, sunvault_update_invertal, + entry_id, ): """Basic data fetch routine to get and reformat sunpower data to a dict of device type and serial #""" - global PREVIOUS_PVS_SAMPLE_TIME - global PREVIOUS_PVS_SAMPLE - global PREVIOUS_ESS_SAMPLE_TIME - global PREVIOUS_ESS_SAMPLE - - sunpower_data = PREVIOUS_PVS_SAMPLE - ess_data = PREVIOUS_ESS_SAMPLE + # Use entry-specific cache to avoid conflicts between multiple accounts + if entry_id not in ENTRY_DATA_CACHE: + ENTRY_DATA_CACHE[entry_id] = { + "pvs_sample_time": 0, + "pvs_sample": {}, + "ess_sample_time": 0, + "ess_sample": {}, + } + + cache = ENTRY_DATA_CACHE[entry_id] + sunpower_data = cache["pvs_sample"] + ess_data = cache["ess_sample"] use_ess = False data = None try: - if (time.time() - PREVIOUS_PVS_SAMPLE_TIME) >= (sunpower_update_invertal - 1): - PREVIOUS_PVS_SAMPLE_TIME = time.time() + if (time.time() - cache["pvs_sample_time"]) >= (sunpower_update_invertal - 1): + cache["pvs_sample_time"] = time.time() sunpower_data = sunpower_monitor.device_list() - PREVIOUS_PVS_SAMPLE = sunpower_data + cache["pvs_sample"] = sunpower_data _LOGGER.debug("got PVS data %s", sunpower_data) except (ParseException, ConnectionException) as error: raise UpdateFailed from error @@ -288,10 +293,10 @@ def sunpower_fetch( use_ess = True try: - if use_ess and (time.time() - PREVIOUS_ESS_SAMPLE_TIME) >= (sunvault_update_invertal - 1): - PREVIOUS_ESS_SAMPLE_TIME = time.time() + if use_ess and (time.time() - cache["ess_sample_time"]) >= (sunvault_update_invertal - 1): + cache["ess_sample_time"] = time.time() ess_data = sunpower_monitor.energy_storage_system_status() - PREVIOUS_ESS_SAMPLE = ess_data + cache["ess_sample"] = ess_data _LOGGER.debug("got ESS data %s", ess_data) except (ParseException, ConnectionException) as error: raise UpdateFailed from error @@ -349,6 +354,7 @@ async def async_update_data(): sunpower_monitor, sunpower_update_invertal, sunvault_update_invertal, + entry_id, ) # This could be better, taking the shortest time interval as the coordinator update is fine @@ -375,6 +381,7 @@ async def async_update_data(): hass.data[DOMAIN][entry.entry_id] = { SUNPOWER_OBJECT: sunpower_monitor, SUNPOWER_COORDINATOR: coordinator, + SUNPOWER_ENTRY_ID: entry_id, } start = time.time() @@ -412,5 +419,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): if unload_ok: hass.data[DOMAIN].pop(entry.entry_id) + # Clean up entry-specific cache to avoid memory leaks + if entry.entry_id in ENTRY_DATA_CACHE: + ENTRY_DATA_CACHE.pop(entry.entry_id) return unload_ok diff --git a/custom_components/sunpower/config_flow.py b/custom_components/sunpower/config_flow.py index ea08c0b..5abafcc 100644 --- a/custom_components/sunpower/config_flow.py +++ b/custom_components/sunpower/config_flow.py @@ -8,7 +8,10 @@ core, exceptions, ) -from homeassistant.const import CONF_HOST +from homeassistant.const import ( + CONF_HOST, + CONF_LOCATION, +) from .const import ( DEFAULT_SUNPOWER_UPDATE_INTERVAL, @@ -32,6 +35,7 @@ DATA_SCHEMA = vol.Schema( { vol.Required(CONF_HOST): str, + vol.Optional(CONF_LOCATION, default=""): str, vol.Required(SUNPOWER_DESCRIPTIVE_NAMES, default=True): bool, vol.Required(SUNPOWER_PRODUCT_NAMES, default=False): bool, }, @@ -45,7 +49,11 @@ async def validate_input(hass: core.HomeAssistant, data): """ spm = SunPowerMonitor(data[SUNPOWER_HOST]) - name = "PVS {}".format(data[SUNPOWER_HOST]) + # Use custom name if provided, otherwise use host IP + if data.get(CONF_LOCATION): + name = "PVS {} - {}".format(data[SUNPOWER_HOST], data[CONF_LOCATION]) + else: + name = "PVS {}".format(data[SUNPOWER_HOST]) try: response = await hass.async_add_executor_job(spm.network_status) _LOGGER.debug("Got from %s %s", data[SUNPOWER_HOST], response) @@ -76,7 +84,9 @@ async def async_step_user(self, user_input: dict[str, any] | None = None): if user_input is not None: try: info = await validate_input(self.hass, user_input) + # Keeping the same unique id to prevent migration. await self.async_set_unique_id(user_input[SUNPOWER_HOST]) + self._abort_if_unique_id_configured() return self.async_create_entry(title=info["title"], data=user_input) except CannotConnect: errors["base"] = "cannot_connect" @@ -92,6 +102,7 @@ async def async_step_user(self, user_input: dict[str, any] | None = None): async def async_step_import(self, user_input: dict[str, any] | None = None): """Handle import.""" + # Keeping the same unique id to prevent migration. await self.async_set_unique_id(user_input[SUNPOWER_HOST]) self._abort_if_unique_id_configured() return await self.async_step_user(user_input) diff --git a/custom_components/sunpower/const.py b/custom_components/sunpower/const.py index bbc59b9..88b265c 100644 --- a/custom_components/sunpower/const.py +++ b/custom_components/sunpower/const.py @@ -26,6 +26,7 @@ SUNPOWER_OBJECT = "sunpower" SUNPOWER_HOST = "host" SUNPOWER_COORDINATOR = "coordinator" +SUNPOWER_ENTRY_ID = "entry_id" DEFAULT_SUNPOWER_UPDATE_INTERVAL = 120 DEFAULT_SUNVAULT_UPDATE_INTERVAL = 60 MIN_SUNPOWER_UPDATE_INTERVAL = 60 diff --git a/custom_components/sunpower/strings.json b/custom_components/sunpower/strings.json index c57b419..5558895 100644 --- a/custom_components/sunpower/strings.json +++ b/custom_components/sunpower/strings.json @@ -5,10 +5,11 @@ "user": { "data": { "host": "Host", + "location": "Location name (for config entry name)", "use_descriptive_names": "Use descriptive entity names (recommended)", "use_product_names": "Use products in entity names (not recommended)" }, - "description": "Hostname or IP of PVS (usually 172.27.153.1)" + "description": "hostname or ip of pvs (usually 172.27.153.1)." } }, "error": { diff --git a/custom_components/sunpower/translations/en.json b/custom_components/sunpower/translations/en.json index 0eb6620..768a0e3 100644 --- a/custom_components/sunpower/translations/en.json +++ b/custom_components/sunpower/translations/en.json @@ -1,36 +1,37 @@ { - "config": { - "abort": { - "already_configured": "Already Configured" - }, - "error": { - "cannot_connect": "Cannot Connect", - "unknown": "Unknown Error" - }, - "step": { - "user": { + "config": { + "abort": { + "already_configured": "Already Configured" + }, + "error": { + "cannot_connect": "Cannot Connect", + "unknown": "Unknown Error" + }, + "step": { + "user": { "data": { - "host": "Host", - "use_descriptive_names": "Use descriptive entity names (recommended)", - "use_product_names": "Use products in entity names (not recommended)" + "host": "Host", + "location": "Location name (for config entry name)", + "use_descriptive_names": "Use descriptive entity names (recommended)", + "use_product_names": "Use products in entity names (not recommended)" }, - "description": "Hostname or IP of PVS (usually 172.27.153.1)" - } + "description": "hostname or ip of pvs (usually 172.27.153.1)." + } } - }, - "options":{ - "step": { - "init": { - "data": { - "PVS_UPDATE_INTERVAL": "Solar data update interval (not less than 60)", - "ESS_UPDATE_INTERVAL": "Energy storage update interval (not less than 20)" - }, - "description": "Update intervals to change the polling rate, note: the PVS is slow" - } + }, + "options": { + "step": { + "init": { + "data": { + "PVS_UPDATE_INTERVAL": "Solar data update interval (not less than 60)", + "ESS_UPDATE_INTERVAL": "Energy storage update interval (not less than 20)" }, - "error": { - "MIN_INTERVAL": "Interval too small" - } + "description": "Update intervals to change the polling rate, note: the PVS is slow" + } }, - "title": "SunPower" + "error": { + "MIN_INTERVAL": "Interval too small" + } + }, + "title": "SunPower" } diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..270dc93 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,7 @@ +[tool:pytest] +asyncio_mode = auto +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = -v --tb=short diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..650a6d6 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,17 @@ +# Testing requirements for SunPower Home Assistant Integration + + +# Home Assistant testing dependencies +homeassistant>=2024.1.0 +# Core testing framework +pytest>=7.0.0 +pytest-asyncio>=0.21.0 +pytest-cov>=4.0.0 +pytest-homeassistant-custom-component>=0.13.0 + +# Additional testing utilities +pytest-mock>=3.10.0 + +# Core integration dependencies +requests>=2.28.0 +voluptuous>=0.13.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..f019047 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Tests for SunPower Home Assistant Integration diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py new file mode 100644 index 0000000..2d0d6b5 --- /dev/null +++ b/tests/test_config_flow.py @@ -0,0 +1,328 @@ +"""Test the SunPower config flow.""" + +from unittest.mock import ( + MagicMock, + patch, +) + +import pytest +from homeassistant.const import ( + CONF_HOST, + CONF_LOCATION, +) +from homeassistant.data_entry_flow import FlowResultType + +from custom_components.sunpower.config_flow import ( + CannotConnect, + ConfigFlow, + validate_input, +) +from custom_components.sunpower.const import ( + SUNPOWER_DESCRIPTIVE_NAMES, + SUNPOWER_HOST, + SUNPOWER_PRODUCT_NAMES, +) + +# Test data +TEST_HOST = "192.168.1.100" +TEST_LOCATION = "Main House" +TEST_CONFIG = { + CONF_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + SUNPOWER_DESCRIPTIVE_NAMES: True, + SUNPOWER_PRODUCT_NAMES: False, +} + + +@pytest.fixture() +def hass(): + """Create a test Home Assistant instance.""" + from unittest.mock import AsyncMock + + hass_instance = MagicMock() + hass_instance.config_entries = MagicMock() + hass_instance.async_add_executor_job = AsyncMock() + hass_instance.data = {} + + return hass_instance + + +class TestConfigFlow: + """Test the config flow.""" + + @pytest.mark.asyncio() + async def test_config_flow_user_step_form_display(self, hass): + """Test that the user form is displayed correctly.""" + flow = ConfigFlow() + flow.hass = hass + flow.context = {} + # Mock the duplicate check to avoid abort + flow._abort_if_unique_id_configured = MagicMock() + + result = await flow.async_step_user() + + assert result["type"] == FlowResultType.FORM + assert result["step_id"] == "user" + assert result["errors"] == {} + assert CONF_HOST in result["data_schema"].schema + assert CONF_LOCATION in result["data_schema"].schema + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_config_flow_user_step_success_with_location(self, mock_monitor, hass): + """Test successful config flow with location.""" + # Mock the SunPowerMonitor + mock_instance = mock_monitor.return_value + mock_instance.network_status.return_value = {"status": "ok"} + + flow = ConfigFlow() + flow.hass = hass + flow.context = {} + # Mock the duplicate check to avoid abort + flow._abort_if_unique_id_configured = MagicMock() + + # Test the actual async_step_user method with user input + user_input = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + SUNPOWER_DESCRIPTIVE_NAMES: True, + SUNPOWER_PRODUCT_NAMES: False, + } + + hass.async_add_executor_job.return_value = {"status": "ok"} + result = await flow.async_step_user(user_input) + + assert result["type"] == FlowResultType.CREATE_ENTRY + assert result["title"] == f"PVS {TEST_HOST} - {TEST_LOCATION}" + assert result["data"] == user_input + # Verify unique_id was set correctly (current implementation uses just host) + assert flow.unique_id == TEST_HOST + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_config_flow_user_step_success_without_location(self, mock_monitor, hass): + """Test successful config flow without location.""" + # Mock the SunPowerMonitor + mock_instance = mock_monitor.return_value + mock_instance.network_status.return_value = {"status": "ok"} + + flow = ConfigFlow() + flow.hass = hass + flow.context = {} + # Mock the duplicate check to avoid abort + flow._abort_if_unique_id_configured = MagicMock() + + # Test without location + user_input = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: "", # Empty location + SUNPOWER_DESCRIPTIVE_NAMES: True, + SUNPOWER_PRODUCT_NAMES: False, + } + + hass.async_add_executor_job.return_value = {"status": "ok"} + result = await flow.async_step_user(user_input) + + expected_title = f"PVS {TEST_HOST}" + assert result["type"] == FlowResultType.CREATE_ENTRY + assert result["title"] == expected_title + assert result["data"] == user_input + # Verify unique_id was set correctly (should be just host) + assert flow.unique_id == TEST_HOST + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_config_flow_connection_error(self, mock_monitor, hass): + """Test config flow with connection error.""" + # Mock connection failure + mock_instance = mock_monitor.return_value + mock_instance.network_status.side_effect = Exception("Connection failed") + + flow = ConfigFlow() + flow.hass = hass + flow.context = {} + # Mock the duplicate check to avoid abort + flow._abort_if_unique_id_configured = MagicMock() + + user_input = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + SUNPOWER_DESCRIPTIVE_NAMES: True, + SUNPOWER_PRODUCT_NAMES: False, + } + + # Mock the executor job to raise a ConnectionException + from custom_components.sunpower.sunpower import ConnectionException + + async def mock_executor_job(func, *args): + raise ConnectionException("Connection failed") + + hass.async_add_executor_job.side_effect = mock_executor_job + result = await flow.async_step_user(user_input) + + assert result["type"] == FlowResultType.FORM + assert result["errors"]["base"] == "cannot_connect" + + def test_unique_id_logic_with_location(self): + """Test unique ID generation logic with location name.""" + # Test the actual logic used in the config flow + user_input = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + } + + # This is the actual logic from the config flow + unique_id = user_input[SUNPOWER_HOST] + if user_input.get(CONF_LOCATION): + host = user_input[SUNPOWER_HOST] + location = user_input[CONF_LOCATION] + unique_id = f"{host}_{location}" + + assert unique_id == f"{TEST_HOST}_{TEST_LOCATION}" + + def test_unique_id_logic_without_location(self): + """Test unique ID generation logic without location name.""" + # Test the actual logic used in the config flow + user_input = { + SUNPOWER_HOST: TEST_HOST, + } + + # This is the actual logic from the config flow + unique_id = user_input[SUNPOWER_HOST] + if user_input.get(CONF_LOCATION): + host = user_input[SUNPOWER_HOST] + location = user_input[CONF_LOCATION] + unique_id = f"{host}_{location}" + + assert unique_id == TEST_HOST + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_config_flow_import_step(self, mock_monitor, hass): + """Test the import step.""" + # Mock the SunPowerMonitor + mock_instance = mock_monitor.return_value + mock_instance.network_status.return_value = {"status": "ok"} + + flow = ConfigFlow() + flow.hass = hass + flow.context = {} + # Mock the duplicate check to avoid abort + flow._abort_if_unique_id_configured = MagicMock() + + import_data = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + SUNPOWER_DESCRIPTIVE_NAMES: True, + SUNPOWER_PRODUCT_NAMES: False, + } + + # Mock the executor job for import step too + hass.async_add_executor_job.return_value = {"status": "ok"} + + # Test the actual import step which should call user step + result = await flow.async_step_import(import_data) + + # Import step should set unique_id (current implementation uses just host) + assert flow.unique_id == TEST_HOST + # The result should be a CREATE_ENTRY (since import calls user step) + assert result["type"] == FlowResultType.CREATE_ENTRY + + +class TestValidateInput: + """Test the validate_input function.""" + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_validate_input_with_location(self, mock_monitor, hass): + """Test validate_input function with location name.""" + # Mock the SunPowerMonitor + mock_instance = mock_monitor.return_value + mock_instance.network_status.return_value = {"status": "ok"} + + data = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + } + + # Test the actual validate_input function + hass.async_add_executor_job.return_value = {"status": "ok"} + result = await validate_input(hass, data) + + assert result["title"] == f"PVS {TEST_HOST} - {TEST_LOCATION}" + # Verify the SunPowerMonitor was called correctly + mock_monitor.assert_called_once_with(TEST_HOST) + hass.async_add_executor_job.assert_called_once() + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_validate_input_without_location(self, mock_monitor, hass): + """Test validate_input function without location name.""" + # Mock the SunPowerMonitor + mock_instance = mock_monitor.return_value + mock_instance.network_status.return_value = {"status": "ok"} + + data = { + SUNPOWER_HOST: TEST_HOST, + # No CONF_LOCATION + } + + # Test the actual validate_input function + hass.async_add_executor_job.return_value = {"status": "ok"} + result = await validate_input(hass, data) + + expected_title = f"PVS {TEST_HOST}" + assert result["title"] == expected_title + # Verify the SunPowerMonitor was called correctly + mock_monitor.assert_called_once_with(TEST_HOST) + hass.async_add_executor_job.assert_called_once() + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_validate_input_empty_location(self, mock_monitor, hass): + """Test validate_input function with empty location name.""" + # Mock the SunPowerMonitor + mock_instance = mock_monitor.return_value + mock_instance.network_status.return_value = {"status": "ok"} + + data = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: "", # Empty string + } + + # Test the actual validate_input function + hass.async_add_executor_job.return_value = {"status": "ok"} + result = await validate_input(hass, data) + + expected_title = f"PVS {TEST_HOST}" + assert result["title"] == expected_title + # Verify the SunPowerMonitor was called correctly + mock_monitor.assert_called_once_with(TEST_HOST) + hass.async_add_executor_job.assert_called_once() + + @pytest.mark.asyncio() + @patch("custom_components.sunpower.config_flow.SunPowerMonitor") + async def test_validate_input_connection_error(self, mock_monitor, hass): + """Test validate_input function with connection error.""" + # Mock the SunPowerMonitor to raise an exception + mock_instance = mock_monitor.return_value + mock_instance.network_status.side_effect = Exception("Connection failed") + + data = { + SUNPOWER_HOST: TEST_HOST, + CONF_LOCATION: TEST_LOCATION, + } + + # Mock the executor job to raise a ConnectionException + from custom_components.sunpower.sunpower import ConnectionException + + async def mock_executor_job(func, *args): + raise ConnectionException("Connection failed") + + # Test the actual validate_input function + hass.async_add_executor_job.side_effect = mock_executor_job + with pytest.raises(CannotConnect): + await validate_input(hass, data) + + # Verify the SunPowerMonitor was called correctly + mock_monitor.assert_called_once_with(TEST_HOST) diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..716fecd --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,223 @@ +"""Test the SunPower integration setup and multi-account support.""" + +from unittest.mock import ( + MagicMock, + patch, +) + +import pytest + +from custom_components.sunpower import ( + ENTRY_DATA_CACHE, + sunpower_fetch, +) +from custom_components.sunpower.const import DOMAIN + + +@pytest.fixture() +def hass(): + """Create a test Home Assistant instance.""" + from unittest.mock import AsyncMock + + hass_instance = MagicMock() + hass_instance.config_entries = MagicMock() + hass_instance.async_add_executor_job = AsyncMock() + hass_instance.data = {DOMAIN: {}} + + return hass_instance + + +class TestIntegrationSetup: + """Test integration setup and multi-account support.""" + + def test_sunpower_fetch_with_entry_id(self): + """Test that sunpower_fetch function properly uses entry-specific caching.""" + from custom_components.sunpower.sunpower import SunPowerMonitor + + # Clear any existing cache + ENTRY_DATA_CACHE.clear() + + entry_id1 = "test_entry_1" + entry_id2 = "test_entry_2" + + mock_monitor1 = MagicMock(spec=SunPowerMonitor) + mock_monitor1.device_list.return_value = { + "devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST1"}], + } + + mock_monitor2 = MagicMock(spec=SunPowerMonitor) + mock_monitor2.device_list.return_value = { + "devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST2"}], + } + + # Test the actual sunpower_fetch function + data1 = sunpower_fetch(mock_monitor1, 120, 60, entry_id1) + assert entry_id1 in ENTRY_DATA_CACHE + assert entry_id2 not in ENTRY_DATA_CACHE + + # Verify the function called the monitor + mock_monitor1.device_list.assert_called_once() + + # Second call should create separate cache for entry2 + data2 = sunpower_fetch(mock_monitor2, 120, 60, entry_id2) + assert entry_id1 in ENTRY_DATA_CACHE + assert entry_id2 in ENTRY_DATA_CACHE + + # Verify caches are independent + cache1 = ENTRY_DATA_CACHE[entry_id1] + cache2 = ENTRY_DATA_CACHE[entry_id2] + + assert cache1["pvs_sample"] != cache2["pvs_sample"] + assert data1 != data2 + + # Verify the second function called the second monitor + mock_monitor2.device_list.assert_called_once() + + # Clean up + ENTRY_DATA_CACHE.clear() + + def test_cache_prevents_unnecessary_api_calls(self): + """Test that cache prevents API calls within the update interval.""" + from custom_components.sunpower.sunpower import SunPowerMonitor + + # Clear any existing cache + ENTRY_DATA_CACHE.clear() + + entry_id = "test_entry_cache" + sunpower_update_interval = 120 # 2 minutes + sunvault_update_interval = 60 # 1 minute + + # Create mock monitor + mock_monitor = MagicMock(spec=SunPowerMonitor) + mock_monitor.device_list.return_value = { + "devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST123"}], + } + + # First call - should trigger API call (cache is empty) + with patch("time.time", return_value=1000.0): # Mock current time + data1 = sunpower_fetch( + mock_monitor, + sunpower_update_interval, + sunvault_update_interval, + entry_id, + ) + + # Verify API was called + assert mock_monitor.device_list.call_count == 1 + + # Verify cache was populated + assert entry_id in ENTRY_DATA_CACHE + cache = ENTRY_DATA_CACHE[entry_id] + assert cache["pvs_sample_time"] == 1000.0 + assert cache["pvs_sample"] == {"devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST123"}]} + + # Second call within cache duration - should NOT trigger API call + with patch("time.time", return_value=1050.0): # 50 seconds later (within 120s interval) + data2 = sunpower_fetch( + mock_monitor, + sunpower_update_interval, + sunvault_update_interval, + entry_id, + ) + + # Verify API was NOT called again (still 1 call total) + assert mock_monitor.device_list.call_count == 1 + + # Verify same data returned + assert data1 == data2 + + # Verify cache timestamp unchanged (no new fetch) + assert cache["pvs_sample_time"] == 1000.0 + + # Third call after cache expiry - should trigger API call + with patch("time.time", return_value=1200.0): # 200 seconds later (beyond 120s interval) + mock_monitor.device_list.return_value = { + "devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST456"}], + } + data3 = sunpower_fetch( + mock_monitor, + sunpower_update_interval, + sunvault_update_interval, + entry_id, + ) + + # Verify API was called again (now 2 calls total) + assert mock_monitor.device_list.call_count == 2 + + # Verify cache was updated with new timestamp and data + assert cache["pvs_sample_time"] == 1200.0 + assert cache["pvs_sample"] == {"devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST456"}]} + + # Verify new data is different + assert data1 != data3 + + # Clean up + ENTRY_DATA_CACHE.clear() + + def test_entry_cache_isolation(self): + """Test that multiple entries have properly isolated caches.""" + from custom_components.sunpower.sunpower import SunPowerMonitor + + # Clear any existing cache + ENTRY_DATA_CACHE.clear() + + entry_id1 = "test_entry_1" + entry_id2 = "test_entry_2" + + # Create mock monitors for different entries + mock_monitor1 = MagicMock(spec=SunPowerMonitor) + mock_monitor1.device_list.return_value = { + "devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST1"}], + } + + mock_monitor2 = MagicMock(spec=SunPowerMonitor) + mock_monitor2.device_list.return_value = { + "devices": [{"DEVICE_TYPE": "PVS", "SERIAL": "TEST2"}], + } + + # Fetch data for both entries at same time + with patch("time.time", return_value=1000.0): + data1 = sunpower_fetch(mock_monitor1, 120, 60, entry_id1) + data2 = sunpower_fetch(mock_monitor2, 120, 60, entry_id2) + + # Verify both entries have separate cache entries + assert entry_id1 in ENTRY_DATA_CACHE + assert entry_id2 in ENTRY_DATA_CACHE + assert ENTRY_DATA_CACHE[entry_id1] != ENTRY_DATA_CACHE[entry_id2] + + # Verify different data but same timestamp (called at same time) + cache1 = ENTRY_DATA_CACHE[entry_id1] + cache2 = ENTRY_DATA_CACHE[entry_id2] + assert cache1["pvs_sample_time"] == cache2["pvs_sample_time"] == 1000.0 + assert cache1["pvs_sample"] != cache2["pvs_sample"] + assert data1 != data2 + + # Test that cache behavior is independent for each entry + with patch("time.time", return_value=1050.0): # Within cache duration + # Call entry1 again - should use cache + data1_cached = sunpower_fetch(mock_monitor1, 120, 60, entry_id1) + # Call entry2 again - should also use cache + data2_cached = sunpower_fetch(mock_monitor2, 120, 60, entry_id2) + + # Verify no additional API calls were made (each monitor called once) + assert mock_monitor1.device_list.call_count == 1 + assert mock_monitor2.device_list.call_count == 1 + + # Verify same data returned + assert data1 == data1_cached + assert data2 == data2_cached + + # Test cleanup of one entry doesn't affect the other + ENTRY_DATA_CACHE.pop(entry_id1) + assert entry_id1 not in ENTRY_DATA_CACHE + assert entry_id2 in ENTRY_DATA_CACHE + + # Entry2 should still work with its cache + with patch("time.time", return_value=1080.0): + data2_still_cached = sunpower_fetch(mock_monitor2, 120, 60, entry_id2) + + assert mock_monitor2.device_list.call_count == 1 # Still only 1 call + assert data2 == data2_still_cached + + # Clean up + ENTRY_DATA_CACHE.clear() diff --git a/tests/test_sunpower_api.py b/tests/test_sunpower_api.py new file mode 100644 index 0000000..175cae5 --- /dev/null +++ b/tests/test_sunpower_api.py @@ -0,0 +1,77 @@ +"""Test the SunPower API client.""" + +import os +import sys +from unittest.mock import ( + Mock, + patch, +) + +import pytest +import requests + +# Add custom_components to path before importing custom modules +sys.path.insert( + 0, + os.path.join(os.path.dirname(__file__), "..", "custom_components"), +) # noqa: E402 + +from sunpower.sunpower import ( # noqa: E402 + ConnectionException, + SunPowerMonitor, +) + + +class TestSunPowerMonitor: + """Test the SunPowerMonitor class.""" + + def test_init(self): + """Test SunPowerMonitor initialization.""" + monitor = SunPowerMonitor("192.168.1.100") + assert monitor.host == "192.168.1.100" + assert monitor.command_url == "http://192.168.1.100/cgi-bin/dl_cgi?Command=" + + @patch("sunpower.sunpower.requests") + def test_device_list_success(self, mock_requests): + """Test successful device list retrieval.""" + # Mock response + mock_response = Mock() + mock_response.json.return_value = {"devices": []} + mock_requests.get.return_value = mock_response + + monitor = SunPowerMonitor("192.168.1.100") + result = monitor.device_list() + + assert result == {"devices": []} + mock_requests.get.assert_called_once_with( + "http://192.168.1.100/cgi-bin/dl_cgi?Command=DeviceList", + timeout=120, + ) + + @patch("sunpower.sunpower.requests") + def test_device_list_connection_error(self, mock_requests): + """Test device list with connection error.""" + # Mock the exception class as well + mock_requests.exceptions.RequestException = requests.exceptions.RequestException + mock_requests.get.side_effect = requests.exceptions.RequestException("Connection failed") + + monitor = SunPowerMonitor("192.168.1.100") + + with pytest.raises(ConnectionException): + monitor.device_list() + + @patch("sunpower.sunpower.requests") + def test_network_status_success(self, mock_requests): + """Test successful network status retrieval.""" + mock_response = Mock() + mock_response.json.return_value = {"status": "ok"} + mock_requests.get.return_value = mock_response + + monitor = SunPowerMonitor("192.168.1.100") + result = monitor.network_status() + + assert result == {"status": "ok"} + mock_requests.get.assert_called_once_with( + "http://192.168.1.100/cgi-bin/dl_cgi?Command=Get_Comm", + timeout=120, + )