Skip to content

Commit 81eaea5

Browse files
michaelsafyanxrmxaabmass
authored
Add support for async and streaming responses in the Google GenAI instrumentation (#3298)
* Begin instrumentation of GenAI SDK. * Snapshot current state. * Created minimal tests and got first test to pass. * Added test for span attributes. * Ensure that token counts work. * Add more tests. * Make it easy to turn off instrumentation for streaming and async to allow for rapid iteration. * Add licenses and fill out main README.rst. * Add a changelog file. * Fill out 'requirements.txt' and 'README.rst' for the manual instrumentation example. * Add missing exporter dependency for the manual instrumentation example. * Fill out rest of the zero-code example. * Add minimal tests for async, streaming cases. * Update sync test to use indirection on top of 'client.models.generate_content' to simplify test reuse. * Fix ruff check issues. * Add subproject to top-level project build mechanism. * Simplify invocation of pylint. * Fix 'make test' command and lint issues. * Add '.dev' suffix to version per feedback on pull request #3256 * Fix README.rst files for the examples. * Add specific versions for the examples. * Revamp 'make test' to not require local 'tox.ini' configuration. * Extend separators per review comment. Co-authored-by: Riccardo Magliocchetti <[email protected]> * Fix version conflict caused by non-hermetic requirements. * Fix typo on the comment line. * Add test for the use of the 'vertex_ai' system, and improve how this system is determined. * Factor out testing logic to enable sharing with the async code. * Addressed minor lint issues. * Make it clearer that nonstreaming_base is a helper module that is not invoked directly. * Integrate feedback from related pull request #3268. * Update workflows with 'tox -e generate-workflows'. * Improve data model and add some rudimentary type checking. * Accept only 'true' for a true value to align with other code. * Update the scope name used. * Add **kwargs to patched methods to prevent future breakage due to the addition of future keyword arguments. * Remove redundant list conversion in call to "sorted". Co-authored-by: Aaron Abbott <[email protected]> * Reformat with 'tox -e ruff'. * Fix failing lint workflow. * Fix failing lint workflow. * Exclude Google GenAI instrumentation from the bootstrap code for now. * Minor improvements to the tooling shell files. * Fix typo flagged by codespell spellchecker. * Increase alignment with broader repo practices. * Add more TODOs and documentation to clarify the intended work scope. * Remove unneeded accessor from OTelWrapper. * Add more comments to the tests. * Reformat with ruff. * Change 'desireable' to 'desirable' per codespell spellchecker. * Make tests pass without pythonpath * Fix new lint errors showing up after change * Revert "Fix new lint errors showing up after change" This reverts commit 567adc6. pylint ignore instead * Add TODO item required/requested from code review. Co-authored-by: Aaron Abbott <[email protected]> * Simplify changelog per PR feedback. * Remove square brackets from model name in span name per PR feedback. * Checkpoint current state. * Misc test cleanup. Now that scripts are invoked solely through pytest via tox, remove main functions and hash bang lines. * Improve quality of event logging. * Implement streaming support in RequestsMocker, get tests passing again. * Add test with multiple responses. * Remove support for async and streaming from TODOs, since this is now addressed. * Increase testing coverage for streaming. * Reformat with ruff. * Add minor version bump with changelog. * Change TODOs to bulleted list. * Update per PR feedback Co-authored-by: Aaron Abbott <[email protected]> * Restructure streaming async logic to begin execution earlier. * Reformat with ruff. * Disable pylint check for catching broad exception. Should be allowed given exception is re-raised. * Simplify async streaming solution per PR comment. --------- Co-authored-by: Riccardo Magliocchetti <[email protected]> Co-authored-by: Aaron Abbott <[email protected]>
1 parent c6c0162 commit 81eaea5

File tree

12 files changed

+293
-245
lines changed

12 files changed

+293
-245
lines changed

instrumentation-genai/opentelemetry-instrumentation-google-genai/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Add support for async and streaming.
11+
([#3298](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3298))
12+
1013
Create an initial version of Open Telemetry instrumentation for github.com/googleapis/python-genai.
1114
([#3256](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3256))

instrumentation-genai/opentelemetry-instrumentation-google-genai/TODOS.md

+11-12
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@
44

55
Here are some TODO items required to achieve stability for this package:
66

7-
1. Add support for streaming interfaces
8-
2. Add support for async interfaces
9-
3. Add more span-level attributes for request configuration
10-
4. Add more span-level attributes for response information
11-
5. Verify and correct formatting of events:
12-
- Including the 'role' field for message events
13-
- Including tool invocation information
14-
6. Emit events for safety ratings when they block responses
15-
7. Additional cleanup/improvement tasks such as:
16-
- Adoption of 'wrapt' instead of 'functools.wraps'
17-
- Bolstering test coverage
18-
8. Migrate tests to use VCR.py
7+
- Add more span-level attributes for request configuration
8+
- Add more span-level attributes for response information
9+
- Verify and correct formatting of events:
10+
- Including the 'role' field for message events
11+
- Including tool invocation information
12+
- Emit events for safety ratings when they block responses
13+
- Additional cleanup/improvement tasks such as:
14+
- Adoption of 'wrapt' instead of 'functools.wraps'
15+
- Bolstering test coverage
16+
- Migrate tests to use VCR.py
17+
1918
## Future
2019

2120
Beyond the above TODOs, it would also be desirable to extend the

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py

+36-32
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,11 @@
4545
_logger = logging.getLogger(__name__)
4646

4747

48-
# Constant used for the value of 'gen_ai.operation.name".
49-
_GENERATE_CONTENT_OP_NAME = "generate_content"
50-
5148
# Constant used to make the absence of content more understandable.
5249
_CONTENT_ELIDED = "<elided>"
5350

54-
# Enable these after these cases are fully vetted and tested
55-
_INSTRUMENT_STREAMING = False
56-
_INSTRUMENT_ASYNC = False
51+
# Constant used for the value of 'gen_ai.operation.name".
52+
_GENERATE_CONTENT_OP_NAME = "generate_content"
5753

5854

5955
class _MethodsSnapshot:
@@ -220,7 +216,9 @@ def __init__(
220216
self._response_index = 0
221217
self._candidate_index = 0
222218

223-
def start_span_as_current_span(self, model_name, function_name):
219+
def start_span_as_current_span(
220+
self, model_name, function_name, end_on_exit=True
221+
):
224222
return self._otel_wrapper.start_as_current_span(
225223
f"{_GENERATE_CONTENT_OP_NAME} {model_name}",
226224
start_time=self._start_time,
@@ -230,6 +228,7 @@ def start_span_as_current_span(self, model_name, function_name):
230228
gen_ai_attributes.GEN_AI_REQUEST_MODEL: self._genai_request_model,
231229
gen_ai_attributes.GEN_AI_OPERATION_NAME: _GENERATE_CONTENT_OP_NAME,
232230
},
231+
end_on_exit=end_on_exit,
233232
)
234233

235234
def process_request(
@@ -543,9 +542,6 @@ def _create_instrumented_generate_content_stream(
543542
snapshot: _MethodsSnapshot, otel_wrapper: OTelWrapper
544543
):
545544
wrapped_func = snapshot.generate_content_stream
546-
if not _INSTRUMENT_STREAMING:
547-
# TODO: remove once this case has been fully tested
548-
return wrapped_func
549545

550546
@functools.wraps(wrapped_func)
551547
def instrumented_generate_content_stream(
@@ -586,9 +582,6 @@ def _create_instrumented_async_generate_content(
586582
snapshot: _MethodsSnapshot, otel_wrapper: OTelWrapper
587583
):
588584
wrapped_func = snapshot.async_generate_content
589-
if not _INSTRUMENT_ASYNC:
590-
# TODO: remove once this case has been fully tested
591-
return wrapped_func
592585

593586
@functools.wraps(wrapped_func)
594587
async def instrumented_generate_content(
@@ -630,9 +623,6 @@ def _create_instrumented_async_generate_content_stream( # pyright: ignore
630623
snapshot: _MethodsSnapshot, otel_wrapper: OTelWrapper
631624
):
632625
wrapped_func = snapshot.async_generate_content_stream
633-
if not _INSTRUMENT_ASYNC or not _INSTRUMENT_STREAMING:
634-
# TODO: remove once this case has been fully tested
635-
return wrapped_func
636626

637627
@functools.wraps(wrapped_func)
638628
async def instrumented_generate_content_stream(
@@ -647,24 +637,38 @@ async def instrumented_generate_content_stream(
647637
self, otel_wrapper, model
648638
)
649639
with helper.start_span_as_current_span(
650-
model, "google.genai.AsyncModels.generate_content_stream"
651-
):
640+
model,
641+
"google.genai.AsyncModels.generate_content_stream",
642+
end_on_exit=False,
643+
) as span:
652644
helper.process_request(contents, config)
653-
try:
654-
async for response in await wrapped_func(
655-
self,
656-
model=model,
657-
contents=contents,
658-
config=config,
659-
**kwargs,
660-
): # pyright: ignore
661-
helper.process_response(response)
662-
yield response # pyright: ignore
663-
except Exception as error:
664-
helper.process_error(error)
645+
try:
646+
response_async_generator = await wrapped_func(
647+
self,
648+
model=model,
649+
contents=contents,
650+
config=config,
651+
**kwargs,
652+
)
653+
except Exception as error: # pylint: disable=broad-exception-caught
654+
helper.process_error(error)
655+
helper.finalize_processing()
656+
with trace.use_span(span, end_on_exit=True):
665657
raise
666-
finally:
667-
helper.finalize_processing()
658+
659+
async def _response_async_generator_wrapper():
660+
with trace.use_span(span, end_on_exit=True):
661+
try:
662+
async for response in response_async_generator:
663+
helper.process_response(response)
664+
yield response
665+
except Exception as error:
666+
helper.process_error(error)
667+
raise
668+
finally:
669+
helper.finalize_processing()
670+
671+
return _response_async_generator_wrapper()
668672

669673
return instrumented_generate_content_stream
670674

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@
1717
# This version should stay below "1.0" until the fundamentals
1818
# in "TODOS.md" have been addressed. Please revisit the TODOs
1919
# listed there before bumping to a stable version.
20-
__version__ = "0.0.1.dev"
20+
__version__ = "0.0.2.dev"

instrumentation-genai/opentelemetry-instrumentation-google-genai/tests/common/otel_mocker.py

+10
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,16 @@ def get_event_named(self, event_name):
179179
return event
180180
return None
181181

182+
def get_events_named(self, event_name):
183+
result = []
184+
for event in self.get_finished_logs():
185+
event_name_attr = event.attributes.get("event.name")
186+
if event_name_attr is None:
187+
continue
188+
if event_name_attr == event_name:
189+
result.append(event)
190+
return result
191+
182192
def assert_has_event_named(self, name):
183193
event = self.get_event_named(name)
184194
finished_logs = self.get_finished_logs()

instrumentation-genai/opentelemetry-instrumentation-google-genai/tests/common/requests_mocker.py

+63-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import http.client
3838
import io
3939
import json
40+
from typing import Optional
4041

4142
import requests
4243
import requests.sessions
@@ -81,7 +82,7 @@ def response(self):
8182

8283

8384
def _return_error_status(
84-
args: RequestsCallArgs, status_code: int, reason: str = None
85+
args: RequestsCallArgs, status_code: int, reason: Optional[str] = None
8586
):
8687
result = requests.Response()
8788
result.url = args.request.url
@@ -123,6 +124,35 @@ def generate_response_from_dict(args):
123124
raise ValueError(f"Unsupported response type: {type(response)}")
124125

125126

127+
def _to_stream_response_generator(response_generators):
128+
if len(response_generators) == 1:
129+
return response_generators[0]
130+
131+
def combined_generator(args):
132+
first_response = response_generators[0](args)
133+
if first_response.status_code != 200:
134+
return first_response
135+
result = requests.Response()
136+
result.status_code = 200
137+
result.headers["content-type"] = "application/json"
138+
result.encoding = "utf-8"
139+
result.headers["transfer-encoding"] = "chunked"
140+
contents = []
141+
for generator in response_generators:
142+
response = generator(args)
143+
if response.status_code != 200:
144+
continue
145+
response_json = response.json()
146+
response_json_str = json.dumps(response_json)
147+
contents.append(f"data: {response_json_str}")
148+
contents_str = "\r\n".join(contents)
149+
full_contents = f"{contents_str}\r\n\r\n"
150+
result.raw = io.BytesIO(full_contents.encode())
151+
return result
152+
153+
return combined_generator
154+
155+
126156
class RequestsMocker:
127157
def __init__(self):
128158
self._original_send = requests.sessions.Session.send
@@ -159,6 +189,38 @@ def _do_send(
159189
session: requests.sessions.Session,
160190
request: requests.PreparedRequest,
161191
**kwargs,
192+
):
193+
stream = kwargs.get("stream", False)
194+
if not stream:
195+
return self._do_send_non_streaming(session, request, **kwargs)
196+
return self._do_send_streaming(session, request, **kwargs)
197+
198+
def _do_send_streaming(
199+
self,
200+
session: requests.sessions.Session,
201+
request: requests.PreparedRequest,
202+
**kwargs,
203+
):
204+
args = RequestsCallArgs(session, request, **kwargs)
205+
response_generators = []
206+
for matcher, response_generator in self._handlers:
207+
if matcher is None:
208+
response_generators.append(response_generator)
209+
elif matcher(args):
210+
response_generators.append(response_generator)
211+
if not response_generators:
212+
response_generators.append(_return_404)
213+
response_generator = _to_stream_response_generator(response_generators)
214+
call = RequestsCall(args, response_generator)
215+
result = call.response
216+
self._calls.append(call)
217+
return result
218+
219+
def _do_send_non_streaming(
220+
self,
221+
session: requests.sessions.Session,
222+
request: requests.PreparedRequest,
223+
**kwargs,
162224
):
163225
args = RequestsCallArgs(session, request, **kwargs)
164226
response_generator = self._lookup_response_generator(args)

instrumentation-genai/opentelemetry-instrumentation-google-genai/tests/generate_content/nonstreaming_base.py

+4-38
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,7 @@
1717
import unittest
1818

1919
from ..common.base import TestCase
20-
21-
22-
def create_valid_response(
23-
response_text="The model response", input_tokens=10, output_tokens=20
24-
):
25-
return {
26-
"modelVersion": "gemini-2.0-flash-test123",
27-
"usageMetadata": {
28-
"promptTokenCount": input_tokens,
29-
"candidatesTokenCount": output_tokens,
30-
"totalTokenCount": input_tokens + output_tokens,
31-
},
32-
"candidates": [
33-
{
34-
"content": {
35-
"role": "model",
36-
"parts": [
37-
{
38-
"text": response_text,
39-
}
40-
],
41-
}
42-
}
43-
],
44-
}
20+
from .util import create_valid_response
4521

4622

4723
class NonStreamingTestCase(TestCase):
@@ -56,22 +32,12 @@ def setUp(self): # pylint: disable=invalid-name
5632
def generate_content(self, *args, **kwargs):
5733
raise NotImplementedError("Must implement 'generate_content'.")
5834

35+
@property
5936
def expected_function_name(self):
6037
raise NotImplementedError("Must implement 'expected_function_name'.")
6138

62-
def configure_valid_response(
63-
self,
64-
response_text="The model_response",
65-
input_tokens=10,
66-
output_tokens=20,
67-
):
68-
self.requests.add_response(
69-
create_valid_response(
70-
response_text=response_text,
71-
input_tokens=input_tokens,
72-
output_tokens=output_tokens,
73-
)
74-
)
39+
def configure_valid_response(self, *args, **kwargs):
40+
self.requests.add_response(create_valid_response(*args, **kwargs))
7541

7642
def test_instrumentation_does_not_break_core_functionality(self):
7743
self.configure_valid_response(response_text="Yep, it works!")

0 commit comments

Comments
 (0)