Spaces:
Sleeping
Sleeping
Commit
·
bcd9850
1
Parent(s):
06a4ed9
feat: connect to vertex ai
Browse files
public-prediction/kafka_consumer.py
CHANGED
|
@@ -4,13 +4,13 @@ from kafka import KafkaConsumer
|
|
| 4 |
from get_gpt_answer import GetGPTAnswer
|
| 5 |
from typing import List
|
| 6 |
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
|
| 7 |
|
| 8 |
|
| 9 |
def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
|
| 10 |
-
|
| 11 |
-
|
| 12 |
-
data["gpt35_answer"] = "This is gpt35 answer"
|
| 13 |
-
data["gpt4_answer"] = "This is gpt4 answer"
|
| 14 |
return data
|
| 15 |
|
| 16 |
|
|
@@ -23,6 +23,22 @@ def process_batch(batch: List[dict[str, any]], batch_size: int, job_application_
|
|
| 23 |
|
| 24 |
print("Batch ready with gpt responses", results)
|
| 25 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 26 |
|
| 27 |
def consume_messages():
|
| 28 |
consumer = KafkaConsumer(
|
|
|
|
| 4 |
from get_gpt_answer import GetGPTAnswer
|
| 5 |
from typing import List
|
| 6 |
from concurrent.futures import ThreadPoolExecutor
|
| 7 |
+
from predict_custom_model import predict_custom_trained_model
|
| 8 |
+
from google.protobuf.json_format import MessageToDict
|
| 9 |
|
| 10 |
|
| 11 |
def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
|
| 12 |
+
data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
|
| 13 |
+
data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
|
|
|
|
|
|
|
| 14 |
return data
|
| 15 |
|
| 16 |
|
|
|
|
| 23 |
|
| 24 |
print("Batch ready with gpt responses", results)
|
| 25 |
|
| 26 |
+
predictions = predict_custom_trained_model(
|
| 27 |
+
instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))
|
| 28 |
+
|
| 29 |
+
results = []
|
| 30 |
+
for prediction in predictions:
|
| 31 |
+
result_dict = {}
|
| 32 |
+
for key, value in prediction._pb.items():
|
| 33 |
+
# Ensure that 'value' is a protobuf message
|
| 34 |
+
if hasattr(value, 'DESCRIPTOR'):
|
| 35 |
+
result_dict[key] = MessageToDict(value)
|
| 36 |
+
else:
|
| 37 |
+
print(f"Item {key} is not a convertible protobuf message.")
|
| 38 |
+
results.append(result_dict)
|
| 39 |
+
|
| 40 |
+
print({"result": results})
|
| 41 |
+
|
| 42 |
|
| 43 |
def consume_messages():
|
| 44 |
consumer = KafkaConsumer(
|
public-prediction/predict_custom_model.py
ADDED
|
@@ -0,0 +1,40 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from typing import Dict, List, Union
|
| 2 |
+
from google.cloud import aiplatform
|
| 3 |
+
from google.protobuf import json_format
|
| 4 |
+
from google.protobuf.struct_pb2 import Value
|
| 5 |
+
|
| 6 |
+
|
| 7 |
+
def predict_custom_trained_model(
|
| 8 |
+
project: str,
|
| 9 |
+
endpoint_id: str,
|
| 10 |
+
instances: Union[Dict, List[Dict]],
|
| 11 |
+
location: str = "us-central1",
|
| 12 |
+
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
|
| 13 |
+
):
|
| 14 |
+
"""
|
| 15 |
+
`instances` can be either single instance of type dict or a list
|
| 16 |
+
of instances.
|
| 17 |
+
"""
|
| 18 |
+
# The AI Platform services require regional API endpoints.
|
| 19 |
+
client_options = {"api_endpoint": api_endpoint}
|
| 20 |
+
# Initialize client that will be used to create and send requests.
|
| 21 |
+
# This client only needs to be created once, and can be reused for multiple requests.
|
| 22 |
+
client = aiplatform.gapic.PredictionServiceClient(
|
| 23 |
+
client_options=client_options)
|
| 24 |
+
# The format of each instance should conform to the deployed model's prediction input schema.
|
| 25 |
+
instances = instances if isinstance(instances, list) else [instances]
|
| 26 |
+
instances = [
|
| 27 |
+
json_format.ParseDict(instance_dict, Value()) for instance_dict in instances
|
| 28 |
+
]
|
| 29 |
+
parameters_dict = {}
|
| 30 |
+
parameters = json_format.ParseDict(parameters_dict, Value())
|
| 31 |
+
endpoint = client.endpoint_path(
|
| 32 |
+
project=project, location=location, endpoint=endpoint_id
|
| 33 |
+
)
|
| 34 |
+
response = client.predict(
|
| 35 |
+
endpoint=endpoint, instances=instances, parameters=parameters
|
| 36 |
+
)
|
| 37 |
+
# The predictions are a google.protobuf.Value representation of the model's predictions.
|
| 38 |
+
predictions = response.predictions
|
| 39 |
+
|
| 40 |
+
return predictions
|