Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import json | |
| from typing import List, Dict, Any | |
| from datasets import load_dataset | |
| def load_huggingface_faq_data(dataset_name: str = "NebulaByte/E-Commerce_FAQs") -> List[Dict[str, Any]]: | |
| """ | |
| Load FAQ data from Hugging Face datasets | |
| """ | |
| print(f"Loading dataset {dataset_name} from Hugging Face...") | |
| try: | |
| # Load the dataset | |
| dataset = load_dataset(dataset_name) | |
| # Get the train split (as seen in the screenshots) | |
| train_data = dataset["train"] | |
| # Convert to list of dictionaries | |
| faqs = [] | |
| for item in train_data: | |
| # Extract the required fields | |
| faq = { | |
| "question": item["question"], | |
| "answer": item["answer"], | |
| # Include additional metadata | |
| "category": item.get("category", ""), | |
| "question_id": item.get("question_id", ""), | |
| "faq_url": item.get("faq_url", "") | |
| } | |
| faqs.append(faq) | |
| print(f"Loaded {len(faqs)} FAQ entries from Hugging Face") | |
| return faqs | |
| except Exception as e: | |
| print(f"Error loading dataset from Hugging Face: {e}") | |
| print("Falling back to local data...") | |
| return load_faq_data("data/faq_data.csv") | |
| def load_faq_data(file_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Load FAQ data from a local CSV or JSON file | |
| """ | |
| print(f"Loading data from {file_path}") | |
| try: | |
| if file_path.endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| # Assume CSV has 'question' and 'answer' columns | |
| faqs = df.to_dict('records') | |
| elif file_path.endswith('.json'): | |
| with open(file_path, 'r') as f: | |
| faqs = json.load(f) | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_path}") | |
| print(f"Loaded {len(faqs)} FAQ entries") | |
| return faqs | |
| except Exception as e: | |
| print(f"Error loading data: {e}") | |
| # Create a minimal sample dataset as fallback | |
| print("Creating sample dataset as fallback") | |
| sample_faqs = [ | |
| {"question": "How do I track my order?", | |
| "answer": "You can track your order by logging into your account and visiting the Order History section."}, | |
| {"question": "How do I reset my password?", | |
| "answer": "To reset your password, click on the 'Forgot Password' link on the login page."} | |
| ] | |
| return sample_faqs | |
| def preprocess_faq(faqs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Preprocess FAQ data: clean text, handle formatting | |
| """ | |
| processed_faqs = [] | |
| for faq in faqs: | |
| # Basic cleaning - remove extra whitespace | |
| if 'question' in faq and faq['question'] is not None: | |
| faq['question'] = faq['question'].strip() | |
| else: | |
| faq['question'] = "" | |
| if 'answer' in faq and faq['answer'] is not None: | |
| faq['answer'] = faq['answer'].strip() | |
| else: | |
| faq['answer'] = "" | |
| # Only include FAQs with both question and answer | |
| if faq.get('question') and faq.get('answer'): | |
| processed_faqs.append(faq) | |
| print(f"After preprocessing: {len(processed_faqs)} valid FAQ entries") | |
| return processed_faqs |