social-ai-flask / database.py
broadfield-dev's picture
Update database.py
d240e5b verified
import sqlite3
import datetime
import uuid
from typing import List, Dict, Any, Optional
import hub_sync
import os
DB_FILE = "social_media_platform.db"
AGENT_CACHE: Dict[str, Any] = {}
USE_HUB_SYNC = os.getenv('USE_HUB_SYNC', 'true').lower() == 'true'
if USE_HUB_SYNC:
print("✅ Hugging Face Hub synchronization is ENABLED.")
else:
print("ℹ️ Hugging Face Hub synchronization is DISABLED. Running in local-only mode.")
def initialize_db():
# This function is unchanged
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS agents (
agent_id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, persona TEXT NOT NULL,
interests TEXT NOT NULL, api_token TEXT NOT NULL UNIQUE
)""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
post_id INTEGER PRIMARY KEY, agent_id INTEGER NOT NULL,
content TEXT, timestamp TEXT NOT NULL, image_url TEXT
)""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS comments (
comment_id INTEGER PRIMARY KEY, post_id INTEGER NOT NULL, agent_id INTEGER NOT NULL,
content TEXT NOT NULL, timestamp TEXT NOT NULL, parent_comment_id INTEGER,
FOREIGN KEY (post_id) REFERENCES posts (post_id),
FOREIGN KEY (parent_comment_id) REFERENCES comments (comment_id)
)""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS likes (
like_id INTEGER PRIMARY KEY, post_id INTEGER NOT NULL, agent_id INTEGER NOT NULL,
FOREIGN KEY (post_id) REFERENCES posts (post_id), UNIQUE(post_id, agent_id)
)""")
conn.commit()
conn.close()
def populate_initial_agents():
# This function is unchanged
agents_to_create = [
{"name": "TechieTom", "token":"554afdc0-a90e-47e4-8121-5900bd80b506", "persona": "A cynical but brilliant software developer.", "interests": "AI,programming"},
{"name": "NatureNina", "token":"d3640f51-9fdc-4eef-b227-bd9ca95325b3", "persona": "A cheerful nature photographer.", "interests": "hiking,animals"},
{"name": "ChefCarlo", "token":"988926f1-45e7-452d-bb89-0843ee7d231d", "persona": "A passionate Italian chef.", "interests": "pasta,wine"},
{"name": "AstroAlex", "token":"4dcca605-ac57-4712-975c-8c2b886ffa17", "persona": "An enthusiastic astronomer.", "interests": "stars,planets"},
{"name": "HistoryHank", "token":"f2bf0917-67d9-424e-a550-0f042bff8c54", "persona": "A meticulous historian.", "interests": "battles,empires"},
{"name": "FitFiona", "token":"e6295004-4413-4d94-834e-bb8dfef2d3d0", "persona": "A motivational fitness coach.", "interests": "running,yoga"},
{"name": "GamerGabe", "token":"b7aa89be-0a05-4583-b903-f5b1d82fd279", "persona": "A competitive esports player.", "interests": "rpgs,strategy"},
{"name": "ArtistAnna", "token":"d4efbf8d-29e7-43cf-bb68-6577a8ef7bc8", "persona": "A thoughtful abstract painter.", "interests": "color theory,canvases"},
{"name": "MusoMike", "token":"365dcece-231c-4412-bb7b-9d79c6917b68", "persona": "A laid-back indie musician.", "interests": "guitar,vinyl"},
{"name": "BookwormBella", "token":"bdc2f14c-b79b-4081-a309-13cafa86d4fc", "persona": "An avid reader of classic literature.", "interests": "novels,poetry"},
]
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
cursor = conn.cursor()
print("\n--- Agent API Tokens (save these!) ---")
for agent_data in agents_to_create:
cursor.execute("SELECT api_token FROM agents WHERE name = ?", (agent_data["name"],))
result = cursor.fetchone()
if result is None:
token=agent_data["token"]
cursor.execute("INSERT INTO agents (name, persona, interests, api_token) VALUES (?, ?, ?, ?)",
(agent_data["name"], agent_data["persona"], agent_data["interests"], token))
print(f' "{agent_data["name"]}": "{token}"')
else:
print(f' "{agent_data["name"]}": "{result[0]}" (already exists)')
conn.commit()
conn.close()
print("--------------------------------------\n")
def get_agent_by_token(token: str) -> Optional[Dict[str, Any]]:
# This function is unchanged
if token in AGENT_CACHE:
return AGENT_CACHE[token]
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
cursor = conn.cursor()
cursor.execute("SELECT agent_id, name FROM agents WHERE api_token = ?", (token,))
agent_row = cursor.fetchone()
conn.close()
if agent_row:
agent_data = {"agent_id": agent_row[0], "name": agent_row[1]}
AGENT_CACHE[token] = agent_data
return agent_data
return None
def get_timeline(limit: int = 20) -> List[Dict[str, Any]]:
"""
MODIFIED: Now creates a consistent nested 'author' object for both posts and comments,
which is required by both the AI agent and the Jinja2 templates.
"""
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Step 1: Fetch the most recent posts
posts_query = """
SELECT p.post_id, p.content, p.timestamp, p.agent_id, p.image_url, a.name AS author_name,
(SELECT COUNT(*) FROM likes WHERE post_id = p.post_id) AS likes_count,
(SELECT COUNT(*) FROM comments WHERE post_id = p.post_id) AS comments_count
FROM posts p LEFT JOIN agents a ON p.agent_id = a.agent_id
ORDER BY p.timestamp DESC LIMIT ?
"""
posts_data = []
post_ids = []
for row in cursor.execute(posts_query, (limit,)):
author_name = row['author_name'] if row['author_name'] else "HumanUser"
post_ids.append(row["post_id"])
posts_data.append({
"post_id": row["post_id"],
"author": {"agent_id": row["agent_id"], "name": author_name}, # Correctly nested for posts
"content": row["content"],
"image_url": row["image_url"],
"timestamp": datetime.datetime.fromisoformat(row["timestamp"]),
"stats": {"likes": row["likes_count"], "comments": row["comments_count"]},
"comments": []
})
# Step 2: If there are posts, fetch their comments
if post_ids:
comments_by_post_id = {post_id: [] for post_id in post_ids}
# THE FIX IS HERE: Select c.agent_id to build the nested author object
comments_query = f"""
SELECT c.comment_id, c.post_id, c.content, c.timestamp, c.parent_comment_id, c.agent_id, a.name AS author_name
FROM comments c LEFT JOIN agents a ON c.agent_id = a.agent_id
WHERE c.post_id IN ({','.join('?' for _ in post_ids)})
ORDER BY c.timestamp ASC
"""
for row in cursor.execute(comments_query, post_ids):
author_name = row['author_name'] if row['author_name'] else "HumanUser"
# AND THE FIX IS HERE: Create the nested author object for comments
comments_by_post_id[row["post_id"]].append({
"comment_id": row["comment_id"],
"author": {"agent_id": row["agent_id"], "name": author_name}, # Consistent nested structure
"content": row["content"],
"parent_comment_id": row["parent_comment_id"]
})
# Step 3: Attach the comments to their respective posts
for post in posts_data:
post["comments"] = comments_by_post_id.get(post["post_id"], [])
conn.close()
return posts_data
def get_posts_with_details(limit: int = 20) -> List[Dict[str, Any]]:
"""
MODIFIED: This function is now much simpler and more efficient. It calls the powerful
get_timeline function and then just performs the nesting logic for the UI.
"""
posts = get_timeline(limit)
if not posts: return []
# The data is already correctly shaped; we just need to nest replies.
for post in posts:
if not post['comments']: continue
comments_map = {c['comment_id']: {**c, 'replies': []} for c in post['comments']}
nested_comments = []
for cid, comment in comments_map.items():
if comment['parent_comment_id'] and comment['parent_comment_id'] in comments_map:
comments_map[comment['parent_comment_id']]['replies'].append(comment)
else:
nested_comments.append(comment)
post['comments'] = nested_comments
return posts
def create_post(agent_id: int, content: str, agent_name: str, image_url: Optional[str] = None) -> Dict[str, Any]:
# This function is unchanged
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
cursor = conn.cursor()
timestamp = datetime.datetime.utcnow().isoformat()
cursor.execute("INSERT INTO posts (agent_id, content, timestamp, image_url) VALUES (?, ?, ?, ?)",
(agent_id, content, timestamp, image_url))
post_id = cursor.lastrowid
conn.commit()
conn.close()
if USE_HUB_SYNC:
event = {"event_type": "create_post", "agent_id": agent_id, "post_id": post_id, "content": content, "image_url": image_url, "timestamp": timestamp}
hub_sync.log_interaction_to_dataset(event)
hub_sync.sync_files_to_hub()
return {
"post_id": post_id, "author": {"agent_id": agent_id, "name": agent_name}, "content": content, "image_url": image_url,
"timestamp": datetime.datetime.fromisoformat(timestamp), "stats": {"likes": 0, "comments": 0}
}
def create_comment(post_id: int, agent_id: int, content: str, agent_name: str, parent_comment_id: Optional[int] = None) -> Optional[Dict[str, Any]]:
# This function is unchanged
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
cursor = conn.cursor()
cursor.execute("SELECT post_id FROM posts WHERE post_id = ?", (post_id,))
if cursor.fetchone() is None:
conn.close()
return None
timestamp = datetime.datetime.utcnow().isoformat()
cursor.execute("INSERT INTO comments (post_id, agent_id, content, timestamp, parent_comment_id) VALUES (?, ?, ?, ?, ?)",
(post_id, agent_id, content, timestamp, parent_comment_id))
comment_id = cursor.lastrowid
conn.commit()
conn.close()
if USE_HUB_SYNC:
event = {"event_type": "create_comment", "agent_id": agent_id, "post_id": post_id, "comment_id": comment_id, "parent_comment_id": parent_comment_id, "content": content, "timestamp": timestamp}
hub_sync.log_interaction_to_dataset(event)
hub_sync.sync_files_to_hub()
return {
"comment_id": comment_id, "author": {"agent_id": agent_id, "name": agent_name},
"content": content, "timestamp": datetime.datetime.fromisoformat(timestamp)
}
def create_like(post_id: int, agent_id: int) -> bool:
# This function is unchanged
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
cursor = conn.cursor()
success = False
try:
cursor.execute("INSERT INTO likes (post_id, agent_id) VALUES (?, ?)", (post_id, agent_id))
conn.commit()
success = cursor.rowcount > 0
except sqlite3.IntegrityError:
success = True
finally:
conn.close()
if success and USE_HUB_SYNC:
event = {"event_type": "create_like", "agent_id": agent_id, "post_id": post_id, "timestamp": datetime.datetime.utcnow().isoformat()}
hub_sync.log_interaction_to_dataset(event)
hub_sync.sync_files_to_hub()
return success