diff --git a/.doc_gen/metadata/bedrock-agent_metadata.yaml b/.doc_gen/metadata/bedrock-agent_metadata.yaml index 99495ad4382..5658d12aa00 100644 --- a/.doc_gen/metadata/bedrock-agent_metadata.yaml +++ b/.doc_gen/metadata/bedrock-agent_metadata.yaml @@ -185,6 +185,8 @@ bedrock-agent_PrepareAgent: - python.example_code.bedrock-agent.PrepareAgent services: bedrock-agent: {PrepareAgent} + + bedrock-agent_GettingStartedWithBedrockAgents: title: An end-to-end example showing how to create and invoke &BRA; using an &AWS; SDK title_abbrev: Create and invoke an agent @@ -209,3 +211,225 @@ bedrock-agent_GettingStartedWithBedrockAgents: services: bedrock-agent: {CreateAgent, CreateAgentActionGroup, CreateAgentAlias, DeleteAgent, DeleteAgentAlias, GetAgent, ListAgentActionGroups, ListAgents, ListAgentKnowledgeBases, PrepareAgent} + + + +bedrock-agent_GettingStartedWithBedrockFlows: + title: An end-to-end example showing how to create and invoke an Amazon Bedrock flow using an &AWS; SDK + title_abbrev: Create and invoke a flow + synopsis_list: + - Create an execution role for the flow. + - Create the flow. + - Deploy the fully configured flow. + - Invoke the flow with user-provided prompts. + - Delete all created resources. + category: Scenarios + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Generates a music playlist based on user-specified genre and number of songs. + snippet_tags: + - python.example_code.bedrock-agent-runtime.Scenario_GettingStartedBedrockFlows + - python.example_code.bedrock-agent-runtime.flow_invoke_flow + - python.example_code.bedrock-agent-runtime.run_playlist_flow + - python.example_code.bedrock-agent-runtime.Scenario_GettingStartedBedrockFlows_iam_role + + services: + bedrock-agent: {CreateFlow, CreateFlowAlias, CreateFlowVersion, DeleteFlow, DeleteFlowVersion, DeleteFlowAlias, GetFlow, GetFlowAlias, + GetFlowVersion, PrepareFlow} + bedrock-agent-runtime: {InvokeFlow} + + +bedrock-agent_CreateFlow: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Create an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.create_flow + services: + bedrock-agent: {CreateFlow} + +bedrock-agent_PrepareFlow: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Prepare an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.prepare_flow + services: + bedrock-agent: {PrepareFlow} + +bedrock-agent_UpdateFlow: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Update an Amazon Bedrock Flow. + snippet_tags: + - python.example_code.bedrock-agent.update_flow + services: + bedrock-agent: {UpdateFlow} + +bedrock-agent_DeleteFlow: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Delete an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.delete_flow + services: + bedrock-agent: {DeleteFlow} + +bedrock-agent_GetFlow: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Get an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.get_flow + services: + bedrock-agent: {GetFlow} + +bedrock-agent_ListFlows: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: List Amazon Bedrock flows. + snippet_tags: + - python.example_code.bedrock-agent.list_flows + + services: + bedrock-agent: {ListFlows} + +bedrock-agent_CreateFlowAlias: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Create an alias for an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.create_flow_alias + + services: + bedrock-agent: {CreateFlowAlias} + +bedrock-agent_UpdateFlowAlias: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Update an alias for an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.update_flow_alias + + services: + bedrock-agent: {UpdateFlowAlias} + +bedrock-agent_DeleteFlowAlias: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Delete an alias for an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.delete_flow_alias + + services: + bedrock-agent: {DeleteFlowAlias} + +bedrock-agent_ListFlowAliases: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: List the aliases for an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.list_flow_aliases + + services: + bedrock-agent: {ListFlowAliases} + +bedrock-agent_CreateFlowVersion: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Create a version of an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.create_flow_version + + services: + bedrock-agent: {CreateFlowVersion} + +bedrock-agent_GetFlowVersion: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Get a version of an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.get_flow_version + + services: + bedrock-agent: {GetFlowVersion} + +bedrock-agent_DeleteFlowVersion: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: Delete a version of an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.delete_flow_version + + services: + bedrock-agent: {DeleteFlowVersion} + +bedrock-agent_ListFlowVersions: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/bedrock-agent + excerpts: + - description: List the versions of an Amazon Bedrock flow. + snippet_tags: + - python.example_code.bedrock-agent.list_flow_versions + + services: + bedrock-agent: {ListFlowVersions} diff --git a/python/example_code/bedrock-agent/README.md b/python/example_code/bedrock-agent/README.md index e5a04608892..6287c7e0a66 100644 --- a/python/example_code/bedrock-agent/README.md +++ b/python/example_code/bedrock-agent/README.md @@ -48,6 +48,23 @@ Code excerpts that show you how to call individual service functions. - [ListAgentKnowledgeBases](bedrock_agent_wrapper.py#L237) - [ListAgents](bedrock_agent_wrapper.py#L185) - [PrepareAgent](bedrock_agent_wrapper.py#L266) +- [InvokeFlow](flows/run_flow.py#L23) +- [CreateFlow](flows/flow.py#L18) +- [PrepareFlow](flows/flow.py#L58) +- [UpdateFlow](flows/flow.py#L112) +- [DeleteFlow](flows/flow.py#L156) +- [GetFlow](flows/flow.py#L192) +- [ListFlows](flows/flow.py#L229) +- [CreateFlowVersion](flows/flow_version.py#L18) +- [GetFlowVersion](flows/flow_version.py#L54) +- [DeleteFlowVersion](flows/flow_version.py#L91) +- [ListFlowVersions](flows/flow_version.py#L128) +- [CreateFlowAlias](flows/flow_alias.py#L15) +- [UpdateFlowAlias](flows/flow_alias.py#L55) +- [DeleteFlowAlias](flows/flow_alias.py#L98) +- [ListFlowAliases](flows/flow_alias.py#L132) + + ### Scenarios @@ -55,6 +72,7 @@ Code examples that show you how to accomplish a specific task by calling multipl functions within the same service. - [Create and invoke an agent](scenario_get_started_with_agents.py) +- [Create and invoke a flow](flows/playlist_flow.py) @@ -95,6 +113,37 @@ python scenario_get_started_with_agents.py + +#### Create and invoke a flow + +Shows how to create a simple flow that generates music playlists. +The flow includes a prompt node that generates a playlist for a chosen genre +and number of songs. The example creates the nodes and permissions +for the flow. + +Start the example by running the following at a command prompt: + +``` +python flows/playlist_flow.py +``` +When prompted, enter the genre of music and the number of songs you want +in the playlist. +Optionally, the script can delete the resources that it creates. If you want to use the flow later, such as in the Amazon Bedrock console, enter `n` when the script prompts you to delete resources. Note that you will then need to manually delete the resources. + + + +#### List flows + +Shows how to List Amazon Bedrock flows, versions of a flow, and aliases of a flow. + +Start the example by running the following at a command prompt: + +``` +python flows/list_flows.py +``` +The example first lists the flows in the current AWS Region. It +then prompts for a flow ID, which you can get from the list of flows. Finally, the example lists the flow versions and flow aliases for the flow ID that you entered. + ### Tests ⚠ Running tests might result in charges to your AWS account. diff --git a/python/example_code/bedrock-agent/flows/flow-conversation.py b/python/example_code/bedrock-agent/flows/flow-conversation.py new file mode 100644 index 00000000000..d8e08fbad63 --- /dev/null +++ b/python/example_code/bedrock-agent/flows/flow-conversation.py @@ -0,0 +1,182 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# snippet-start:[python.example_code.bedrock-agent-runtime.flow_conversation.complete] + + +""" +Shows how to run an Amazon Bedrock flow with InvokeFlow and handle muli-turn interaction +for a single conversation. +For more information, see https://docs.aws.amazon.com/bedrock/latest/userguide/flows-multi-turn-invocation.html. + +""" +import logging +import boto3 +import botocore + +import botocore.exceptions + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def invoke_flow(client, flow_id, flow_alias_id, input_data, execution_id): + """ + Invoke an Amazon Bedrock flow and handle the response stream. + + Args: + client: Boto3 client for Amazon Bedrock agent runtime. + flow_id: The ID of the flow to invoke. + flow_alias_id: The alias ID of the flow. + input_data: Input data for the flow. + execution_id: Execution ID for continuing a flow. Use the value None on first run. + + Returns: + Dict containing flow_complete status, input_required info, and execution_id + """ + + response = None + request_params = None + + if execution_id is None: + # Don't pass execution ID for first run. + request_params = { + "flowIdentifier": flow_id, + "flowAliasIdentifier": flow_alias_id, + "inputs": [input_data], + "enableTrace": True + } + else: + request_params = { + "flowIdentifier": flow_id, + "flowAliasIdentifier": flow_alias_id, + "executionId": execution_id, + "inputs": [input_data], + "enableTrace": True + } + + response = client.invoke_flow(**request_params) + + if "executionId" not in request_params: + execution_id = response['executionId'] + + input_required = None + flow_status = "" + + # Process the streaming response. + for event in response['responseStream']: + + # Check if flow is complete. + if 'flowCompletionEvent' in event: + flow_status = event['flowCompletionEvent']['completionReason'] + + # Check if more input us needed from user. + elif 'flowMultiTurnInputRequestEvent' in event: + input_required = event + + # Print the model output. + elif 'flowOutputEvent' in event: + print(event['flowOutputEvent']['content']['document']) + + # Log trace events. + elif 'flowTraceEvent' in event: + logger.info("Flow trace: %s", event['flowTraceEvent']) + + return { + "flow_status": flow_status, + "input_required": input_required, + "execution_id": execution_id + } + + +def converse_with_flow(bedrock_agent_client, flow_id, flow_alias_id): + """ + Run a conversation with the supplied flow. + + Args: + bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. + flow_id: The ID of the flow to run. + flow_alias_id: The alias ID of the flow. + + """ + + flow_execution_id = None + finished = False + + # Get the intial prompt from the user. + user_input = input("Enter input: ") + + # Use prompt to create input data. + flow_input_data = { + "content": { + "document": user_input + }, + "nodeName": "FlowInputNode", + "nodeOutputName": "document" + } + + try: + while not finished: + # Invoke the flow until successfully finished. + + result = invoke_flow( + bedrock_agent_client, flow_id, flow_alias_id, flow_input_data, flow_execution_id) + + status = result['flow_status'] + flow_execution_id = result['execution_id'] + more_input = result['input_required'] + if status == "INPUT_REQUIRED": + # The flow needs more information from the user. + logger.info("The flow %s requires more input", flow_id) + user_input = input( + more_input['flowMultiTurnInputRequestEvent']['content']['document'] + ": ") + flow_input_data = { + "content": { + "document": user_input + }, + "nodeName": more_input['flowMultiTurnInputRequestEvent']['nodeName'], + "nodeInputName": "agentInputText" + + } + elif status == "SUCCESS": + # The flow completed successfully. + finished = True + logger.info("The flow %s successfully completed.", flow_id) + + except botocore.exceptions.ClientError as e: + print(f"Client error: {str(e)}") + logger.error("Client error: %s", {str(e)}) + + except Exception as e: + print(f"An error occurred: {str(e)}") + logger.error("An error occurred: %s", {str(e)}) + logger.error("Error type: %s", {type(e)}) + + +def main(): + """ + Main entry point for the script. + """ + + # Replace these with your actual flow ID and flow alias ID. + FLOW_ID = 'YOUR_FLOW_ID' + FLOW_ALIAS_ID = 'YOUR_FLOW_ALIAS_ID' + + logger.info("Starting conversation with FLOW: %s ID: %s", + FLOW_ID, FLOW_ALIAS_ID) + + # Get the Bedrock agent runtime client. + session = boto3.Session(profile_name='default') + bedrock_agent_client = session.client('bedrock-agent-runtime') + + # Start the conversation. + converse_with_flow(bedrock_agent_client, FLOW_ID, FLOW_ALIAS_ID) + + logger.info("Conversation with FLOW: %s ID: %s finished", + FLOW_ID, FLOW_ALIAS_ID) + + +if __name__ == "__main__": + main() + + # snippet-end:[python.example_code.bedrock-agent-runtime.flow_conversation.complete] diff --git a/python/example_code/bedrock-agent/flows/flow.py b/python/example_code/bedrock-agent/flows/flow.py new file mode 100644 index 00000000000..2e4c0cda778 --- /dev/null +++ b/python/example_code/bedrock-agent/flows/flow.py @@ -0,0 +1,274 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +Shows how to use the AWS SDK for Python (Boto3) with the Amazon Bedrock Agents Runtime +to manage an Amazon Bedrock flow. +""" + +import logging +from time import sleep +from botocore.exceptions import ClientError + + +logging.basicConfig( + level=logging.INFO +) +logger = logging.getLogger(__name__) + +# snippet-start:[python.example_code.bedrock-agent.create_flow] +def create_flow(client, flow_name, flow_description, role_arn, flow_def): + """ + Creates an Amazon Bedrock flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_name (str): The name for the new flow. + role_arn (str): The ARN for the IAM role that use flow uses. + flow_def (json): The JSON definition of the flow that you want to create. + + Returns: + dict: The response from CreateFlow. + """ + try: + + logger.info("Creating flow: %s.", flow_name) + + response = client.create_flow( + name=flow_name, + description=flow_description, + executionRoleArn=role_arn, + definition=flow_def + ) + + logger.info("Successfully created flow: %s. ID: %s", + flow_name, + {response['id']}) + + return response + + except ClientError as e: + logger.exception("Client error creating flow: %s", {str(e)}) + raise + + except Exception as e: + logger.exception("Unexepcted error creating flow: %s", {str(e)}) + raise +# snippet-end:[python.example_code.bedrock-agent.create_flow] + +# snippet-start:[python.example_code.bedrock-agent.prepare_flow] +def prepare_flow(client, flow_id): + """ + Prepares an Amazon Bedrock Flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_id (str): The identifier of the flow that you want to prepare. + + Returns: + str: The status of the flow preparation + """ + try: + + # Prepare the flow. + logger.info("Preparing flow ID: %s", + flow_id) + + response = client.prepare_flow( + flowIdentifier=flow_id + ) + + status = response.get('status') + + while status == "Preparing": + logger.info("Preparing flow ID: %s. Status %s", + flow_id, status) + + sleep(5) + response = client.get_flow( + flowIdentifier=flow_id + ) + status = response.get('status') + print(f"Flow Status: {status}") + + if status == "Prepared": + logger.info("Finished preparing flow ID: %s. Status %s", + flow_id, status) + else: + logger.warning("flow ID: %s not prepared. Status %s", + flow_id, status) + + return status + + except ClientError as e: + logger.exception("Client error preparing flow: %s", {str(e)}) + raise + + except Exception as e: + logger.exception("Unexepcted error preparing flow: %s", {str(e)}) + raise +# snippet-end:[python.example_code.bedrock-agent.prepare_flow] + + +# snippet-start:[python.example_code.bedrock-agent.update_flow] +def update_flow(client, flow_id, flow_name, flow_description, role_arn, flow_def): + """ + Updates an Amazon Bedrock flow. + + Args: + client: bedrock agent boto3 client. + flow_id (str): The ID for the flow that you want to update. + flow_name (str): The name for the flow. + role_arn (str): The ARN for the IAM role that use flow uses. + flow_def (json): The JSON definition of the flow that you want to create. + + Returns: + dict: Flow information if successful. + """ + try: + + logger.info("Updating flow: %s.", flow_id) + + response = client.update_flow( + flowIdentifier=flow_id, + name=flow_name, + description=flow_description, + executionRoleArn=role_arn, + definition=flow_def + ) + + logger.info("Successfully updated flow: %s. ID: %s", + flow_name, + {response['id']}) + + return response + + except ClientError as e: + logger.exception("Client error updating flow: %s", {str(e)}) + raise + + except Exception as e: + logger.exception("Unexepcted error updating flow: %s", {str(e)}) + raise +# snippet-end:[python.example_code.bedrock-agent.update_flow] + + +# snippet-start:[python.example_code.bedrock-agent.delete_flow] +def delete_flow(client, flow_id): + """ + Deletes an Amazon Bedrock flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_id (str): The identifier of the flow that you want to delete. + + Returns: + dict: The response from the DeleteFLow operation. + """ + try: + + logger.info("Deleting flow ID: %s.", + flow_id) + + # Call DeleteFlow operation + response = client.delete_flow( + flowIdentifier=flow_id, + skipResourceInUseCheck=True + ) + + logger.info("Finished deleting flow ID: %s", flow_id) + + return response + + except ClientError as e: + logger.exception("Client error deleting flow: %s", {str(e)}) + raise + + except Exception as e: + logger.exception("Unexepcted error deleting flow: %s", {str(e)}) + raise + +# snippet-end:[python.example_code.bedrock-agent.delete_flow] + +# snippet-start:[python.example_code.bedrock-agent.get_flow] +def get_flow(client, flow_id): + """ + Gets an Amazon Bedrock flow. + + Args: + client: bedrock agent boto3 client. + flow_id (str): The identifier of the flow that you want to get. + + Returns: + dict: The response from the GetFlow operation. + """ + try: + + logger.info("Getting flow ID: %s.", + flow_id) + + # Call GetFlow operation. + response = client.get_flow( + flowIdentifier=flow_id + ) + + logger.info("Retrieved flow ID: %s. Name: %s", flow_id, + response['name']) + + return response + + except ClientError as e: + logger.exception("Client error getting flow: %s", {str(e)}) + raise + + except Exception as e: + logger.exception("Unexepcted error getting flow: %s", {str(e)}) + raise + +# snippet-end:[python.example_code.bedrock-agent.get_flow] + +# snippet-start:[python.example_code.bedrock-agent.list_flows] +def list_flows(client): + """ + Lists versions of an Amazon Bedrock flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + Nothing. + """ + try: + finished = False + + logger.info("Listing flows:") + + response = client.list_flows(maxResults=10) + + while finished is False: + + for flow in response['flowSummaries']: + print(f"ID: {flow['id']}") + print(f"Name: {flow['name']}") + print( + f"Description: {flow.get('description', 'No description')}") + print(f"Latest version: {flow['version']}") + print(f"Status: {flow['status']}\n") + + if 'nextToken' in response: + next_token = response['nextToken'] + response = client.list_flows(maxResults=10, + nextToken=next_token) + else: + finished = True + + logging.info("Successfully listed flows.") + + + except ClientError as e: + logging.exception("Client error listing flow versions: %s", str(e)) + raise + except Exception as e: + logging.exception("Unexpected error listing flow versions: %s", str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.list_flows] diff --git a/python/example_code/bedrock-agent/flows/flow_alias.py b/python/example_code/bedrock-agent/flows/flow_alias.py new file mode 100644 index 00000000000..90f22b4a269 --- /dev/null +++ b/python/example_code/bedrock-agent/flows/flow_alias.py @@ -0,0 +1,182 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +Shows how to use the AWS SDK for Python (Boto3) with the Amazon Bedrock Agents Runtime +to use an alias with an Amazon Bedrock flow. +""" +import logging + +from botocore.exceptions import ClientError + +# Create a logger instance. +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.bedrock-agent.create_flow_alias] +def create_flow_alias(client, flow_id, flow_version, name, description): + """ + Creates an alias for an Amazon Bedrock flow. + + Args: + client: bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + str: The ID for the flow alias. + """ + + try: + logger.info("Creating flow alias for flow: %s.", flow_id) + + response = client.create_flow_alias( + flowIdentifier=flow_id, + name=name, + description=description, + routingConfiguration=[ + { + "flowVersion": flow_version + } + ] + ) + logger.info("Successfully created flow alias for %s.", flow_id) + + return response['id'] + + except ClientError as e: + logging.exception("Client error creating alias for flow: %s - %s", + flow_id, str(e)) + raise + except Exception as e: + logging.exception("Unexpected error creating alias for flow : %s - %s", + flow_id, str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.create_flow_alias] + +# snippet-start:[python.example_code.bedrock-agent.update_flow_alias] +def update_flow_alias(client, flow_id, alias_id, flow_version, name, description): + """ + Updates an alias for an Amazon Bedrock flow. + + Args: + client: bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + str: The response from UpdateFlowAlias. + """ + + try: + logger.info("Updating flow alias %s for flow: %s.", alias_id, flow_id) + + response = client.update_flow_alias( + aliasIdentifier=alias_id, + flowIdentifier=flow_id, + name=name, + description=description, + routingConfiguration=[ + { + "flowVersion": flow_version + } + ] + ) + logger.info("Successfully updated flow alias %s for %s.", alias_id, flow_id) + + return response + + except ClientError as e: + logging.exception("Client error updating alias %s for flow: %s - %s", + alias_id, flow_id, str(e)) + raise + except Exception as e: + logging.exception("Unexpected error updating alias %s for flow : %s - %s", + alias_id, flow_id, str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.update_flow_alias] + + + +# snippet-start:[python.example_code.bedrock-agent.delete_flow_alias] +def delete_flow_alias(client, flow_id, flow_alias_id): + """ + Deletes an Amazon Bedrock flow alias. + + Args: + client: bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + dict: The response from the call to DetectFLowAlias + """ + try: + + logger.info("Deleting flow alias %s for flow: %s.", flow_alias_id, flow_id) + + # Delete the flow alias. + response = client.delete_flow_alias( + aliasIdentifier=flow_alias_id, + flowIdentifier=flow_id + ) + + logging.info("Successfully deleted flow version for %s.", flow_id) + return response + + except ClientError as e: + logging.exception("Client error deleting flow version: %s", str(e)) + raise + except Exception as e: + logging.exception("Unexpected deleting flow version: %s", str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.delete_flow_alias] + + +# snippet-start:[python.example_code.bedrock-agent.list_flow_aliases] +def list_flow_aliases(client, flow_id): + """ + Lists the aliases of an Amazon Bedrock flow. + + Args: + client: bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + dict: The response from ListFlowAliases. + """ + try: + + finished = False + + logger.info("Listing flow aliases for flow: %s.", flow_id) + + print(f"Aliases for flow: {flow_id}") + + response = client.list_flow_aliases( + flowIdentifier=flow_id, + maxResults=10) + + while finished is False: + + for alias in response['flowAliasSummaries']: + print(f"Alias Name: {alias['name']}") + print(f"ID: {alias['id']}") + print(f"Description: {alias.get('description', 'No description')}\n") + + if 'nextToken' in response: + next_token = response['nextToken'] + response = client.list_flow_aliases(maxResults=10, + nextToken=next_token) + else: + finished = True + + logging.info("Successfully listed flow aliases for flow %s.", + flow_id) + + return response + + except ClientError as e: + logging.exception("Client error listing flow aliases: %s", str(e)) + raise + except Exception as e: + logging.exception("Unexpected error listing flow aliases: %s", str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.list_flow_aliases] + diff --git a/python/example_code/bedrock-agent/flows/flow_version.py b/python/example_code/bedrock-agent/flows/flow_version.py new file mode 100644 index 00000000000..688a18437ca --- /dev/null +++ b/python/example_code/bedrock-agent/flows/flow_version.py @@ -0,0 +1,177 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +Shows how to use the AWS SDK for Python (Boto3) with the Amazon Bedrock Agents Runtime +to manage versions of an Amazon Bedrock flow. +""" +import logging + +from botocore.exceptions import ClientError + +# Create a logger instance +logging.basicConfig( + level=logging.INFO +) +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.bedrock-agent.create_flow_version] +def create_flow_version(client, flow_id, description): + """ + Creates a version of an Amazon Bedrock flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + description (str) : A description for the flow. + + Returns: + str: The version for the flow. + """ + try: + + logger.info("Creating flow version for flow: %s.", flow_id) + + # Call CreateFlowVersion operation + response = client.create_flow_version( + flowIdentifier=flow_id, + description=description + ) + + logging.info("Successfully created flow version %s for flow %s.", + response['version'], flow_id) + + return response['version'] + + except ClientError as e: + logging.exception("Client error creating flow: %s", str(e)) + raise + except Exception as e: + logging.exception("Unexpected error creating flow : %s", str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.create_flow_version] + +# snippet-start:[python.example_code.bedrock-agent.get_flow_version] +def get_flow_version(client, flow_id, flow_version): + """ + Gets information about a version of an Amazon Bedrock flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + flow_version (str): The flow version of the flow. + + Returns: + dict: The response from the call to GetFlowVersion. + """ + try: + + logger.info("Deleting flow version for flow: %s.", flow_id) + + # Call GetFlowVersion operation + response = client.get_flow_version( + flowIdentifier=flow_id, + flowVersion=flow_version + ) + + logging.info("Successfully got flow version %s information for flow %s.", + flow_version, + flow_id) + + return response + + except ClientError as e: + logging.exception("Client error getting flow version: %s", str(e)) + raise + except Exception as e: + logging.exception("Unexpected error getting flow version: %s", str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.get_flow_version] + +# snippet-start:[python.example_code.bedrock-agent.delete_flow_version] +def delete_flow_version(client, flow_id, flow_version): + """ + Deletes a version of an Amazon Bedrock flow. + + Args: + client: Amazon Bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + dict: The response from DeleteFlowVersion. + """ + try: + + logger.info("Deleting flow version %s for flow: %s.",flow_version, flow_id) + + # Call DeleteFlowVersion operation + response = client.delete_flow_version( + flowIdentifier=flow_id, + flowVersion=flow_version + ) + + logging.info("Successfully deleted flow version %s for %s.", + flow_version, + flow_id) + return response + + except ClientError as e: + logging.exception("Client error deleting flow version: %s ", str(e)) + raise + except Exception as e: + logging.exception("Unexpected deleting flow version: %s", str(e)) + raise + +# snippet-end:[python.example_code.bedrock-agent.delete_flow_version] + + +# snippet-start:[python.example_code.bedrock-agent.list_flow_versions] +def list_flow_versions(client, flow_id): + """ + Lists the versions of an Amazon Bedrock flow. + + Args: + client: Amazon bedrock agent boto3 client. + flow_id (str): The identifier of the flow. + + Returns: + dict: The response from ListFlowVersions. + """ + try: + + finished = False + + logger.info("Listing flow versions for flow: %s.", flow_id) + + response = client.list_flow_versions( + flowIdentifier=flow_id, + maxResults=10) + + while finished is False: + + print(f"Versions for flow:{flow_id}") + for version in response['flowVersionSummaries']: + print(f"Version: {version['version']}") + print(f"Status: {version['status']}\n") + + if 'nextToken' in response: + next_token = response['nextToken'] + response = client.list_flow_versions(maxResults=10, + nextToken=next_token) + else: + finished = True + + + logging.info("Successfully listed flow versions for flow %s.", + flow_id) + + return response + + except ClientError as e: + logging.exception("Client error listing flow versions: %s", str(e)) + raise + except Exception as e: + logging.exception("Unexpected error listing flow versions: %s", str(e)) + raise +# snippet-end:[python.example_code.bedrock-agent.list_flow_versions] + diff --git a/python/example_code/bedrock-agent/flows/list_flows.py b/python/example_code/bedrock-agent/flows/list_flows.py new file mode 100644 index 00000000000..85e7d3b8db0 --- /dev/null +++ b/python/example_code/bedrock-agent/flows/list_flows.py @@ -0,0 +1,57 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Amazon Bedrock Flow lists + +Shows how to list Amazon Bedrock flows, flow versions in a flow, +and flow aliases in a flow. + +""" + +import logging +import boto3 + +from botocore.exceptions import ClientError + + +from flow import list_flows +from flow_version import list_flow_versions +from flow_alias import list_flow_aliases + + +def main(): + """ + List the Amazon Bedrock flows, flow versions in a flow, + and flow aliases in a flow. The call to List_flows shows + the IDs that you can enter for the flow ID. + + Note: + Requires valid AWS credentials in the default profile. + """ + + try: + + session = boto3.Session(profile_name='default') + bedrock_agent_client = session.client('bedrock-agent') + + print("Listing flows") + list_flows(bedrock_agent_client) + flow_id = input("Enter Flow ID: ") + print(f"Listing flow versions for flow {flow_id}") + list_flow_versions(bedrock_agent_client, flow_id) + print(f"Listing flow aliases for flow {flow_id}") + list_flow_aliases(bedrock_agent_client, flow_id) + + except ClientError as e: + logging.exception("Client error running example: %s", str(e)) + + except Exception as e: + print(f"Fatal error: {str(e)}") + + finally: + print ("Done") + +if __name__ == "__main__": + main() + diff --git a/python/example_code/bedrock-agent/flows/playlist_flow.py b/python/example_code/bedrock-agent/flows/playlist_flow.py new file mode 100644 index 00000000000..47a1dd93416 --- /dev/null +++ b/python/example_code/bedrock-agent/flows/playlist_flow.py @@ -0,0 +1,424 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Amazon Bedrock Flow Generator for Music Playlists + +This module implements functionality to create, run, and manage an Amazon Bedrock flow +that generates music playlists. The flow consists of three nodes: +- Input node: Accepts genre and number of songs as input +- Prompt node: Uses a foundation model to generate a playlist +- Output node: Returns the generated playlist + +The flow takes a music genre and desired number of songs as input and returns +a generated playlist using the specified foundation model. + +The script will: +1. Create necessary IAM roles and permissions +2. Create and configure the Bedrock flow +3. Run the flow with user-provided genre and song count +4. Optionally delete the flow and cleanup resources + +""" + +# snippet-start:[python.example_code.bedrock-agent-runtime.Scenario_GettingStartedBedrockFlows] +from datetime import datetime +import logging +import boto3 + +from botocore.exceptions import ClientError + +from roles import create_flow_role, delete_flow_role, update_role_policy +from flow import create_flow, prepare_flow, delete_flow +from run_flow import run_playlist_flow +from flow_version import create_flow_version, delete_flow_version +from flow_alias import create_flow_alias, delete_flow_alias + +logging.basicConfig( + level=logging.INFO +) +logger = logging.getLogger(__name__) + +# snippet-start:[python.example_code.bedrock-agent-runtime.create_input_node] +def create_input_node(name): + """ + Creates an input node configuration for an Amazon Bedrock flow. + + The input node serves as the entry point for the flow and defines + the initial document structure that will be passed to subsequent nodes. + + Args: + name (str): The name of the input node. + + Returns: + dict: The input node configuration. + + """ + return { + "type": "Input", + "name": name, + "outputs": [ + { + "name": "document", + "type": "Object" + } + ] + } + +# snippet-end:[python.example_code.bedrock-agent-runtime.create_input_node] + +# snippet-start:[python.example_code.bedrock-agent-runtime.create_prompt_node] +def create_prompt_node(name, model_id): + """ + Creates a prompt node configuration for a Bedrock flow that generates music playlists. + + The prompt node defines an inline prompt template that creates a music playlist based on + a specified genre and number of songs. The prompt uses two variables that are mapped from + the input JSON object: + - {{genre}}: The genre of music to create a playlist for + - {{number}}: The number of songs to include in the playlist + + Args: + name (str): The name of the prompt node. + model_id (str): The identifier of the foundation model to use for the prompt. + + Returns: + dict: The prompt node. + + """ + + return { + "type": "Prompt", + "name": name, + "configuration": { + "prompt": { + "sourceConfiguration": { + "inline": { + "modelId": model_id, + "templateType": "TEXT", + "inferenceConfiguration": { + "text": { + "temperature": 0.8 + } + }, + "templateConfiguration": { + "text": { + "text": "Make me a {{genre}} playlist consisting of the following number of songs: {{number}}." + } + } + } + } + } + }, + "inputs": [ + { + "name": "genre", + "type": "String", + "expression": "$.data.genre" + }, + { + "name": "number", + "type": "Number", + "expression": "$.data.number" + } + ], + "outputs": [ + { + "name": "modelCompletion", + "type": "String" + } + ] + } + +# snippet-end:[python.example_code.bedrock-agent-runtime.create_prompt_node] + +# snippet-start:[python.example_code.bedrock-agent-runtime.create_output_node] +def create_output_node(name): + """ + Creates an output node configuration for a Bedrock flow. + + The output node validates that the output from the last node is a string + and returns it unmodified. The input name must be "document". + + Args: + name (str): The name of the output node. + + Returns: + dict: The output node configuration containing the output node: + + """ + + return { + "type": "Output", + "name": name, + "inputs": [ + { + "name": "document", + "type": "String", + "expression": "$.data" + } + ] + } + +# snippet-end:[python.example_code.bedrock-agent-runtime.create_output_node] + + +# snippet-start:[python.example_code.bedrock-agent-runtime.create_playlist_flow] + +def create_playlist_flow(client, flow_name, flow_description, role_arn, prompt_model_id): + """ + Creates the playlist generator flow. + Args: + client: bedrock agent boto3 client. + role_arn (str): Name for the new IAM role. + prompt_model_id (str): The id of the model to use in the prompt node. + Returns: + dict: The response from the create_flow operation. + """ + + input_node = create_input_node("FlowInput") + prompt_node = create_prompt_node("MakePlaylist", prompt_model_id) + output_node = create_output_node("FlowOutput") + + # Create connections between the nodes + connections = [] + + # First, create connections between the output of the flow + # input node and each input of the prompt node. + for prompt_node_input in prompt_node["inputs"]: + connections.append( + { + "name": "_".join([input_node["name"], prompt_node["name"], + prompt_node_input["name"]]), + "source": input_node["name"], + "target": prompt_node["name"], + "type": "Data", + "configuration": { + "data": { + "sourceOutput": input_node["outputs"][0]["name"], + "targetInput": prompt_node_input["name"] + } + } + } + ) + + # Then, create a connection between the output of the prompt node and the input of the flow output node + connections.append( + { + "name": "_".join([prompt_node["name"], output_node["name"]]), + "source": prompt_node["name"], + "target": output_node["name"], + "type": "Data", + "configuration": { + "data": { + "sourceOutput": prompt_node["outputs"][0]["name"], + "targetInput": output_node["inputs"][0]["name"] + } + } + } + ) + + flow_def = { + "nodes": [input_node, prompt_node, output_node], + "connections": connections + } + + # Create the flow. + + response = create_flow( + client, flow_name, flow_description, role_arn, flow_def) + + return response + +# snippet-end:[python.example_code.bedrock-agent-runtime.create_playlist_flow] + + +# snippet-start:[python.example_code.bedrock-agent-runtime.create_get_model_arn] +def get_model_arn(client, model_id): + """ + Gets the Amazon Resource Name (ARN) for a model. + Args: + client (str): Amazon Bedrock boto3 client. + model_id (str): The id of the model. + Returns: + str: The ARN of the model. + """ + + try: + # Call GetFoundationModelDetails operation + response = client.get_foundation_model(modelIdentifier=model_id) + + # Extract model ARN from the response + model_arn = response['modelDetails']['modelArn'] + + return model_arn + + except ClientError as e: + logger.exception("Client error getting model ARN: %s", {str(e)}) + raise + + except Exception as e: + logger.exception("Unexpected error getting model ARN: %s", {str(e)}) + raise +# snippet-end:[python.example_code.bedrock-agent-runtime.create_get_model_arn] + + +# snippet-start:[python.example_code.bedrock-agent-runtime.flow_prepare_version_alias] +def prepare_flow_version_and_alias(bedrock_agent_client, + flow_id): + """ + Prepares the flow and then creates a flow version and flow alias. + Args: + bedrock_agent_client: Amazon Bedrock Agent boto3 client. + flowd_id (str): The ID of the flow that you want to prepare. + Returns: The flow_version and flow_alias. + + """ + + status = prepare_flow(bedrock_agent_client, flow_id) + + flow_version = None + flow_alias = None + + if status == 'Prepared': + + # Create the flow version and alias. + flow_version = create_flow_version(bedrock_agent_client, + flow_id, + f"flow version for flow {flow_id}.") + + flow_alias = create_flow_alias(bedrock_agent_client, + flow_id, + flow_version, + "latest", + f"Alias for flow {flow_id}, version {flow_version}") + + return flow_version, flow_alias + +# snippet-end:[python.example_code.bedrock-agent-runtime.flow_prepare_version_alias] + +# snippet-start:[python.example_code.bedrock-agent-runtime.flow_delete_resources] + +def delete_role_resources(bedrock_agent_client, + iam_client, + role_name, + flow_id, + flow_version, + flow_alias): + """ + Deletes the flow, flow alias, flow version, and IAM roles. + Args: + bedrock_agent_client: Amazon Bedrock Agent boto3 client. + iam_client: Amazon IAM boto3 client. + role_name (str): The name of the IAM role. + flow_id (str): The id of the flow. + flow_version (str): The version of the flow. + flow_alias (str): The alias of the flow. + """ + + if flow_id is not None: + if flow_alias is not None: + delete_flow_alias(bedrock_agent_client, flow_id, flow_alias) + if flow_version is not None: + delete_flow_version(bedrock_agent_client, + flow_id, flow_version) + delete_flow(bedrock_agent_client, flow_id) + + if role_name is not None: + delete_flow_role(iam_client, role_name) + +# snippet-end:[python.example_code.bedrock-agent-runtime.flow_delete_resources] + + +def main(): + """ + Creates, runs, and optionally deletes a Bedrock flow for generating music playlists. + + Note: + Requires valid AWS credentials in the default profile + """ + + delete_choice = "y" + try: + + # Get various boto3 clients. + session = boto3.Session(profile_name='default') + bedrock_agent_runtime_client = session.client('bedrock-agent-runtime') + bedrock_agent_client = session.client('bedrock-agent') + bedrock_client = session.client('bedrock') + iam_client = session.client('iam') + + role_name = None + flow_id = None + flow_version = None + flow_alias = None + + #Change the model as needed. + prompt_model_id = "amazon.nova-pro-v1:0" + + # Base the flow name on the current date and time + current_time = datetime.now() + timestamp = current_time.strftime("%Y-%m-%d-%H-%M-%S") + flow_name = f"FlowPlayList_{timestamp}" + flow_description = "A flow to generate a music playlist." + + # Create a role for the flow. + role_name = f"BedrockFlowRole-{flow_name}" + role = create_flow_role(iam_client, role_name) + role_arn = role['Arn'] + + # Create the flow. + response = create_playlist_flow( + bedrock_agent_client, flow_name, flow_description, role_arn, prompt_model_id) + flow_id = response.get('id') + + if flow_id: + # Update accessible resources in the role. + model_arn = get_model_arn(bedrock_client, prompt_model_id) + update_role_policy(iam_client, role_name, [ + response.get('arn'), model_arn]) + + # Prepare the flow and flow version. + flow_version, flow_alias = prepare_flow_version_and_alias( + bedrock_agent_client, flow_id) + + # Run the flow. + if flow_version and flow_alias: + run_playlist_flow(bedrock_agent_runtime_client, + flow_id, flow_alias) + + delete_choice = input("Delete flow? y or n : ").lower() + + + else: + print("Couldn't run. Deleting flow and role.") + delete_flow(bedrock_agent_client, flow_id) + delete_flow_role(iam_client, role_name) + else: + print("Couldn't create flow.") + + + except Exception as e: + print(f"Fatal error: {str(e)}") + + finally: + if delete_choice == 'y': + delete_role_resources(bedrock_agent_client, + iam_client, + role_name, + flow_id, + flow_version, + flow_alias) + else: + print("Flow not deleted. ") + print(f"\tFlow ID: {flow_id}") + print(f"\tFlow version: {flow_version}") + print(f"\tFlow alias: {flow_alias}") + print(f"\tRole ARN: {role_arn}") + + print("Done!") + +if __name__ == "__main__": + main() + +# snippet-end:[python.example_code.bedrock-agent-runtime.Scenario_GettingStartedBedrockFlows] diff --git a/python/example_code/bedrock-agent/flows/roles.py b/python/example_code/bedrock-agent/flows/roles.py new file mode 100644 index 00000000000..558e1167b5c --- /dev/null +++ b/python/example_code/bedrock-agent/flows/roles.py @@ -0,0 +1,159 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +IAM Role Management for Amazon Bedrock Flows + +This module provides functionality to create, update, and delete IAM roles specifically +configured for Amazon Bedrock flows. It handles the complete lifecycle of IAM roles +including trust relationships, inline policies, and permissions management. +""" +import logging +import json +from botocore.exceptions import ClientError + +logging.basicConfig( + level=logging.INFO +) +logger = logging.getLogger(__name__) + +# snippet-start:[python.example_code.bedrock-agent-runtime.Scenario_GettingStartedBedrockFlows_iam_role] + +def create_flow_role(client, role_name): + """ + Creates an IAM role for Amazon Bedrock with permissions to run a flow. + + Args: + role_name (str): Name for the new IAM role. + Returns: + str: The role Amazon Resource Name. + """ + + + # Trust relationship policy - allows Amazon Bedrock service to assume this role. + trust_policy = { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": { + "Service": "bedrock.amazonaws.com" + }, + "Action": "sts:AssumeRole" + }] + } + + # Basic inline policy for for running a flow. + + resources = "*" + + bedrock_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "bedrock:InvokeModel", + "bedrock:Retrieve", + "bedrock:RetrieveAndGenerate" + ], + # Using * as placeholder - Later you update with specific ARNs. + "Resource": resources + } + ] + } + + + + try: + # Create the IAM role with trust policy + logging.info("Creating role: %s",role_name) + role = client.create_role( + RoleName=role_name, + AssumeRolePolicyDocument=json.dumps(trust_policy), + Description="Role for Amazon Bedrock operations" + ) + + # Attach inline policy to the role + print("Attaching inline policy") + client.put_role_policy( + RoleName=role_name, + PolicyName=f"{role_name}-policy", + PolicyDocument=json.dumps(bedrock_policy) + ) + + logging.info("Create Role ARN: %s", role['Role']['Arn']) + return role['Role'] + + except ClientError as e: + logging.warning("Error creating role: %s", str(e)) + raise + except Exception as e: + logging.warning("Unexpected error: %s", str(e)) + raise + + +def update_role_policy(client, role_name, resource_arns): + """ + Updates an IAM role's inline policy with specific resource ARNs. + + Args: + role_name (str): Name of the existing role. + resource_arns (list): List of resource ARNs to allow access to. + """ + + + updated_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "bedrock:GetFlow", + "bedrock:InvokeModel", + "bedrock:Retrieve", + "bedrock:RetrieveAndGenerate" + ], + "Resource": resource_arns + } + ] + } + + try: + client.put_role_policy( + RoleName=role_name, + PolicyName=f"{role_name}-policy", + PolicyDocument=json.dumps(updated_policy) + ) + logging.info("Updated policy for role: %s",role_name) + + except ClientError as e: + logging.warning("Error updating role policy: %s", str(e)) + raise + + +def delete_flow_role(client, role_name): + """ + Deletes an IAM role. + + Args: + role_name (str): Name of the role to delete. + """ + + + + try: + # Detach and delete inline policies + policies = client.list_role_policies(RoleName=role_name)['PolicyNames'] + for policy_name in policies: + client.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + + # Delete the role + client.delete_role(RoleName=role_name) + logging.info("Deleted role: %s", role_name) + + + except ClientError as e: + logging.info("Error Deleting role: %s", str(e)) + raise + +# snippet-end:[python.example_code.bedrock-agent-runtime.Scenario_GettingStartedBedrockFlows_iam_role] \ No newline at end of file diff --git a/python/example_code/bedrock-agent/flows/run_flow.py b/python/example_code/bedrock-agent/flows/run_flow.py new file mode 100644 index 00000000000..766673fba47 --- /dev/null +++ b/python/example_code/bedrock-agent/flows/run_flow.py @@ -0,0 +1,137 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Amazon Bedrock Flow Runner for Playlist Generation + +This module provides functionality to execute an existing Amazon Bedrock flow that generates +a music playlist based on user-specified genre and number of songs. It handles flow invocation, +response streaming, and error handling. + +The module interacts with a pre-configured Bedrock flow that expects: + - Input: JSON document containing genre and number of songs + - Output: Generated playlist as formatted text + +""" + +import logging +from botocore.exceptions import ClientError + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# snippet-start:[python.example_code.bedrock-agent-runtime.flow_invoke_flow] +def invoke_flow(client, flow_id, flow_alias_id, input_data): + """ + Invoke an Amazon Bedrock flow and handle the response stream. + + Args: + client: Boto3 client for Amazon Bedrock agent runtime. + flow_id: The ID of the flow to invoke. + flow_alias_id: The alias ID of the flow. + input_data: Input data for the flow. + + Returns: + Dict containing flow status and flow output. + """ + + response = None + request_params = None + + request_params = { + "flowIdentifier": flow_id, + "flowAliasIdentifier": flow_alias_id, + "inputs": [input_data], + "enableTrace": True + } + + + response = client.invoke_flow(**request_params) + + flow_status = "" + output= "" + + # Process the streaming response + for event in response['responseStream']: + + # Check if flow is complete. + if 'flowCompletionEvent' in event: + flow_status = event['flowCompletionEvent']['completionReason'] + + # Save the model output. + elif 'flowOutputEvent' in event: + output = event['flowOutputEvent']['content']['document'] + logger.info("Output : %s", output) + + # Log trace events. + elif 'flowTraceEvent' in event: + logger.info("Flow trace: %s", event['flowTraceEvent']) + + return { + "flow_status": flow_status, + "output": output + + } + + +# snippet-end:[python.example_code.bedrock-agent-runtime.flow_invoke_flow] + +# snippet-start:[python.example_code.bedrock-agent-runtime.run_playlist_flow] + +def run_playlist_flow(bedrock_agent_client, flow_id, flow_alias_id): + """ + Runs the playlist generator flow. + + Args: + bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. + flow_id: The ID of the flow to run. + flow_alias_id: The alias ID of the flow. + + """ + + + print ("Welcome to the playlist generator flow.") + # Get the initial prompt from the user. + genre = input("Enter genre: ") + number_of_songs = int(input("Enter number of songs: ")) + + + # Use prompt to create input data for the input node. + flow_input_data = { + "content": { + "document": { + "genre" : genre, + "number" : number_of_songs + } + }, + "nodeName": "FlowInput", + "nodeOutputName": "document" + } + + try: + + result = invoke_flow( + bedrock_agent_client, flow_id, flow_alias_id, flow_input_data) + + status = result['flow_status'] + + if status == "SUCCESS": + # The flow completed successfully. + logger.info("The flow %s successfully completed.", flow_id) + print(result['output']) + else: + logger.warning("Flow status: %s",status) + + except ClientError as e: + print(f"Client error: {str(e)}") + logger.error("Client error: %s", {str(e)}) + raise + + except Exception as e: + logger.error("An error occurred: %s", {str(e)}) + logger.error("Error type: %s", {type(e)}) + raise + +# snippet-end:[python.example_code.bedrock-agent-runtime.run_playlist_flow] + + diff --git a/python/example_code/bedrock-agent/test/conftest.py b/python/example_code/bedrock-agent/test/conftest.py index 95ed28e81a4..929ede44594 100644 --- a/python/example_code/bedrock-agent/test/conftest.py +++ b/python/example_code/bedrock-agent/test/conftest.py @@ -26,3 +26,33 @@ class FakeData: INSTRUCTION = "A fake instruction with a minimum of 40 characters" TIMESTAMP = "1970-01-01T00:00:00Z" VERSION = "1.234.5" + + +class FakeFlowData: + FLOW_NAME = "Fake_flow" + FLOW_DESCRIPTION = "Playlist creator flow" + FLOW_ID = "XXXXXXXXXX" + FLOW_VERSION = "DRAFT" + ROLE_ARN = f"arn:aws:iam::123456789012:role/BedrockFlowRole-{FLOW_NAME}" + FLOW_ARN = f"arn:aws:bedrock:us-east-1:123456789012:flow/{FLOW_ID}" + FLOW_DEFINITION = {} + CREATED_AT = "2025-03-29T21:34:43.048609+00:00" + UPDATED_AT = "2025-03-30T21:34:43.048609+00:00" + + VERSION_NAME = "Fake_flow_alias" + VERSION_DESCRIPTION = "Playlist creator flow version" + VERSION_ID = "XXXXXXXXXX" + FLOW_VERSION = "1" + VERSION_ARN = f"arn:aws:bedrock:us-east-1:123456789012:flow/{FLOW_ID}/version/{VERSION_ID}" + + ALIAS_NAME = "Fake_flow_alias" + ALIAS_DESCRIPTION = "Playlist creator flow alias" + FLOW_ID = "XXXXXXXXXX" + ALIAS_ID = "XXXXXXXXXX" + ALIAS_ARN = f"arn:aws:bedrock:us-east-1:123456789012:flow/{FLOW_ID}/alias/{ALIAS_ID}" + ROUTING_CONFIG = [ + { + "flowVersion": FLOW_VERSION + } + ] + SESSION_ID = "XXXXXXXXXX" \ No newline at end of file diff --git a/python/example_code/bedrock-agent/test/test_Invoke_flow.py b/python/example_code/bedrock-agent/test/test_Invoke_flow.py new file mode 100644 index 00000000000..f7be51642f4 --- /dev/null +++ b/python/example_code/bedrock-agent/test/test_Invoke_flow.py @@ -0,0 +1,60 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for flows.py. +""" + + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from conftest import FakeFlowData as Fake + +from flows import run_flow + + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_invoke_flow(make_stubber, error_code): + bedrock_agent_runtime_client = boto3.client( + service_name="bedrock-agent-runtime") + bedrock_agent_runtime_stubber = make_stubber(bedrock_agent_runtime_client) + + flow_input_data = { + "content": { + "document": { + "genre" : "pop", + "number" : "3" + } + }, + "nodeName": "FlowInput", + "nodeOutputName": "document" + } + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "flowAliasIdentifier": Fake.ALIAS_ID, + "inputs": [ flow_input_data], + "enableTrace": True + } + + response = {"responseStream": {}} + + bedrock_agent_runtime_stubber.stub_invoke_flow( + expected_params, response, error_code=error_code + ) + + if error_code is None: + response = run_flow.invoke_flow( + bedrock_agent_runtime_client, Fake.FLOW_ID, Fake.ALIAS_ID, flow_input_data + ) + assert response is not None + + else: + with pytest.raises(ClientError) as exc_info: + run_flow.invoke_flow(bedrock_agent_runtime_client, Fake.FLOW_ID, Fake.ALIAS_ID, flow_input_data) + assert exc_info.value.response["Error"]["Code"] == error_code + + diff --git a/python/example_code/bedrock-agent/test/test_flow.py b/python/example_code/bedrock-agent/test/test_flow.py new file mode 100644 index 00000000000..83613961338 --- /dev/null +++ b/python/example_code/bedrock-agent/test/test_flow.py @@ -0,0 +1,272 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for flows.py. +""" + + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from conftest import FakeFlowData as Fake + +from flows import flow + + +FLOW_NAME = "Fake_flow" +FLOW_DESCRIPTION = "Playlist creator flow" +FLOW_ID = "XXXXXXXXXX" +FLOW_VERSION = "DRAFT" +ROLE_ARN = f"arn:aws:iam::123456789012:role/BedrockFlowRole-{FLOW_NAME}" +FLOW_ARN = f"arn:aws:bedrock:us-east-1:123456789012:flow/{FLOW_ID}" +FLOW_DEFINITION = {} +CREATED_AT = "2025-03-29T21:34:43.048609+00:00" +UPDATED_AT = "2025-03-30T21:34:43.048609+00:00" + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_create_flow(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "name": Fake.FLOW_NAME, + "description": Fake.FLOW_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "definition": Fake.FLOW_DEFINITION + } + + response = { + "arn": Fake.FLOW_ARN, + "createdAt": Fake.CREATED_AT, + "definition": Fake.FLOW_DEFINITION, + "description": Fake.FLOW_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "id": Fake.FLOW_ID, + "name": Fake.FLOW_NAME, + "status": "NotPrepared", + "updatedAt": Fake.UPDATED_AT, + "version": "DRAFT" + } + + bedrock_agent_stubber.stub_create_flow( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow.create_flow( + bedrock_agent_client, Fake.FLOW_NAME, Fake.FLOW_DESCRIPTION, Fake.ROLE_ARN, Fake.FLOW_DEFINITION + ) + assert call_response["status"] == "NotPrepared" + + else: + with pytest.raises(ClientError) as exc_info: + flow.create_flow(bedrock_agent_client, Fake.FLOW_NAME, + Fake.FLOW_DESCRIPTION, Fake.ROLE_ARN, Fake.FLOW_DEFINITION) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_update_flow(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "name": Fake.FLOW_NAME, + "description": Fake.FLOW_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "definition": Fake.FLOW_DEFINITION + } + + response = { + "arn": Fake.FLOW_ARN, + "createdAt": Fake.CREATED_AT, + "definition": Fake.FLOW_DEFINITION, + "description": Fake.FLOW_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "id": Fake.FLOW_ID, + "name": Fake.FLOW_NAME, + "status": "NotPrepared", + "updatedAt": Fake.UPDATED_AT, + "version": "DRAFT" + } + + bedrock_agent_stubber.stub_update_flow( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow.update_flow( + bedrock_agent_client, Fake.FLOW_ID, Fake.FLOW_NAME, Fake.FLOW_DESCRIPTION, Fake.ROLE_ARN, Fake.FLOW_DEFINITION + ) + assert call_response["id"] == Fake.FLOW_ID + + else: + with pytest.raises(ClientError) as exc_info: + flow.update_flow(bedrock_agent_client, Fake.FLOW_ID, Fake.FLOW_NAME, + Fake.FLOW_DESCRIPTION, Fake.ROLE_ARN, Fake.FLOW_DEFINITION) + assert exc_info.value.response["Error"]["Code"] == error_code + + + + + + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_prepare_flow(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + } + + if error_code is None: + + # First stub - Flow starts preparing + bedrock_agent_stubber.stub_prepare_flow( + expected_params, + { + "id": Fake.FLOW_ID, + "status": "Preparing" + } + ) + + # Second stub - Get flow status for prepared flow. + bedrock_agent_stubber.stub_get_flow( + expected_params, + { + "arn": Fake.FLOW_ARN, + "createdAt": Fake.CREATED_AT, + "definition": Fake.FLOW_DEFINITION, + "description": Fake.FLOW_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "id": Fake.FLOW_ID, + "name": Fake.FLOW_NAME, + "status": "Prepared", + "updatedAt": Fake.UPDATED_AT, + "version": "DRAFT" + }) + + call_response = flow.prepare_flow(bedrock_agent_client, FLOW_ID) + assert call_response == "Prepared" + + else: + bedrock_agent_stubber.stub_prepare_flow( + expected_params, + {"id": Fake.FLOW_ID}, + error_code=error_code + ) + with pytest.raises(ClientError) as exc_info: + flow.prepare_flow(bedrock_agent_client, FLOW_ID) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_get_flow(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID + } + + response = { + "arn": Fake.FLOW_ARN, + "createdAt": Fake.CREATED_AT, + "definition": Fake.FLOW_DEFINITION, + "description": Fake.FLOW_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "id": Fake.FLOW_ID, + "name": Fake.FLOW_NAME, + "status": "NotPrepared", + "updatedAt": Fake.UPDATED_AT, + "version": "DRAFT" + } + + bedrock_agent_stubber.stub_get_flow( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow.get_flow( + bedrock_agent_client, Fake.FLOW_ID) + + assert call_response["status"] == "NotPrepared" + + else: + with pytest.raises(ClientError) as exc_info: + flow.get_flow(bedrock_agent_client, Fake.FLOW_ID) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_delete_flow(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "skipResourceInUseCheck": True + } + + response = { + "id": Fake.FLOW_ID + } + + bedrock_agent_stubber.stub_delete_flow( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow.delete_flow( + bedrock_agent_client, Fake.FLOW_ID) + + assert call_response["id"] == Fake.FLOW_ID + + else: + with pytest.raises(ClientError) as exc_info: + flow.delete_flow(bedrock_agent_client, Fake.FLOW_ID) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_flows(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = {"maxResults":10} + + response = { + "flowSummaries": [ + { + "arn": Fake.FLOW_ARN, + "createdAt": Fake.CREATED_AT, + "description": Fake.FLOW_DESCRIPTION, + "name": Fake.FLOW_NAME, + "id": Fake.FLOW_ID, + "status": "Prepared", + "updatedAt": Fake.UPDATED_AT, + "version": "DRAFT" + } + ] + } + + bedrock_agent_stubber.stub_list_flows( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow.list_flows( + bedrock_agent_client) + + assert call_response is None + + else: + with pytest.raises(ClientError) as exc_info: + flow.list_flows(bedrock_agent_client) + assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/example_code/bedrock-agent/test/test_flow_alias.py b/python/example_code/bedrock-agent/test/test_flow_alias.py new file mode 100644 index 00000000000..4995e370a00 --- /dev/null +++ b/python/example_code/bedrock-agent/test/test_flow_alias.py @@ -0,0 +1,178 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for flow_alias.py. +""" + +from conftest import FakeFlowData as Fake + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from flows import flow_alias + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_create_flow_alias(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier" : Fake.FLOW_ID, + "name" : Fake.ALIAS_NAME, + "description" : Fake.ALIAS_DESCRIPTION, + "routingConfiguration" : Fake.ROUTING_CONFIG + } + + response = { + "arn": Fake.ALIAS_ARN, + "createdAt": Fake.CREATED_AT, + "description": Fake.ALIAS_DESCRIPTION, + "flowId": Fake.FLOW_ID, + "id": Fake.ALIAS_ID, + "name": Fake.ALIAS_DESCRIPTION, + "routingConfiguration": [ + { + "flowVersion": Fake.FLOW_VERSION + } + ], + "updatedAt": Fake.UPDATED_AT +} + + bedrock_agent_stubber.stub_create_flow_alias( + expected_params, response, error_code=error_code + ) + + if error_code is None: + alias_id = flow_alias.create_flow_alias( + bedrock_agent_client, Fake.FLOW_ID, Fake.FLOW_VERSION, Fake.ALIAS_NAME, Fake.ALIAS_DESCRIPTION + ) + assert alias_id == Fake.ALIAS_ID + + else: + with pytest.raises(ClientError) as exc_info: + flow_alias.create_flow_alias(bedrock_agent_client, Fake.FLOW_ID, Fake.FLOW_VERSION, Fake.ALIAS_NAME, Fake.ALIAS_DESCRIPTION) + assert exc_info.value.response["Error"]["Code"] == error_code + + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_update_flow_alias(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "aliasIdentifier" : Fake.ALIAS_ID, + "flowIdentifier" : Fake.FLOW_ID, + "name" : Fake.ALIAS_NAME, + "description" : Fake.ALIAS_DESCRIPTION, + "routingConfiguration" : Fake.ROUTING_CONFIG + } + + response = { + "arn": Fake.ALIAS_ARN, + "createdAt": Fake.CREATED_AT, + "description": Fake.ALIAS_DESCRIPTION, + "flowId": Fake.FLOW_ID, + "id": Fake.ALIAS_ID, + "name": Fake.ALIAS_DESCRIPTION, + "routingConfiguration": [ + { + "flowVersion": Fake.FLOW_VERSION + } + ], + "updatedAt": Fake.UPDATED_AT +} + + bedrock_agent_stubber.stub_update_flow_alias( + expected_params, response, error_code=error_code + ) + + if error_code is None: + response = flow_alias.update_flow_alias( + bedrock_agent_client,Fake.ALIAS_ID, Fake.FLOW_ID, + Fake.FLOW_VERSION, Fake.ALIAS_NAME, Fake.ALIAS_DESCRIPTION + ) + assert response["id"] == Fake.ALIAS_ID + + else: + with pytest.raises(ClientError) as exc_info: + flow_alias.update_flow_alias(bedrock_agent_client,Fake.ALIAS_ID, Fake.FLOW_ID, + Fake.FLOW_VERSION, Fake.ALIAS_NAME, Fake.ALIAS_DESCRIPTION) + assert exc_info.value.response["Error"]["Code"] == error_code + + + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_delete_flow_alias(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "aliasIdentifier" : Fake.ALIAS_ID, + "flowIdentifier": Fake.FLOW_ID + } + + response = { + "flowId" : Fake.FLOW_ID, + "id": Fake.ALIAS_ID + } + + bedrock_agent_stubber.stub_delete_flow_alias( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow_alias.delete_flow_alias( + bedrock_agent_client, Fake.FLOW_ID, Fake.ALIAS_ID) + + assert call_response["id"] == Fake.FLOW_ID + + else: + with pytest.raises(ClientError) as exc_info: + flow_alias.delete_flow_alias(bedrock_agent_client, Fake.FLOW_ID, Fake.ALIAS_ID) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_flow_aliases(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "maxResults": 10 + } + + response = { + "flowAliasSummaries": [ + { + "arn": Fake.ALIAS_ARN, + "createdAt": Fake.CREATED_AT, + "description": Fake.ALIAS_DESCRIPTION, + "id": Fake.ALIAS_ID, + "flowId" : Fake.FLOW_ID, + "name": Fake.ALIAS_NAME, + "routingConfiguration": Fake.ROUTING_CONFIG, + "updatedAt": Fake.UPDATED_AT + } + ] +} + + bedrock_agent_stubber.stub_list_flow_aliases( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow_alias.list_flow_aliases( + bedrock_agent_client, Fake.FLOW_ID) + + assert call_response is not None + + else: + with pytest.raises(ClientError) as exc_info: + flow_alias.list_flow_aliases(bedrock_agent_client, Fake.FLOW_ID) + assert exc_info.value.response["Error"]["Code"] == error_code \ No newline at end of file diff --git a/python/example_code/bedrock-agent/test/test_flow_conversation.py b/python/example_code/bedrock-agent/test/test_flow_conversation.py new file mode 100644 index 00000000000..fd619675009 --- /dev/null +++ b/python/example_code/bedrock-agent/test/test_flow_conversation.py @@ -0,0 +1,31 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import subprocess +import sys + +files_under_test = [ + "flows/flow-conversation.py" +] + +@pytest.mark.integ +@pytest.mark.parametrize("file", files_under_test) +def test_flow_conversation(file): + # Simulate user input - each string represents one input() call + # If you're using the docs at https://docs.aws.amazon.com/bedrock/latest/userguide/flows-multi-turn-invocation.html, + # "Create a playlist\n 3\n pop, castles\n" should work with Antropic Haiku. + test_input = "Hello\n" + + result = subprocess.run( + [sys.executable, file], + input=test_input, + capture_output=True, + text=True, + ) + + print(f"STDOUT: {result.stdout}") # For debugging + print(f"STDERR: {result.stderr}") # For debugging + + assert result.stdout != "" + assert result.returncode == 0 diff --git a/python/example_code/bedrock-agent/test/test_flow_version.py b/python/example_code/bedrock-agent/test/test_flow_version.py new file mode 100644 index 00000000000..72cb10b0f74 --- /dev/null +++ b/python/example_code/bedrock-agent/test/test_flow_version.py @@ -0,0 +1,129 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for flow_version.py. +""" + +from conftest import FakeFlowData as Fake + +import boto3 +from botocore.exceptions import ClientError +import pytest + +from flows import flow_version + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_create_flow_version(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + create_flow_version_expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "description": Fake.VERSION_DESCRIPTION + } + + get_flow_version_expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "version": Fake.FLOW_VERSION + } + + response = { + "arn": Fake.VERSION_ARN, + "createdAt": Fake.CREATED_AT, + "definition": {}, + "description": Fake.VERSION_DESCRIPTION, + "executionRoleArn": Fake.ROLE_ARN, + "id": Fake.VERSION_ID, + "name": Fake.VERSION_NAME, + "status": "NotPrepared", + "version": Fake.FLOW_VERSION + } + + # First stub - Start creating the flow version. + bedrock_agent_stubber.stub_create_flow_version( + create_flow_version_expected_params, + response, + error_code=error_code) + + if error_code is None: + version = flow_version.create_flow_version( + bedrock_agent_client, Fake.FLOW_ID, Fake.VERSION_DESCRIPTION) + assert version == Fake.FLOW_VERSION + else: + with pytest.raises(ClientError) as exc_info: + flow_version.create_flow_version( + bedrock_agent_client, Fake.FLOW_ID, Fake.VERSION_DESCRIPTION) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_delete_flow_version(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "flowVersion": Fake.FLOW_VERSION + } + + response = { + "id": Fake.FLOW_ID, + "version": Fake.FLOW_VERSION + } + + bedrock_agent_stubber.stub_delete_flow_version( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow_version.delete_flow_version( + bedrock_agent_client, Fake.FLOW_ID, Fake.FLOW_VERSION) + + assert call_response["id"] == Fake.FLOW_ID + assert call_response["version"] == Fake.FLOW_VERSION + + else: + with pytest.raises(ClientError) as exc_info: + flow_version.delete_flow_version( + bedrock_agent_client, Fake.FLOW_ID, Fake.FLOW_VERSION) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_flow_versions(make_stubber, error_code): + bedrock_agent_client = boto3.client("bedrock-agent") + bedrock_agent_stubber = make_stubber(bedrock_agent_client) + + expected_params = { + "flowIdentifier": Fake.FLOW_ID, + "maxResults": 10 + } + + response = { + "flowVersionSummaries": [ + { + "arn": Fake.VERSION_ARN, + "createdAt": Fake.CREATED_AT, + "id": Fake.VERSION_ID, + "status": "Prepared", + "version": Fake.FLOW_VERSION + } + ] + } + + bedrock_agent_stubber.stub_list_flow_versions( + expected_params, response, error_code=error_code + ) + + if error_code is None: + call_response = flow_version.list_flow_versions( + bedrock_agent_client, Fake.FLOW_ID) + + assert call_response is not None + + else: + with pytest.raises(ClientError) as exc_info: + flow_version.list_flow_versions(bedrock_agent_client, Fake.FLOW_ID) + assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/example_code/bedrock-agent/test/test_playlist_flow.py b/python/example_code/bedrock-agent/test/test_playlist_flow.py new file mode 100644 index 00000000000..940ebcc00b3 --- /dev/null +++ b/python/example_code/bedrock-agent/test/test_playlist_flow.py @@ -0,0 +1,31 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import subprocess +import sys + +files_under_test = [ + "flows/playlist_flow.py" +] + +@pytest.mark.integ +@pytest.mark.parametrize("file", files_under_test) +def test_playlist_flow(file): + # Simulate user input - each string represents one input() call + # Creates a playlist of 3 pop songs. + test_input = "pop\n3\ny" + + result = subprocess.run( + [sys.executable, file], + input=test_input, + capture_output=True, + text=True, + ) + + print(f"STDOUT: {result.stdout}") # For debugging + print(f"STDERR: {result.stderr}") # For debugging + + assert result.stdout != "" + assert result.returncode == 0 + diff --git a/python/test_tools/bedrock_agent_stubber.py b/python/test_tools/bedrock_agent_stubber.py index f1635a4c406..c64eb874ed4 100644 --- a/python/test_tools/bedrock_agent_stubber.py +++ b/python/test_tools/bedrock_agent_stubber.py @@ -83,3 +83,70 @@ def stub_prepare_agent(self, expected_params, response, error_code=None): self._stub_bifurcator( "prepare_agent", expected_params, response, error_code=error_code ) + def stub_create_flow(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "create_flow", expected_params, response, error_code=error_code + ) + + def stub_get_flow(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "get_flow", expected_params, response, error_code=error_code + ) + + + def stub_prepare_flow(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "prepare_flow", expected_params, response, error_code=error_code + ) + + def stub_update_flow(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "update_flow", expected_params, response, error_code=error_code + ) + + def stub_list_flows(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "list_flows", expected_params, response, error_code=error_code + ) + + def stub_delete_flow(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "delete_flow", expected_params, response, error_code=error_code + ) + def stub_create_flow_alias(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "create_flow_alias", expected_params, response, error_code=error_code + ) + def stub_update_flow_alias(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "update_flow_alias", expected_params, response, error_code=error_code + ) + + def stub_list_flow_aliases(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "list_flow_aliases", expected_params, response, error_code=error_code + ) + + def stub_delete_flow_alias(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "delete_flow_alias", expected_params, response, error_code=error_code + ) + + def stub_create_flow_version(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "create_flow_version", expected_params, response, error_code=error_code + ) + def stub_get_flow_version(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "get_flow_version", expected_params, response, error_code=error_code + ) + + def stub_delete_flow_version(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "delete_flow_version", expected_params, response, error_code=error_code + ) + + def stub_list_flow_versions(self, expected_params, response, error_code=None): + self._stub_bifurcator( + "list_flow_versions", expected_params, response, error_code=error_code + )