Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import boto3 | |
| import json | |
| import time | |
| import zipfile | |
| from io import BytesIO | |
| import uuid | |
| import pprint | |
| import logging | |
| from PIL import Image | |
| import os | |
| import base64 | |
| import re | |
| import requests | |
| #import utilities.re_ranker as re_ranker | |
| import utilities.invoke_models as invoke_models | |
| import streamlit as st | |
| import time as t | |
| import botocore.exceptions | |
| from datetime import datetime, timezone | |
| import botocore | |
| import utilities.ubi_lambda as ubi | |
| # OTEL imports | |
| from opentelemetry import trace | |
| from opentelemetry.sdk.resources import Resource | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| # --- OTEL Setup --- | |
| resource = Resource(attributes={"service.name": "bedrock-agent"}) | |
| trace.set_tracer_provider(TracerProvider(resource=resource)) | |
| tracer = trace.get_tracer_provider().get_tracer("app.tracer") | |
| otlp_exporter = OTLPSpanExporter(endpoint="http://54.201.184.124:4318/v1/traces") | |
| span_processor = BatchSpanProcessor(otlp_exporter) | |
| trace.get_tracer_provider().add_span_processor(span_processor) | |
| if "inputs_" not in st.session_state: | |
| st.session_state.inputs_ = {} | |
| parent_dirname = "/".join((os.path.dirname(__file__)).split("/")[0:-1]) | |
| region = 'us-east-1' | |
| # setting logger | |
| logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # getting boto3 clients for required AWS services | |
| #bedrock_agent_client = boto3.client('bedrock-agent',region_name=region) | |
| bedrock_agent_runtime_client = boto3.client( | |
| 'bedrock-agent-runtime', | |
| aws_access_key_id=st.secrets['user_access_key_us_west_2'], | |
| aws_secret_access_key=st.secrets['user_secret_key_us_west_2'], region_name = 'us-west-2' | |
| ) | |
| enable_trace:bool = True | |
| end_session:bool = False | |
| def now_rfc3339(): | |
| return datetime.now(timezone.utc).isoformat() | |
| def send_otel_span(span): | |
| try: | |
| status = ubi.send_to_lambda("otel-v1-apm-span-default", span) | |
| if status == 202: | |
| print("Traces sent to Lambda") | |
| else: | |
| print("Lambda did not accept the request") | |
| print(f"[OTEL SPAN] {span['name']} -> {status}") | |
| except Exception as e: | |
| print(f"[OTEL ERROR] {e}") | |
| def convert_to_span(block, trace_id, index): | |
| span_id = str(uuid.uuid4()).replace("-", "")[:16] | |
| name = "step" | |
| attributes = {} | |
| if "invocationInput" in block: | |
| name = block["invocationInput"].get("function", "invocation") | |
| attributes = {p["name"]: p["value"] for p in block["invocationInput"].get("parameters", [])} | |
| elif "observation" in block: | |
| name = block["observation"].get("type", "observation").lower() | |
| attributes = block["observation"].get("actionGroupInvocationOutput", {}) | |
| elif "thinking" in block: | |
| name = "thinking" | |
| attributes["message"] = block["thinking"].get("content", "") | |
| elif "rationale" in block: | |
| name = "rationale" | |
| attributes["message"] = block["rationale"] | |
| return { | |
| "traceId": trace_id, | |
| "spanId": span_id, | |
| "name": name, | |
| "startTime": now_rfc3339(), | |
| "endTime": now_rfc3339(), | |
| "durationInNanos": 10000000 * (index + 1), | |
| "kind": "INTERNAL", | |
| "status": {"code": "OK"}, | |
| "attributes": attributes, | |
| "resource": { | |
| "service.name": "bedrock-agent" | |
| } | |
| } | |
| def delete_memory(): | |
| response = bedrock_agent_runtime_client.delete_agent_memory( | |
| agentAliasId='DEEEEZM2TM', | |
| agentId='EJVGQW1BH7' | |
| ) | |
| def query_(inputs): | |
| # invoke the agent API | |
| agentResponse = bedrock_agent_runtime_client.invoke_agent( | |
| inputText=inputs['shopping_query'], | |
| agentId='EJVGQW1BH7', | |
| agentAliasId='DEEEEZM2TM', | |
| sessionId=st.session_state.session_id_, | |
| enableTrace=enable_trace, | |
| endSession= end_session | |
| ) | |
| logger.info(pprint.pprint(agentResponse)) | |
| #print("***agent*****response*********") | |
| #print(agentResponse) | |
| event_stream = agentResponse['completion'] | |
| total_context = [] | |
| last_tool = "" | |
| last_tool_name = "" | |
| agent_answer = "" | |
| trace_id = str(uuid.uuid4()).replace("-", "") | |
| try: | |
| for i,event in enumerate(event_stream): | |
| name = "step" | |
| attributes = {} | |
| if 'trace' in event: | |
| if('orchestrationTrace' not in event['trace']['trace']): | |
| continue | |
| orchestration_trace = event['trace']['trace']['orchestrationTrace'] | |
| total_context_item = {} | |
| if('modelInvocationOutput' in orchestration_trace and '<tool_name>' in orchestration_trace['modelInvocationOutput']['rawResponse']['content']): | |
| total_context_item['tool'] = orchestration_trace['modelInvocationOutput']['rawResponse'] | |
| if('rationale' in orchestration_trace): | |
| total_context_item['rationale'] = orchestration_trace['rationale']['text'] | |
| name = "rationale" | |
| attributes["message"] = total_context_item["rationale"] | |
| if('invocationInput' in orchestration_trace): | |
| total_context_item['invocationInput'] = orchestration_trace['invocationInput']['actionGroupInvocationInput'] | |
| last_tool_name = total_context_item['invocationInput']['function'] | |
| name = total_context_item["invocationInput"].get("function", "invocation") | |
| attributes = {p["name"]: p["value"] for p in total_context_item["invocationInput"].get("parameters", [])} | |
| if('observation' in orchestration_trace): | |
| total_context_item['observation'] = event['trace']['trace']['orchestrationTrace']['observation'] | |
| tool_output_last_obs = event['trace']['trace']['orchestrationTrace']['observation'] | |
| if(tool_output_last_obs['type'] == 'ACTION_GROUP'): | |
| last_tool = tool_output_last_obs['actionGroupInvocationOutput']['text'] | |
| if(tool_output_last_obs['type'] == 'FINISH'): | |
| agent_answer = tool_output_last_obs['finalResponse']['text'] | |
| name = total_context_item["observation"].get("type", "observation").lower() | |
| attributes = total_context_item["observation"].get("actionGroupInvocationOutput", {}) | |
| if('modelInvocationOutput' in orchestration_trace and '<thinking>' in orchestration_trace['modelInvocationOutput']['rawResponse']['content']): | |
| total_context_item['thinking'] = orchestration_trace['modelInvocationOutput']['rawResponse'] | |
| name = "thinking" | |
| attributes["message"] = total_context_item["thinking"].get("content", "") | |
| if(total_context_item!={}): | |
| total_context.append(total_context_item) | |
| # # 🔁 Generate + send OpenTelemetry span for each block | |
| # span = convert_to_span(total_context_item, trace_id, i) | |
| # send_otel_span(span) | |
| with tracer.start_as_current_span(name, attributes=attributes) as span: | |
| span.set_attribute("trace.source", "agentic-app") | |
| except botocore.exceptions.EventStreamError as error: | |
| raise error | |
| return {'text':agent_answer,'source':total_context,'last_tool':{'name':last_tool_name,'response':last_tool}} | |