Spaces:
Runtime error
Runtime error
##FINAL FILE | |
# This deploy.py file contains the complete code for the Instagram Reels Analysis Gradio App. | |
# --- Imports --- | |
import gradio as gr | |
import time | |
import random | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import torch | |
import emoji | |
import re | |
import numpy as np | |
import io # Import io for handling image bytes | |
from instagrapi import Client | |
from transformers import ( | |
pipeline, | |
AutoTokenizer, | |
AutoModelForSequenceClassification, | |
Trainer, | |
TrainingArguments, | |
RobertaForSequenceClassification, | |
AlbertForSequenceClassification | |
) | |
from datasets import Dataset, Features, Value | |
from collections import Counter | |
from sklearn.metrics import accuracy_score, f1_score | |
# --- Configuration --- | |
CONFIG = { | |
"max_length": 128, | |
"batch_size": 16, | |
"learning_rate": 2e-5, | |
"num_train_epochs": 3, | |
"few_shot_examples": 5, # per class | |
"confidence_threshold": 0.7, | |
"neutral_reanalysis_threshold": 0.33 | |
} | |
# --- Global Variables for State Management --- | |
global cl | |
global explore_reels_list | |
global sentiment_analyzer_instance | |
global content_classifier_pipeline | |
cl = None | |
explore_reels_list = [] | |
sentiment_analyzer_instance = None | |
content_classifier_pipeline = None | |
# --- Sentiment Analysis Class --- | |
class ReelSentimentAnalyzer: | |
def __init__(self): | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self._initialize_models() | |
def _initialize_models(self): | |
"""Initialize and configure all models""" | |
print("\nInitializing Sentiment Analysis Models...") | |
# English models | |
print("Loading English Emotion Model...") | |
self.emotion_tokenizer = AutoTokenizer.from_pretrained("finiteautomata/bertweet-base-emotion-analysis") | |
self.emotion_model = AutoModelForSequenceClassification.from_pretrained( | |
"finiteautomata/bertweet-base-emotion-analysis" | |
).to(self.device) | |
print("Loading English Sentiment Model...") | |
self.sentiment_tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") | |
self.sentiment_model = RobertaForSequenceClassification.from_pretrained( | |
"cardiffnlp/twitter-roberta-base-sentiment-latest", | |
ignore_mismatched_sizes=True | |
).to(self.device) | |
# Hindi/English model (we'll fine-tune this) | |
print("Loading Indic-BERT Model for Hindi/Hinglish...") | |
self.hindi_tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert") | |
self.hindi_model = AlbertForSequenceClassification.from_pretrained( | |
"ai4bharat/indic-bert", | |
num_labels=3, | |
id2label={0: "negative", 1: "neutral", 2: "positive"}, | |
label2id={"negative": 0, "neutral": 1, "positive": 2} | |
).to(self.device) | |
# Store label2id mapping for easy access | |
self.hindi_label2id = self.hindi_model.config.label2id | |
print("Models Initialized.") | |
# Emotion to sentiment mapping | |
self.emotion_map = { | |
"joy": "positive", "love": "positive", "happy": "positive", | |
"anger": "negative", "sadness": "negative", "fear": "negative", | |
"surprise": "neutral", "neutral": "neutral", "disgust": "negative", "shame": "negative" | |
} | |
# Neutral keywords | |
self.neutral_keywords = { | |
"ad", "sponsored", "promo", "sale", "discount", "offer", "giveaway", | |
"buy", "shop", "link in bio", | |
"विज्ञापन", "प्रचार", "ऑफर", "डिस्काउंट", "बिक्री", "लिंक बायो में" | |
} | |
def train_hindi_model(self, train_data, eval_data=None): | |
""" | |
Fine-tune the Hindi/English model on labeled data | |
Args: | |
train_data: List of dicts [{"text": "...", "label": "positive/negative/neutral"}] | |
eval_data: Optional evaluation data | |
""" | |
print("\nStarting Hindi model training...") | |
# Convert to dataset | |
train_dataset = Dataset.from_pandas(pd.DataFrame(train_data)) | |
# Map string labels to integer IDs | |
def map_labels_to_ids(examples): | |
# Ensure label exists and is in expected range | |
labels = [] | |
for label_str in examples["label"]: | |
if label_str in self.hindi_label2id: | |
labels.append(self.hindi_label2id[label_str]) | |
else: | |
# Handle unexpected labels, maybe map to neutral or skip | |
print(f"Warning: Unexpected label '{label_str}'. Mapping to neutral.") | |
labels.append(self.hindi_label2id["neutral"]) # Map unknown to neutral | |
examples["label"] = labels | |
return examples | |
train_dataset = train_dataset.map(map_labels_to_ids, batched=True) | |
# Explicitly set the label column to integer type | |
train_dataset = train_dataset.cast_column("label", Value("int64")) | |
def tokenize_function(examples): | |
return self.hindi_tokenizer( | |
examples["text"], | |
padding="max_length", | |
truncation=True, | |
max_length=CONFIG["max_length"] | |
) | |
tokenized_train = train_dataset.map(tokenize_function, batched=True) | |
# Training arguments - using eval_strategy instead of evaluation_strategy | |
training_args = TrainingArguments( | |
output_dir="./results", | |
eval_strategy="epoch" if eval_data else "no", | |
per_device_train_batch_size=CONFIG["batch_size"], | |
per_device_eval_batch_size=CONFIG["batch_size"], | |
learning_rate=CONFIG["learning_rate"], | |
num_train_epochs=CONFIG["num_train_epochs"], | |
weight_decay=0.01, | |
save_strategy="no", # Don't save checkpoints during training | |
logging_dir='./logs', | |
logging_steps=10, | |
report_to="none" # Don't report to external services | |
) | |
# Compute metrics function | |
def compute_metrics(p): | |
predictions, labels = p | |
predictions = np.argmax(predictions, axis=1) | |
return { | |
"accuracy": accuracy_score(labels, predictions), | |
"f1": f1_score(labels, predictions, average="weighted") | |
} | |
# Trainer | |
eval_dataset_processed = None | |
if eval_data: | |
eval_dataset = Dataset.from_pandas(pd.DataFrame(eval_data)) | |
eval_dataset = eval_dataset.map(map_labels_to_ids, batched=True) | |
eval_dataset_processed = eval_dataset.cast_column("label", Value("int64")).map(tokenize_function, batched=True) | |
trainer = Trainer( | |
model=self.hindi_model, | |
args=training_args, | |
train_dataset=tokenized_train, | |
eval_dataset=eval_dataset_processed, | |
compute_metrics=compute_metrics if eval_data else None, | |
) | |
# Train | |
trainer.train() | |
# Save the fine-tuned model | |
print("Saving fine-tuned Hindi model...") | |
self.hindi_model.save_pretrained("./fine_tuned_hindi_sentiment") | |
self.hindi_tokenizer.save_pretrained("./fine_tuned_hindi_sentiment") | |
print("Hindi model training complete.") | |
def preprocess_text(self, text): | |
"""Enhanced text cleaning with multilingual support""" | |
if not text: | |
return "" | |
# Convert emojis to text | |
text = emoji.demojize(text, delimiters=(" ", " ")) | |
# Remove URLs and mentions | |
text = re.sub(r"http\S+|@\w+", "", text) | |
# Expand common abbreviations (can be extended) | |
abbrevs = { | |
r"\bomg\b": "oh my god", | |
r"\btbh\b": "to be honest", | |
r"\bky\b": "kyun", # Hindi 'why' | |
r"\bkb\b": "kab", # Hindi 'when' | |
r"\bkya\b": "kya", # Hindi 'what' | |
r"\bkahan\b": "kahan", # Hindi 'where' | |
r"\bkaisa\b": "kaisa" # Hindi 'how' | |
} | |
for pattern, replacement in abbrevs.items(): | |
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) | |
# Remove extra whitespace | |
text = re.sub(r"\s+", " ", text).strip() | |
return text | |
def detect_language(self, text): | |
"""Improved language detection""" | |
if re.search(r"[\u0900-\u097F]", text): # Devanagari script (Hindi, Marathi etc.) | |
return "hi" | |
# Simple check for common Hindi/Hinglish words (can be expanded) | |
hinglish_keywords = ["hai", "kyun", "nahi", "kya", "acha", "bas", "yaar", "main"] | |
if any(re.search(rf"\b{kw}\b", text.lower()) for kw in hinglish_keywords): | |
return "hi-latin" | |
# Fallback to English if no strong Hindi/Hinglish indicators | |
return "en" | |
def analyze_content(self, text): | |
"""Main analysis function with improved confidence handling""" | |
processed = self.preprocess_text(text) | |
if not processed: | |
return "neutral", 0.5, {"reason": "empty_text"} | |
lang = self.detect_language(processed) | |
# Check for neutral keywords first with higher confidence | |
if any(re.search(rf"\b{re.escape(kw)}\b", processed.lower()) for kw in self.neutral_keywords): | |
return "neutral", 0.9, {"reason": "neutral_keyword"} | |
try: | |
if lang in ("hi", "hi-latin"): | |
# Use Hindi model for Hindi/Hinglish | |
return self._analyze_hindi_content(processed) | |
else: | |
# Use ensemble for English | |
return self._analyze_english_content(processed) | |
except Exception as e: | |
print(f"Analysis error for text '{processed[:50]}...': {e}") | |
return "neutral", 0.5, {"error": str(e), "original_text": text[:50]} | |
def _analyze_hindi_content(self, text): | |
"""Analyze Hindi content with fine-tuned model""" | |
inputs = self.hindi_tokenizer( | |
text, | |
return_tensors="pt", | |
truncation=True, | |
padding=True, | |
max_length=CONFIG["max_length"] | |
).to(self.device) | |
with torch.no_grad(): | |
outputs = self.hindi_model(**inputs) | |
probs = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
pred_idx = torch.argmax(probs).item() | |
confidence = probs[0][pred_idx].item() | |
label = self.hindi_model.config.id2label[pred_idx] | |
return label, confidence, {"model": "fine-tuned-indic-bert", "lang": "hi"} | |
def _analyze_english_content(self, text): | |
"""Analyze English content with ensemble approach""" | |
# Emotion analysis | |
emotion_inputs = self.emotion_tokenizer( | |
text, | |
return_tensors="pt", | |
truncation=True, | |
max_length=CONFIG["max_length"] | |
).to(self.device) | |
with torch.no_grad(): | |
emotion_outputs = self.emotion_model(**emotion_inputs) | |
emotion_probs = torch.nn.functional.softmax(emotion_outputs.logits, dim=-1) | |
emotion_pred = torch.argmax(emotion_probs).item() | |
emotion_label = self.emotion_model.config.id2label[emotion_pred] | |
emotion_score = emotion_probs[0][emotion_pred].item() | |
# Sentiment analysis | |
sentiment_inputs = self.sentiment_tokenizer( | |
text, | |
return_tensors="pt", | |
truncation=True, | |
max_length=CONFIG["max_length"] | |
).to(self.device) | |
with torch.no_grad(): | |
sentiment_outputs = self.sentiment_model(**sentiment_inputs) | |
sentiment_probs = torch.nn.functional.softmax(sentiment_outputs.logits, dim=-1) | |
sentiment_pred = torch.argmax(sentiment_probs).item() | |
# sentiment_label comes as 'LABEL_0', 'LABEL_1', 'LABEL_2' | |
# Need to map these to 'negative', 'neutral', 'positive' | |
# The roberta-base-sentiment-latest model has mapping: 0: Negative, 1: Neutral, 2: Positive | |
sentiment_label_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'} | |
sentiment_label = sentiment_label_mapping.get(sentiment_pred, 'neutral') # Default to neutral if mapping fails | |
sentiment_score = sentiment_probs[0][sentiment_pred].item() | |
# Combine results | |
mapped_emotion = self.emotion_map.get(emotion_label, "neutral") | |
# Prioritize high-confidence sentiment | |
if sentiment_score > CONFIG["confidence_threshold"]: | |
final_label = sentiment_label | |
final_confidence = sentiment_score | |
reason = "high_sentiment_confidence" | |
# Then prioritize high-confidence emotion if not neutral | |
elif emotion_score > CONFIG["confidence_threshold"] and mapped_emotion != "neutral": | |
final_label = mapped_emotion | |
final_confidence = emotion_score | |
reason = "high_emotion_confidence" | |
else: | |
# Fallback mechanism for lower confidence or conflicting results | |
# A simple weighted sum or voting could be used, | |
# but let's use a clearer logic: | |
# If both are low confidence or neutral, and their results align, use that. | |
# Otherwise, default to neutral or pick the one with slightly higher confidence | |
# if it's not neutral. | |
if sentiment_label == mapped_emotion and sentiment_label != "neutral": | |
final_label = sentiment_label | |
final_confidence = (sentiment_score + emotion_score) / 2 | |
reason = "emotion_sentiment_agreement" | |
elif sentiment_label != "neutral" and sentiment_score > emotion_score and sentiment_score > 0.4: # Use sentiment if somewhat confident | |
final_label = sentiment_label | |
final_confidence = sentiment_score * 0.9 # Slightly reduce confidence | |
reason = "sentiment_slightly_higher" | |
elif mapped_emotion != "neutral" and emotion_score > sentiment_score and emotion_score > 0.4: # Use emotion if somewhat confident | |
final_label = mapped_emotion | |
final_confidence = emotion_score * 0.9 # Slightly reduce confidence | |
reason = "emotion_slightly_higher" | |
else: # Default to neutral if no strong signal | |
final_label = "neutral" | |
final_confidence = 0.6 # Assign a baseline neutral confidence | |
reason = "fallback_to_neutral" | |
return final_label, final_confidence, { | |
"emotion_label": emotion_label, | |
"emotion_score": emotion_score, | |
"sentiment_label": sentiment_label, | |
"sentiment_score": sentiment_score, | |
"mapped_emotion": mapped_emotion, | |
"model": "ensemble", | |
"lang": "en", | |
"reason": reason | |
} | |
def analyze_reels(self, reels, max_to_analyze=100): | |
"""Batch analysis with improved neutral handling""" | |
print(f"\n--- Starting Sentiment Analysis ({max_to_analyze} reels) ---") | |
results = Counter() | |
detailed_results = [] | |
for i, reel in enumerate(reels[:max_to_analyze], 1): | |
caption = getattr(reel, 'caption_text', '') or getattr(reel, 'caption', '') or '' | |
print(f"Analyzing sentiment for reel {i}/{max_to_analyze} (ID: {reel.id})...") | |
label, confidence, details = self.analyze_content(caption) | |
results[label] += 1 | |
detailed_results.append({ | |
"reel_id": reel.id, # Add reel ID | |
"text": caption, | |
"label": label, | |
"confidence": confidence, | |
"details": details | |
}) | |
print("\nInitial Sentiment Distribution:", dict(results)) | |
# Post-analysis neutral reduction if a significant portion is neutral | |
total_analyzed = sum(results.values()) | |
if total_analyzed > 0 and results["neutral"] / total_analyzed > CONFIG["neutral_reanalysis_threshold"]: | |
print(f"High neutral count ({results['neutral']}). Attempting to re-analyze...") | |
self._reduce_neutrals(results, detailed_results) | |
print("Sentiment distribution after re-analysis:", dict(results)) | |
print("Sentiment Analysis Complete.") | |
return results, detailed_results | |
def _reduce_neutrals(self, results, detailed_results): | |
"""Apply additional techniques to reduce neutral classifications""" | |
neutrals_to_recheck = [item for item in detailed_results if item["label"] == "neutral" and item["confidence"] < 0.8] | |
print(f"Re-checking {len(neutrals_to_recheck)} neutral reels...") | |
for item in neutrals_to_recheck: | |
original_text = item["text"] | |
processed_text = self.preprocess_text(original_text) | |
text_lower = processed_text.lower() | |
# Try keyword analysis for strong positive/negative signals | |
pos_keywords_strong = {"amazing", "love", "best", "fantastic", "awesome", "superb", "great", | |
"अद्भुत", "शानदार", "बहुत अच्छा", "मज़ेदार"} | |
neg_keywords_strong = {"hate", "worst", "bad", "terrible", "awful", "disappointed", "horrible", "cringe", | |
"खराब", "बेकार", "बहुत बुरा", "घटिया"} | |
is_strong_pos = any(re.search(rf"\b{re.escape(kw)}\b", text_lower) for kw in pos_keywords_strong) | |
is_strong_neg = any(re.search(rf"\b{re.escape(kw)}\b", text_lower) for kw in neg_keywords_strong) | |
if is_strong_pos and not is_strong_neg: | |
# Reclassify as positive if strong positive keywords found and no strong negative ones | |
results["neutral"] -= 1 | |
results["positive"] += 1 | |
item.update({ | |
"label": "positive", | |
"confidence": min(0.95, item["confidence"] + 0.3), # Increase confidence | |
"reanalyzed": True, | |
"reanalysis_reason": "strong_pos_keywords" | |
}) | |
# print(f" Reclassified reel {item['reel_id']} to Positive (Keywords)") | |
elif is_strong_neg and not is_strong_pos: | |
# Reclassify as negative if strong negative keywords found and no strong positive ones | |
results["neutral"] -= 1 | |
results["negative"] += 1 | |
item.update({ | |
"label": "negative", | |
"confidence": min(0.95, item["confidence"] + 0.3), # Increase confidence | |
"reanalyzed": True, | |
"reanalysis_reason": "strong_neg_keywords" | |
}) | |
# print(f" Reclassified reel {item['reel_id']} to Negative (Keywords)") | |
# Add other potential re-analysis rules here if needed | |
# e.g., checking for question marks (might indicate neutral query), | |
# or checking length (very short captions often neutral) | |
# For now, we stick to keyword-based re-analysis for simplicity | |
def plot_sentiment_pie(results, title="Reels Sentiment Analysis"): | |
""" | |
Creates a pie chart from sentiment analysis results and returns the matplotlib figure. | |
Args: | |
results: Counter object or dict with 'positive', 'neutral', 'negative' keys | |
title: Chart title | |
Returns: | |
Matplotlib Figure object, or None if no data. | |
""" | |
labels = ['Positive', 'Neutral', 'Negative'] | |
sizes = [results.get('positive', 0), results.get('neutral', 0), results.get('negative', 0)] | |
if sum(sizes) == 0: | |
return None | |
colors = ['#4CAF50', '#FFC107', '#F44336'] | |
explode = (0.05, 0, 0.05) | |
fig, ax = plt.subplots(figsize=(8, 6)) | |
filtered_labels = [label for i, label in enumerate(labels) if sizes[i] > 0] | |
filtered_sizes = [size for size in sizes if size > 0] | |
filtered_colors = [colors[i] for i, size in enumerate(sizes) if size > 0] | |
explode_map = {'Positive': 0.05, 'Neutral': 0, 'Negative': 0.05} | |
filtered_explode = [explode_map.get(label, 0) for label in filtered_labels] | |
ax.pie(filtered_sizes, explode=filtered_explode, labels=filtered_labels, colors=filtered_colors, | |
autopct='%1.1f%%', shadow=True, startangle=140, | |
textprops={'fontsize': 12, 'color': 'black'}) | |
ax.axis('equal') | |
plt.title(title, fontsize=16, pad=20) | |
plt.tight_layout() | |
# Return the figure object | |
return fig | |
# --- Content Analysis Logic --- | |
# Content categories | |
content_categories = [ | |
"news", "meme", "sports", "science", "music", "movie", | |
"gym", "comedy", "food", "technology", "travel", "fashion", "art", "business" | |
] | |
category_keywords = { | |
"news": {"news", "update", "breaking", "reported", "headlines"}, | |
"meme": {"meme", "funny", "lol", "haha", "relatable"}, | |
"sports": {"sports", "cricket", "football", "match", "game", "team", "score"}, | |
"science": {"science", "research", "discovery", "experiment", "facts", "theory"}, | |
"music": {"music", "song", "album", "release", "artist", "beats"}, | |
"movie": {"movie", "film", "bollywood", "trailer", "series", "actor"}, | |
"gym": {"gym", "workout", "fitness", "exercise", "training", "bodybuilding"}, | |
"comedy": {"comedy", "joke", "humor", "standup", "skit", "laugh"}, | |
"food": {"food", "recipe", "cooking", "eat", "delicious", "restaurant", "kitchen"}, | |
"technology": {"tech", "phone", "computer", "ai", "gadget", "software", "innovation"}, | |
"travel": {"travel", "trip", "vacation", "explore", "destination", "adventure"}, | |
"fashion": {"fashion", "style", "ootd", "outfit", "trends", "clothing"}, | |
"art": {"art", "artist", "painting", "drawing", "creative", "design"}, | |
"business": {"business", "startup", "marketing", "money", "finance", "entrepreneur"} | |
} | |
def preprocess_text_cat(text): | |
"""Basic text cleaning for categorization""" | |
if not text: | |
return "" | |
text = re.sub(r"http\S+|@\w+|#\w+", "", text).lower() | |
text = re.sub(r"\s+", " ", text).strip() | |
return text | |
def classify_reel_content(text): | |
"""Classify content using keywords and zero-shot model""" | |
global content_classifier_pipeline # Use the global pipeline | |
processed = preprocess_text_cat(text) | |
if not processed or len(processed.split()) < 2: | |
return "other", {"reason": "short_text"} | |
for category, keywords in category_keywords.items(): | |
if any(re.search(rf"\b{re.escape(keyword)}\b", processed) for keyword in keywords): | |
return category, {"reason": "keyword_match"} | |
model_text = processed[:256] | |
if content_classifier_pipeline is None: | |
# Should not happen if initialized in analyze_reels_gradio or globally | |
print("Content classifier pipeline not initialized in classify_reel_content.") | |
return "other", {"reason": "classifier_not_initialized"} | |
try: | |
result = content_classifier_pipeline(model_text, content_categories, multi_label=False) | |
top_label = result['labels'][0] | |
top_score = result['scores'][0] | |
if top_score > 0.5: | |
return top_label, {"reason": "model_prediction", "score": top_score} | |
else: | |
return "other", {"reason": "low_model_confidence", "score": top_score} | |
except Exception as e: | |
print(f"Error during zero-shot classification for text '{model_text}...': {e}") | |
return "other", {"reason": "classification_error"} | |
def plot_category_distribution(counter, title="Reels Content Distribution"): | |
""" | |
Generate pie chart from category counts and returns the matplotlib figure. | |
Args: | |
counter: Counter object with category counts. | |
title: Chart title. | |
Returns: | |
Matplotlib Figure object, or None if no data. | |
""" | |
labels = [] | |
sizes = [] | |
total = sum(counter.values()) | |
if total == 0: | |
return None | |
threshold = total * 0.02 | |
other_count = 0 | |
sorted_categories = counter.most_common() | |
for category, count in sorted_categories: | |
if count >= threshold and category != "other": | |
labels.append(category.replace('_', ' ').title()) | |
sizes.append(count) | |
elif category == "other": | |
other_count += count | |
else: | |
other_count += count | |
if other_count > 0: | |
labels.append("Other") | |
sizes.append(other_count) | |
if not sizes: | |
return None | |
fig, ax = plt.subplots(figsize=(10, 8)) | |
colors = plt.cm.viridis(np.linspace(0, 1, len(sizes))) | |
ax.pie( | |
sizes, | |
labels=labels, | |
autopct='%1.1f%%', | |
startangle=140, | |
colors=colors, | |
wedgeprops={'edgecolor': 'white', 'linewidth': 1}, | |
textprops={'fontsize': 11, 'color': 'black'} | |
) | |
plt.title(title, pad=20, fontsize=15) | |
plt.axis('equal') | |
plt.tight_layout() | |
# Return the figure object | |
return fig | |
# --- Gradio-Compatible Functions --- | |
# Preset username from Colab secrets | |
# Ensure USERNAME is set in your Colab secrets | |
USERNAME = "jattman1993" # Replace with your preset username or fetch from secrets if needed | |
def login_gradio_auto(): | |
"""Gradio-compatible function for automatic login.""" | |
global cl | |
try: | |
# Fetch password securely from Colab secrets | |
PASSWORD = userdata.get('password') | |
except Exception as e: | |
return f"Error accessing password secret: {e}", gr.update(visible=False) # Hide OTP input on error | |
if not PASSWORD: | |
return "Error: Instagram password not found in Colab secrets. Please add it to Colab secrets with the key 'password'.", gr.update(visible=False) # Hide OTP input | |
cl = Client() | |
try: | |
cl.login(USERNAME, PASSWORD) | |
# If login is successful, return success message and hide OTP input | |
return f"Successfully logged in as {USERNAME}", gr.update(visible=False) | |
except Exception as e: | |
cl = None # Ensure cl is None on failure | |
error_message = str(e) | |
if "Two factor challenged" in error_message or "challenge_required" in error_message: | |
# If 2FA is required, show the OTP input field | |
return f"Login failed: Two-factor authentication required. Please enter the code below.", gr.update(visible=True) | |
else: | |
# For other errors, hide OTP input and show error message | |
return f"Error during login: {error_message}", gr.update(visible=False) | |
# Function to handle OTP submission (if 2FA was required) | |
def submit_otp_gradio(otp_code): | |
"""Gradio-compatible function to submit OTP.""" | |
global cl | |
if cl is None: | |
return "Error: Not logged in or client not initialized.", "", gr.update(visible=False) # Hide OTP input | |
try: | |
# Assuming the challenge was set up correctly in the login attempt | |
# and the cl object has the challenge_data | |
cl.two_factor_login(otp_code) | |
# If OTP is successful | |
return f"OTP successful. Successfully logged in as {USERNAME}.", "", gr.update(visible=False) # Clear OTP input and hide field | |
except Exception as e: | |
# If OTP fails | |
return f"OTP submission failed: {e}. Please try again.", "", gr.update(visible=True) # Keep OTP input visible | |
def fetch_reels_gradio(): | |
"""Gradio-compatible function to fetch explore reels.""" | |
global cl | |
global explore_reels_list | |
if cl is None: | |
explore_reels_list = [] # Ensure list is empty on failure | |
return "Error: Not logged in. Please log in first." | |
try: | |
# Fetch a limited number of reels for demonstration purposes | |
# You might want to make this number configurable later | |
fetched_reels = cl.explore_reels()[:100] # Fetch up to 100 for analysis | |
explore_reels_list = fetched_reels | |
if explore_reels_list: | |
return f"Successfully fetched {len(explore_reels_list)} explore reels." | |
else: | |
explore_reels_list = [] # Ensure it's an empty list | |
return "Fetched 0 explore reels." | |
except Exception as e: | |
explore_reels_list = [] # Ensure it's an empty list on error | |
return f"Error fetching explore reels: {e}" | |
def analyze_reels_gradio(max_to_analyze): | |
"""Gradio-compatible function to analyze fetched reels and generate plots.""" | |
global explore_reels_list | |
global sentiment_analyzer_instance | |
global content_classifier_pipeline | |
if not explore_reels_list: | |
# Return None for plots if no reels | |
return "Error: No reels fetched yet. Please fetch reels first.", None, None | |
# Ensure max_to_analyze does not exceed the number of fetched reels | |
num_reels_to_process = min(max_to_analyze, len(explore_reels_list)) | |
reels_to_analyze = explore_reels_list[:num_reels_to_process] | |
if not reels_to_analyze: | |
return "Error: No reels available to analyze.", None, None | |
# Initialize sentiment analyzer if not already done | |
if sentiment_analyzer_instance is None: | |
try: | |
sentiment_analyzer_instance = ReelSentimentAnalyzer() | |
# Optional: Train Hindi model if needed and data is available | |
# sample_train_data = [...] # Define your training data | |
# sentiment_analyzer_instance.train_hindi_model(sample_train_data) | |
except Exception as e: | |
return f"Error initializing Sentiment Analyzer: {e}", None, None | |
# Initialize content classifier pipeline if not already done | |
if content_classifier_pipeline is None: | |
try: | |
print("Initializing Content Classifier Pipeline...") | |
content_classifier_pipeline = pipeline( | |
"zero-shot-classification", | |
model="facebook/bart-large-mnli", | |
device=0 if torch.cuda.is_available() else -1 # Use GPU if available | |
) | |
print("Content Classifier Pipeline Initialized.") | |
except Exception as e: | |
return f"Error initializing Content Classifier: {e}", None, None | |
analysis_status_messages = [] | |
sentiment_plot_figure = None # Changed to figure | |
content_plot_figure = None # Changed to figure | |
# Perform Sentiment Analysis | |
try: | |
analysis_status_messages.append(f"Starting Sentiment Analysis for {len(reels_to_analyze)} reels...") | |
sentiment_results, detailed_sentiment_results = sentiment_analyzer_instance.analyze_reels( | |
reels_to_analyze, | |
max_to_analyze=len(reels_to_analyze) # Pass the actual number being processed | |
) | |
# Call the updated plotting function that returns a figure | |
sentiment_plot_figure = plot_sentiment_pie(sentiment_results, title=f"Sentiment of {len(reels_to_analyze)} Instagram Reels") | |
analysis_status_messages.append("Sentiment Analysis Complete.") | |
except Exception as e: | |
analysis_status_messages.append(f"Error during Sentiment Analysis: {e}") | |
sentiment_plot_figure = None # Ensure plot is None on error | |
# Perform Content Categorization | |
try: | |
analysis_status_messages.append(f"Starting Content Categorization for {len(reels_to_analyze)} reels...") | |
category_counts = Counter() | |
# Re-implement content analysis slightly to fit this flow using the global pipeline | |
print(f"\n⏳ Analyzing content for {len(reels_to_analyze)} reels...") | |
for i, reel in enumerate(reels_to_analyze, 1): | |
caption = getattr(reel, 'caption_text', '') or getattr(reel, 'caption', '') or '' | |
# Use the global classifier pipeline | |
category, details = classify_reel_content(caption) | |
category_counts[category] += 1 | |
print("\n✅ Content Analysis complete!") | |
print("\n📊 Category Counts:") | |
for category, count in category_counts.most_common(): | |
print(f"- {category.replace('_', ' ').title()}: {count}") | |
# Call the updated plotting function that returns a figure | |
content_plot_figure = plot_category_distribution(category_counts) | |
analysis_status_messages.append("Content Categorization Complete.") | |
except Exception as e: | |
analysis_status_messages.append(f"Error during Content Analysis: {e}") | |
content_plot_figure = None # Ensure plot is None on error | |
final_status_message = "\n".join(analysis_status_messages) | |
# Return the figure objects | |
return final_status_message, sentiment_plot_figure, content_plot_figure | |
# --- Gradio Blocks Interface --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Instagram Reels Analysis") | |
# Login Section | |
with gr.Row(): | |
connect_button = gr.Button("Connect Instagram") | |
login_status_output = gr.Label(label="Login Status") | |
# OTP Input (initially hidden) | |
with gr.Row(visible=False) as otp_row: | |
otp_input = gr.Textbox(label="Enter OTP Code") | |
otp_submit_button = gr.Button("Submit OTP") | |
# Fetch Reels Section | |
with gr.Row(): | |
fetch_button = gr.Button("Fetch Reels") | |
fetch_status_output = gr.Label(label="Fetch Status") | |
# Analysis Section | |
with gr.Row(): | |
max_reels_input = gr.Slider(minimum=1, maximum=100, value=10, step=1, label="Number of Reels to Analyze") | |
analyze_button = gr.Button("Analyze Reels") | |
analyze_status_output = gr.Label(label="Analysis Status") | |
# Results Section | |
with gr.Row(): | |
# Sentiment Analysis Outputs | |
with gr.Column(): | |
gr.Markdown("## Sentiment Analysis") | |
sentiment_plot_output = gr.Plot(label="Sentiment Distribution") | |
# Content Analysis Outputs | |
with gr.Column(): | |
gr.Markdown("## Content Analysis") | |
content_plot_output = gr.Plot(label="Content Distribution") | |
# Link buttons to functions | |
connect_button.click( | |
fn=login_gradio_auto, | |
inputs=None, # No direct inputs, username is preset | |
outputs=[login_status_output, otp_row] | |
) | |
otp_submit_button.click( | |
fn=submit_otp_gradio, | |
inputs=otp_input, | |
outputs=[login_status_output, otp_input, otp_row] | |
) | |
fetch_button.click( | |
fn=fetch_reels_gradio, | |
inputs=None, # No direct inputs needed for fetching | |
outputs=fetch_status_output | |
) | |
analyze_button.click( | |
fn=analyze_reels_gradio, | |
inputs=max_reels_input, # Input is the slider value | |
outputs=[analyze_status_output, sentiment_plot_output, content_plot_output] # Outputs are status and the two plots | |
) | |
# --- Launch the Gradio app --- | |
if __name__ == "__main__": | |
# This block ensures the app only launches when the script is executed directly | |
# (e.g., when running `python deploy.py` or `gradio deploy.py`) | |
# It prevents the app from launching automatically when the file is written in Colab. | |
# When deploying to Hugging Face Spaces via `gradio deploy`, it will find and run this. | |
# For Colab sharing, you can use `demo.launch(share=True)` outside this if block. | |
# For standalone deploy.py, you might want to uncomment this: | |
# demo.launch() | |
# For Colab and `gradio deploy` compatibility, the `gradio deploy` command handles launching. | |
# The `demo.launch()` line is removed here from the main script block. | |
pass # Keep the __main__ block if needed for local testing setup | |
# Note: When using `gradio deploy` on Hugging Face Spaces, the `demo` object is | |
# automatically discovered and launched. You don't need `demo.launch()` here | |
# for that specific deployment method. | |
# For running directly in Colab to test before deploying: | |
# demo.launch(share=True) | |