diff --git a/src/viam/app/billing_client.py b/src/viam/app/billing_client.py index b49f1254c..3820842cc 100644 --- a/src/viam/app/billing_client.py +++ b/src/viam/app/billing_client.py @@ -5,6 +5,8 @@ from viam import logging from viam.proto.app.billing import ( BillingServiceStub, + CreateInvoiceAndChargeImmediatelyRequest, + CreateInvoiceAndChargeImmediatelyResponse, GetCurrentMonthUsageRequest, GetCurrentMonthUsageResponse, GetInvoicePdfRequest, @@ -141,3 +143,39 @@ async def get_org_billing_information(self, org_id: str, timeout: Optional[float """ request = GetOrgBillingInformationRequest(org_id=org_id) return await self._billing_client.GetOrgBillingInformation(request, metadata=self._metadata, timeout=timeout) + + async def create_invoice_and_charge_immediately( + self, + org_id_to_charge: str, + amount: float, + description: Optional[str] = None, + org_id_for_branding: Optional[str] = None, + timeout: Optional[float] = None, + ) -> None: + """Create an invoice and charge the organization immediately. + + :: + + await billing_client.create_invoice_and_charge_immediately( + org_id_to_charge="", + amount=10.50, + description="Test invoice", + org_id_for_branding="" + ) + + Args: + org_id_to_charge (str): The organization ID to charge. + amount (float): The amount to charge in cents. + description (Optional[str], optional): A description for the invoice. Defaults to None. + org_id_for_branding (Optional[str], optional): The organization ID to use for branding. Defaults to None. + timeout (Optional[float], optional): The timeout in seconds for the request. Defaults to None. + + For more information, see `Billing Client API `_. + """ + request = CreateInvoiceAndChargeImmediatelyRequest( + org_id_to_charge=org_id_to_charge, + amount_cents=int(amount * 100), # Assuming amount is in dollars, convert to cents + description=description, + org_id_for_branding=org_id_for_branding, + ) + await self._billing_client.CreateInvoiceAndChargeImmediately(request, metadata=self._metadata, timeout=timeout) diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 66c463607..6c9a2130e 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -90,6 +90,8 @@ ListDatasetsByIDsResponse, ListDatasetsByOrganizationIDRequest, ListDatasetsByOrganizationIDResponse, + MergeDatasetsRequest, + MergeDatasetsResponse, RenameDatasetRequest, ) from viam.proto.app.datasync import ( @@ -308,6 +310,8 @@ class DataPipelineRun: """The start time of the data that was processed in the run.""" data_end_time: datetime """The end time of the data that was processed in the run.""" + error_message: str + """The error message, if the run failed.""" @classmethod def from_proto(cls, data_pipeline_run: ProtoDataPipelineRun) -> Self: @@ -318,6 +322,7 @@ def from_proto(cls, data_pipeline_run: ProtoDataPipelineRun) -> Self: end_time=data_pipeline_run.end_time.ToDatetime(), data_start_time=data_pipeline_run.data_start_time.ToDatetime(), data_end_time=data_pipeline_run.data_end_time.ToDatetime(), + error_message=data_pipeline_run.error_message, ) @dataclass @@ -2031,6 +2036,29 @@ async def _list_data_pipeline_runs(self, id: str, page_size: int, page_token: st response: ListDataPipelineRunsResponse = await self._data_pipelines_client.ListDataPipelineRuns(request, metadata=self._metadata) return DataClient.DataPipelineRunsPage.from_proto(response, self, page_size) + async def merge_datasets(self, dataset_ids: List[str], name: str, organization_id: str) -> str: + """Merge multiple datasets into a new dataset. + + :: + + new_dataset_id = await data_client.merge_datasets( + dataset_ids=["", ""], + name="MergedDataset", + organization_id="" + ) + + Args: + dataset_ids (List[str]): The IDs of the datasets to merge. + name (str): The name of the new dataset. + organization_id (str): The ID of the organization where the new dataset is being created. + + Returns: + str: The ID of the newly created merged dataset. + """ + request = MergeDatasetsRequest(dataset_ids=dataset_ids, name=name, organization_id=organization_id) + response: MergeDatasetsResponse = await self._dataset_client.MergeDatasets(request, metadata=self._metadata) + return response.dataset_id + @staticmethod def create_filter( component_name: Optional[str] = None, diff --git a/src/viam/components/camera/camera.py b/src/viam/components/camera/camera.py index 9faeae9d5..c2e51a80c 100644 --- a/src/viam/components/camera/camera.py +++ b/src/viam/components/camera/camera.py @@ -63,7 +63,7 @@ async def get_image( ... @abc.abstractmethod - async def get_images(self, *, timeout: Optional[float] = None, **kwargs) -> Tuple[List[NamedImage], ResponseMetadata]: + async def get_images(self, *, filter_source_names: Optional[List[str]] = None, timeout: Optional[float] = None, **kwargs) -> Tuple[List[NamedImage], ResponseMetadata]: """Get simultaneous images from different imagers, along with associated metadata. This should not be used for getting a time series of images from the same imager. @@ -75,6 +75,10 @@ async def get_images(self, *, timeout: Optional[float] = None, **kwargs) -> Tupl first_image = images[0] timestamp = metadata.captured_at + Args: + filter_source_names (Optional[List[str]]): If provided, only images from imagers with names + in this list will be returned. + Returns: Tuple[List[NamedImage], ResponseMetadata]: A tuple containing two values; the first [0] a list of images returned from the camera system, and the second [1] the metadata associated with this response. diff --git a/src/viam/components/camera/client.py b/src/viam/components/camera/client.py index 45feacf32..d5b84fe65 100644 --- a/src/viam/components/camera/client.py +++ b/src/viam/components/camera/client.py @@ -13,6 +13,7 @@ GetPointCloudRequest, GetPointCloudResponse, GetPropertiesRequest, + Image, ) from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict @@ -45,16 +46,17 @@ async def get_image( async def get_images( self, + filter_source_names: Optional[List[str]] = None, *, timeout: Optional[float] = None, **kwargs, ) -> Tuple[List[NamedImage], ResponseMetadata]: md = kwargs.get("metadata", self.Metadata()).proto - request = GetImagesRequest(name=self.name) + request = GetImagesRequest(name=self.name, filter_source_names=filter_source_names) response: GetImagesResponse = await self.client.GetImages(request, timeout=timeout, metadata=md) imgs = [] for img_data in response.images: - mime_type = CameraMimeType.from_proto(img_data.format) + mime_type = CameraMimeType.from_string(img_data.mime_type) img = NamedImage(img_data.source_name, img_data.image, mime_type) imgs.append(img) resp_metadata: ResponseMetadata = response.response_metadata diff --git a/src/viam/components/camera/service.py b/src/viam/components/camera/service.py index 936271491..8c589dd47 100644 --- a/src/viam/components/camera/service.py +++ b/src/viam/components/camera/service.py @@ -48,12 +48,12 @@ async def GetImages(self, stream: Stream[GetImagesRequest, GetImagesResponse]) - camera = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None - images, metadata = await camera.get_images(timeout=timeout, metadata=stream.metadata) + images, metadata = await camera.get_images(filter_source_names=list(request.filter_source_names), timeout=timeout, metadata=stream.metadata) img_bytes_lst = [] for img in images: - fmt = img.mime_type.to_proto() + mime_type = img.mime_type.mime_type img_bytes = img.data - img_bytes_lst.append(Image(source_name=name, format=fmt, image=img_bytes)) + img_bytes_lst.append(Image(source_name=name, format=img.mime_type.to_proto(), mime_type=mime_type, image=img_bytes)) response = GetImagesResponse(images=img_bytes_lst, response_metadata=metadata) await stream.send_message(response) diff --git a/tests/mocks/components.py b/tests/mocks/components.py index fd80dc51e..01cfd24e3 100644 --- a/tests/mocks/components.py +++ b/tests/mocks/components.py @@ -368,6 +368,7 @@ def __init__(self, name: str): ts = Timestamp() ts.FromDatetime(datetime(1970, 1, 1)) self.metadata = ResponseMetadata(captured_at=ts) + self.filter_source_names: Optional[List[str]] = None super().__init__(name) async def get_image( @@ -377,7 +378,8 @@ async def get_image( self.timeout = timeout return self.image - async def get_images(self, timeout: Optional[float] = None, **kwargs) -> Tuple[List[NamedImage], ResponseMetadata]: + async def get_images(self, filter_source_names: Optional[List[str]] = None, timeout: Optional[float] = None, **kwargs) -> Tuple[List[NamedImage], ResponseMetadata]: + self.filter_source_names = filter_source_names self.timeout = timeout return [NamedImage(self.name, self.image.data, self.image.mime_type)], self.metadata diff --git a/tests/mocks/services.py b/tests/mocks/services.py index b5fb0ecbc..b8ee00727 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -111,6 +111,7 @@ ListModulesResponse, ListOrganizationMembersRequest, ListOrganizationMembersResponse, + ListOrganizationsWithAccessToLocationResponse, ListOrganizationsByUserRequest, ListOrganizationsByUserResponse, ListOrganizationsRequest, @@ -183,6 +184,8 @@ UploadModuleFileResponse, ) from viam.proto.app.billing import ( + CreateInvoiceAndChargeImmediatelyRequest, + CreateInvoiceAndChargeImmediatelyResponse, GetCurrentMonthUsageRequest, GetCurrentMonthUsageResponse, GetInvoicePdfRequest, @@ -273,6 +276,8 @@ ListDatasetsByIDsResponse, ListDatasetsByOrganizationIDRequest, ListDatasetsByOrganizationIDResponse, + MergeDatasetsRequest, + MergeDatasetsResponse, RenameDatasetRequest, RenameDatasetResponse, ) @@ -1068,6 +1073,9 @@ class MockDataset(DatasetServiceBase): def __init__(self, create_response: str, datasets_response: Sequence[Dataset]): self.create_response = create_response self.datasets_response = datasets_response + self.merge_dataset_ids: Optional[List[str]] = None + self.merge_name: Optional[str] = None + self.merge_org_id: Optional[str] = None async def CreateDataset(self, stream: Stream[CreateDatasetRequest, CreateDatasetResponse]) -> None: request = await stream.recv_message() @@ -1103,6 +1111,14 @@ async def RenameDataset(self, stream: Stream[RenameDatasetRequest, RenameDataset self.name = request.name await stream.send_message((RenameDatasetResponse())) + async def MergeDatasets(self, stream: Stream[MergeDatasetsRequest, MergeDatasetsResponse]) -> None: + request = await stream.recv_message() + assert request is not None + self.merge_dataset_ids = request.dataset_ids + self.merge_name = request.name + self.merge_org_id = request.organization_id + await stream.send_message(MergeDatasetsResponse(dataset_id="new_merged_dataset_id")) + class MockDataSync(DataSyncServiceBase): def __init__(self, file_upload_response: str): @@ -1263,6 +1279,10 @@ def __init__( self.curr_month_usage = curr_month_usage self.invoices_summary = invoices_summary self.billing_info = billing_info + self.org_id_to_charge: Optional[str] = None + self.amount: Optional[float] = None + self.description: Optional[str] = None + self.org_id_for_branding: Optional[str] = None async def GetCurrentMonthUsage(self, stream: Stream[GetCurrentMonthUsageRequest, GetCurrentMonthUsageResponse]) -> None: request = await stream.recv_message() @@ -1290,6 +1310,17 @@ async def GetOrgBillingInformation(self, stream: Stream[GetOrgBillingInformation self.org_id = request.org_id await stream.send_message(self.billing_info) + async def CreateInvoiceAndChargeImmediately( + self, stream: Stream[CreateInvoiceAndChargeImmediatelyRequest, CreateInvoiceAndChargeImmediatelyResponse] + ) -> None: + request = await stream.recv_message() + assert request is not None + self.org_id_to_charge = request.org_id_to_charge + self.amount = request.amount + self.description = request.description + self.org_id_for_branding = request.org_id_for_branding + await stream.send_message(CreateInvoiceAndChargeImmediatelyResponse()) + class MockApp(UnimplementedAppServiceBase): def __init__( diff --git a/tests/test_billing_client.py b/tests/test_billing_client.py index 2d5c093cc..bf1018132 100644 --- a/tests/test_billing_client.py +++ b/tests/test_billing_client.py @@ -8,6 +8,8 @@ GetInvoicesSummaryResponse, GetOrgBillingInformationResponse, InvoiceSummary, + CreateInvoiceAndChargeImmediatelyRequest, + CreateInvoiceAndChargeImmediatelyResponse, ) from .mocks.services import MockBilling @@ -67,6 +69,11 @@ AUTH_TOKEN = "auth_token" BILLING_SERVICE_METADATA = {"authorization": f"Bearer {AUTH_TOKEN}"} +ORG_ID_TO_CHARGE = "org_id_to_charge" +AMOUNT = 10000 +DESCRIPTION = "test description" +ORG_ID_FOR_BRANDING = "org_id_for_branding" + @pytest.fixture(scope="function") def service() -> MockBilling: @@ -105,3 +112,18 @@ async def test_get_org_billing_information(self, service: MockBilling): org_billing_info = await client.get_org_billing_information(org_id=org_id) assert org_billing_info == ORG_BILLING_INFO assert service.org_id == org_id + + async def test_create_invoice_and_charge_immediately(self, service: MockBilling): + async with ChannelFor([service]) as channel: + client = BillingClient(channel, BILLING_SERVICE_METADATA) + response = await client.create_invoice_and_charge_immediately( + org_id=ORG_ID_TO_CHARGE, + amount=AMOUNT, + description=DESCRIPTION, + org_id_for_branding=ORG_ID_FOR_BRANDING, + ) + assert response == CreateInvoiceAndChargeImmediatelyResponse() + assert service.org_id_to_charge == ORG_ID_TO_CHARGE + assert service.amount == AMOUNT + assert service.description == DESCRIPTION + assert service.org_id_for_branding == ORG_ID_FOR_BRANDING diff --git a/tests/test_camera.py b/tests/test_camera.py index 47943e7d9..bb59e6d24 100644 --- a/tests/test_camera.py +++ b/tests/test_camera.py @@ -24,6 +24,7 @@ GetPropertiesResponse, IntrinsicParameters, RenderFrameRequest, + Image ) from viam.resource.manager import ResourceManager from viam.utils import dict_to_struct, struct_to_dict @@ -93,11 +94,12 @@ async def test_get_image(self, camera: MockCamera, image: ViamImage): assert camera.extra == {"1": 1} async def test_get_images(self, camera: Camera, image: ViamImage, metadata: ResponseMetadata): - imgs, md = await camera.get_images() + imgs, md = await camera.get_images(filter_source_names=["cam1", "cam2"]) assert isinstance(imgs[0], NamedImage) assert imgs[0].name == camera.name assert imgs[0].data == image.data assert md == metadata + assert camera.filter_source_names == ["cam1", "cam2"] async def test_get_point_cloud(self, camera: MockCamera, point_cloud: bytes): pc, _ = await camera.get_point_cloud() @@ -155,13 +157,15 @@ async def test_get_images(self, camera: MockCamera, service: CameraRPCService, m async with ChannelFor([service]) as channel: client = CameraServiceStub(channel) - request = GetImagesRequest(name="camera") + request = GetImagesRequest(name="camera", filter_source_names=["cam1", "cam2"]) response: GetImagesResponse = await client.GetImages(request, timeout=18.1) raw_img = response.images[0] assert raw_img.format == Format.FORMAT_PNG assert raw_img.source_name == camera.name + assert raw_img.mime_type == CameraMimeType.PNG assert response.response_metadata == metadata assert camera.timeout == loose_approx(18.1) + assert camera.filter_source_names == ["cam1", "cam2"] async def test_render_frame(self, camera: MockCamera, service: CameraRPCService, image: ViamImage): assert camera.timeout is None @@ -226,12 +230,13 @@ async def test_get_images(self, camera: MockCamera, service: CameraRPCService, i async with ChannelFor([service]) as channel: client = CameraClient("camera", channel) - imgs, md = await client.get_images(timeout=1.82) + imgs, md = await client.get_images(timeout=1.82, filter_source_names=["cam1", "cam2"]) assert isinstance(imgs[0], NamedImage) assert imgs[0].name == camera.name assert imgs[0].data == image.data assert md == metadata assert camera.timeout == loose_approx(1.82) + assert camera.filter_source_names == ["cam1", "cam2"] async def test_get_point_cloud(self, camera: MockCamera, service: CameraRPCService, point_cloud: bytes): assert camera.timeout is None diff --git a/tests/test_data_pipelines.py b/tests/test_data_pipelines.py index 7c1869f67..882f8d204 100644 --- a/tests/test_data_pipelines.py +++ b/tests/test_data_pipelines.py @@ -23,6 +23,8 @@ TIMESTAMP = datetime.fromtimestamp(0) TIMESTAMP_PROTO = datetime_to_timestamp(TIMESTAMP) +ERROR_MESSAGE = "pipeline failed" + PROTO_DATA_PIPELINE = DataPipeline( id=ID, name=NAME, @@ -48,6 +50,7 @@ end_time=TIMESTAMP_PROTO, data_start_time=TIMESTAMP_PROTO, data_end_time=TIMESTAMP_PROTO, + error_message=ERROR_MESSAGE, ) PROTO_DATA_PIPELINE_RUNS = [PROTO_DATA_PIPELINE_RUN] @@ -59,6 +62,7 @@ end_time=TIMESTAMP, data_start_time=TIMESTAMP, data_end_time=TIMESTAMP, + error_message=ERROR_MESSAGE, ) ] diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 86e0840ac..413104a53 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -3,7 +3,7 @@ from grpclib.testing import ChannelFor from viam.app.data_client import DataClient -from viam.proto.app.dataset import Dataset +from viam.proto.app.dataset import Dataset, MergeDatasetsRequest, MergeDatasetsResponse from .mocks.services import MockDataset @@ -19,6 +19,11 @@ AUTH_TOKEN = "auth_token" DATA_SERVICE_METADATA = {"authorization": f"Bearer {AUTH_TOKEN}"} +MERGE_DATASET_IDS = ["dataset_1", "dataset_2"] +MERGE_NAME = "merged_dataset" +MERGE_ORG_ID = "merged_org_id" +MERGED_DATASET_ID = "merged_dataset_id" + @pytest.fixture(scope="function") def service() -> MockDataset: @@ -60,3 +65,12 @@ async def test_rename_dataset(self, service: MockDataset): await client.rename_dataset(ID, NAME) assert service.id == ID assert service.name == NAME + + async def test_merge_datasets(self, service: MockDataset): + async with ChannelFor([service]) as channel: + client = DataClient(channel, DATA_SERVICE_METADATA) + merged_id = await client.merge_datasets(MERGE_DATASET_IDS, MERGE_NAME, MERGE_ORG_ID) + assert service.merge_dataset_ids == MERGE_DATASET_IDS + assert service.merge_name == MERGE_NAME + assert service.merge_org_id == MERGE_ORG_ID + assert merged_id == MERGED_DATASET_ID