Spaces:
Runtime error
Runtime error
| # ===ๆฑๅณๅคงๅญธ่ณๆ็ณป 2025 ๅนด LINEBOT === | |
| import base64 | |
| import logging | |
| import os | |
| import tempfile | |
| from io import BytesIO | |
| from google import genai | |
| from google.genai import types | |
| from PIL import Image | |
| import markdown | |
| from bs4 import BeautifulSoup | |
| from flask import Flask, abort, request, send_from_directory | |
| from linebot.v3 import WebhookHandler | |
| from linebot.v3.exceptions import InvalidSignatureError | |
| from linebot.v3.messaging import ( | |
| ApiClient, | |
| Configuration, | |
| ImageMessage, | |
| MessagingApi, | |
| MessagingApiBlob, | |
| ReplyMessageRequest, | |
| TextMessage, | |
| ) | |
| from linebot.v3.webhooks import ImageMessageContent, MessageEvent, TextMessageContent | |
| from openai import OpenAI | |
| # === ๅๅงๅ Google Gemini === | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| google_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| # === ๅๅงๅOpenAIๆจกๅ === | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| text_system_prompt = "ไฝ ๆฏไธๅไธญๆ็AIๅฉๆ๏ผ่ซ็จ็น้ซไธญๆๅ็ญ" | |
| # === ๅๅง่จญๅฎ === | |
| static_tmp_path = tempfile.gettempdir() | |
| os.makedirs(static_tmp_path, exist_ok=True) | |
| base_url = os.getenv("SPACE_HOST") # e.g., "your-space-name.hf.space" | |
| # === Flask ๆ็จๅๅงๅ === | |
| app = Flask(__name__) | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| app.logger.setLevel(logging.INFO) | |
| channel_secret = os.environ.get("YOUR_CHANNEL_SECRET") | |
| channel_access_token = os.environ.get("YOUR_CHANNEL_ACCESS_TOKEN") | |
| configuration = Configuration(access_token=channel_access_token) | |
| handler = WebhookHandler(channel_secret) | |
| # === AI Query ๅ ่ฃ === | |
| def query(payload): | |
| response = google_client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=f"{text_system_prompt}๏ผ{payload}", | |
| ) | |
| return response.text | |
| # === ้ๆ ๅๆช่ทฏ็ฑ === | |
| def serve_image(filename): | |
| return send_from_directory(static_tmp_path, filename) | |
| # === LINE Webhook ๆฅๆถ็ซฏ้ป === | |
| def home(): | |
| return {"message": "Line Webhook Server"} | |
| def callback(): | |
| signature = request.headers.get("X-Line-Signature") | |
| body = request.get_data(as_text=True) | |
| app.logger.info(f"Request body: {body}") | |
| try: | |
| handler.handle(body, signature) | |
| except InvalidSignatureError: | |
| app.logger.warning("Invalid signature. Please check channel credentials.") | |
| abort(400) | |
| return "OK" | |
| # === ่็ๆๅญ่จๆฏ === | |
| def handle_text_message(event): | |
| user_input = event.message.text.strip() | |
| if user_input.startswith("AI "): | |
| prompt = user_input[3:].strip() | |
| try: | |
| # ไฝฟ็จ Gemini ็ๆๅ็ | |
| response = google_client.models.generate_content( | |
| model="gemini-2.0-flash-exp-image-generation", | |
| contents=prompt, | |
| config=types.GenerateContentConfig( | |
| response_modalities=['TEXT', 'IMAGE'] | |
| ) | |
| ) | |
| # ่็ๅๆไธญ็ๅ็ | |
| for part in response.candidates[0].content.parts: | |
| if part.inline_data is not None: | |
| image = Image.open(BytesIO(part.inline_data.data)) | |
| filename = "output.png" | |
| image_path = os.path.join(static_tmp_path, filename) | |
| image.save(image_path) | |
| # ๅปบ็ซๅ็็ๅ ฌ้ URL | |
| image_url = f"https://{base_url}/images/{filename}" | |
| with ApiClient(configuration) as api_client: | |
| line_bot_api = MessagingApi(api_client) | |
| line_bot_api.reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[ | |
| ImageMessage( | |
| original_content_url=image_url, | |
| preview_image_url=image_url, | |
| ) | |
| ], | |
| ) | |
| ) | |
| except Exception as e: | |
| app.logger.error(f"Gemini API error: {e}") | |
| with ApiClient(configuration) as api_client: | |
| line_bot_api = MessagingApi(api_client) | |
| line_bot_api.reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[TextMessage(text="ๆฑๆญ๏ผ็ๆๅ็ๆ็ผ็้ฏ่ชคใ")], | |
| ) | |
| ) | |
| else: | |
| with ApiClient(configuration) as api_client: | |
| line_bot_api = MessagingApi(api_client) | |
| response = query(event.message.text) | |
| html_msg = markdown.markdown(response) | |
| soup = BeautifulSoup(html_msg, "html.parser") | |
| line_bot_api.reply_message_with_http_info( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[TextMessage(text=soup.get_text())], | |
| ) | |
| ) | |
| # === ่็ๅ็่จๆฏ === | |
| def handle_image_message(event): | |
| # === ไปฅไธๆฏ่็ๅ็ๅๅณ้จๅ === # | |
| with ApiClient(configuration) as api_client: | |
| blob_api = MessagingApiBlob(api_client) | |
| content = blob_api.get_message_content(message_id=event.message.id) | |
| image_bytes = content | |
| # Step 2๏ผ่ฝๆ base64 ๅญไธฒ | |
| base64_string = base64.b64encode(image_bytes).decode("utf-8") | |
| # Step 3๏ผ็ตๆ OpenAI ็ data URI ๆ ผๅผ | |
| data_uri = f"data:image/png;base64,{base64_string}" | |
| app.logger.info(f"Data URI: {data_uri}") | |
| # Step 4๏ผๅฐๅ็ๅญๅฐๆฌๅฐ็ซฏ | |
| with tempfile.NamedTemporaryFile( | |
| dir=static_tmp_path, suffix=".jpg", delete=False | |
| ) as tf: | |
| tf.write(content) | |
| filename = os.path.basename(tf.name) | |
| image_url = f"https://{base_url}/images/{filename}" | |
| app.logger.info(f"Image URL: {image_url}") | |
| # === ไปฅไธๆฏ่็่งฃ้ๅ็้จๅ === # | |
| response = client.responses.create( | |
| model="gpt-4.1-nano", | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": "describe the image in traditional chinese", | |
| }, | |
| { | |
| "type": "input_image", | |
| "image_url": data_uri, | |
| }, | |
| ], | |
| } | |
| ], | |
| ) | |
| app.logger.info(response.output_text) | |
| # === ไปฅไธๆฏๅๅณๅ็้จๅ === # | |
| with ApiClient(configuration) as api_client: | |
| line_bot_api = MessagingApi(api_client) | |
| line_bot_api.reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[ | |
| ImageMessage( | |
| original_content_url=image_url, preview_image_url=image_url | |
| ), | |
| TextMessage(text=response.output_text), | |
| ], | |
| ) | |
| ) |