|
| 1 | +# Copyright (c) 2021 Software AG, |
| 2 | +# Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, |
| 3 | +# and/or its subsidiaries and/or its affiliates and/or their licensors. |
| 4 | +# Use, reproduction, transfer, publication or disclosure is prohibited except |
| 5 | +# as specifically provided for in your License Agreement with Software AG. |
| 6 | + |
| 7 | +from __future__ import annotations |
| 8 | + |
| 9 | +import json as js |
| 10 | +from typing import List, Set, Generator |
| 11 | +import urllib.parse |
| 12 | +import uuid |
| 13 | + |
| 14 | +from c8y_api._base_api import CumulocityRestApi |
| 15 | +from c8y_api.model._base import SimpleObject, CumulocityResource |
| 16 | +from c8y_api.model._parser import SimpleObjectParser |
| 17 | + |
| 18 | + |
| 19 | +class Subscription(SimpleObject): |
| 20 | + """ Represent a Notification 2.0 subscription within the database. |
| 21 | +
|
| 22 | + Instances of this class are returned by functions of the corresponding |
| 23 | + Subscriptions API. Use this class to create new options. |
| 24 | +
|
| 25 | + See also: https://cumulocity.com/api/#tag/Subscriptions |
| 26 | + """ |
| 27 | + |
| 28 | + class Context(object): |
| 29 | + """Notification context types.""" |
| 30 | + MANAGED_OBJECT = 'mo' |
| 31 | + TENANT = 'tenant' |
| 32 | + |
| 33 | + class ApiFilter(object): |
| 34 | + """Notification API filter types.""" |
| 35 | + ANY = '*' |
| 36 | + ALARMS = 'alarms' |
| 37 | + ALARMS_WITH_CHILDREN = 'alarmsWithChildren' |
| 38 | + EVENTS = 'events' |
| 39 | + EVENTS_WITH_CHILDREN = 'eventsWithChildren' |
| 40 | + MANAGED_OBJECTS = 'managedobjects' |
| 41 | + MEASUREMENTS = 'measurements' |
| 42 | + OPERATIONS = 'operations' |
| 43 | + |
| 44 | + _resource = '/notification2/subscriptions' |
| 45 | + _parser = SimpleObjectParser({ |
| 46 | + 'name': 'subscription', |
| 47 | + 'context': 'context', |
| 48 | + 'fragments': 'fragmentsToCopy'}) |
| 49 | + _accept = 'application/vnd.com.nsn.cumulocity.subscription+json' |
| 50 | + |
| 51 | + def __init__(self, c8y: CumulocityRestApi = None, name: str = None, context: str = None, source_id: str = None, |
| 52 | + api_filter: List[str] = None, type_filter: str = None, |
| 53 | + fragments: List[str] = None): |
| 54 | + """ Create a new Subscription instance. |
| 55 | +
|
| 56 | + Args: |
| 57 | + c8y (CumulocityRestApi): Cumulocity connection reference; needs |
| 58 | + to be set for direct manipulation (create, delete) |
| 59 | + name (str): Subscription name |
| 60 | + context (str): Subscription context. |
| 61 | + source_id (str): Managed object ID the subscription is for. |
| 62 | + api_filter (List[str]): List of APIs/resources to subscribe for. |
| 63 | + type_filter (str): Object type the subscription is for. |
| 64 | +
|
| 65 | + Returns: |
| 66 | + Subscription instance |
| 67 | + """ |
| 68 | + super().__init__(c8y) |
| 69 | + self.name = name |
| 70 | + self.context = context |
| 71 | + self.source_id = source_id |
| 72 | + self.api_filter = api_filter |
| 73 | + self.type_filter = type_filter |
| 74 | + self.fragments = fragments |
| 75 | + |
| 76 | + def _to_json(self, only_updated=False, exclude: Set[str] = None) -> dict: |
| 77 | + json = super()._to_json(only_updated=only_updated, exclude=exclude) |
| 78 | + if self.source_id: |
| 79 | + json['source'] = {'id': self.source_id} |
| 80 | + if self.api_filter or self.type_filter: |
| 81 | + subscription_filter = {'apis': self.api_filter if self.api_filter else None, |
| 82 | + 'typeFilter': self.type_filter if self.type_filter else None} |
| 83 | + json['subscriptionFilter'] = subscription_filter |
| 84 | + return json |
| 85 | + |
| 86 | + @classmethod |
| 87 | + def from_json(cls, json: dict) -> Subscription: |
| 88 | + """Create a Subscription instance from Cumulocity JSON format. |
| 89 | +
|
| 90 | + Caveat: this function is primarily for internal use and does not |
| 91 | + return a full representation of the JSON. It is used for object |
| 92 | + creation and update within Cumulocity. |
| 93 | +
|
| 94 | + Params: |
| 95 | + json (dict): The JSON to parse. |
| 96 | +
|
| 97 | + Returns: |
| 98 | + A Subscription instance. |
| 99 | + """ |
| 100 | + subscription = super()._from_json(json, Subscription()) |
| 101 | + subscription.source_id = json['source']['id'] |
| 102 | + if 'subscriptionFilter' in json: |
| 103 | + if 'apis' in json['subscriptionFilter']: |
| 104 | + subscription.api_filter = json['subscriptionFilter']['apis'] |
| 105 | + if 'typeFilter' in json['subscriptionFilter']: |
| 106 | + subscription.type_filter = json['subscriptionFilter']['typeFilter'] |
| 107 | + return subscription |
| 108 | + |
| 109 | + def create(self) -> Subscription: |
| 110 | + """ Create a new subscription within the database. |
| 111 | +
|
| 112 | + Returns: |
| 113 | + A fresh Subscription instance representing the created |
| 114 | + subscription within the database. |
| 115 | +
|
| 116 | + See also function Subscriptions.create which doesn't parse the result. |
| 117 | + """ |
| 118 | + return self._create() |
| 119 | + |
| 120 | + |
| 121 | +class Subscriptions(CumulocityResource): |
| 122 | + """Provides access to the Notification 2.0 Subscriptions API. |
| 123 | +
|
| 124 | + This class can be used for get, search for, create, and |
| 125 | + delete Notification2 subscriptions within the Cumulocity database. |
| 126 | +
|
| 127 | + See also: https://cumulocity.com/api/#tag/Subscriptions |
| 128 | + https://cumulocity.com/guides/reference/notifications/ |
| 129 | + """ |
| 130 | + |
| 131 | + def __init__(self, c8y: CumulocityRestApi): |
| 132 | + super().__init__(c8y, '/notification2/subscriptions') |
| 133 | + |
| 134 | + def get(self, subscription_id: str) -> Subscription: |
| 135 | + """ Retrieve a specific subscription from the database. |
| 136 | +
|
| 137 | + Args: |
| 138 | + subscription_id (str): Subscription ID |
| 139 | +
|
| 140 | + Returns: |
| 141 | + A Subscription instance |
| 142 | +
|
| 143 | + Raises: |
| 144 | + KeyError if the given ID is not defined within the database |
| 145 | + """ |
| 146 | + subscription = Subscription.from_json(super()._get_object(subscription_id)) |
| 147 | + subscription.c8y = self.c8y # inject c8y connection into instance |
| 148 | + return subscription |
| 149 | + |
| 150 | + def select(self, context: str = None, source: str = None, |
| 151 | + limit: int = None, page_size: int = 1000) -> Generator[Subscription]: |
| 152 | + """ Query the database for subscriptions and iterate over the |
| 153 | + results. |
| 154 | +
|
| 155 | + This function is implemented in a lazy fashion - results will only be |
| 156 | + fetched from the database as long there is a consumer for them. |
| 157 | +
|
| 158 | + All parameters are considered to be filters, limiting the result set |
| 159 | + to objects which meet the filters' specification. Filters can be |
| 160 | + combined (within reason). |
| 161 | +
|
| 162 | + Args: |
| 163 | + context (str): Subscription context. |
| 164 | + source (str): Managed object ID the subscription is for. |
| 165 | + limit (int): Limit the number of results to this number. |
| 166 | + page_size (int): Define the number of objects which are read (and |
| 167 | + parsed in one chunk). This is a performance related setting. |
| 168 | +
|
| 169 | + Returns: |
| 170 | + Generator for Subscription instances |
| 171 | + """ |
| 172 | + base_query = self._build_base_query(context=context, source=source, page_size=page_size) |
| 173 | + return super()._iterate(base_query, limit, Subscription.from_json) |
| 174 | + |
| 175 | + def get_all(self, context: str = None, source: str = None, |
| 176 | + limit: int = None, page_size: int = 1000) -> List[Subscription]: |
| 177 | + """ Query the database for subscriptions and return the results |
| 178 | + as list. |
| 179 | +
|
| 180 | + This function is a greedy version of the `select` function. All |
| 181 | + available results are read immediately and returned as list. |
| 182 | +
|
| 183 | + Returns: |
| 184 | + List of Subscription instances. |
| 185 | + """ |
| 186 | + return list(self.select(context=context, source=source, limit=limit, page_size=page_size)) |
| 187 | + |
| 188 | + def create(self, *subscriptions: Subscription) -> None: |
| 189 | + """ Create subscriptions within the database. |
| 190 | +
|
| 191 | + Args: |
| 192 | + subscriptions (*TenantOption): Collection of Subscription instances |
| 193 | + """ |
| 194 | + super()._create(Subscription.to_full_json, *subscriptions) |
| 195 | + |
| 196 | + def delete_by(self, context: str = None, source: str = None) -> None: |
| 197 | + """ Delete subscriptions within the database. |
| 198 | +
|
| 199 | + Args: |
| 200 | + context (str): Subscription context |
| 201 | + source (str): Managed object ID the subscription is for. |
| 202 | + """ |
| 203 | + base_query = self._build_base_query(context=context, source=source) |
| 204 | + # remove &page_number= from the end |
| 205 | + query = base_query[:base_query.rindex('&')] |
| 206 | + self.c8y.delete(query) |
| 207 | + |
| 208 | + |
| 209 | +class Tokens(CumulocityResource): |
| 210 | + """Provides access to the Notification 2.0 token generation API. |
| 211 | +
|
| 212 | + This class can be used for get, search for, create, and |
| 213 | + delete Notification2 subscriptions within the Cumulocity database. |
| 214 | +
|
| 215 | + See also: https://cumulocity.com/api/#tag/Tokens |
| 216 | + https://cumulocity.com/guides/reference/notifications/ |
| 217 | + """ |
| 218 | + |
| 219 | + _subscriber_uuid = uuid.uuid5(uuid.NAMESPACE_URL, 'https://github.com/SoftwareAG/cumulocity-python-api') |
| 220 | + _default_subscriber = 'c8yapi' + str(_subscriber_uuid).replace('-', '') |
| 221 | + |
| 222 | + def __init__(self, c8y: CumulocityRestApi): |
| 223 | + super().__init__(c8y, '/notification2') |
| 224 | + self.host = urllib.parse.urlparse(c8y.base_url).netloc |
| 225 | + |
| 226 | + def generate(self, subscription: str, expires: int = 60, subscriber: str = None) -> str: |
| 227 | + """Generate a new access token. |
| 228 | +
|
| 229 | + Args: |
| 230 | + subscription (str): Subscription name |
| 231 | + expires (int): Expiration time in minutes |
| 232 | + subscriber (str): Subscriber Id (name) |
| 233 | +
|
| 234 | + Returns: |
| 235 | + JWT access token as string. |
| 236 | + """ |
| 237 | + td_json = self._build_token_definition(subscription, expires, subscriber) |
| 238 | + token_json = self.c8y.post(self.resource + '/token', td_json) |
| 239 | + return token_json['token'] |
| 240 | + |
| 241 | + def renew(self, token: str): |
| 242 | + """Renew a token.""" |
| 243 | + |
| 244 | + def unsubscribe(self, token: str): |
| 245 | + """Invalidate a token and unsubscribe a subscriber. |
| 246 | +
|
| 247 | + Args: |
| 248 | + token (str): Subscribed token |
| 249 | + """ |
| 250 | + result_json = self.c8y.post(self.resource + '/unsubscribe?token=' + token, json={}) |
| 251 | + if not result_json['result'] == 'DONE': |
| 252 | + raise RuntimeError(f"Unexpected response: {js.dumps(result_json)}") |
| 253 | + |
| 254 | + def build_websocket_uri(self, token: str): |
| 255 | + """Build websocket access URL. |
| 256 | +
|
| 257 | + Args: |
| 258 | + token (str): Subscriber access token |
| 259 | +
|
| 260 | + Returns: |
| 261 | + A websocket (wss://) URL to access the subscriber channel. |
| 262 | + """ |
| 263 | + return f'wss://{self.host}/notification2/consumer/?token={token}' |
| 264 | + |
| 265 | + def _build_token_definition(self, subscription: str, expires: int, subscriber: str = None): |
| 266 | + return { |
| 267 | + 'subscriber': subscriber or self._default_subscriber, |
| 268 | + 'subscription' : subscription, |
| 269 | + 'expiresInMinutes' : expires |
| 270 | + } |
0 commit comments