diff --git a/opentelemetry-instrumentation/pyproject.toml b/opentelemetry-instrumentation/pyproject.toml index 4598d444be..c298bdca4b 100644 --- a/opentelemetry-instrumentation/pyproject.toml +++ b/opentelemetry-instrumentation/pyproject.toml @@ -32,6 +32,14 @@ dependencies = [ "packaging >= 18.0", ] +[project.optional-dependencies] +gcs = [ + "google-cloud-storage==2.19.0" +] +magic = [ + "python-magic==0.4.27" +] + [project.scripts] opentelemetry-bootstrap = "opentelemetry.instrumentation.bootstrap:run" opentelemetry-instrument = "opentelemetry.instrumentation.auto_instrumentation:run" diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/README.md b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/README.md new file mode 100644 index 0000000000..bfd0dd0474 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/README.md @@ -0,0 +1,123 @@ +# Blob Uploader Library (Experimental) + +The Blob Uploader library provides an experimental way to +"write-aside" large or sensitive payloads to a blob storage +system, while retaining references to the written-aside destination +in the operations backend where telemetry is being written. + +This is particularly intended for the use case of request/response +logging, where typical telemetry backends may be unsuitable for +writing this data, either due to size reasons or due to privacy +reasons. GenAI multi-modal prompt/response logging is a particularly +salient motivation for this feature, though general HTTP request/ +response logging is another situation where this is applicable. + +## Usage: Instrumentation Library + +Instrumentation libraries should provide general hooks for handling +requests/responses (or other large blobs) that should be only +conditionally included in telemetry signals. The hooks should provide +enough context to allow a user of the instrumentation library to +conditionally choose what to do with the content including but not +limited to: dropping, including in the telemetry signal, or writing +to a BlobUploader and retaining a reference to the destination URI. + +For example: + +``` + +class RequestHook(abc.ABC): + + @abc.abstractmethod + def handle_request(self, context, signal, request): + pass + + +class ResponseHook(abc.ABC): + + @abc.abstractmethod: + def handle_response(self, context, signal, response): + pass + + +class FooInstrumentationLibrary: + + def __init__(self, + # ..., + request_hook: Optional[RequestHook]=None, + response_hook: Optional[ResponseHook]=None, + # ...) + + ... +``` + + +## Usage: User of Instrumentation Library + +Users of instrumentation libraries can use the Blob Uploader +libraries to implement relevant request/response hooks. + +For example: + +``` +from opentelemetry.instrumentation._blobupload.api import ( + NOT_PROVIDED, + Blob, + BlobUploaderProvider, + get_blob_uploader, + set_blob_uploader_provider) + + +class MyBlobUploaderRequestHook(RequestHook): + # ... + + def handle_request(self, context, signal, request): + if not self.should_uploader(context): + return + use_case = self.select_use_case(context, signal) + uploader = get_blob_uploader(use_case) + blob = Blob( + request.raw_bytes, + content_type=request.content_type, + labels=self.generate_blob_labels(context, signal, request)) + uri = uploader.upload_async(blob) + if uri == NOT_UPLOADED: + return + signal.attributes[REQUEST_ATTRIBUTE] = uri + + # ... + +class MyBlobUploaderProvider(BlobUploaderProvider): + + def get_blob_uploader(self, use_case=None): + # ... + + +def main(): + set_blob_uploader_provider(MyBlobUploaderProvider()) + instrumentation_libary = FooInstrumentationLibrary( + # ..., + request_hook=MyBlobUploaderRequestHook(), + # ... + ) + # ... + +``` + +## Future Work + +As can be seen from the above usage examples, there is quite a +bit of common boilerplate both for instrumentation libraries (e.g. +defining the set of hook interfaces) and for consumers of those +instrumentation libraries (e.g. implementing variants of those hook +interfaces that make use of the BlobUploader libraries). + +A potential future improvement would be to define a common set of +hook interfaces for this use case that can be be reused across +instrumentation libraries and to provide simple drop-in +implementations of those hooks that make use of BlobUploader. + +Beyond this, boilerplate to define a custom 'BlobUploaderProvider' +could be reduced by expanding the capabilities of the default +provider, so that most common uses are covered with a minimal +set of environment variables (if optional deps are present). diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/__init__.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/__init__.py new file mode 100644 index 0000000000..b0a6f42841 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/__init__.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/__init__.py new file mode 100644 index 0000000000..185305b079 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/__init__.py @@ -0,0 +1,49 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Exposes API methods to callers from the package name.""" + +from opentelemetry.instrumentation._blobupload.api.blob import Blob +from opentelemetry.instrumentation._blobupload.api.blob_uploader import ( + BlobUploader, +) +from opentelemetry.instrumentation._blobupload.api.constants import ( + NOT_UPLOADED, +) +from opentelemetry.instrumentation._blobupload.api.content_type import ( + detect_content_type, +) +from opentelemetry.instrumentation._blobupload.api.labels import ( + generate_labels_for_event, + generate_labels_for_span, + generate_labels_for_span_event, +) +from opentelemetry.instrumentation._blobupload.api.provider import ( + BlobUploaderProvider, + get_blob_uploader, + set_blob_uploader_provider, +) + +__all__ = [ + "Blob", + "BlobUploader", + "NOT_UPLOADED", + "detect_content_type", + "generate_labels_for_event", + "generate_labels_for_span", + "generate_labels_for_span_event", + "BlobUploaderProvider", + "get_blob_uploader", + "set_blob_uploader_provider", +] diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/blob.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/blob.py new file mode 100644 index 0000000000..9e6f840a17 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/blob.py @@ -0,0 +1,125 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import json +from types import MappingProxyType as _frozendict +from typing import Mapping, Optional + + +class Blob: + """Represents an opaque binary object and associated metadata. + + This object conteptually has the following properties: + + - raw_bytes: the actual data (payload) of the Blob + - content_type: metadata about the content type (e.g. "image/jpeg") + - labels: key/value data that can be used to identify and contextualize + the object such as {"trace_id": "...", "span_id": "...", "filename": ...} + """ + + def __init__( + self, + raw_bytes: bytes, + content_type: Optional[str] = None, + labels: Optional[Mapping[str, str]] = None, + ): + """Initialize the blob with an explicit set of properties. + + Args: + raw_bytes: the required payload + content_type: the MIME type describing the type of data in the payload + labels: additional key/value data about the Blob + """ + self._raw_bytes = raw_bytes + self._content_type = content_type + self._labels = {} + if labels is not None: + if isinstance(labels, dict): + self._labels.update(labels) + else: + for k in labels: + self._labels[k] = labels[k] + + @staticmethod + def from_data_uri(uri: str, labels: Optional[Mapping[str, str]] = None) -> "Blob": + """Instantiate a blob from a 'data:...' URI. + + Args: + uri: A URI in the 'data:' format. Supports a subset of 'data:' URIs + that encode the data with the 'base64' extension and that include + a content type. Should work with any normal 'image/jpeg', 'image/png', + 'application/pdf', 'audio/aac', and many others. DOES NOT SUPPORT + encoding data as percent-encoded text (no "base64"). + + labels: Additional key/value data to include in the constructed Blob. + """ + if not uri.startswith("data:"): + raise ValueError( + 'Invalid "uri"; expected "data:" prefix. Found: "{}"'.format( + uri + ) + ) + if ";base64," not in uri: + raise ValueError( + 'Invalid "uri"; expected ";base64," section. Found: "{}"'.format( + uri + ) + ) + data_prefix_len = len("data:") + after_data_prefix = uri[data_prefix_len:] + if ";" not in after_data_prefix: + raise ValueError( + 'Invalid "uri"; expected ";" in URI. Found: "{}"'.format(uri) + ) + content_type, remaining = after_data_prefix.split(";", 1) + while not remaining.startswith("base64,"): + _, remaining = remaining.split(";", 1) + assert remaining.startswith("base64,") + base64_len = len("base64,") + base64_encoded_content = remaining[base64_len:] + raw_bytes = base64.b64decode(base64_encoded_content) + return Blob(raw_bytes, content_type=content_type, labels=labels) + + @property + def raw_bytes(self) -> bytes: + """Returns the raw bytes (payload) of this Blob.""" + return self._raw_bytes + + @property + def content_type(self) -> Optional[str]: + """Returns the content type (or None) of this Blob.""" + return self._content_type + + @property + def labels(self) -> Mapping[str, str]: + """Returns the key/value metadata of this Blob.""" + return _frozendict(self._labels) + + def __eq__(self, o: Any) -> bool: + return ( + (isinstance(o, Blob)) and + (self.raw_bytes == o.raw_bytes) and + (self.content_type == o.content_type) and + (self.labels == o.labels) + ) + + def __repr__(self) -> str: + params = [repr(self._raw_bytes)] + if self._content_type is not None: + params.append(f"content_type={self._content_type!r}") + if self._labels: + params.append("labels={}".format(json.dumps(self._labels, sort_keys=True))) + params_string = ", ".join(params) + return "Blob({})".format(params_string) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/blob_uploader.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/blob_uploader.py new file mode 100644 index 0000000000..8f8a47729a --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/blob_uploader.py @@ -0,0 +1,30 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines an interface for performing asynchronous blob uploading.""" + +import abc + +from opentelemetry.instrumentation._blobupload.api.blob import Blob +from opentelemetry.instrumentation._blobupload.api.constants import ( + NOT_UPLOADED, +) + + +class BlobUploader(abc.ABC): + """Pure abstract base class representing a component that does blob uploading.""" + + @abc.abstractmethod + def upload_async(self, blob: Blob) -> str: + return NOT_UPLOADED diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/constants.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/constants.py new file mode 100644 index 0000000000..2b96051f80 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/constants.py @@ -0,0 +1,18 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines constants that are used by the '_blobupload' package.""" + +# Special constant used to indicate that a BlobUploader did not upload. +NOT_UPLOADED = "/dev/null" diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/content_type.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/content_type.py new file mode 100644 index 0000000000..bcaf922f33 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/content_type.py @@ -0,0 +1,43 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Provides utilities for automatic content-type detection.""" + + +# Helper used to handle the possibility of optional 'magic' dependency +# being unavailable for guessing the MIME type of raw bytes. +class _FallBackModule: + """Class that is shaped like the portion of 'magic' we need.""" + + def from_buffer(self, raw_bytes: bytes, mime: bool = True): + """Fallback, subpar implementation of 'from_buffer'.""" + return "application/octet-stream" + + +# Set up '_module' to either use 'magic' or the fallback. +_module = _FallBackModule() +try: + import magic + + _module = magic +except ImportError: + pass + + +def detect_content_type(raw_bytes: bytes) -> str: + """Attempts to infer the content type of the specified data.""" + if not raw_bytes: + return "application/octet-stream" + return _module.from_buffer(raw_bytes, mime=True) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/labels.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/labels.py new file mode 100644 index 0000000000..d3ffc157d8 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/labels.py @@ -0,0 +1,48 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Provides utilities for providing basic identifying labels for blobs.""" + + +def generate_labels_for_span(trace_id: str, span_id: str) -> dict[str, str]: + """Returns metadata for a span.""" + return {"otel_type": "span", "trace_id": trace_id, "span_id": span_id} + + +def generate_labels_for_event( + trace_id: str, span_id: str, event_name: str +) -> dict[str, str]: + """Returns metadata for an event.""" + result = generate_labels_for_span(trace_id, span_id) + result.update( + { + "otel_type": "event", + "event_name": event_name, + } + ) + return result + + +def generate_labels_for_span_event( + trace_id: str, span_id: str, event_name: str, event_index: int +) -> dict[str, str]: + """Returns metadata for a span event.""" + result = generate_labels_for_event(trace_id, span_id, event_name) + result.update( + { + "otel_type": "span_event", + "event_index": event_index, + } + ) + return result diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/provider.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/provider.py new file mode 100644 index 0000000000..d09e528e38 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/api/provider.py @@ -0,0 +1,81 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +from typing import Optional + +from opentelemetry.instrumentation._blobupload.api.blob import Blob +from opentelemetry.instrumentation._blobupload.api.blob_uploader import ( + BlobUploader, +) +from opentelemetry.instrumentation._blobupload.api.constants import ( + NOT_UPLOADED, +) + +_logger = logging.getLogger(__name__) + + +class _NoOpBlobUploader(BlobUploader): + """Implementation of BlobUploader that does nothing.""" + + def upload_async(self, blob: Blob) -> str: + return NOT_UPLOADED + + +class BlobUploaderProvider(abc.ABC): + """Pure abstract base for configuring how to provide a BlobUploader.""" + + def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader: + """Returns a BlobUploader for the specified use case. + + Args: + use_case: An optional use case that describes what the uploader is for. This could + name a particular package, class, or instrumentation. It is intended to allow + users to differentiate upload behavior based on the target instrumentation. + + Returns: + A BlobUploader that is appropriate for the use case. + """ + return _NoOpBlobUploader() + + +class _DefaultBlobUploaderProvider(BlobUploaderProvider): + """Default provider used when none has been configured.""" + + def get_blob_uploader(self, use_case: Optional[str]=None) -> BlobUploader: + use_case_formatted = "(None)" + if use_case: + use_case_formatted = use_case + _logger.warning( + "No BlobUploaderProvider configured; returning a no-op for use case \"{}\". Use 'set_blob_uploader_provider()' to configure.".format( + use_case_formatted + ) + ) + return _NoOpBlobUploader() + + +_blob_uploader_provider = _DefaultBlobUploaderProvider() + + +def set_blob_uploader_provider(provider: BlobUploaderProvider) -> BlobUploaderProvider: + """Allows configuring the behavior of 'get_blob_uploader.""" + global _blob_uploader_provider + old_provider = _blob_uploader_provider + _blob_uploader_provider = provider + return old_provider + + +def get_blob_uploader(use_case: Optional[str] = None) -> BlobUploader: + return _blob_uploader_provider.get_blob_uploader(use_case) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/__init__.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/__init__.py new file mode 100644 index 0000000000..b0a6f42841 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/__init__.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/__init__.py new file mode 100644 index 0000000000..b0a6f42841 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/__init__.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/__init__.py new file mode 100644 index 0000000000..1e7c92a9e6 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/__init__.py @@ -0,0 +1,21 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.instrumentation._blobupload.backend.google.gcs._gcs_impl import ( + GcsBlobUploader, +) + +__all__ = [ + "GcsBlobUploader" +] diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_client_wrapper.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_client_wrapper.py new file mode 100644 index 0000000000..bf531d012b --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_client_wrapper.py @@ -0,0 +1,89 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Isolates calls to 'google-cloud-storage' dependency, simplifying mocking.""" + + +import logging +from typing import Any, TypeAlias + +_logger = logging.getLogger(__name__) + + +# Whether the Google Cloud Storage library has been initialized. +_gcs_initialized = False + +# Function that returns a Google Cloud Storage Client object. +_gcs_client_factory = None + +# Function that given a URI and client, returns a Google Cloud +# Storage Blob class that can be used to write to a blob. +_gcs_blob_from_uri = None + + +# Type alias for a Google Cloud Storage client. This has to default +# to 'Any' to allow for mocks of the Google Cloud Storage client. It +# is updated at runtime in 'set_gcs_client_factory', though this +# means it is not particularly useful for automatic static type +# checking (it is, however, useful for documenting intended type). +GcsClientType: TypeAlias = Any + + +def set_gcs_client_factory(gcs_client_type, client_factory): + global _gcs_initialized + global _gcs_client_factory + global GcsClientType + if _gcs_initialized: + _logger.warning("Replacing default GCS client factory") + GcsClientType = gcs_client_type + _gcs_client_factory = client_factory + if _gcs_client_factory and _gcs_blob_from_uri: + _gcs_initialized = True + + +def set_gcs_blob_from_uri(blob_from_uri): + global _gcs_initialized + global _gcs_blob_from_uri + if _gcs_initialized: + _logger.warning("Replacing default GCS blob_from_uri method") + _gcs_blob_from_uri = blob_from_uri + if _gcs_client_factory and _gcs_blob_from_uri: + _gcs_initialized = True + + +def is_gcs_initialized(): + return _gcs_initialized + + +def create_gcs_client(): + if _gcs_client_factory is not None: + return _gcs_client_factory() + return None + + +def blob_from_uri(uri, client): + if _gcs_blob_from_uri is not None: + return _gcs_blob_from_uri(uri, client=client) + return None + + +try: + from google.cloud.storage import Client as _GcsClient + from google.cloud.storage.blob import Blob as _GcsBlob + set_gcs_client_factory(_GcsClient, _GcsClient) + set_gcs_blob_from_uri(getattr(_GcsBlob, "from_uri", getattr(_GcsBlob, "from_string"))) + _logger.debug('Found "google-cloud-storage" optional dependency and successfully registered it.') +except ImportError: + _logger.warning('Missing optional "google-cloud-storage" dependency.') diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_impl.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_impl.py new file mode 100644 index 0000000000..0e79951cef --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_impl.py @@ -0,0 +1,155 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Provides the 'GcsBlobUploader' class.""" + +import io +import logging +import uuid +from typing import Optional, TypeAlias + +from opentelemetry.instrumentation._blobupload.api import Blob, BlobUploader +from opentelemetry.instrumentation._blobupload.backend.google.gcs import ( + _gcs_client_wrapper, +) +from opentelemetry.instrumentation._blobupload.utils import ( + SimpleBlobUploader, + blob_uploader_from_simple_blob_uploader, +) + +_logger = logging.getLogger(__name__) + +GcsClient: TypeAlias = _gcs_client_wrapper.GcsClientType + + +def _path_for_span(trace_id: str, span_id: str) -> str: + if not trace_id or not span_id: + return "" + return "traces/{}/spans/{}".format(trace_id, span_id) + + +def _path_for_event(trace_id: str, span_id: str, event_name: str) -> str: + if not event_name: + return "" + span_path = _path_for_span(trace_id, span_id) + if not span_path: + return "" + return "{}/events/{}".format(span_path, event_name) + + +def _path_for_span_event(trace_id: str, span_id: str, event_index) -> str: + if event_index is None: + return "" + span_path = _path_for_span(trace_id, span_id) + if not span_path: + return "" + return "{}/events/{}".format(span_path, event_index) + + +def _path_segment_from_labels(labels: Mapping[str, str]) -> str: + """Returns a path segment based on blob label metadata. + + This aims to return paths like: + + 'traces/12345/spans/56789' + 'traces/12345/spans/56789/events/0' + 'traces/12345/spans/56789/events/some.event.name' + + ...depending on the particular type of signal source. + + """ + signal_type = labels.get("otel_type") + if not signal_type or signal_type not in ["span", "event", "span_event"]: + return "" + trace_id = labels.get("trace_id") + span_id = labels.get("span_id") + event_name = labels.get("event_name") + event_index = labels.get("event_index") + if signal_type == "span": + return _path_for_span(trace_id, span_id) + elif signal_type == "event": + return _path_for_event(trace_id, span_id, event_name) + elif signal_type == "span_event": + return _path_for_span_event(trace_id, span_id, event_index) + + +class _SimpleGcsBlobUploader(SimpleBlobUploader): + + def __init__(self, prefix: str, client: Optional[GcsClient] = None): + if not prefix: + raise ValueError("Must supply a non-empty prefix.") + if not prefix.startswith("gs://"): + raise ValueError('Invalid prefix; must start with "gs://"; found: "{}".'.format(prefix)) + if not prefix.endswith("/"): + prefix = "{}/".format(prefix) + self._prefix = prefix + self._client = client or _gcs_client_wrapper.create_gcs_client() + + def generate_destination_uri(self, blob: Blob) -> str: + origin_path = _path_segment_from_labels(blob.labels) + if origin_path and not origin_path.endswith("/"): + origin_path = "{}/".format(origin_path) + upload_id = uuid.uuid4().hex + return "{}{}uploads/{}".format(self._prefix, origin_path, upload_id) + + def upload_sync(self, uri: str, blob: Blob): + _logger.debug('Uploading blob: size: {} -> "{}"'.format(len(blob.raw_bytes), uri)) + gcs_blob = _gcs_client_wrapper.blob_from_uri(uri, client=self._client) + gcs_blob.upload_from_file( + io.BytesIO(blob.raw_bytes), + content_type=blob.content_type) + metadata = gcs_blob.metadata or {} + metadata.update(blob.labels) + gcs_blob.metadata = metadata + + + +class GcsBlobUploader(BlobUploader): + """A BlobUploader that writes to Google Cloud Storage.""" + + def __init__(self, prefix: str, client:Optional[GcsClient]=None): + """Intialize the GcsBlobUploader class. + + Args: + - prefix: a string beginning with "gs://" that includes + the Google Cloud Storage bucket to which to write as + well as an optional path prefix to use. + + - client: an optional Google Cloud Storage client. If not + provided, this class will create a Google Cloud Storage + client using the environment (i.e. Application Default + Credentials). Supply your own instance if you'd like to + use non-default configuration (e.g. to use an explicit + credential other than the one in the environment). + + Known Failure Modes: + - Missing 'google-cloud-storage' library dependency. + - Failure to construct the client (e.g. absence of a valid + Google Application Default credential in the enviroment). + """ + if not _gcs_client_wrapper.is_gcs_initialized(): + raise NotImplementedError("GcsBlobUploader implementation unavailable without 'google-cloud-storage' optional dependency.") + simple_uploader = _SimpleGcsBlobUploader(prefix, client) + self._delegate = blob_uploader_from_simple_blob_uploader(simple_uploader) + + def upload_async(self, blob: Blob) -> str: + """Upload the specified blob in the background. + + Generates a URI from the blob, based on the prefix supplied + to the constructor as well as the labels of the Blob (may + also include entropy or other random components). Immediately + returns the "gs://" URI representing where the Blob will be + written, and schedules background uploading of the blob there. + """ + return self._delegate.upload_async(blob) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/__init__.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/__init__.py new file mode 100644 index 0000000000..d76657b0da --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/__init__.py @@ -0,0 +1,27 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Exposes API methods to callers from the package name.""" + +from opentelemetry.instrumentation._blobupload.utils.simple_blob_uploader import ( + SimpleBlobUploader, +) +from opentelemetry.instrumentation._blobupload.utils.simple_blob_uploader_adaptor import ( + blob_uploader_from_simple_blob_uploader, +) + +__all__ = [ + "blob_uploader_from_simple_blob_uploader", + "SimpleBlobUploader", +] diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader.py new file mode 100644 index 0000000000..c6506d11dd --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader.py @@ -0,0 +1,49 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines a simple, synchronous interface for providing a backend implementation.""" + +import abc + +from opentelemetry.instrumentation._blobupload.api import Blob + + +class SimpleBlobUploader(abc.ABC): + """Pure abstract base class of a backend implementation that is synchronous.""" + + @abc.abstractmethod + def generate_destination_uri(self, blob: Blob) -> str: + """Generates a URI of where the blob will get written. + + Args: + blob: the blob which will be uploaded. + + Returns: + A new, unique URI that represents the target destination of the data. + """ + raise NotImplementedError("SimpleBlobUploader.generate_destination_uri") + + @abc.abstractmethod + def upload_sync(self, uri: str, blob: Blob): + """Synchronously writes the blob to the specified destination URI. + + Args: + uri: A destination URI that was previously created by the function + 'create_destination_uri' with the same blob. + blob: The blob that should get uploaded. + + Effects: + Attempts to upload/write the Blob to the specified destination URI. + """ + raise NotImplementedError("SimpleBlobUploader.upload_sync") diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader_adaptor.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader_adaptor.py new file mode 100644 index 0000000000..24cd50e763 --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader_adaptor.py @@ -0,0 +1,138 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides 'blob_uploader_from_simple_blob_uploader', a utility for +backend providers to more easily provide a full-fledged BlobUploader +by implementing the simpler 'SimpleBlobUploader' interface. + +The 'blob_uploader_from_simple_blob_uploader' utility takes care of +common machinery such as scheduling, retries, background uploading, +etc. allowing providers of specific BlobUploader backends to supply +a simpler set of synchronous uploading instructions. +""" + +import atexit +import logging +from concurrent.futures import Executor, ThreadPoolExecutor +from typing import Optional + +from opentelemetry.instrumentation._blobupload.api import ( + Blob, + BlobUploader, + detect_content_type, +) +from opentelemetry.instrumentation._blobupload.utils.simple_blob_uploader import ( + SimpleBlobUploader, +) + +_logger = logging.getLogger(__name__) + + +def _with_content_type(blob: Blob) -> Blob: + """Returns a variant of the Blob with the content type auto-detected if needed.""" + if blob.content_type is not None: + return blob + content_type = detect_content_type(blob.raw_bytes) + return Blob(blob.raw_bytes, content_type=content_type, labels=blob.labels) + + +class _UploadAction: + """Represents the work to be done in the background to upload a blob.""" + + def __init__(self, simple_uploader, uri, blob): + self._simple_uploader = simple_uploader + self._uri = uri + self._blob = blob + + def __call__(self): + _logger.debug('Uploading blob to "{}".'.format(self._uri)) + try: + self._simple_uploader.upload_sync(self._uri, self._blob) + except Exception: + _logger.exception('Failed to upload blob to "{}".'.format(self._uri)) + + +def _create_default_executor_no_cleanup(): + """Instantiates an executor subject to configuration.""" + # Potential future enhancement: allow the default executor to be + # configured using environment variables (e.g. to select between + # threads or processes, to choose number of workers, etc.) + # + # It is because of this potential future enhancement, that we + # have moved this logic into a separate function despite it + # being currently logically quite simple. + _logger.debug("Creating thread pool executor") + return ThreadPoolExecutor() + + +def _create_default_executor(): + """Creates an executor and registers appropriate cleanup.""" + result = _create_default_executor_no_cleanup() + def _cleanup(): + result.shutdown() + _logger.debug("Registering cleanup for the pool") + atexit.register(_cleanup) + return result + +# Global default executor so that multiple uses of the adaptor +# do not waste resources creating many duplicative executors. +# Used in the '_get_or_create_default_executor' function below. +_default_executor = None + + +def _get_or_create_default_executor(): + """Return or lazily instantiate a shared default executor.""" + global _default_executor + if _default_executor is None: + _logger.debug("No existing executor found; creating one lazily.") + _default_executor = _create_default_executor() + else: + _logger.debug("Reusing existing executor.") + return _default_executor + + +class _SimpleBlobUploaderAdaptor(BlobUploader): + """Implementation of 'BlobUploader' wrapping a 'SimpleBlobUploader'. + + This implements the core of the function 'blob_uploader_from_simple_blob_uploader'. + """ + + def __init__(self, simple_uploader: SimpleBlobUploader, executor: Optional[Executor] = None): + self._simple_uploader = simple_uploader + self._executor = executor or _get_or_create_default_executor() + + def upload_async(self, blob: Blob) -> str: + full_blob = _with_content_type(blob) + uri = self._simple_uploader.generate_destination_uri(full_blob) + self._do_in_background(_UploadAction(self._simple_uploader, uri, full_blob)) + return uri + + def _do_in_background(self, action: _UploadAction) -> None: + _logger.debug("Scheduling background upload.") + self._executor.submit(action) + + + +def blob_uploader_from_simple_blob_uploader(simple_uploader: SimpleBlobUploader) -> BlobUploader: + """Implements a 'BlobUploader' using the supplied 'SimpleBlobUploader'. + + The purpose of this function is to allow backend implementations/vendors to be able to + implement their logic much more simply, using synchronous uploading interfaces. + + This function takes care of the nitty gritty details necessary to supply an asynchronous + interface on top of the simpler logic supplied by the backend system. + """ + return _SimpleBlobUploaderAdaptor(simple_uploader) + diff --git a/opentelemetry-instrumentation/test-requirements.txt b/opentelemetry-instrumentation/test-requirements.txt index 943a45c8f4..cb4cdc0b98 100644 --- a/opentelemetry-instrumentation/test-requirements.txt +++ b/opentelemetry-instrumentation/test-requirements.txt @@ -5,6 +5,7 @@ packaging==24.0 pluggy==1.5.0 py-cpuinfo==9.0.0 pytest==7.4.4 +python-magic==0.4.27 tomli==2.0.1 typing_extensions==4.12.2 wrapt==1.16.0 diff --git a/opentelemetry-instrumentation/tests/_blobupload/api/test_blob.py b/opentelemetry-instrumentation/tests/_blobupload/api/test_blob.py new file mode 100755 index 0000000000..8fe4487be1 --- /dev/null +++ b/opentelemetry-instrumentation/tests/_blobupload/api/test_blob.py @@ -0,0 +1,109 @@ +#! /usr/bin/env python3 + +if __name__ == "__main__": + import sys + sys.path.append("../../../src") + +import base64 +import logging +import unittest + +from opentelemetry.instrumentation._blobupload.api import Blob + + +class TestBlob(unittest.TestCase): + + def test_construction_with_just_bytes(self): + data = "some string".encode() + blob = Blob(data) + self.assertEqual(blob.raw_bytes, data) + self.assertIsNone(blob.content_type) + self.assertIsNotNone(blob.labels) + self.assertEqual(len(blob.labels), 0) + + def test_construction_with_bytes_and_content_type(self): + data = "some string".encode() + content_type = "text/plain" + blob = Blob(data, content_type=content_type) + self.assertEqual(blob.raw_bytes, data) + self.assertEqual(blob.content_type, content_type) + self.assertIsNotNone(blob.labels) + self.assertEqual(len(blob.labels), 0) + + def test_construction_with_bytes_and_labels(self): + data = "some string".encode() + labels = {"key1": "value1", "key2": "value2"} + blob = Blob(data, labels=labels) + self.assertEqual(blob.raw_bytes, data) + self.assertIsNone(blob.content_type) + self.assert_labels_equal(blob.labels, labels) + + def test_construction_with_all_fields(self): + data = "some string".encode() + content_type = "text/plain" + labels = {"key1": "value1", "key2": "value2"} + blob = Blob(data, content_type=content_type, labels=labels) + self.assertEqual(blob.raw_bytes, data) + self.assertEqual(blob.content_type, content_type) + self.assert_labels_equal(blob.labels, labels) + + def test_from_data_uri_without_labels(self): + data = "some string".encode() + content_type = "text/plain" + encoded_data = base64.b64encode(data).decode() + uri = "data:{};base64,{}".format(content_type, encoded_data) + blob = Blob.from_data_uri(uri) + self.assertEqual(blob.raw_bytes, data) + self.assertEqual(blob.content_type, content_type) + self.assertIsNotNone(blob.labels) + self.assertEqual(len(blob.labels), 0) + + def test_from_data_uri_with_labels(self): + data = "some string".encode() + content_type = "text/plain" + encoded_data = base64.b64encode(data).decode() + uri = "data:{};base64,{}".format(content_type, encoded_data) + labels = {"key1": "value1", "key2": "value2"} + blob = Blob.from_data_uri(uri, labels=labels) + self.assertEqual(blob.raw_bytes, data) + self.assertEqual(blob.content_type, content_type) + self.assert_labels_equal(blob.labels, labels) + + def test_from_data_uri_with_valid_standard_base64(self): + data = "some string".encode() + content_type = "text/plain" + encoded_data = base64.standard_b64encode(data).decode() + uri = "data:{};base64,{}".format(content_type, encoded_data) + blob = Blob.from_data_uri(uri) + self.assertEqual(blob.raw_bytes, data) + self.assertEqual(blob.content_type, content_type) + + def test_from_data_uri_with_valid_websafe_base64(self): + data = "some string".encode() + content_type = "text/plain" + encoded_data = base64.urlsafe_b64encode(data).decode() + uri = "data:{};base64,{}".format(content_type, encoded_data) + blob = Blob.from_data_uri(uri) + self.assertEqual(blob.raw_bytes, data) + self.assertEqual(blob.content_type, content_type) + + def test_from_data_uri_with_non_data_uri_content(self): + with self.assertRaisesRegex(ValueError, 'expected "data:" prefix'): + Blob.from_data_uri("not a valid data uri") + + def test_from_data_uri_with_non_base64_content(self): + with self.assertRaisesRegex(ValueError, 'expected ";base64," section'): + Blob.from_data_uri("data:text/plain,validifpercentencoded") + + def assert_labels_equal(self, a, b): + self.assertEqual(len(a), len(b), msg="Different sizes: {} vs {}; a={}, b={}".format(len(a), len(b), a, b)) + for k in a: + self.assertTrue(k in b, msg="Key {} found in a but not b".format(k)) + va = a[k] + vb = b[k] + self.assertEqual(va, vb, msg="Values for key {} different for a vs b: {} vs {}".format(k, va, vb)) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/opentelemetry-instrumentation/tests/_blobupload/api/test_content_type.py b/opentelemetry-instrumentation/tests/_blobupload/api/test_content_type.py new file mode 100755 index 0000000000..cdcd361763 --- /dev/null +++ b/opentelemetry-instrumentation/tests/_blobupload/api/test_content_type.py @@ -0,0 +1,61 @@ +#! /usr/bin/env python3 + +if __name__ == "__main__": + import sys + sys.path.append("../../../src") + +import io +import logging +import unittest + +from PIL import Image + +from opentelemetry.instrumentation._blobupload.api import detect_content_type + + +def create_test_image(image_format): + """Helper for creating a PIL Image for verifying image format support.""" + test_img = Image.new("RGB", (2, 2)) + output_buffer = io.BytesIO() + test_img.save(output_buffer, image_format) + result = output_buffer.getvalue() + output_buffer.close() + test_img.close() + return result + + +class TestContentType(unittest.TestCase): + + def test_handles_empty_correctly(self): + data = bytes() + content_type = detect_content_type(data) + self.assertEqual(content_type, "application/octet-stream") + + def test_detects_plaintext(self): + data = "this is just regular text" + content_type = detect_content_type(data.encode()) + self.assertEqual(content_type, "text/plain") + + def test_detects_json(self): + data = """{ + "this": { + "contains": "json" + } + }""" + content_type = detect_content_type(data.encode()) + self.assertEqual(content_type, "application/json") + + def test_detects_jpeg(self): + data = create_test_image("jpeg") + content_type = detect_content_type(data) + self.assertEqual(content_type, "image/jpeg") + + def test_detects_png(self): + data = create_test_image("png") + content_type = detect_content_type(data) + self.assertEqual(content_type, "image/png") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/opentelemetry-instrumentation/tests/_blobupload/api/test_labels.py b/opentelemetry-instrumentation/tests/_blobupload/api/test_labels.py new file mode 100755 index 0000000000..039a7550e5 --- /dev/null +++ b/opentelemetry-instrumentation/tests/_blobupload/api/test_labels.py @@ -0,0 +1,73 @@ +#! /usr/bin/env python3 + +if __name__ == "__main__": + import sys + sys.path.append("../../../src") + +import logging +import unittest + +from opentelemetry.instrumentation._blobupload.api import ( + generate_labels_for_event, + generate_labels_for_span, + generate_labels_for_span_event, +) + + +class TestLabels(unittest.TestCase): + def test_generate_labels_for_span(self): + trace_id = "test-trace-id" + span_id = "test-span-id" + labels = generate_labels_for_span(trace_id=trace_id, span_id=span_id) + self.assertEqual( + labels, + { + "otel_type": "span", + "trace_id": "test-trace-id", + "span_id": "test-span-id", + }, + ) + + def test_generate_labels_for_event(self): + trace_id = "test-trace-id" + span_id = "test-span-id" + event_name = "some-event" + labels = generate_labels_for_event( + trace_id=trace_id, span_id=span_id, event_name=event_name + ) + self.assertEqual( + labels, + { + "otel_type": "event", + "trace_id": "test-trace-id", + "span_id": "test-span-id", + "event_name": "some-event", + }, + ) + + def test_generate_labels_for_span_event(self): + trace_id = "test-trace-id" + span_id = "test-span-id" + event_name = "some-event" + event_index = 2 + labels = generate_labels_for_span_event( + trace_id=trace_id, + span_id=span_id, + event_name=event_name, + event_index=event_index, + ) + self.assertEqual( + labels, + { + "otel_type": "span_event", + "trace_id": "test-trace-id", + "span_id": "test-span-id", + "event_name": "some-event", + "event_index": 2, + }, + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/opentelemetry-instrumentation/tests/_blobupload/api/test_provider.py b/opentelemetry-instrumentation/tests/_blobupload/api/test_provider.py new file mode 100755 index 0000000000..ce841f5fde --- /dev/null +++ b/opentelemetry-instrumentation/tests/_blobupload/api/test_provider.py @@ -0,0 +1,67 @@ +#! /usr/bin/env python3 + +if __name__ == "__main__": + import sys + sys.path.append("../../../src") + +import logging +import unittest + +from opentelemetry.instrumentation._blobupload.api import ( + NOT_UPLOADED, + Blob, + BlobUploader, + BlobUploaderProvider, + get_blob_uploader, + set_blob_uploader_provider, +) + + +class TestProvider(unittest.TestCase): + + def test_default_provider(self): + uploader = get_blob_uploader("test") + self.assertIsNotNone(uploader) + blob = Blob(bytes()) + url = uploader.upload_async(blob) + self.assertEqual(url, NOT_UPLOADED) + + def test_custom_provider(self): + + class CustomUploader(BlobUploader): + + def __init__(self, result): + self.captured_blob = None + self.upload_result = result + + def upload_async(self, blob): + self.captured_blob = blob + return self.upload_result + + class CustomProvider(BlobUploaderProvider): + + def __init__(self, uploader): + self.uploader = uploader + self.captured_use_case = None + + def get_blob_uploader(self, use_case): + self.captured_use_case = use_case + return self.uploader + + uploader = CustomUploader("foo") + provider = CustomProvider(uploader) + old_provider = set_blob_uploader_provider(provider) + returned_uploader = get_blob_uploader("test") + self.assertEqual(provider.captured_use_case, "test") + self.assertEqual(returned_uploader, uploader) + blob = Blob(bytes(), content_type="bar") + url = returned_uploader.upload_async(blob) + self.assertEqual(url, "foo") + self.assertEqual(uploader.captured_blob, blob) + unset_provider = set_blob_uploader_provider(old_provider) + self.assertEqual(unset_provider, provider) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/opentelemetry-instrumentation/tests/_blobupload/backend/google/gcs/test_gcs_blob_uploader.py b/opentelemetry-instrumentation/tests/_blobupload/backend/google/gcs/test_gcs_blob_uploader.py new file mode 100755 index 0000000000..9ccb830d13 --- /dev/null +++ b/opentelemetry-instrumentation/tests/_blobupload/backend/google/gcs/test_gcs_blob_uploader.py @@ -0,0 +1,176 @@ +#! /usr/bin/env python3 + +if __name__ == "__main__": + import sys + sys.path.append("../../../../../src") + +import logging +import unittest +from multiprocessing import Queue + +from opentelemetry.instrumentation._blobupload.api import ( + Blob, + BlobUploader, + generate_labels_for_event, + generate_labels_for_span, + generate_labels_for_span_event, +) + +# Internal implementation used for mocking +from opentelemetry.instrumentation._blobupload.backend.google.gcs import ( + GcsBlobUploader, + _gcs_client_wrapper, +) + + +class FakeGcs(object): + + def __init__(self): + self._queue = Queue() + self._storage = {} + self._done = set() + + def reset(self): + self._storage = {} + + def get(self, gcs_blob_id): + while gcs_blob_id not in self._done: + self._queue.get() + return self._storage.get(gcs_blob_id) + + def upload_from_file(self, gcs_blob_id, data, content_type): + b = Blob(data.read(), content_type=content_type) + self._storage[gcs_blob_id] = b + + def update_metadata(self, gcs_blob_id, new_metadata): + old = self._storage[gcs_blob_id] + b = Blob(old.raw_bytes, content_type=old.content_type, labels=new_metadata) + self._storage[gcs_blob_id] = b + self._done.add(gcs_blob_id) + self._queue.put(gcs_blob_id) + + +class FakeGcsBlob(object): + + def __init__(self, gcs_blob_id, fake_gcs): + self._gcs_blob_id = gcs_blob_id + self._fake_gcs = fake_gcs + self._metadata = {} + + def upload_from_file(self, iodata, content_type): + self._fake_gcs.upload_from_file(self._gcs_blob_id, iodata, content_type) + + @property + def metadata(self): + self._metadata + + @metadata.setter + def metadata(self, m): + self._metadata = m + self._fake_gcs.update_metadata(self._gcs_blob_id, self._metadata) + + +def mocked_blob_from_uri(fake_gcs): + def gcs_blob_from_uri(uri, client): + return FakeGcsBlob(uri, fake_gcs) + return gcs_blob_from_uri + + +_gcs_mock = FakeGcs() +_gcs_client_wrapper.set_gcs_client_factory(FakeGcs, lambda: _gcs_mock) +_gcs_client_wrapper.set_gcs_blob_from_uri(mocked_blob_from_uri(_gcs_mock)) + + +def get_from_fake_gcs(gcs_blob_id): + return _gcs_mock.get(gcs_blob_id) + + +class GcsBlobUploaderTestCase(unittest.TestCase): + + def setUp(self): + _gcs_mock.reset() + + def test_constructor_throws_if_prefix_not_uri(self): + with self.assertRaises(ValueError): + GcsBlobUploader("not a valgcs_blob_id URI") + + def test_constructor_throws_if_prefix_not_gs_protocol(self): + with self.assertRaises(ValueError): + GcsBlobUploader("other://foo/bar") + + def test_can_construct_gcs_uploader_with_bucket_uri(self): + uploader = GcsBlobUploader("gs://some-bucket") + self.assertIsNotNone(uploader) + self.assertIsInstance(uploader, BlobUploader) + + def test_can_construct_gcs_uploader_with_bucket_uri_and_trailing_slash(self): + uploader = GcsBlobUploader("gs://some-bucket/") + self.assertIsNotNone(uploader) + self.assertIsInstance(uploader, BlobUploader) + + def test_can_construct_gcs_uploader_with_bucket_and_path_uri(self): + uploader = GcsBlobUploader("gs://some-bucket/some/path") + self.assertIsNotNone(uploader) + self.assertIsInstance(uploader, BlobUploader) + + def test_can_construct_gcs_uploader_with_bucket_and_path_uri_with_trailing_slash(self): + uploader = GcsBlobUploader("gs://some-bucket/some/path/") + self.assertIsNotNone(uploader) + self.assertIsInstance(uploader, BlobUploader) + + def test_uploads_blob_from_span(self): + trace_id = "test-trace-id" + span_id = "test-span-id" + labels = generate_labels_for_span(trace_id, span_id) + blob = Blob("some data".encode(), content_type="text/plain", labels=labels) + uploader = GcsBlobUploader("gs://some-bucket/some/path") + url = uploader.upload_async(blob) + self.assertTrue( + url.startswith("gs://some-bucket/some/path/traces/test-trace-id/spans/test-span-id/uploads/") + ) + uploaded_blob = get_from_fake_gcs(url) + self.assertEqual(blob, uploaded_blob) + + def test_uploads_blob_from_event(self): + trace_id = "test-trace-id" + span_id = "test-span-id" + event_name = "event-name" + labels = generate_labels_for_event(trace_id, span_id, event_name) + blob = Blob("some data".encode(), content_type="text/plain", labels=labels) + uploader = GcsBlobUploader("gs://some-bucket/some/path") + url = uploader.upload_async(blob) + self.assertTrue( + url.startswith("gs://some-bucket/some/path/traces/test-trace-id/spans/test-span-id/events/event-name/uploads/") + ) + uploaded_blob = get_from_fake_gcs(url) + self.assertEqual(blob, uploaded_blob) + + def test_uploads_blob_from_span_event(self): + trace_id = "test-trace-id" + span_id = "test-span-id" + event_name = "event-name" + event_index = 2 + labels = generate_labels_for_span_event(trace_id, span_id, event_name, event_index) + blob = Blob("some data".encode(), content_type="text/plain", labels=labels) + uploader = GcsBlobUploader("gs://some-bucket/some/path") + url = uploader.upload_async(blob) + self.assertTrue( + url.startswith("gs://some-bucket/some/path/traces/test-trace-id/spans/test-span-id/events/2/uploads/") + ) + uploaded_blob = get_from_fake_gcs(url) + self.assertEqual(blob, uploaded_blob) + + def test_uploads_blobs_missing_expected_labels(self): + blob = Blob("some data".encode(), content_type="text/plain") + uploader = GcsBlobUploader("gs://some-bucket/some/path") + url = uploader.upload_async(blob) + self.assertTrue( + url.startswith("gs://some-bucket/some/path/uploads/"), + ) + uploaded_blob = get_from_fake_gcs(url) + self.assertEqual(blob, uploaded_blob) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/opentelemetry-instrumentation/tests/_blobupload/utils/test_simple_blob_uploader_adaptor.py b/opentelemetry-instrumentation/tests/_blobupload/utils/test_simple_blob_uploader_adaptor.py new file mode 100755 index 0000000000..72185a5f34 --- /dev/null +++ b/opentelemetry-instrumentation/tests/_blobupload/utils/test_simple_blob_uploader_adaptor.py @@ -0,0 +1,93 @@ +#! /usr/bin/env python3 + +if __name__ == "__main__": + import sys + sys.path.append("../../../src") + +import logging +import unittest +from multiprocessing import Queue + +from opentelemetry.instrumentation._blobupload.api import Blob, BlobUploader +from opentelemetry.instrumentation._blobupload.utils import ( + SimpleBlobUploader, + blob_uploader_from_simple_blob_uploader, +) + + +class QueueBasedUploader(SimpleBlobUploader): + + def __init__(self, queue): + self._queue = queue + + def generate_destination_uri(self, blob): + return blob.labels["destination_uri"] + + def upload_sync(self, uri, blob): + self._queue.put((uri, blob)) + + +class FailingUploader(SimpleBlobUploader): + + def __init__(self, queue): + self._queue = queue + + def generate_destination_uri(self, blob): + return blob.labels["destination_uri"] + + def upload_sync(self, uri, blob): + try: + raise RuntimeError("something went wrong") + finally: + self._queue.put("done") + + + +class TestBlob(unittest.TestCase): + + def test_simple_blob_uploader_adaptor(self): + queue = Queue() + simple = QueueBasedUploader(queue) + blob = Blob(bytes(), content_type="some-content-type", labels={"destination_uri": "foo"}) + uploader = blob_uploader_from_simple_blob_uploader(simple) + self.assertIsInstance(uploader, BlobUploader) + url = uploader.upload_async(blob) + self.assertEqual(url, "foo") + stored_uri, stored_blob = queue.get() + self.assertEqual(stored_uri, "foo") + self.assertEqual(stored_blob, blob) + self.assertTrue(queue.empty()) + queue.close() + + def test_auto_adds_missing_content_type(self): + queue = Queue() + simple = QueueBasedUploader(queue) + blob = Blob("some plain text".encode(), labels={"destination_uri": "foo"}) + uploader = blob_uploader_from_simple_blob_uploader(simple) + self.assertIsInstance(uploader, BlobUploader) + url = uploader.upload_async(blob) + self.assertEqual(url, "foo") + stored_uri, stored_blob = queue.get() + self.assertEqual(stored_uri, "foo") + self.assertEqual(stored_blob.raw_bytes, blob.raw_bytes) + self.assertEqual(stored_blob.content_type, "text/plain") + self.assertEqual(stored_blob.labels, blob.labels) + self.assertTrue(queue.empty()) + queue.close() + + def test_captures_exceptions_raised(self): + queue = Queue() + simple = FailingUploader(queue) + blob = Blob(bytes(), labels={"destination_uri": "foo"}) + uploader = blob_uploader_from_simple_blob_uploader(simple) + self.assertIsInstance(uploader, BlobUploader) + url = uploader.upload_async(blob) + self.assertEqual(url, "foo") + queue.get() + self.assertTrue(queue.empty()) + queue.close() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main()