#!/usr/bin/env python3
"""
Enhanced Series Mosaics Integration for Tag Collector Game
This module provides improved integration between series-based mosaics,
the enhanced tag storage system, and the tag collector game.
"""
import os
import json
import io
import time
import streamlit as st
from PIL import Image
from game_constants import RARITY_LEVELS, TAG_CURRENCY_NAME, ENKEPHALIN_CURRENCY_NAME, ENKEPHALIN_ICON
# Import the improved tag mosaic implementation
from tag_mosaic import RevealMosaic
# Default paths
DEFAULT_MOSAICS_DIR = "mosaics"
DEFAULT_TEMPLATES_DIR = "mosaics/templates"
SERIES_COMPLETION_MILESTONES = [25, 50, 75, 100] # Percentage milestones
BASE_ENKEPHALIN_REWARDS = { # Base rewards for each milestone
25: 100, # 25% completion
50: 500, # 50% completion
75: 1000, # 75% completion
100: 5000 # 100% completion
}
# Define collection size tiers
COLLECTION_SIZE_MULTIPLIERS = {
"small": 1.0, # 1-20 tags
"medium": 1.5, # 21-50 tags
"large": 2.0, # 51-100 tags
"huge": 3.0, # 101-200 tags
"massive": 5.0, # 201+ tags
"all_tags": 10.0
}
def get_collection_size_tier(total_tags):
"""
Determine the size tier of a collection based on total tag count
Args:
total_tags: Total number of tags in the collection
Returns:
String representing the size tier
"""
if total_tags <= 20:
return "small"
elif total_tags <= 50:
return "medium"
elif total_tags <= 100:
return "large"
elif total_tags <= 200:
return "huge"
elif total_tags <= 1000:
return "massive"
else:
return "all_tags"
def calculate_enkephalin_reward(percentage, total_tags):
"""
Calculate the enkephalin reward for a specific milestone
Args:
percentage: Completion percentage milestone (25, 50, 75, 100)
total_tags: Total number of tags in the collection
Returns:
Integer amount of enkephalin to reward
"""
# Get the base reward for this percentage
base_reward = BASE_ENKEPHALIN_REWARDS.get(percentage, 0)
# Get the collection size multiplier
size_tier = get_collection_size_tier(total_tags)
multiplier = COLLECTION_SIZE_MULTIPLIERS.get(size_tier, 1.0)
# Calculate the final reward (round to nearest integer)
final_reward = round(base_reward * multiplier)
return max(1, final_reward) # Ensure at least 1 enkephalin is rewarded
def check_and_award_milestone_rewards(mosaic_name, filtered_tags, total_tags):
"""
Check if new milestones have been reached and award enkephalin rewards
Args:
mosaic_name: Unique identifier for this mosaic/collection
filtered_tags: Dictionary with the tags that belong to this collection
total_tags: Total number of tags in this collection
Returns:
Tuple of (awarded_milestone, reward_amount) or (None, 0) if no reward
"""
import streamlit as st
# Skip if no tags or invalid total
if not filtered_tags or total_tags <= 0:
return None, 0
# Calculate current completion percentage
current_tags = len(filtered_tags)
current_percentage = (current_tags / total_tags) * 100
# Round down to the nearest whole percentage
current_percentage = int(current_percentage)
# Get the milestones already awarded for this collection
milestone_key = f"collection_milestones_{mosaic_name}"
if milestone_key not in st.session_state:
st.session_state[milestone_key] = []
awarded_milestones = st.session_state[milestone_key]
# Check each milestone
for milestone in SERIES_COMPLETION_MILESTONES:
# If we've reached this milestone and haven't awarded it yet
if current_percentage >= milestone and milestone not in awarded_milestones:
# Calculate reward
reward = calculate_enkephalin_reward(milestone, total_tags)
# Add to awarded milestones
awarded_milestones.append(milestone)
st.session_state[milestone_key] = awarded_milestones
# Give the reward
if not hasattr(st.session_state, 'enkephalin'):
st.session_state.enkephalin = 0
# Update enkephalin - using direct assignment to ensure it changes
st.session_state.enkephalin += reward
# Track in game stats if available
if hasattr(st.session_state, 'game_stats'):
if 'enkephalin_generated' in st.session_state.game_stats:
st.session_state.game_stats['enkephalin_generated'] += reward
else:
st.session_state.game_stats['enkephalin_generated'] = reward
# Save game state after awarding reward
try:
import tag_storage
tag_storage.save_game(st.session_state)
except (ImportError, Exception) as e:
print(f"Error saving game after enkephalin award: {str(e)}")
# Return the awarded milestone and reward
return milestone, reward
# No new milestone reached
return None, 0
def ensure_directories():
"""Ensure all required directories exist"""
# Create mosaics directory if it doesn't exist
if not os.path.exists(DEFAULT_MOSAICS_DIR):
os.makedirs(DEFAULT_MOSAICS_DIR)
# Create templates directory if it doesn't exist
if not os.path.exists(DEFAULT_TEMPLATES_DIR):
os.makedirs(DEFAULT_TEMPLATES_DIR)
def load_series_data(regenerate=False):
"""
Load series data for mosaics from model directory
Args:
regenerate: Force regeneration of series data even if file exists
Returns:
Dictionary with series information and mosaic configs
"""
# Load existing series data
try:
with open('series_groups.json', 'r', encoding='utf-8') as f:
series_data = json.load(f)
print(f"Loaded series data from {'series_groups.json'}")
return series_data
except Exception as e:
print(f"Error loading series data: {str(e)}")
# Return a default empty structure instead of None
return {
"stats": {
"total_tags": 0,
"series_tags": 0,
"regular_tags": 0,
"unique_series": 0,
"total_characters": 0
},
"series": {},
"mosaic_configs": []
}
def filter_collected_tags_by_series(collected_tags, series_data, series_name):
"""
Filter collected tags to only include tags from a specific series
Args:
collected_tags: Dictionary of collected tags from session state
series_data: Series data from load_series_data()
series_name: Name of the series to filter for
Returns:
Dictionary with only the tags belonging to the specified series
"""
if 'series' not in series_data or series_name not in series_data['series']:
return {}
# Get tags for this series
series_tags = set(series_data['series'][series_name]['tags'])
# Filter collected tags to only include this series
filtered_tags = {}
for tag, info in collected_tags.items():
if tag in series_tags:
filtered_tags[tag] = info
return filtered_tags
def create_filtered_collections(collected_tags, series_data):
"""
Create filtered tag collections for each series
Args:
collected_tags: Main collected_tags dictionary
series_data: Series data from load_series_data()
Returns:
Dictionary mapping series keys to filtered tag dictionaries
"""
filtered_collections = {}
# Handle case where series_data is None or doesn't have 'series' key
if not series_data or 'series' not in series_data:
return filtered_collections
# Create a filtered collection for each series
for series_name, series_info in series_data['series'].items():
# Skip series with too few tags
if series_info['tag_count'] <= 1: # Skip series with just 1 tag
continue
# Create a series key (safe for filenames)
series_key = series_name.replace(" ", "_").replace(":", "").lower()
# Filter tags for this series
filtered_tags = filter_collected_tags_by_series(collected_tags, series_data, series_name)
# Add to return dictionary
filtered_collections[series_key] = {
"series_name": series_name,
"filtered_tags": filtered_tags,
"tag_count": len(filtered_tags),
"total_tags": series_info['tag_count']
}
return filtered_collections
def render_tag_reveal_feature(st, series_name, filtered_tags, series_data, revealed_tags_key):
"""
Render a feature that allows users to pay to reveal missing tags
Args:
st: Streamlit instance
series_name: Name of the series
filtered_tags: The tags the user has already collected for this series
series_data: The full series data loaded from the JSON file
revealed_tags_key: The session key for storing revealed tags
"""
# Skip if series data is invalid
if 'series' not in series_data or series_name not in series_data['series']:
return
# Get all tags for this series
all_series_tags = set(series_data['series'][series_name]['tags'])
# Calculate missing tags
collected_tag_names = set(filtered_tags.keys())
missing_tags = all_series_tags - collected_tag_names
# If no missing tags, don't show the feature
if not missing_tags:
st.success("Congratulations! You have collected all tags in this series!")
return
# Check if we already have a revealed tags session state for this series
if revealed_tags_key not in st.session_state:
st.session_state[revealed_tags_key] = set()
# Get previously revealed tags
revealed_tags = st.session_state[revealed_tags_key]
# Calculate truly unknown tags (not collected and not revealed)
unknown_tags = missing_tags - revealed_tags
# Show tag reveal section
with st.expander("📋 Missing Tags Explorer", expanded=True):
# Explain the feature
st.write("""
Use this feature to explore missing tags in this collection.
You won't actually collect these tags, but you'll know what to look for in images!
""")
# Show stats
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Tags Collected", len(collected_tag_names))
with col2:
st.metric("Tags Revealed", len(revealed_tags))
with col3:
st.metric("Tags Unknown", len(unknown_tags))
# Cost calculation - only use enkephalin
enkephalin_cost = 2 # Base cost in Enkephalin
# Adjust cost based on rarity
if len(unknown_tags) <= 5:
# Very few tags remaining - more expensive
enkephalin_cost = 10
elif len(unknown_tags) <= 15:
# Getting close to completion
enkephalin_cost = 5
# Show reveal option
st.write("### Reveal Missing Tags")
# Get current resources
current_enkephalin = st.session_state.enkephalin if hasattr(st.session_state, 'enkephalin') else 0
# Create centered column for the payment option
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.write(f"**Cost: {enkephalin_cost} {ENKEPHALIN_ICON} {ENKEPHALIN_CURRENCY_NAME}**")
can_afford_enkephalin = current_enkephalin >= enkephalin_cost
if st.button("Reveal Tag", disabled=not can_afford_enkephalin or not unknown_tags):
if can_afford_enkephalin and unknown_tags:
# Pick a random tag to reveal
import random
tag_to_reveal = random.choice(list(unknown_tags))
# Add to revealed tags
revealed_tags.add(tag_to_reveal)
st.session_state[revealed_tags_key] = revealed_tags
# Deduct enkephalin
st.session_state.enkephalin -= enkephalin_cost
if hasattr(st.session_state.game_stats, 'enkephalin_spent'):
st.session_state.game_stats['enkephalin_spent'] += enkephalin_cost
else:
st.session_state.game_stats['enkephalin_spent'] = enkephalin_cost
# Save game state after purchase
try:
import tag_storage
tag_storage.save_game(st.session_state)
except (ImportError, Exception):
pass
# Show success message
st.success(f"Revealed new tag: **{tag_to_reveal}**")
# Force a rerun to update the UI
st.rerun()
# Show revealed tags
if revealed_tags:
st.write("### Revealed Tags")
st.write("These tags are part of this collection but you haven't collected them yet:")
# Create columns for tags
cols = st.columns(3)
for i, tag in enumerate(sorted(revealed_tags)):
col_idx = i % 3
with cols[col_idx]:
st.write(f"- {tag}")
# Add a button to reset revealed tags
if st.button("Reset Revealed Tags"):
st.session_state[revealed_tags_key] = set()
st.success("Reset successful! All revealed tags have been forgotten.")
st.rerun()
def display_series_mosaic(mosaic_name, mosaic_title, filtered_tags, total_tags=None, width=1024, height=1024):
"""
Display a mosaic for a specific tag collection or series
Args:
mosaic_name: Unique identifier for this mosaic
mosaic_title: Display title for the mosaic
filtered_tags: Dictionary with the tags that belong to this collection
total_tags: Total number of tags in this collection (if None, uses len(filtered_tags))
width: Width of the mosaic in pixels (not used in this implementation)
height: Height of the mosaic in pixels (not used in this implementation)
"""
import streamlit as st
if total_tags is None:
# Use the number of filtered tags as the total or set minimum
total_tags = max(len(filtered_tags), 20) # At least 20 tags
# Create a container for the mosaic
with st.container():
st.subheader(f"🧩 {mosaic_title} Mosaic")
# Initialize the mosaic if not already in session state
mosaic_key = f"{mosaic_name}_mosaic"
if mosaic_key not in st.session_state:
# Create the reveal mosaic
st.session_state[mosaic_key] = RevealMosaic(
total_tags=total_tags,
mosaic_name=mosaic_name
)
# Get the mosaic
mosaic = st.session_state[mosaic_key]
# Show collection stats and progress
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Discovered Tags", len(filtered_tags))
with col2:
st.metric("Total Tags", total_tags)
with col3:
completion = (len(filtered_tags) / total_tags) * 100 if total_tags > 0 else 0
st.metric("Completion", f"{completion:.1f}%")
# Always display milestone tracker at the top
display_milestone_tracker(mosaic_name, filtered_tags, total_tags)
# Add update button
update_requested = st.button("🔄 Update Mosaic", key=f"update_btn_{mosaic_name}")
# Display the manually update message
if not update_requested:
st.info("Click the 'Update Mosaic' button to process new tag discoveries and update the image.")
# Update the mosaic with the filtered tags only if requested
newly_revealed = 0
if update_requested and filtered_tags:
# Show processing spinner
with st.spinner("Processing tag discoveries and updating mosaic..."):
# Get optional metadata if available
metadata = st.session_state.model.dataset if hasattr(st.session_state, 'model') else None
newly_revealed = mosaic.update_with_tags(filtered_tags, metadata, force_update=True)
# Check for milestone rewards after updating
milestone, reward = check_and_award_milestone_rewards(mosaic_name, filtered_tags, total_tags)
# Show appropriate messages based on update results
if milestone is not None:
# Show milestone achievement message with celebration
st.balloons()
st.success(f"🏆 MILESTONE ACHIEVED! {milestone}% Completion of {mosaic_title}!")
st.success(f"Rewarded with {reward} {ENKEPHALIN_ICON} {ENKEPHALIN_CURRENCY_NAME}!")
# Force update of UI to show new enkephalin amount
st.rerun()
elif newly_revealed > 0:
st.success(f"Successfully updated! Revealed {newly_revealed} new pixels.")
else:
st.info("No new pixels to reveal.")
# Get mosaic stats
stats = mosaic.get_stats()
# Show completion stats
col1, col2 = st.columns(2)
with col1:
st.write(f"**Completion:** {stats['completion_percentage']:.2f}%")
st.write(f"**Pixels Revealed:** {stats['revealed_pixels']} / {stats['total_pixels']}")
with col2:
st.write(f"**Status:** {stats['completion_pattern']}")
if newly_revealed > 0:
st.write(f"**Newly Revealed:** {newly_revealed} pixels")
# Display the mosaic image
mosaic_img = mosaic.get_image()
# Convert to bytes for display
img_bytes = io.BytesIO()
mosaic_img.save(img_bytes, format='PNG')
img_bytes.seek(0)
# Show the mosaic image
st.image(img_bytes, caption=f"Your {mosaic_title} Mosaic - Each discovery reveals more of the hidden image",
use_container_width=True)
# Only show newly added tags
if mosaic.highlighted_tags:
with st.expander("Recently Added Tags", expanded=False):
for tag, _, _, rarity in mosaic.highlighted_tags:
color = RARITY_LEVELS.get(rarity, {}).get("color", "#AAAAAA")
st.markdown(
f"{tag}",
unsafe_allow_html=True
)
def display_milestone_tracker(mosaic_name, filtered_tags, total_tags):
"""
Display a visual tracker for collection milestones
Args:
mosaic_name: Unique identifier for this mosaic/collection
filtered_tags: Dictionary with the tags that belong to this collection
total_tags: Total number of tags in this collection
"""
import streamlit as st
# Skip if no tags or invalid total
if not filtered_tags or total_tags <= 0:
return
# Get the milestones already awarded for this collection
milestone_key = f"collection_milestones_{mosaic_name}"
if milestone_key not in st.session_state:
st.session_state[milestone_key] = []
awarded_milestones = st.session_state[milestone_key]
# Calculate current completion percentage
current_tags = len(filtered_tags)
current_percentage = (current_tags / total_tags) * 100
# Prepare milestone data with rewards
milestone_data = []
for milestone in SERIES_COMPLETION_MILESTONES:
reward = calculate_enkephalin_reward(milestone, total_tags)
milestone_data.append({
"percentage": milestone,
"reward": reward,
"achieved": milestone in awarded_milestones,
"is_next": milestone not in awarded_milestones and current_percentage < milestone
})
# Sort by percentage
milestone_data.sort(key=lambda x: x["percentage"])
# Display milestone expander
with st.expander("Collection Milestones", expanded=len(awarded_milestones) > 0):
st.write("Complete collection milestones to earn Enkephalin rewards:")
# Display progress bar
st.progress(min(1.0, current_percentage / 100), text=f"Current Progress: {current_percentage:.1f}%")
# Display milestone columns
cols = st.columns(len(milestone_data))
for i, milestone in enumerate(milestone_data):
with cols[i]:
# Format with emoji based on status
if milestone["achieved"]:
st.markdown(f"### ✅ {milestone['percentage']}%")
st.markdown(f"Reward: **{milestone['reward']}** {ENKEPHALIN_ICON}")
st.markdown("**COMPLETED!**")
elif milestone["is_next"]:
# This is the next milestone to achieve
percentage_needed = milestone["percentage"] - current_percentage
tags_needed = int((milestone["percentage"] / 100 * total_tags) - current_tags)
st.markdown(f"### 🎯 {milestone['percentage']}%")
st.markdown(f"Reward: **{milestone['reward']}** {ENKEPHALIN_ICON}")
st.markdown(f"Need: **{tags_needed}** more tags")
else:
st.markdown(f"### 🔜 {milestone['percentage']}%")
st.markdown(f"Reward: **{milestone['reward']}** {ENKEPHALIN_ICON}")
st.markdown("Coming up!")
# Display collection size tier info
size_tier = get_collection_size_tier(total_tags)
tier_multiplier = COLLECTION_SIZE_MULTIPLIERS.get(size_tier, 1.0)
st.markdown("---")
st.markdown(f"**Collection Size Tier:** {size_tier.capitalize()} Collection")
st.markdown(f"**Reward Multiplier:** {tier_multiplier}x")
def display_series_mosaics():
"""
Display mosaics for each series found in the metadata.
Uses the improved tag storage system for better integration.
"""
import streamlit as st
# Ensure all directories exist
ensure_directories()
# Check if we have collected tags
if not hasattr(st.session_state, 'collected_tags'):
st.info("Start scanning images to collect tags!")
return
# Load series data from model directory
series_data = load_series_data()
# Handle case where no series were found
if not series_data or 'series' not in series_data or not series_data['series']:
st.warning("Series data not found. Make sure series_groups.json exists in the model directory.")
return
# Create filtered collections for each series
filtered_collections = create_filtered_collections(st.session_state.collected_tags, series_data)
# Create the mosaic configs with additional data
mosaic_configs = []
# Add all series configurations
for config in series_data.get("mosaic_configs", []):
# Skip the main collection - it's handled by the main game
if config["name"] == "main":
continue
# Add the session state key to the config
series_key = config["name"]
config_with_session_key = config.copy()
# Add discovered tags info if this series is in our filtered collections
if series_key in filtered_collections:
config_with_session_key["discovered_tags"] = filtered_collections[series_key]["tag_count"]
config_with_session_key["filtered_tags"] = filtered_collections[series_key]["filtered_tags"]
else:
# For series not in filtered collections, set discovered tags to 0
config_with_session_key["discovered_tags"] = 0
config_with_session_key["filtered_tags"] = {}
mosaic_configs.append(config_with_session_key)
# Display the collection dropdown and selected mosaic
st.subheader("🎮 Series Collections")
# Organize collection options in two ways:
# 1. Collections with discovered tags, sorted by most discovered first
collections_with_tags = [c for c in mosaic_configs if c.get("discovered_tags", 0) > 0]
collections_with_tags.sort(key=lambda x: x.get("discovered_tags", 0), reverse=True)
# 2. Collections with no tags yet
collections_without_tags = [c for c in mosaic_configs if c.get("discovered_tags", 0) == 0]
# Create a selection menu
collection_options = []
# First add collections with tags
if collections_with_tags:
collection_options.append("--- Collections with discoveries ---")
for config in collections_with_tags:
collection_options.append(f"{config['title']} ({config.get('discovered_tags', 0)}/{config['total_tags']} tags)")
# Then add collections without tags
if collections_without_tags:
collection_options.append("--- Collections to discover ---")
for config in collections_without_tags:
collection_options.append(f"{config['title']} (0/{config['total_tags']} tags)")
# Get the index of the currently selected collection
if 'selected_collection_index' not in st.session_state:
# Default to first real collection (index 1 if there are collections with tags)
st.session_state.selected_collection_index = 1 if collections_with_tags else len(collections_with_tags) + 1
# Create the collection dropdown
if collection_options: # Only create dropdown if we have options
selected_option = st.selectbox(
"Select Collection:",
options=collection_options,
index=min(st.session_state.selected_collection_index, len(collection_options)-1)
)
# Update the selected collection index
st.session_state.selected_collection_index = collection_options.index(selected_option)
# Skip displaying if a separator is selected
if "---" in selected_option:
st.info("Please select a collection from the dropdown.")
return
# Find the corresponding config for the selected collection
selected_config = None
selected_series_name = None
for config in mosaic_configs:
option_text = f"{config['title']} ({config.get('discovered_tags', 0)}/{config['total_tags']} tags)"
if option_text == selected_option:
selected_config = config
# Extract series name from config title
if config['name'] == 'miku_collection':
selected_series_name = 'vocaloid' # Special case for Miku
elif 'series_name' in config:
selected_series_name = config['series_name']
else:
selected_series_name = config['name'].replace('_', ' ')
break
# Display the selected collection
if selected_config:
# Create a container for this collection
with st.container():
# Get the filtered tags for this collection
filtered_tags = selected_config.get("filtered_tags", {})
# Get mosaic name and total tags
mosaic_name = selected_config["name"]
total_tags = selected_config["total_tags"]
# Get dimensions from config or use defaults
width = selected_config.get("width", 1536)
height = selected_config.get("height", 2040)
# Display the mosaic
display_series_mosaic(
mosaic_name=mosaic_name,
mosaic_title=selected_config["title"],
filtered_tags=filtered_tags,
total_tags=total_tags
)
# Show tag reveal feature if we have series_data
if selected_series_name:
revealed_tags_key = f"revealed_tags_{mosaic_name}"
render_tag_reveal_feature(
st,
selected_series_name,
filtered_tags,
series_data,
revealed_tags_key
)
# Show all collected tags for this series
if filtered_tags:
with st.expander(f"All Collected {selected_config['title']} Tags", expanded=False):
# Group by rarity
by_rarity = {}
for tag, info in filtered_tags.items():
rarity = info.get("rarity", "Unknown")
if rarity not in by_rarity:
by_rarity[rarity] = []
by_rarity[rarity].append(tag)
# Display by rarity
for rarity in RARITY_LEVELS.keys():
if rarity in by_rarity:
color = RARITY_LEVELS[rarity]["color"]
st.markdown(f"### {rarity}", unsafe_allow_html=True)
# Create columns for tags
cols = st.columns(3)
for i, tag in enumerate(sorted(by_rarity[rarity])):
col_idx = i % 3
with cols[col_idx]:
st.write(f"- {tag}")
else:
st.info("No series collections found. Make sure series_groups.json exists in the model directory.")