|
|
|
import os, re, json |
|
import numpy as np |
|
import pandas as pd |
|
import gradio as gr |
|
import faiss |
|
import torch |
|
from typing import List |
|
from sentence_transformers import SentenceTransformer, CrossEncoder |
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
|
|
|
|
|
|
|
|
|
|
|
FLAN_PRIMARY = os.getenv("FLAN_PRIMARY", "google/flan-t5-large") |
|
FLAN_FALLBACK = "google/flan-t5-base" |
|
EMBED_NAME = "sentence-transformers/all-mpnet-base-v2" |
|
RERANK_NAME = "cross-encoder/stsb-roberta-base" |
|
|
|
NUM_SLOGAN_SAMPLES = int(os.getenv("NUM_SLOGAN_SAMPLES", "16")) |
|
INDEX_ROOT = os.path.join(os.path.dirname(__file__), "vector_store") |
|
DEFAULT_MODEL_FOR_INDEX = EMBED_NAME |
|
|
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
|
_GEN_TOK = None |
|
_GEN_MODEL = None |
|
_EMBED_MODEL = None |
|
_RERANKER = None |
|
|
|
def _ensure_models(): |
|
global _GEN_TOK, _GEN_MODEL, _EMBED_MODEL, _RERANKER |
|
if _EMBED_MODEL is None: |
|
_EMBED_MODEL = SentenceTransformer(EMBED_NAME) |
|
if _RERANKER is None: |
|
_RERANKER = CrossEncoder(RERANK_NAME) |
|
|
|
if _GEN_MODEL is None: |
|
try: |
|
tok = AutoTokenizer.from_pretrained(FLAN_PRIMARY) |
|
mdl = AutoModelForSeq2SeqLM.from_pretrained(FLAN_PRIMARY) |
|
_GEN_TOK, _GEN_MODEL = tok, mdl.to(DEVICE) |
|
print(f"[INFO] Loaded generator: {FLAN_PRIMARY}") |
|
except Exception as e: |
|
print(f"[WARN] Failed to load {FLAN_PRIMARY}. Falling back to {FLAN_FALLBACK}. Error: {e}") |
|
tok = AutoTokenizer.from_pretrained(FLAN_FALLBACK) |
|
mdl = AutoModelForSeq2SeqLM.from_pretrained(FLAN_FALLBACK) |
|
_GEN_TOK, _GEN_MODEL = tok, mdl.to(DEVICE) |
|
print(f"[INFO] Loaded generator: {FLAN_FALLBACK}") |
|
|
|
|
|
|
|
|
|
_INDEX_CACHE = {} |
|
|
|
def _model_key(name: str) -> str: |
|
return name.replace("/", "_") |
|
|
|
def _format_for_e5(texts, as_query=False): |
|
prefix = "query: " if as_query else "passage: " |
|
return [prefix + str(t) for t in texts] |
|
|
|
def _load_index_for_model(model_name: str = DEFAULT_MODEL_FOR_INDEX): |
|
"""Load FAISS index + meta once for a given model.""" |
|
mkey = _model_key(model_name) |
|
if mkey in _INDEX_CACHE: |
|
return _INDEX_CACHE[mkey] |
|
|
|
base = os.path.join(INDEX_ROOT, mkey) |
|
idx_path = os.path.join(base, "index.faiss") |
|
meta_path = os.path.join(base, "meta.parquet") |
|
|
|
if not (os.path.exists(idx_path) and os.path.exists(meta_path)): |
|
|
|
print(f"[WARN] Missing index for {model_name}. Using tiny demo in-memory index.") |
|
demo = pd.DataFrame({ |
|
"name": ["HowDidIDo", "Museotainment", "Movitr"], |
|
"tagline": ["Online evaluation platform", "PacMan & Louvre meet", "Crowdsourced video translation"], |
|
"description": [ |
|
"Public speaking, Presentation skills and interview practice", |
|
"Interactive AR museum tours", |
|
"Video translation with voice and subtitles" |
|
] |
|
}) |
|
model = SentenceTransformer(model_name) |
|
vecs = model.encode(demo["description"].tolist(), normalize_embeddings=True) |
|
dim = vecs.shape[1] |
|
index = faiss.IndexFlatIP(dim) |
|
index.add(np.asarray(vecs, dtype=np.float32)) |
|
_INDEX_CACHE[mkey] = (index, demo) |
|
return _INDEX_CACHE[mkey] |
|
|
|
index = faiss.read_index(idx_path) |
|
meta_df = pd.read_parquet(meta_path) |
|
_INDEX_CACHE[mkey] = (index, meta_df) |
|
return _INDEX_CACHE[mkey] |
|
|
|
|
|
|
|
|
|
def recommend(query_text: str, model_name: str = DEFAULT_MODEL_FOR_INDEX, top_k: int = 3) -> pd.DataFrame: |
|
_ensure_models() |
|
index, meta = _load_index_for_model(model_name) |
|
|
|
|
|
if model_name.startswith("intfloat/e5"): |
|
q_inp = _format_for_e5([query_text], as_query=True) |
|
else: |
|
q_inp = [query_text] |
|
|
|
q_vec = _EMBED_MODEL.encode(q_inp, normalize_embeddings=True) |
|
q_vec = np.asarray(q_vec, dtype=np.float32) |
|
scores, idxs = index.search(q_vec, top_k) |
|
scores, idxs = scores[0], idxs[0] |
|
out = meta.iloc[idxs].copy() |
|
out["score"] = scores |
|
|
|
cols = [c for c in ["row_id","name","tagline","description","score"] if c in out.columns or c=="score"] |
|
return out[cols] if "score" in out.columns else out |
|
|
|
|
|
|
|
|
|
BLOCK_PATTERNS = [ |
|
r"^[A-Z][a-z]+ [A-Z][a-z]+ (Platform|Solution|System|Application|Marketplace)$", |
|
r"^[A-Z][a-z]+ [A-Z][a-z]+$", |
|
r"^[A-Z][a-z]+$", |
|
] |
|
HARD_BLOCK_WORDS = { |
|
"platform","solution","system","application","marketplace", |
|
"ai-powered","ai powered","empower","empowering", |
|
"artificial intelligence","machine learning","augmented reality","virtual reality", |
|
} |
|
GENERIC_WORDS = {"app","assistant","smart","ai","ml","ar","vr","decentralized","blockchain"} |
|
MARKETING_VERBS = {"build","grow","simplify","discover","create","connect","transform","unlock","boost","learn","move","clarify"} |
|
BENEFIT_WORDS = {"faster","smarter","easier","better","safer","clearer","stronger","together","confidently","simply","instantly"} |
|
GOOD_SLOGANS_TO_AVOID_DUP = { |
|
"smarter care, faster decisions", |
|
"checkout built for small brands", |
|
"less guessing. more healing.", |
|
"built to grow with your cart.", |
|
"stand tall. feel better.", |
|
"train your brain to win.", |
|
"your body. your algorithm.", |
|
"play smarter. grow brighter.", |
|
"style that thinks with you." |
|
} |
|
|
|
def _tokens(s: str) -> List[str]: |
|
return re.findall(r"[a-z0-9]{3,}", s.lower()) |
|
|
|
def _jaccard(a: List[str], b: List[str]) -> float: |
|
A, B = set(a), set(b) |
|
return 0.0 if not A or not B else len(A & B) / len(A | B) |
|
|
|
def _titlecase_soft(s: str) -> str: |
|
out = [] |
|
for w in s.split(): |
|
out.append(w if w.isupper() else w.capitalize()) |
|
return " ".join(out) |
|
|
|
def _is_blocked_slogan(s: str) -> bool: |
|
if not s: return True |
|
s_strip = s.strip() |
|
for pat in BLOCK_PATTERNS: |
|
if re.match(pat, s_strip): |
|
return True |
|
s_low = s_strip.lower() |
|
for w in HARD_BLOCK_WORDS: |
|
if w in s_low: |
|
return True |
|
if s_low in GOOD_SLOGANS_TO_AVOID_DUP: |
|
return True |
|
return False |
|
|
|
def _generic_penalty(s: str) -> float: |
|
hits = sum(1 for w in GENERIC_WORDS if w in s.lower()) |
|
return min(1.0, 0.25 * hits) |
|
|
|
def _for_penalty(s: str) -> float: |
|
return 0.3 if re.search(r"\bfor\b", s.lower()) else 0.0 |
|
|
|
def _neighbor_context(neighbors_df: pd.DataFrame) -> str: |
|
if neighbors_df is None or neighbors_df.empty: |
|
return "" |
|
examples = [] |
|
for _, row in neighbors_df.head(3).iterrows(): |
|
tg = str(row.get("tagline", "")).strip() |
|
if 5 <= len(tg) <= 70: |
|
examples.append(f"- {tg}") |
|
return "\n".join(examples) |
|
|
|
def _copies_neighbor(s: str, neighbors_df: pd.DataFrame) -> bool: |
|
if neighbors_df is None or neighbors_df.empty: |
|
return False |
|
s_low = s.lower() |
|
s_toks = _tokens(s_low) |
|
for _, row in neighbors_df.iterrows(): |
|
t = str(row.get("tagline", "")).strip() |
|
if not t: |
|
continue |
|
t_low = t.lower() |
|
if s_low == t_low: |
|
return True |
|
if _jaccard(s_toks, _tokens(t_low)) >= 0.7: |
|
return True |
|
try: |
|
s_vec = _EMBED_MODEL.encode([s])[0]; s_vec = s_vec / np.linalg.norm(s_vec) |
|
for _, row in neighbors_df.head(3).iterrows(): |
|
t = str(row.get("tagline", "")).strip() |
|
if not t: continue |
|
t_vec = _EMBED_MODEL.encode([t])[0]; t_vec = t_vec / np.linalg.norm(t_vec) |
|
if float(np.dot(s_vec, t_vec)) >= 0.85: |
|
return True |
|
except Exception: |
|
pass |
|
return False |
|
|
|
def _clean_slogan(text: str, max_words: int = 8) -> str: |
|
text = text.strip().split("\n")[0] |
|
text = re.sub(r"[\"โโโโ]", "", text) |
|
text = re.sub(r"\s+", " ", text).strip() |
|
text = re.sub(r"^\W+|\W+$", "", text) |
|
words = text.split() |
|
if len(words) > max_words: |
|
text = " ".join(words[:max_words]) |
|
return text |
|
|
|
def _score_candidates(query: str, cands: List[str], neighbors_df: pd.DataFrame) -> List[tuple]: |
|
if not cands: |
|
return [] |
|
ce_scores = np.asarray(_RERANKER.predict([(query, s) for s in cands]), dtype=np.float32) / 5.0 |
|
q_toks = _tokens(query) |
|
results = [] |
|
|
|
neighbor_vecs = [] |
|
if neighbors_df is not None and not neighbors_df.empty: |
|
for _, row in neighbors_df.head(3).iterrows(): |
|
t = str(row.get("tagline","")).strip() |
|
if t: |
|
v = _EMBED_MODEL.encode([t])[0] |
|
neighbor_vecs.append(v / np.linalg.norm(v)) |
|
|
|
for i, s in enumerate(cands): |
|
words = s.split() |
|
brevity = 1.0 - min(1.0, abs(len(words) - 5) / 5.0) |
|
wl = set(w.lower() for w in words) |
|
m_hits = len(wl & MARKETING_VERBS) |
|
b_hits = len(wl & BENEFIT_WORDS) |
|
marketing = min(1.0, 0.2*m_hits + 0.2*b_hits) |
|
g_pen = _generic_penalty(s) |
|
f_pen = _for_penalty(s) |
|
|
|
n_pen = 0.0 |
|
if neighbor_vecs: |
|
try: |
|
s_vec = _EMBED_MODEL.encode([s])[0]; s_vec = s_vec / np.linalg.norm(s_vec) |
|
sim_max = max(float(np.dot(s_vec, nv)) for nv in neighbor_vecs) if neighbor_vecs else 0.0 |
|
n_pen = sim_max |
|
except Exception: |
|
n_pen = 0.0 |
|
|
|
overlap = _jaccard(q_toks, _tokens(s)) |
|
anti_copy = 1.0 - overlap |
|
|
|
score = ( |
|
0.55*float(ce_scores[i]) + |
|
0.20*brevity + |
|
0.15*marketing + |
|
0.03*anti_copy - |
|
0.07*g_pen - |
|
0.03*f_pen - |
|
0.10*n_pen |
|
) |
|
results.append((s, float(score))) |
|
return results |
|
|
|
def generate_slogan(query_text: str, neighbors_df: pd.DataFrame = None, n_samples: int = NUM_SLOGAN_SAMPLES) -> str: |
|
_ensure_models() |
|
ctx = _neighbor_context(neighbors_df) |
|
prompt = ( |
|
"You are a creative brand copywriter. Write short, original, memorable startup slogans (max 8 words).\n" |
|
"Forbidden words: app, assistant, platform, solution, system, marketplace, AI, machine learning, augmented reality, virtual reality, decentralized, empower.\n" |
|
"Focus on clear benefits and vivid verbs. Do not copy the description. Return ONLY a list, one slogan per line.\n\n" |
|
"Good Examples:\n" |
|
"Description: AI assistant for doctors to prioritize patient cases\n" |
|
"Slogan: Less Guessing. More Healing.\n\n" |
|
"Description: Payments for small online stores\n" |
|
"Slogan: Built to Grow with Your Cart.\n\n" |
|
"Description: Neurotech headset to boost focus\n" |
|
"Slogan: Train Your Brain to Win.\n\n" |
|
"Description: Interior design suggestions with AI\n" |
|
"Slogan: Style That Thinks With You.\n\n" |
|
"Bad Examples (avoid these): Innovative AI Platform / Smart App for Everyone / Empowering Small Businesses\n\n" |
|
) |
|
if ctx: |
|
prompt += f"Similar taglines (style only):\n{ctx}\n\n" |
|
prompt += f"Description: {query_text}\nSlogans:" |
|
|
|
input_ids = _GEN_TOK(prompt, return_tensors="pt").input_ids.to(DEVICE) |
|
outputs = _GEN_MODEL.generate( |
|
input_ids, |
|
max_new_tokens=24, |
|
do_sample=True, |
|
top_k=60, |
|
top_p=0.92, |
|
temperature=1.2, |
|
num_return_sequences=n_samples, |
|
repetition_penalty=1.08 |
|
) |
|
raw_cands = [_GEN_TOK.decode(o, skip_special_tokens=True) for o in outputs] |
|
|
|
cand_set = set() |
|
for txt in raw_cands: |
|
for line in txt.split("\n"): |
|
s = _clean_slogan(line) |
|
if not s: |
|
continue |
|
if len(s.split()) < 2 or len(s.split()) > 8: |
|
continue |
|
if _is_blocked_slogan(s): |
|
continue |
|
if _copies_neighbor(s, neighbors_df): |
|
continue |
|
cand_set.add(_titlecase_soft(s)) |
|
|
|
if not cand_set: |
|
return _clean_slogan(_GEN_TOK.decode(outputs[0], skip_special_tokens=True)) |
|
|
|
scored = _score_candidates(query_text, sorted(cand_set), neighbors_df) |
|
if not scored: |
|
return _clean_slogan(_GEN_TOK.decode(outputs[0], skip_special_tokens=True)) |
|
|
|
scored.sort(key=lambda x: x[1], reverse=True) |
|
return scored[0][0] |
|
|
|
|
|
|
|
|
|
EXAMPLES = [ |
|
"AI coach for improving public speaking skills", |
|
"Augmented reality app for interactive museum tours", |
|
"Voice-controlled task manager for remote teams", |
|
"Machine learning system for predicting crop yields", |
|
"Platform for AI-assisted interior design suggestions", |
|
] |
|
|
|
def pipeline(user_input: str): |
|
|
|
recs = recommend(user_input, model_name=DEFAULT_MODEL_FOR_INDEX, top_k=3) |
|
|
|
|
|
slogan = generate_slogan(user_input, neighbors_df=recs, n_samples=NUM_SLOGAN_SAMPLES) |
|
|
|
|
|
recs = recs.reset_index(drop=True) |
|
|
|
if "name" not in recs.columns: recs["name"] = "" |
|
if "tagline" not in recs.columns: recs["tagline"] = "" |
|
if "description" not in recs.columns: recs["description"] = "" |
|
|
|
recs.loc[len(recs)] = { |
|
"row_id": np.nan, |
|
"name": "Synthetic Example", |
|
"tagline": slogan, |
|
"description": user_input, |
|
"score": np.nan |
|
} |
|
|
|
return recs[["name","tagline","description","score"]], slogan |
|
|
|
with gr.Blocks(title="SloganAI โ Recommendations + Slogan Generator") as demo: |
|
gr.Markdown("## SloganAI โ Top-3 Recommendations + A High-Quality Generated Slogan\nEnter a startup idea, click **Submit**, or try an example.") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
inp = gr.Textbox(label="Enter a startup description", lines=3, placeholder="e.g., AI coach for improving public speaking skills") |
|
ex = gr.Examples(EXAMPLES, inputs=inp, label="Oneโclick examples") |
|
btn = gr.Button("Submit", variant="primary") |
|
with gr.Column(scale=2): |
|
out_df = gr.Dataframe(headers=["Name","Tagline","Description","Score"], label="Top 3 + Generated") |
|
out_sg = gr.Textbox(label="Generated Slogan", interactive=False) |
|
|
|
btn.click(fn=pipeline, inputs=inp, outputs=[out_df, out_sg]) |
|
|
|
if __name__ == "__main__": |
|
_ensure_models() |
|
demo.queue().launch() |
|
|