|
import threading
|
|
import time
|
|
import openai
|
|
import gradio as gr
|
|
import csv
|
|
import re
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from gtts import gTTS
|
|
from queue import Queue
|
|
|
|
|
|
openai.api_key = 'YOUR_API_KEY'
|
|
|
|
|
|
tts_queue = Queue()
|
|
|
|
|
|
data = None
|
|
|
|
def load_training_data():
|
|
global data
|
|
if data is None:
|
|
data = {}
|
|
with open("docs/your_data.csv", "r", encoding="utf-8-sig") as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
plate_number = row.get("رقم المركبة", "").strip()
|
|
company_name = row.get("اسم الشركه", "").strip()
|
|
date = row.get("تاريخ الدخول", "").strip()
|
|
|
|
if plate_number not in data:
|
|
data[plate_number] = []
|
|
data[plate_number].append(row)
|
|
|
|
if company_name not in data:
|
|
data[company_name] = []
|
|
data[company_name].append(row)
|
|
|
|
if date not in data:
|
|
data[date] = []
|
|
data[date].append(row)
|
|
|
|
def parse_date(date_str):
|
|
return datetime.strptime(date_str, "%d.%m.%Y")
|
|
|
|
def get_week_range(date_str):
|
|
current_date = parse_date(date_str)
|
|
days_to_subtract = (current_date.weekday() + 2) % 7
|
|
start_of_week = current_date - timedelta(days=days_to_subtract)
|
|
end_of_week = start_of_week + timedelta(days=6)
|
|
return start_of_week, end_of_week
|
|
|
|
def text_to_speech(text):
|
|
try:
|
|
speech = gTTS(text=text, lang='ar', slow=False)
|
|
filename = f"text_to_speech_{int(time.time())}.mp3"
|
|
filepath = os.path.join("static", filename)
|
|
speech.save(filepath)
|
|
return filepath
|
|
except Exception as e:
|
|
print("Exception:", str(e))
|
|
return None
|
|
|
|
def calculate_weekly_total(date_str):
|
|
load_training_data()
|
|
try:
|
|
input_date = parse_date(date_str)
|
|
except ValueError:
|
|
return "Invalid date format. Please enter a date in the format dd.mm.yyyy."
|
|
start_of_week, end_of_week = get_week_range(date_str)
|
|
weekly_total = 0
|
|
for date_key in data.keys():
|
|
try:
|
|
record_date = parse_date(date_key)
|
|
if start_of_week <= record_date <= end_of_week:
|
|
for record in data[date_key]:
|
|
report = record.get("تقرير نهائي", "")
|
|
if "شغل" in report:
|
|
money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report)
|
|
else:
|
|
money_values = re.findall(r'(\d+)\s*شيكل', report)
|
|
money_values = [int(value) for value in money_values]
|
|
weekly_total += sum(money_values)
|
|
except ValueError:
|
|
continue
|
|
return weekly_total
|
|
|
|
def calculate_weekly_cash_total(date_str):
|
|
load_training_data()
|
|
try:
|
|
input_date = parse_date(date_str)
|
|
except ValueError:
|
|
return "Invalid date format. Please enter a date in the format dd.mm.yyyy."
|
|
start_of_week, end_of_week = get_week_range(date_str)
|
|
weekly_cash_total = 0
|
|
for date_key in data.keys():
|
|
try:
|
|
record_date = parse_date(date_key)
|
|
if start_of_week <= record_date <= end_of_week:
|
|
for record in data[date_key]:
|
|
plate_number = record.get("رقم المركبة", "")
|
|
if "كاش" in plate_number:
|
|
report = record.get("تقرير نهائي", "")
|
|
money_values = re.findall(r'(\d+)\s*شيكل', report)
|
|
money_values = [int(value) for value in money_values]
|
|
weekly_cash_total += sum(money_values)
|
|
except ValueError:
|
|
continue
|
|
return weekly_cash_total
|
|
|
|
def search_partial_matches(input_text):
|
|
load_training_data()
|
|
input_text = input_text.strip()
|
|
matching_records = {}
|
|
for key in data.keys():
|
|
if input_text in key:
|
|
matching_records[key] = data[key]
|
|
return matching_records
|
|
|
|
def calculate_total_for_period(start_date_str, end_date_str):
|
|
load_training_data()
|
|
try:
|
|
start_date = parse_date(start_date_str)
|
|
end_date = parse_date(end_date_str)
|
|
except ValueError:
|
|
return "Invalid date format. Please enter dates in the format dd.mm.yyyy."
|
|
total_amount = 0
|
|
for date_key in data.keys():
|
|
try:
|
|
record_date = parse_date(date_key)
|
|
if start_date <= record_date <= end_date:
|
|
for record in data[date_key]:
|
|
report = record.get("تقرير نهائي", "")
|
|
if "شغل" in report:
|
|
money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report)
|
|
else:
|
|
money_values = re.findall(r'(\d+)\s*شيكل', report)
|
|
money_values = [int(value) for value in money_values]
|
|
total_amount += sum(money_values)
|
|
except ValueError:
|
|
continue
|
|
return total_amount
|
|
|
|
def chatbot(input_text, start_date_str="", end_date_str="", enable_voice=False):
|
|
if start_date_str and end_date_str:
|
|
total_for_period = calculate_total_for_period(start_date_str, end_date_str)
|
|
return (f"Total amount from {start_date_str} to {end_date_str}: {total_for_period} شيكل", "", "", None)
|
|
else:
|
|
return original_chatbot(input_text, enable_voice)
|
|
|
|
def original_chatbot(input_text, enable_voice):
|
|
load_training_data()
|
|
matching_records = search_partial_matches(input_text)
|
|
|
|
total_money = 0
|
|
filtered_records = {}
|
|
|
|
for key, records in matching_records.items():
|
|
filtered_records[key] = [info for info in records if "شيكل" in info.get("تقرير نهائي", "")]
|
|
|
|
res_list = []
|
|
|
|
if filtered_records:
|
|
responses = []
|
|
|
|
for key, records in filtered_records.items():
|
|
if key == "رقم المركبة":
|
|
company_name = records[0].get("اسم الشركه", "")
|
|
res_list.append(f"اسم الشركة هو: {company_name}")
|
|
|
|
for info in records:
|
|
response = "\n".join([f"{key}: {value}" for key, value in info.items()])
|
|
responses.append(response)
|
|
report = info.get("تقرير نهائي", "")
|
|
if "شغل" in report:
|
|
money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report)
|
|
else:
|
|
money_values = re.findall(r'(\d+)\s*شيكل', report)
|
|
money_values = [int(value) for value in money_values]
|
|
total_money += sum(money_values)
|
|
|
|
num_records_found = f"Number of records found: {len(responses)}"
|
|
total_money_str = f"Total Money: {total_money} شيكل"
|
|
combined_output = f"{num_records_found} - {total_money_str}"
|
|
response = "\n\n---\n\n".join(responses)
|
|
res_list.append(f"مجموع الدخل اليومي في هذا اليوم هو: {total_money}")
|
|
else:
|
|
combined_output = "No matching entries found in the data."
|
|
response = ""
|
|
|
|
weekly_total = calculate_weekly_total(input_text)
|
|
res_list.append(f"مجموع الدخل الأسبوعي هو: {weekly_total}")
|
|
|
|
weekly_cash_total = calculate_weekly_cash_total(input_text)
|
|
res_list.append(f"مجموع الكاش المقبوض في هذا الاسبوع هو: {weekly_cash_total}")
|
|
|
|
audio_file = None
|
|
if enable_voice:
|
|
audio_file = text_to_speech("\n".join(res_list))
|
|
|
|
return (combined_output, response, f"Weekly Total: {weekly_total} - Weekly Cash Total: {weekly_cash_total}", audio_file)
|
|
|
|
iface = gr.Interface(
|
|
fn=chatbot,
|
|
inputs=[
|
|
gr.Textbox(lines=2, placeholder="Enter Date or Company Name or Plate Number..."),
|
|
gr.Textbox(lines=1, placeholder="بحث من تاريخ (dd.mm.yyyy)", label="بحث من تاريخ"),
|
|
gr.Textbox(lines=1, placeholder="الى تاريخ (dd.mm.yyyy)", label="الى تاريخ"),
|
|
gr.Checkbox(label="تفعيل الصوت")
|
|
],
|
|
outputs=[
|
|
gr.Textbox(label="مجموع الدخل اليومي"),
|
|
gr.Textbox(label="عرض التقارير"),
|
|
gr.Textbox(label="مجموع الدخل الاسبوعي"),
|
|
gr.Audio(label="Play Voice")
|
|
],
|
|
live=False,
|
|
title="شركه ابناء عرفات",
|
|
description="بحث حسب اسم الشركه - التاريخ - نمره الشاحنه"
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
iface.launch(share=True)
|
|
|