Spaces:
Sleeping
Sleeping
import json | |
import pandas as pd | |
from pathlib import Path | |
import logging | |
from collections import defaultdict | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from typing import Dict, List, Any | |
import re | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class DataQualityAnalyzer: | |
def __init__(self, data_dir: str = "data/raw"): | |
self.data_dir = Path(data_dir) | |
self.stats = defaultdict(dict) | |
def load_dataset(self, file_path: Path) -> List[Dict]: | |
"""Load a dataset from JSON file""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
logger.error(f"Error loading {file_path}: {str(e)}") | |
return [] | |
def analyze_text_quality(self, text: str) -> Dict[str, Any]: | |
"""Analyze quality metrics for a text""" | |
if not text: | |
return { | |
"length": 0, | |
"word_count": 0, | |
"avg_word_length": 0, | |
"has_numbers": False, | |
"has_special_chars": False | |
} | |
words = text.split() | |
return { | |
"length": len(text), | |
"word_count": len(words), | |
"avg_word_length": sum(len(w) for w in words) / len(words) if words else 0, | |
"has_numbers": bool(re.search(r'\d', text)), | |
"has_special_chars": bool(re.search(r'[^a-zA-Z0-9\s.,!?-]', text)) | |
} | |
def analyze_dataset(self, dataset_name: str, data: List[Dict]): | |
"""Analyze a single dataset""" | |
if not data: | |
logger.warning(f"No data found in {dataset_name}") | |
return | |
# Basic stats | |
self.stats[dataset_name]["total_samples"] = len(data) | |
# Text quality metrics | |
title_metrics = [] | |
abstract_metrics = [] | |
for item in data: | |
if "title" in item: | |
title_metrics.append(self.analyze_text_quality(item["title"])) | |
if "abstract" in item: | |
abstract_metrics.append(self.analyze_text_quality(item["abstract"])) | |
# Aggregate metrics | |
if title_metrics: | |
self.stats[dataset_name]["title"] = { | |
"avg_length": sum(m["length"] for m in title_metrics) / len(title_metrics), | |
"avg_word_count": sum(m["word_count"] for m in title_metrics) / len(title_metrics), | |
"avg_word_length": sum(m["avg_word_length"] for m in title_metrics) / len(title_metrics), | |
"has_numbers_ratio": sum(1 for m in title_metrics if m["has_numbers"]) / len(title_metrics), | |
"has_special_chars_ratio": sum(1 for m in title_metrics if m["has_special_chars"]) / len(title_metrics) | |
} | |
if abstract_metrics: | |
self.stats[dataset_name]["abstract"] = { | |
"avg_length": sum(m["length"] for m in abstract_metrics) / len(abstract_metrics), | |
"avg_word_count": sum(m["word_count"] for m in abstract_metrics) / len(abstract_metrics), | |
"avg_word_length": sum(m["avg_word_length"] for m in abstract_metrics) / len(abstract_metrics), | |
"has_numbers_ratio": sum(1 for m in abstract_metrics if m["has_numbers"]) / len(abstract_metrics), | |
"has_special_chars_ratio": sum(1 for m in abstract_metrics if m["has_special_chars"]) / len(abstract_metrics) | |
} | |
# Field presence | |
fields = set() | |
for item in data: | |
fields.update(item.keys()) | |
self.stats[dataset_name]["fields"] = list(fields) | |
# Year distribution (if available) | |
if "year" in fields: | |
years = [item["year"] for item in data if "year" in item] | |
self.stats[dataset_name]["year_distribution"] = pd.Series(years).value_counts().to_dict() | |
def analyze_all_datasets(self): | |
"""Analyze all datasets in the data directory""" | |
for file_path in self.data_dir.glob("*.json"): | |
dataset_name = file_path.stem | |
logger.info(f"Analyzing dataset: {dataset_name}") | |
data = self.load_dataset(file_path) | |
self.analyze_dataset(dataset_name, data) | |
def generate_report(self): | |
"""Generate a comprehensive report""" | |
report = { | |
"summary": {}, | |
"datasets": self.stats | |
} | |
# Overall summary | |
total_samples = sum(stats["total_samples"] for stats in self.stats.values()) | |
report["summary"]["total_samples"] = total_samples | |
report["summary"]["total_datasets"] = len(self.stats) | |
# Save report | |
report_file = self.data_dir.parent / "reports" / "data_quality_report.json" | |
report_file.parent.mkdir(exist_ok=True) | |
with open(report_file, 'w', encoding='utf-8') as f: | |
json.dump(report, f, indent=2, ensure_ascii=False) | |
logger.info(f"Quality report saved to {report_file}") | |
return report | |
def plot_metrics(self): | |
"""Generate plots for key metrics""" | |
plots_dir = self.data_dir.parent / "reports" / "plots" | |
plots_dir.mkdir(exist_ok=True) | |
# Sample distribution | |
plt.figure(figsize=(10, 6)) | |
samples = {name: stats["total_samples"] for name, stats in self.stats.items()} | |
plt.bar(samples.keys(), samples.values()) | |
plt.xticks(rotation=45) | |
plt.title("Sample Distribution Across Datasets") | |
plt.tight_layout() | |
plt.savefig(plots_dir / "sample_distribution.png") | |
plt.close() | |
# Text length distribution | |
for dataset_name, stats in self.stats.items(): | |
if "abstract" in stats: | |
plt.figure(figsize=(10, 6)) | |
plt.hist([m["length"] for m in stats["abstract"]], bins=50) | |
plt.title(f"Abstract Length Distribution - {dataset_name}") | |
plt.xlabel("Length") | |
plt.ylabel("Count") | |
plt.tight_layout() | |
plt.savefig(plots_dir / f"abstract_length_{dataset_name}.png") | |
plt.close() | |
def main(): | |
analyzer = DataQualityAnalyzer() | |
analyzer.analyze_all_datasets() | |
report = analyzer.generate_report() | |
analyzer.plot_metrics() | |
# Print summary | |
print("\nData Quality Summary:") | |
print(f"Total samples: {report['summary']['total_samples']}") | |
print(f"Total datasets: {report['summary']['total_datasets']}") | |
print("\nPer Dataset Summary:") | |
for dataset_name, stats in report["datasets"].items(): | |
print(f"\n{dataset_name}:") | |
print(f" Samples: {stats['total_samples']}") | |
if "abstract" in stats: | |
print(f" Avg abstract length: {stats['abstract']['avg_length']:.1f}") | |
print(f" Avg words per abstract: {stats['abstract']['avg_word_count']:.1f}") | |
if __name__ == "__main__": | |
main() |