import threading import time import openai import gradio as gr import csv import re import os from datetime import datetime, timedelta from gtts import gTTS from queue import Queue # Set your OpenAI API key openai.api_key = 'YOUR_API_KEY' # Global TTS queue tts_queue = Queue() # Load data from the CSV file when the application starts data = None def load_training_data(): global data if data is None: data = {} with open("docs/your_data.csv", "r", encoding="utf-8-sig") as csvfile: reader = csv.DictReader(csvfile) for row in reader: plate_number = row.get("رقم المركبة", "").strip() company_name = row.get("اسم الشركه", "").strip() date = row.get("تاريخ الدخول", "").strip() if plate_number not in data: data[plate_number] = [] data[plate_number].append(row) if company_name not in data: data[company_name] = [] data[company_name].append(row) if date not in data: data[date] = [] data[date].append(row) def parse_date(date_str): return datetime.strptime(date_str, "%d.%m.%Y") def get_week_range(date_str): current_date = parse_date(date_str) days_to_subtract = (current_date.weekday() + 2) % 7 start_of_week = current_date - timedelta(days=days_to_subtract) end_of_week = start_of_week + timedelta(days=6) return start_of_week, end_of_week def text_to_speech(text): try: speech = gTTS(text=text, lang='ar', slow=False) filename = f"text_to_speech_{int(time.time())}.mp3" filepath = os.path.join("static", filename) speech.save(filepath) return filepath except Exception as e: print("Exception:", str(e)) return None def calculate_weekly_total(date_str): load_training_data() try: input_date = parse_date(date_str) except ValueError: return "Invalid date format. Please enter a date in the format dd.mm.yyyy." start_of_week, end_of_week = get_week_range(date_str) weekly_total = 0 for date_key in data.keys(): try: record_date = parse_date(date_key) if start_of_week <= record_date <= end_of_week: for record in data[date_key]: report = record.get("تقرير نهائي", "") if "شغل" in report: money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report) else: money_values = re.findall(r'(\d+)\s*شيكل', report) money_values = [int(value) for value in money_values] weekly_total += sum(money_values) except ValueError: continue return weekly_total def calculate_weekly_cash_total(date_str): load_training_data() try: input_date = parse_date(date_str) except ValueError: return "Invalid date format. Please enter a date in the format dd.mm.yyyy." start_of_week, end_of_week = get_week_range(date_str) weekly_cash_total = 0 for date_key in data.keys(): try: record_date = parse_date(date_key) if start_of_week <= record_date <= end_of_week: for record in data[date_key]: plate_number = record.get("رقم المركبة", "") if "كاش" in plate_number: report = record.get("تقرير نهائي", "") money_values = re.findall(r'(\d+)\s*شيكل', report) money_values = [int(value) for value in money_values] weekly_cash_total += sum(money_values) except ValueError: continue return weekly_cash_total def search_partial_matches(input_text): load_training_data() input_text = input_text.strip() matching_records = {} for key in data.keys(): if input_text in key: matching_records[key] = data[key] return matching_records def calculate_total_for_period(start_date_str, end_date_str): load_training_data() try: start_date = parse_date(start_date_str) end_date = parse_date(end_date_str) except ValueError: return "Invalid date format. Please enter dates in the format dd.mm.yyyy." total_amount = 0 for date_key in data.keys(): try: record_date = parse_date(date_key) if start_date <= record_date <= end_date: for record in data[date_key]: report = record.get("تقرير نهائي", "") if "شغل" in report: money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report) else: money_values = re.findall(r'(\d+)\s*شيكل', report) money_values = [int(value) for value in money_values] total_amount += sum(money_values) except ValueError: continue return total_amount def chatbot(input_text, start_date_str="", end_date_str="", enable_voice=False): if start_date_str and end_date_str: total_for_period = calculate_total_for_period(start_date_str, end_date_str) return (f"Total amount from {start_date_str} to {end_date_str}: {total_for_period} شيكل", "", "", None) else: return original_chatbot(input_text, enable_voice) def original_chatbot(input_text, enable_voice): load_training_data() matching_records = search_partial_matches(input_text) total_money = 0 filtered_records = {} for key, records in matching_records.items(): filtered_records[key] = [info for info in records if "شيكل" in info.get("تقرير نهائي", "")] res_list = [] if filtered_records: responses = [] for key, records in filtered_records.items(): if key == "رقم المركبة": company_name = records[0].get("اسم الشركه", "") res_list.append(f"اسم الشركة هو: {company_name}") for info in records: response = "\n".join([f"{key}: {value}" for key, value in info.items()]) responses.append(response) report = info.get("تقرير نهائي", "") if "شغل" in report: money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report) else: money_values = re.findall(r'(\d+)\s*شيكل', report) money_values = [int(value) for value in money_values] total_money += sum(money_values) num_records_found = f"Number of records found: {len(responses)}" total_money_str = f"Total Money: {total_money} شيكل" combined_output = f"{num_records_found} - {total_money_str}" response = "\n\n---\n\n".join(responses) res_list.append(f"مجموع الدخل اليومي في هذا اليوم هو: {total_money}") else: combined_output = "No matching entries found in the data." response = "" weekly_total = calculate_weekly_total(input_text) res_list.append(f"مجموع الدخل الأسبوعي هو: {weekly_total}") weekly_cash_total = calculate_weekly_cash_total(input_text) res_list.append(f"مجموع الكاش المقبوض في هذا الاسبوع هو: {weekly_cash_total}") audio_file = None if enable_voice: audio_file = text_to_speech("\n".join(res_list)) return (combined_output, response, f"Weekly Total: {weekly_total} - Weekly Cash Total: {weekly_cash_total}", audio_file) iface = gr.Interface( fn=chatbot, inputs=[ gr.Textbox(lines=2, placeholder="Enter Date or Company Name or Plate Number..."), gr.Textbox(lines=1, placeholder="بحث من تاريخ (dd.mm.yyyy)", label="بحث من تاريخ"), gr.Textbox(lines=1, placeholder="الى تاريخ (dd.mm.yyyy)", label="الى تاريخ"), gr.Checkbox(label="تفعيل الصوت") # Checkbox for enabling voice output ], outputs=[ gr.Textbox(label="مجموع الدخل اليومي"), gr.Textbox(label="عرض التقارير"), gr.Textbox(label="مجموع الدخل الاسبوعي"), gr.Audio(label="Play Voice") # Audio output for playing the voice ], live=False, title="شركه ابناء عرفات", description="بحث حسب اسم الشركه - التاريخ - نمره الشاحنه" ) if __name__ == "__main__": iface.launch(share=True)