Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import pandas as pd | |
| import pdfplumber | |
| import logging | |
| import re | |
| from datetime import datetime | |
| # Thiết lập logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[logging.FileHandler("transaction_app.log"), logging.StreamHandler()] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Headers cố định cho template | |
| TABLE_HEADERS = [ | |
| "Reference - Code", "GL Posting Date", "Date", "Settlement Date", | |
| "Security Name", "Currency", "Quantity", "Price", | |
| "Transaction Amount", "Commission" | |
| ] | |
| # Hàm chuẩn hóa số | |
| def standardize_number(value): | |
| if not value or pd.isna(value): | |
| return None | |
| value = str(value).replace(" ", "").replace(",", ".") | |
| value = re.sub(r"[^\d.-]", "", value) | |
| try: | |
| return float(value) | |
| except ValueError: | |
| logger.warning(f"Không thể chuyển đổi giá trị thành số: {value}") | |
| return None | |
| # Hàm chuẩn hóa ngày tháng | |
| def standardize_date(date_str): | |
| if not date_str or pd.isna(date_str): | |
| return None | |
| try: | |
| date_obj = datetime.strptime(date_str, "%d/%m/%Y") | |
| return date_obj.strftime("%m/%d/%Y") | |
| except ValueError: | |
| try: | |
| date_obj = datetime.strptime(date_str, "%d-%b-%y") | |
| return date_obj.strftime("%m/%d/%Y") | |
| except ValueError: | |
| logger.warning(f"Không thể chuẩn hóa ngày: {date_str}") | |
| return None | |
| # Hàm trích xuất bảng từ PDF (chỉ xử lý trang 1-28) | |
| def extract_tables_from_pdf(file_path): | |
| tables = [] | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| for page_num in range(min(28, len(pdf.pages))): # Chỉ xử lý trang 1-28 | |
| page = pdf.pages[page_num] | |
| table = page.extract_table() | |
| if table and len(table) > 1: | |
| df = pd.DataFrame(table[1:], columns=table[0]) | |
| df["Page"] = page_num + 1 # Thêm cột Page để theo dõi | |
| tables.append(df) | |
| logger.info(f"Trích xuất thành công {len(tables)} bảng từ PDF") | |
| return pd.concat(tables, ignore_index=True) if tables else None | |
| except Exception as e: | |
| logger.error(f"Lỗi trích xuất PDF: {e}") | |
| return None | |
| # Hàm trích xuất thông tin từ Description | |
| def parse_description(description): | |
| if not description or pd.isna(description): | |
| return None, None, None, None | |
| # Mẫu regex để trích xuất Security Name, Quantity, Currency, Price | |
| # Ví dụ: "Sold 20,000 AUDIX [email protected]" | |
| pattern = r"(?:Bought|Sold)\s+([\d,]+)\s+(.+?)@([A-Z]{3})(\d+\.\d+)" | |
| match = re.search(pattern, description) | |
| if match: | |
| quantity = standardize_number(match.group(1)) | |
| security_name = match.group(2).strip() | |
| currency = match.group(3) | |
| price = standardize_number(match.group(4)) | |
| return security_name, currency, quantity, price | |
| logger.warning(f"Không thể trích xuất thông tin từ Description: {description}") | |
| return None, None, None, None | |
| # Hàm tìm Settlement Date | |
| def find_settlement_date(transactions, ref_code, trans_type): | |
| if trans_type == "TSF": # Giao dịch bán | |
| pattern = f"Amount paid TFR to TRUST \\({ref_code}\\)" | |
| for _, row in transactions.iterrows(): | |
| if str(row.get("Reference", "")).startswith("PY") and \ | |
| re.search(pattern, str(row.get("Description", "")), re.IGNORECASE) and \ | |
| pd.notna(row.get("Balance in Trust")): | |
| return standardize_date(row.get("Date")) | |
| elif trans_type == "TPF": # Giao dịch mua | |
| # Tìm mã RC liên quan | |
| rc_code = None | |
| for _, row in transactions.iterrows(): | |
| if str(row.get("Reference", "")).startswith("RC") and \ | |
| re.search(ref_code, str(row.get("Description", "")), re.IGNORECASE): | |
| rc_code = row["Reference"] | |
| break | |
| if rc_code: | |
| pattern = f"Withdrawal from TRUST.*\\({rc_code}\\)" | |
| for _, row in transactions.iterrows(): | |
| if str(row.get("Reference", "")).startswith("WC") and \ | |
| re.search(pattern, str(row.get("Description", "")), re.IGNORECASE) and \ | |
| pd.notna(row.get("Balance in Trust")): | |
| return standardize_date(row.get("Date")) | |
| return None | |
| # Hàm xử lý giao dịch | |
| def process_transactions(transactions): | |
| if transactions is None or transactions.empty: | |
| return pd.DataFrame(columns=TABLE_HEADERS) | |
| result = [] | |
| for _, row in transactions.iterrows(): | |
| ref_code = str(row.get("Reference", "")) | |
| trans_type = None | |
| if ref_code.startswith("TPF"): | |
| trans_type = "TPF" | |
| elif ref_code.startswith("TSF"): | |
| trans_type = "TSF" | |
| else: | |
| continue # Bỏ qua nếu không phải TPF hoặc TSF | |
| # Trích xuất thông tin từ Description | |
| security_name, currency, quantity, price = parse_description(row.get("Description")) | |
| if not all([security_name, currency, quantity, price]): | |
| continue # Bỏ qua nếu không trích xuất được đầy đủ thông tin | |
| # Chuẩn hóa ngày | |
| date = standardize_date(row.get("Date")) | |
| if not date: | |
| continue | |
| # Tìm Settlement Date | |
| settlement_date = find_settlement_date(transactions, ref_code, trans_type) | |
| # Tính Transaction Amount | |
| transaction_amount = quantity * price if quantity and price else None | |
| # Tính Commission (chênh lệch giữa Balance in Trust và Transaction Amount) | |
| balance_in_trust = standardize_number(row.get("Balance in Trust")) | |
| commission = None | |
| if balance_in_trust and transaction_amount: | |
| commission = abs(balance_in_trust - transaction_amount) | |
| # Điền vào template | |
| result.append({ | |
| "Reference - Code": ref_code, | |
| "GL Posting Date": date, | |
| "Date": date, | |
| "Settlement Date": settlement_date, | |
| "Security Name": security_name, | |
| "Currency": currency, | |
| "Quantity": -quantity if trans_type == "TSF" else quantity, # Quantity âm nếu bán | |
| "Price": price, | |
| "Transaction Amount": transaction_amount, | |
| "Commission": commission | |
| }) | |
| return pd.DataFrame(result, columns=TABLE_HEADERS) | |
| # Hàm xử lý file | |
| def process_file(file): | |
| if file is None: | |
| return None, "No file uploaded" | |
| file_path = file.name | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| if file_ext != '.pdf': | |
| return None, "Only PDF files are supported" | |
| transactions = extract_tables_from_pdf(file_path) | |
| if transactions is None: | |
| return None, "Could not extract data from PDF" | |
| processed_data = process_transactions(transactions) | |
| return processed_data, "Processing completed" | |
| # Hàm tạo bảng | |
| def generate_table(file, current_table): | |
| table, status = process_file(file) | |
| if table is not None and not table.empty: | |
| if current_table is not None and not current_table.empty: | |
| updated_table = pd.concat([current_table, table], ignore_index=True) | |
| else: | |
| updated_table = table | |
| return updated_table, status, updated_table | |
| return current_table, status, current_table | |
| # Hàm xuất CSV và reset | |
| def export_and_reset(current_table): | |
| if current_table is None or current_table.empty: | |
| return None, pd.DataFrame(columns=TABLE_HEADERS), "No data to export" | |
| csv_file = "extracted_transactions.csv" | |
| current_table.to_csv(csv_file, index=False) | |
| reset_table = pd.DataFrame(columns=TABLE_HEADERS) | |
| return gr.File(value=csv_file), reset_table, "Export successful", reset_table | |
| # Giao diện Gradio | |
| with gr.Blocks(title="Transaction Extractor") as app: | |
| gr.Markdown("# Transaction Extractor") | |
| with gr.Column(): | |
| file_input = gr.File(label="Upload Bank Statement (PDF)") | |
| extract_btn = gr.Button("Extract Transactions") | |
| status_output = gr.Textbox(label="Status") | |
| table_output = gr.Dataframe(headers=TABLE_HEADERS, label="Extracted Transactions") | |
| export_btn = gr.Button("Export to CSV & Reset") | |
| csv_output = gr.File(label="Download CSV") | |
| export_status = gr.Textbox(label="Export Status") | |
| table_state = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS)) | |
| extract_btn.click( | |
| fn=generate_table, | |
| inputs=[file_input, table_state], | |
| outputs=[table_output, status_output, table_state] | |
| ) | |
| export_btn.click( | |
| fn=export_and_reset, | |
| inputs=table_state, | |
| outputs=[csv_output, table_output, export_status, table_state] | |
| ) | |
| app.launch(share=True) |