File size: 2,379 Bytes
8d7f55c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import Dict, List

from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import BaseOpenAILLMService

from loguru import logger

try:
    from openpipe import AsyncOpenAI as OpenPipeAI, AsyncStream
    from openai.types.chat import (ChatCompletionMessageParam, ChatCompletionChunk)
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error(
        "In order to use OpenPipe, you need to `pip install pipecat-ai[openpipe]`. Also, set `OPENPIPE_API_KEY` and `OPENAI_API_KEY` environment variables.")
    raise Exception(f"Missing module: {e}")


class OpenPipeLLMService(BaseOpenAILLMService):

    def __init__(

            self,

            *,

            model: str = "gpt-4o",

            api_key: str | None = None,

            base_url: str | None = None,

            openpipe_api_key: str | None = None,

            openpipe_base_url: str = "https://app.openpipe.ai/api/v1",

            tags: Dict[str, str] | None = None,

            **kwargs):
        super().__init__(
            model=model,
            api_key=api_key,
            base_url=base_url,
            openpipe_api_key=openpipe_api_key,
            openpipe_base_url=openpipe_base_url,
            **kwargs)
        self._tags = tags

    def create_client(self, api_key=None, base_url=None, **kwargs):
        openpipe_api_key = kwargs.get("openpipe_api_key") or ""
        openpipe_base_url = kwargs.get("openpipe_base_url") or ""
        client = OpenPipeAI(
            api_key=api_key,
            base_url=base_url,
            openpipe={
                "api_key": openpipe_api_key,
                "base_url": openpipe_base_url
            }
        )
        return client

    async def get_chat_completions(

            self,

            context: OpenAILLMContext,

            messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]:
        chunks = await self._client.chat.completions.create(
            model=self._model,
            stream=True,
            messages=messages,
            openpipe={
                "tags": self._tags,
                "log_request": True
            }
        )
        return chunks