gpt-oss-120b-chatbot / gateway.py
mahdicv's picture
updating with web browsing + reasoning effort
ba7492c
raw
history blame
4.58 kB
import json, logging
from typing import List, Generator, Optional
from openai import OpenAI
def request_generation(
api_key: str,
api_base: str,
message: str,
system_prompt: str,
model_name: str,
chat_history: Optional[List[dict]] = None,
temperature: float = 0.3,
max_new_tokens: int = 1024,
reasoning_effort: str = "off",
tools: Optional[List[dict]] = None,
tool_choice: Optional[str] = None,
) -> Generator[str, None, None]:
"""
Streams Responses API events. Emits:
- "analysis" sentinel once, then raw reasoning deltas
- "assistantfinal" sentinel once, then visible output deltas
If no visible deltas, emits a tool-call fallback message.
"""
client = OpenAI(api_key=api_key, base_url=api_base)
input_messages: List[dict] = []
if chat_history:
input_messages.extend(m for m in chat_history if m.get("role") != "system")
input_messages.append({"role": "user", "content": message})
request_args = {
"model": model_name,
"input": input_messages,
"instructions": system_prompt,
"temperature": temperature,
"max_output_tokens": max_new_tokens,
"reasoning": {
"effort": reasoning_effort,
"generate_summary": "detailed",
"summary": "detailed",
},
"stream": True,
}
if tools:
request_args["tools"] = tools
if tool_choice:
request_args["tool_choice"] = tool_choice
raw_reasoning, raw_visible = [], []
try:
stream = client.responses.create(**request_args)
reasoning_started = False
reasoning_closed = False
saw_visible_output = False
last_tool_name = None
last_tool_args = None
buffer = ""
for event in stream:
et = getattr(event, "type", "")
if et == "response.reasoning_text.delta":
if not reasoning_started:
yield "analysis"
reasoning_started = True
rdelta = getattr(event, "delta", "") or ""
if rdelta:
raw_reasoning.append(rdelta)
yield rdelta
continue
if et == "response.output_text.delta":
if reasoning_started and not reasoning_closed:
yield "assistantfinal"
reasoning_closed = True
saw_visible_output = True
delta = getattr(event, "delta", "") or ""
raw_visible.append(delta)
buffer += delta
if "\n" in buffer or len(buffer) > 150:
yield buffer
buffer = ""
continue
if et.startswith("response.tool") or et.startswith("response.function_call"):
name = getattr(event, "name", None)
args = getattr(event, "arguments", None)
if args is None:
args = getattr(event, "args", None) or getattr(event, "delta", None) or getattr(event, "data", None)
if name:
last_tool_name = name
if args is not None:
last_tool_args = args
continue
if et in ("response.completed", "response.error"):
if buffer:
yield buffer
buffer = ""
if reasoning_started and not reasoning_closed:
yield "assistantfinal"
reasoning_closed = True
if not saw_visible_output:
msg = "I attempted to call a tool, but tools aren't executed in this environment, so no final answer was produced."
if last_tool_name:
try:
args_text = json.dumps(last_tool_args, ensure_ascii=False, default=str)
except Exception:
args_text = str(last_tool_args)
msg += f"\n\n• Tool requested: **{last_tool_name}**\n• Arguments: `{args_text}`"
yield msg
if et == "response.error":
err = getattr(event, "error", None)
emsg = getattr(err, "message", "") if err else "Unknown error"
yield f"Error: {emsg}"
break
if buffer:
yield buffer
except Exception as e:
logging.exception("[Gateway] Streaming failed")
yield f"Error: {e}"