|
"""
|
|
Enhanced Tag Storage System for Tag Collector Game
|
|
|
|
This module provides functions for efficient storage and retrieval of tag data and game state,
|
|
reducing reliance on Streamlit's session state for better performance.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import streamlit as st
|
|
from typing import Dict, Any, Set, Optional, Tuple, List
|
|
|
|
|
|
DEFAULT_TAGS_FILE = "tag_collection.json"
|
|
DEFAULT_MOSAIC_DATA_FILE = "mosaic_data.json"
|
|
DEFAULT_GAME_STATE_FILE = "game_state.json"
|
|
DEFAULT_LIBRARY_STATE_FILE = "library_state.json"
|
|
DEFAULT_ESSENCE_STATE_FILE = "essence_state.json"
|
|
TAG_STORAGE_VERSION = "1.2.0"
|
|
|
|
def ensure_storage_directory(directory: str = "storage") -> str:
|
|
"""Ensure the storage directory exists and return its path"""
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
return directory
|
|
|
|
def get_storage_path(filename: str, directory: str = "storage") -> str:
|
|
"""Get the full path for a storage file"""
|
|
storage_dir = ensure_storage_directory(directory)
|
|
return os.path.join(storage_dir, filename)
|
|
|
|
def save_tag_collection(
|
|
collected_tags: Dict[str, Dict[str, Any]],
|
|
tag_history: List[Dict[str, Any]] = None,
|
|
processed_tags: Optional[Set[str]] = None,
|
|
filename: str = DEFAULT_TAGS_FILE
|
|
) -> bool:
|
|
"""
|
|
Save the tag collection and related data to a JSON file
|
|
|
|
Args:
|
|
collected_tags: Dictionary of collected tags
|
|
tag_history: List of recent tag discoveries
|
|
processed_tags: Set of processed tags for mosaic visualization
|
|
filename: Name of the file to save to
|
|
|
|
Returns:
|
|
Boolean indicating if the save was successful
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
|
|
save_data = {
|
|
"version": TAG_STORAGE_VERSION,
|
|
"timestamp": time.time(),
|
|
"save_date": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"collected_tags": collected_tags,
|
|
"tag_history": tag_history if tag_history is not None else []
|
|
}
|
|
|
|
|
|
if processed_tags is not None:
|
|
save_data["processed_tags"] = list(processed_tags)
|
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(save_data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved {len(collected_tags)} tags to {filepath}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error saving tag collection: {str(e)}")
|
|
return False
|
|
|
|
def load_tag_collection(filename: str = DEFAULT_TAGS_FILE) -> Tuple[Optional[Dict[str, Any]], Optional[List[Dict[str, Any]]], Optional[Set[str]]]:
|
|
"""
|
|
Load the tag collection from a JSON file
|
|
|
|
Args:
|
|
filename: Name of the file to load from
|
|
|
|
Returns:
|
|
Tuple of (collected_tags, tag_history, processed_tags) or (None, None, None) if loading fails
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
print(f"No tag collection file found at {filepath}")
|
|
return None, None, None
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
collected_tags = data.get("collected_tags", {})
|
|
tag_history = data.get("tag_history", [])
|
|
|
|
|
|
processed_tags = set(data.get("processed_tags", []))
|
|
|
|
print(f"Loaded {len(collected_tags)} tags from {filepath}")
|
|
return collected_tags, tag_history, processed_tags
|
|
|
|
except Exception as e:
|
|
print(f"Error loading tag collection: {str(e)}")
|
|
return None, None, None
|
|
|
|
def add_tag_to_collection(
|
|
collected_tags: Dict[str, Dict[str, Any]],
|
|
tag: str,
|
|
rarity: str,
|
|
category: str,
|
|
tag_history: List[Dict[str, Any]],
|
|
is_update_only: bool = False
|
|
) -> Tuple[Dict[str, Dict[str, Any]], List[Dict[str, Any]], bool, int]:
|
|
"""
|
|
Add or update a tag in the collection
|
|
|
|
Args:
|
|
collected_tags: Dictionary of collected tags
|
|
tag: The tag name to add
|
|
rarity: Tag rarity
|
|
category: Tag category
|
|
tag_history: Tag history list
|
|
is_update_only: Only update existing tags, don't add new ones
|
|
|
|
Returns:
|
|
Tuple of (updated_collection, updated_history, is_new_tag, currency_earned)
|
|
"""
|
|
|
|
is_new_tag = False
|
|
currency_earned = 0
|
|
timestamp = time.strftime("%H:%M:%S")
|
|
|
|
|
|
updated_collection = collected_tags.copy()
|
|
updated_history = tag_history.copy() if tag_history else []
|
|
|
|
|
|
if tag not in updated_collection:
|
|
if is_update_only:
|
|
|
|
return updated_collection, updated_history, False, 0
|
|
|
|
|
|
is_new_tag = True
|
|
|
|
|
|
updated_collection[tag] = {
|
|
"count": 1,
|
|
"rarity": rarity,
|
|
"category": category,
|
|
"discovery_time": timestamp,
|
|
"ever_sacrificed": False,
|
|
"is_unique": True
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
updated_history.append({
|
|
"tag": tag,
|
|
"rarity": rarity,
|
|
"value": currency_earned,
|
|
"time": timestamp,
|
|
"is_new": True
|
|
})
|
|
|
|
else:
|
|
|
|
updated_collection[tag]["count"] += 1
|
|
|
|
|
|
|
|
if updated_collection[tag]["count"] == 1 and updated_collection[tag].get("is_unique", False) == False:
|
|
updated_collection[tag]["is_unique"] = True
|
|
|
|
return updated_collection, updated_history, is_new_tag, currency_earned
|
|
|
|
def mark_tag_sacrificed(
|
|
collected_tags: Dict[str, Dict[str, Any]],
|
|
tag: str,
|
|
quantity: int = 1
|
|
) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Mark a tag as sacrificed
|
|
|
|
Args:
|
|
collected_tags: Dictionary of collected tags
|
|
tag: The tag name to sacrifice
|
|
quantity: How many to sacrifice
|
|
|
|
Returns:
|
|
Updated collection dictionary
|
|
"""
|
|
if tag not in collected_tags:
|
|
return collected_tags
|
|
|
|
|
|
updated_collection = collected_tags.copy()
|
|
|
|
|
|
updated_collection[tag]["ever_sacrificed"] = True
|
|
|
|
|
|
updated_collection[tag]["count"] = max(0, updated_collection[tag]["count"] - quantity)
|
|
|
|
|
|
if updated_collection[tag]["count"] == 0:
|
|
updated_collection[tag]["is_unique"] = False
|
|
|
|
return updated_collection
|
|
|
|
def save_game_state(
|
|
st_state,
|
|
filename: str = DEFAULT_GAME_STATE_FILE
|
|
) -> bool:
|
|
"""
|
|
Save the full game state to a JSON file
|
|
|
|
Args:
|
|
st_state: Streamlit session state
|
|
filename: Name of the file to save to
|
|
|
|
Returns:
|
|
Boolean indicating if the save was successful
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
|
|
game_data = {
|
|
"version": TAG_STORAGE_VERSION,
|
|
"timestamp": time.time(),
|
|
"save_date": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"threshold": getattr(st_state, 'threshold', 0.50),
|
|
"tag_currency": getattr(st_state, 'tag_currency', 0),
|
|
"enkephalin": getattr(st_state, 'enkephalin', 0),
|
|
"purchased_upgrades": getattr(st_state, 'purchased_upgrades', []),
|
|
"achievements": list(getattr(st_state, 'achievements', set())),
|
|
"game_stats": getattr(st_state, 'game_stats', {}),
|
|
"tag_power_bonus": getattr(st_state, 'tag_power_bonus', 0),
|
|
"coin_multiplier": getattr(st_state, 'coin_multiplier', 1.0),
|
|
"unlocked_combinations": list(getattr(st_state, 'unlocked_combinations', set())),
|
|
"combination_bonuses": getattr(st_state, 'combination_bonuses', {"threshold_reduction": 0, "coin_bonus": 0}),
|
|
"sacrificed_tags": getattr(st_state, 'sacrificed_tags', {})
|
|
}
|
|
|
|
|
|
if hasattr(st_state, 'mastered_categories'):
|
|
if isinstance(st_state.mastered_categories, set):
|
|
game_data["mastered_categories"] = list(st_state.mastered_categories)
|
|
else:
|
|
game_data["mastered_categories"] = st_state.mastered_categories
|
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(game_data, f, indent=2)
|
|
|
|
print(f"Saved game state to {filepath}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error saving game state: {str(e)}")
|
|
return False
|
|
|
|
def load_game_state(
|
|
st_state,
|
|
filename: str = DEFAULT_GAME_STATE_FILE
|
|
) -> bool:
|
|
"""
|
|
Load the game state from a JSON file and update session state
|
|
|
|
Args:
|
|
st_state: Streamlit session state to update
|
|
filename: Name of the file to load from
|
|
|
|
Returns:
|
|
Boolean indicating if the load was successful
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
print(f"No game state file found at {filepath}")
|
|
return False
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
game_data = json.load(f)
|
|
|
|
|
|
st_state.threshold = game_data.get("threshold", 0.50)
|
|
st_state.tag_currency = game_data.get("tag_currency", 0)
|
|
st_state.enkephalin = game_data.get("enkephalin", 0)
|
|
st_state.purchased_upgrades = game_data.get("purchased_upgrades", [])
|
|
st_state.achievements = set(game_data.get("achievements", []))
|
|
st_state.game_stats = game_data.get("game_stats", {})
|
|
st_state.tag_power_bonus = game_data.get("tag_power_bonus", 0)
|
|
st_state.coin_multiplier = game_data.get("coin_multiplier", 1.0)
|
|
st_state.unlocked_combinations = set(game_data.get("unlocked_combinations", []))
|
|
st_state.combination_bonuses = game_data.get("combination_bonuses", {"threshold_reduction": 0, "coin_bonus": 0})
|
|
st_state.sacrificed_tags = game_data.get("sacrificed_tags", {})
|
|
|
|
|
|
if "mastered_categories" in game_data:
|
|
if isinstance(game_data["mastered_categories"], list):
|
|
st_state.mastered_categories = set(game_data["mastered_categories"])
|
|
else:
|
|
st_state.mastered_categories = game_data["mastered_categories"]
|
|
|
|
print(f"Loaded game state from {filepath}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error loading game state: {str(e)}")
|
|
return False
|
|
|
|
def save_mosaic_data(
|
|
processed_tags: Set[str],
|
|
filled_cells: Set[Tuple[int, int]],
|
|
highlighted_tags: List[Tuple[str, int, int, str]],
|
|
mosaic_name: str = "main",
|
|
filename: str = DEFAULT_MOSAIC_DATA_FILE
|
|
) -> bool:
|
|
"""
|
|
Save mosaic-specific data to a JSON file
|
|
|
|
Args:
|
|
processed_tags: Set of tags that have been processed by the mosaic
|
|
filled_cells: Set of grid coordinates that have been filled
|
|
highlighted_tags: List of highlighted tags with their positions
|
|
mosaic_name: Name of the mosaic
|
|
filename: Name of the file to save to
|
|
|
|
Returns:
|
|
Boolean indicating if the save was successful
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
|
|
existing_data = {}
|
|
if os.path.exists(filepath):
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
existing_data = json.load(f)
|
|
|
|
|
|
mosaic_data = {
|
|
"processed_tags": list(processed_tags),
|
|
"filled_cells": [list(cell) for cell in filled_cells],
|
|
"highlighted_tags": highlighted_tags,
|
|
"last_updated": time.strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
|
|
existing_data[mosaic_name] = mosaic_data
|
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(existing_data, f, indent=2)
|
|
|
|
print(f"Saved mosaic data for {mosaic_name} with {len(processed_tags)} processed tags")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error saving mosaic data: {str(e)}")
|
|
return False
|
|
|
|
def load_mosaic_data(
|
|
mosaic_name: str = "main",
|
|
filename: str = DEFAULT_MOSAIC_DATA_FILE
|
|
) -> Tuple[Optional[Set[str]], Optional[Set[Tuple[int, int]]], Optional[List[Tuple[str, int, int, str]]]]:
|
|
"""
|
|
Load mosaic-specific data from a JSON file
|
|
|
|
Args:
|
|
mosaic_name: Name of the mosaic to load
|
|
filename: Name of the file to load from
|
|
|
|
Returns:
|
|
Tuple of (processed_tags, filled_cells, highlighted_tags) or (None, None, None) if loading fails
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
print(f"No mosaic data file found at {filepath}")
|
|
return None, None, None
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
all_data = json.load(f)
|
|
|
|
|
|
if mosaic_name not in all_data:
|
|
print(f"No data found for mosaic {mosaic_name}")
|
|
return None, None, None
|
|
|
|
mosaic_data = all_data[mosaic_name]
|
|
|
|
|
|
processed_tags = set(mosaic_data.get("processed_tags", []))
|
|
filled_cells = {tuple(cell) for cell in mosaic_data.get("filled_cells", [])}
|
|
highlighted_tags = mosaic_data.get("highlighted_tags", [])
|
|
|
|
print(f"Loaded mosaic data for {mosaic_name} with {len(processed_tags)} processed tags")
|
|
return processed_tags, filled_cells, highlighted_tags
|
|
|
|
except Exception as e:
|
|
print(f"Error loading mosaic data: {str(e)}")
|
|
return None, None, None
|
|
|
|
def update_tag_storage_from_session(st_state) -> bool:
|
|
"""
|
|
Update the tag storage from the current session state
|
|
|
|
Args:
|
|
st_state: Streamlit session state object
|
|
|
|
Returns:
|
|
Boolean indicating if the update was successful
|
|
"""
|
|
try:
|
|
|
|
collected_tags = getattr(st_state, 'collected_tags', {})
|
|
tag_history = getattr(st_state, 'tag_history', [])
|
|
|
|
|
|
processed_tags = None
|
|
if hasattr(st_state, 'tag_mosaic') and hasattr(st_state.tag_mosaic, 'processed_tags'):
|
|
processed_tags = st_state.tag_mosaic.processed_tags
|
|
|
|
|
|
tag_save_success = save_tag_collection(collected_tags, tag_history, processed_tags)
|
|
|
|
|
|
game_save_success = save_game_state(st_state)
|
|
|
|
return tag_save_success and game_save_success
|
|
|
|
except Exception as e:
|
|
print(f"Error updating storage from session: {str(e)}")
|
|
return False
|
|
|
|
def update_session_from_tag_storage(st_state) -> bool:
|
|
"""
|
|
Update the session state from the tag storage
|
|
|
|
Args:
|
|
st_state: Streamlit session state object
|
|
|
|
Returns:
|
|
Boolean indicating if the update was successful
|
|
"""
|
|
try:
|
|
|
|
collected_tags, tag_history, processed_tags = load_tag_collection()
|
|
|
|
if collected_tags is None:
|
|
|
|
if not os.path.exists(get_storage_path(DEFAULT_GAME_STATE_FILE)):
|
|
|
|
return False
|
|
else:
|
|
|
|
st_state.collected_tags = collected_tags
|
|
|
|
if tag_history is not None:
|
|
st_state.tag_history = tag_history
|
|
|
|
|
|
if processed_tags is not None and hasattr(st_state, 'tag_mosaic'):
|
|
st_state.tag_mosaic.processed_tags = processed_tags
|
|
|
|
|
|
game_state_loaded = load_game_state(st_state)
|
|
|
|
return game_state_loaded
|
|
|
|
except Exception as e:
|
|
print(f"Error updating session from storage: {str(e)}")
|
|
return False
|
|
|
|
def ensure_tag_tracking_fields(st_state):
|
|
"""
|
|
Make sure all tags in the collection have the required tracking fields
|
|
|
|
Args:
|
|
st_state: Streamlit session state object
|
|
"""
|
|
if not hasattr(st_state, 'collected_tags') or not st_state.collected_tags:
|
|
return
|
|
|
|
for tag, info in st_state.collected_tags.items():
|
|
|
|
if "ever_sacrificed" not in info:
|
|
info["ever_sacrificed"] = False
|
|
|
|
|
|
if "is_unique" not in info:
|
|
info["is_unique"] = info["count"] > 0
|
|
|
|
def periodic_save(st_state, interval_minutes: int = 5) -> None:
|
|
"""
|
|
Periodically save tag data based on the last save time
|
|
|
|
Args:
|
|
st_state: Streamlit session state object
|
|
interval_minutes: How often to save (in minutes)
|
|
"""
|
|
|
|
if not hasattr(st_state, 'last_tag_save_time'):
|
|
st_state.last_tag_save_time = time.time()
|
|
|
|
|
|
current_time = time.time()
|
|
elapsed_minutes = (current_time - st_state.last_tag_save_time) / 60
|
|
|
|
if elapsed_minutes >= interval_minutes:
|
|
if save_game(st_state):
|
|
|
|
st_state.last_tag_save_time = current_time
|
|
print(f"Performed periodic save after {elapsed_minutes:.1f} minutes")
|
|
|
|
|
|
|
|
def save_library_state(
|
|
library_state: Optional[Dict[str, Any]] = None,
|
|
session_state = None,
|
|
filename: str = DEFAULT_LIBRARY_STATE_FILE
|
|
) -> bool:
|
|
"""
|
|
Save the library state to a JSON file
|
|
|
|
Args:
|
|
library_state: Library state data dictionary (optional)
|
|
session_state: Streamlit session state to extract from (optional)
|
|
filename: Name of the file to save to
|
|
|
|
Returns:
|
|
Boolean indicating if the save was successful
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
|
|
if library_state is None and session_state is not None:
|
|
library_state = {
|
|
"discovered_tags": getattr(session_state, 'discovered_tags', {}),
|
|
"library_exploration_history": getattr(session_state, 'library_exploration_history', []),
|
|
"expedition_results": getattr(session_state, 'expedition_results', []),
|
|
"library_growth": getattr(session_state, 'library_growth', {
|
|
"total_discoveries": 0,
|
|
"last_discovery_time": time.time()
|
|
}),
|
|
"library_upgrades": getattr(session_state, 'library_upgrades', {
|
|
"speed": 1,
|
|
"capacity": 1,
|
|
"rarity": 1
|
|
}),
|
|
"last_expedition_time": getattr(session_state, 'last_expedition_time', 0)
|
|
}
|
|
|
|
|
|
if hasattr(session_state, 'explored_library_tiers'):
|
|
if isinstance(session_state.explored_library_tiers, set):
|
|
library_state["explored_library_tiers"] = list(session_state.explored_library_tiers)
|
|
else:
|
|
library_state["explored_library_tiers"] = session_state.explored_library_tiers
|
|
|
|
if library_state is None:
|
|
print("No library state to save.")
|
|
return False
|
|
|
|
|
|
library_state["version"] = TAG_STORAGE_VERSION
|
|
library_state["timestamp"] = time.time()
|
|
library_state["save_date"] = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
if "discovered_tags" in library_state:
|
|
for tag, info in library_state["discovered_tags"].items():
|
|
|
|
if "category" not in info:
|
|
info["category"] = "unknown"
|
|
|
|
|
|
if "discovery_count" not in info:
|
|
info["discovery_count"] = 1
|
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(library_state, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved library state to {filepath}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error saving library state: {str(e)}")
|
|
return False
|
|
|
|
def load_library_state(
|
|
session_state = None,
|
|
filename: str = DEFAULT_LIBRARY_STATE_FILE
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Load the library state from a JSON file
|
|
|
|
Args:
|
|
session_state: Streamlit session state to update (optional)
|
|
filename: Name of the file to load from
|
|
|
|
Returns:
|
|
Dictionary of loaded library state or None if loading fails
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
print(f"No library state file found at {filepath}")
|
|
return None
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
library_state = json.load(f)
|
|
|
|
|
|
if session_state is not None:
|
|
|
|
if "discovered_tags" in library_state:
|
|
session_state.discovered_tags = library_state["discovered_tags"]
|
|
|
|
|
|
if "library_exploration_history" in library_state:
|
|
session_state.library_exploration_history = library_state["library_exploration_history"]
|
|
|
|
|
|
if "expedition_results" in library_state:
|
|
session_state.expedition_results = library_state["expedition_results"]
|
|
|
|
|
|
if "library_growth" in library_state:
|
|
session_state.library_growth = library_state["library_growth"]
|
|
|
|
|
|
if "library_upgrades" in library_state:
|
|
session_state.library_upgrades = library_state["library_upgrades"]
|
|
|
|
|
|
if "last_expedition_time" in library_state:
|
|
session_state.last_expedition_time = library_state["last_expedition_time"]
|
|
|
|
|
|
if "explored_library_tiers" in library_state:
|
|
if isinstance(library_state["explored_library_tiers"], list):
|
|
session_state.explored_library_tiers = set(library_state["explored_library_tiers"])
|
|
else:
|
|
session_state.explored_library_tiers = library_state["explored_library_tiers"]
|
|
|
|
print(f"Loaded library state from {filepath}")
|
|
return library_state
|
|
|
|
except Exception as e:
|
|
print(f"Error loading library state: {str(e)}")
|
|
return None
|
|
|
|
def add_discovered_tag(
|
|
tag: str,
|
|
rarity: str,
|
|
session_state,
|
|
library_floor: str = None,
|
|
category: str = "unknown"
|
|
) -> bool:
|
|
"""
|
|
Add a tag to the discovered tags with enriched metadata
|
|
|
|
Args:
|
|
tag: The tag name
|
|
rarity: The tag rarity level
|
|
session_state: Streamlit session state
|
|
library_floor: The library floor where it was discovered (optional)
|
|
category: The tag category (optional)
|
|
|
|
Returns:
|
|
Boolean indicating if it's a new discovery
|
|
"""
|
|
|
|
if not hasattr(session_state, 'discovered_tags'):
|
|
session_state.discovered_tags = {}
|
|
|
|
|
|
if not hasattr(session_state, 'explored_library_tiers'):
|
|
session_state.explored_library_tiers = set()
|
|
|
|
|
|
is_new = tag not in session_state.discovered_tags
|
|
|
|
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
if category == "unknown":
|
|
|
|
if hasattr(session_state, 'metadata') and 'tag_to_category' in session_state.metadata:
|
|
if tag in session_state.metadata['tag_to_category']:
|
|
category = session_state.metadata['tag_to_category'][tag]
|
|
|
|
|
|
elif hasattr(session_state, 'tag_rarity_metadata') and session_state.tag_rarity_metadata:
|
|
if tag in session_state.tag_rarity_metadata:
|
|
tag_info = session_state.tag_rarity_metadata[tag]
|
|
if isinstance(tag_info, dict) and "category" in tag_info:
|
|
category = tag_info["category"]
|
|
|
|
|
|
if is_new:
|
|
|
|
session_state.discovered_tags[tag] = {
|
|
"rarity": rarity,
|
|
"discovery_time": timestamp,
|
|
"category": category,
|
|
"discovery_count": 1,
|
|
"last_seen": timestamp
|
|
}
|
|
|
|
|
|
if library_floor:
|
|
session_state.discovered_tags[tag]["library_floor"] = library_floor
|
|
|
|
|
|
session_state.explored_library_tiers.add(library_floor)
|
|
else:
|
|
|
|
tag_info = session_state.discovered_tags[tag]
|
|
tag_info["discovery_count"] = tag_info.get("discovery_count", 1) + 1
|
|
tag_info["last_seen"] = timestamp
|
|
|
|
|
|
if category != "unknown" and tag_info.get("category", "unknown") == "unknown":
|
|
tag_info["category"] = category
|
|
|
|
|
|
if library_floor:
|
|
tag_info["library_floor"] = library_floor
|
|
|
|
|
|
session_state.explored_library_tiers.add(library_floor)
|
|
|
|
|
|
save_library_state(session_state=session_state)
|
|
|
|
return is_new
|
|
|
|
def purchase_library_upgrade(
|
|
upgrade_type: str,
|
|
session_state,
|
|
calculate_cost_func = None
|
|
) -> bool:
|
|
"""
|
|
Purchase a library upgrade
|
|
|
|
Args:
|
|
upgrade_type: The type of upgrade ("speed", "capacity", or "rarity")
|
|
session_state: Streamlit session state
|
|
calculate_cost_func: Function to calculate the cost (optional)
|
|
|
|
Returns:
|
|
Boolean indicating if the purchase was successful
|
|
"""
|
|
|
|
if not hasattr(session_state, 'library_upgrades'):
|
|
session_state.library_upgrades = {
|
|
"speed": 1,
|
|
"capacity": 1,
|
|
"rarity": 1
|
|
}
|
|
|
|
|
|
current_level = session_state.library_upgrades.get(upgrade_type, 1)
|
|
|
|
|
|
if calculate_cost_func:
|
|
cost = calculate_cost_func(upgrade_type, current_level)
|
|
else:
|
|
|
|
base_cost = {
|
|
"speed": 50,
|
|
"capacity": 100,
|
|
"rarity": 150
|
|
}.get(upgrade_type, 100)
|
|
|
|
cost = int(base_cost * (current_level * 1.5))
|
|
|
|
|
|
if not hasattr(session_state, 'tag_currency') or session_state.tag_currency < cost:
|
|
print(f"Cannot afford {upgrade_type} upgrade: needs {cost}, has {getattr(session_state, 'tag_currency', 0)}")
|
|
return False
|
|
|
|
|
|
session_state.tag_currency -= cost
|
|
session_state.library_upgrades[upgrade_type] = current_level + 1
|
|
|
|
|
|
if hasattr(session_state, 'game_stats'):
|
|
if "currency_spent" not in session_state.game_stats:
|
|
session_state.game_stats["currency_spent"] = 0
|
|
session_state.game_stats["currency_spent"] += cost
|
|
|
|
|
|
save_library_state(session_state=session_state)
|
|
save_game_state(session_state)
|
|
|
|
print(f"Purchased {upgrade_type} upgrade to level {current_level + 1} for {cost}")
|
|
return True
|
|
|
|
def save_essence_state(
|
|
essence_state: Optional[Dict[str, Any]] = None,
|
|
session_state = None,
|
|
filename: str = DEFAULT_ESSENCE_STATE_FILE
|
|
) -> bool:
|
|
"""
|
|
Save the essence generator state to a JSON file
|
|
|
|
Args:
|
|
essence_state: Essence state data dictionary (optional)
|
|
session_state: Streamlit session state to extract from (optional)
|
|
filename: Name of the file to save to
|
|
|
|
Returns:
|
|
Boolean indicating if the save was successful
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
|
|
if essence_state is None and session_state is not None:
|
|
essence_state = {
|
|
"generated_essences": getattr(session_state, 'generated_essences', {}),
|
|
"essence_custom_settings": getattr(session_state, 'essence_custom_settings', {}),
|
|
"manual_tags": getattr(session_state, 'manual_tags', {})
|
|
}
|
|
|
|
if essence_state is None:
|
|
print("No essence state to save.")
|
|
return False
|
|
|
|
|
|
essence_state["version"] = TAG_STORAGE_VERSION
|
|
essence_state["timestamp"] = time.time()
|
|
essence_state["save_date"] = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
if "generated_essences" in essence_state:
|
|
for tag, info in essence_state["generated_essences"].items():
|
|
if "path" in info:
|
|
|
|
info["path"] = os.path.basename(info["path"])
|
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(essence_state, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved essence state to {filepath}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error saving essence state: {str(e)}")
|
|
return False
|
|
|
|
def load_essence_state(
|
|
session_state = None,
|
|
filename: str = DEFAULT_ESSENCE_STATE_FILE
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Load the essence generator state from a JSON file
|
|
|
|
Args:
|
|
session_state: Streamlit session state to update (optional)
|
|
filename: Name of the file to load from
|
|
|
|
Returns:
|
|
Dictionary of loaded essence state or None if loading fails
|
|
"""
|
|
try:
|
|
filepath = get_storage_path(filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
print(f"No essence state file found at {filepath}")
|
|
return None
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
essence_state = json.load(f)
|
|
|
|
|
|
if session_state is not None:
|
|
|
|
if "generated_essences" in essence_state:
|
|
generated_essences = essence_state["generated_essences"]
|
|
|
|
|
|
essence_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "game_data", "essences")
|
|
for tag, info in generated_essences.items():
|
|
if "path" in info and not os.path.isabs(info["path"]):
|
|
info["path"] = os.path.join(essence_dir, os.path.basename(info["path"]))
|
|
|
|
session_state.generated_essences = generated_essences
|
|
|
|
|
|
if "essence_custom_settings" in essence_state:
|
|
session_state.essence_custom_settings = essence_state["essence_custom_settings"]
|
|
|
|
|
|
if "manual_tags" in essence_state:
|
|
session_state.manual_tags = essence_state["manual_tags"]
|
|
|
|
print(f"Loaded essence state from {filepath}")
|
|
return essence_state
|
|
|
|
except Exception as e:
|
|
print(f"Error loading essence state: {str(e)}")
|
|
return None
|
|
|
|
def save_game(st_state) -> bool:
|
|
"""
|
|
Save all game data including tags, game state, library state, and essence state
|
|
|
|
Args:
|
|
st_state: Streamlit session state object
|
|
|
|
Returns:
|
|
Boolean indicating if the save was successful
|
|
"""
|
|
try:
|
|
|
|
ensure_tag_tracking_fields(st_state)
|
|
|
|
|
|
tag_success = update_tag_storage_from_session(st_state)
|
|
library_success = save_library_state(session_state=st_state)
|
|
essence_success = save_essence_state(session_state=st_state)
|
|
|
|
|
|
if tag_success and library_success and essence_success:
|
|
print("Successfully saved all game components")
|
|
else:
|
|
print(f"Partial save - Tags: {tag_success}, Library: {library_success}, Essence: {essence_success}")
|
|
|
|
return tag_success
|
|
except Exception as e:
|
|
print(f"Error in full game save: {str(e)}")
|
|
return False
|
|
|
|
def load_game(st_state) -> bool:
|
|
"""
|
|
Load all game data including tags, game state, library state, and essence state
|
|
|
|
Args:
|
|
st_state: Streamlit session state object
|
|
|
|
Returns:
|
|
Boolean indicating if the load was successful
|
|
"""
|
|
try:
|
|
|
|
tag_success = update_session_from_tag_storage(st_state)
|
|
|
|
|
|
library_success = load_library_state(st_state) is not None
|
|
essence_success = load_essence_state(st_state) is not None
|
|
|
|
|
|
if tag_success:
|
|
ensure_tag_tracking_fields(st_state)
|
|
|
|
|
|
if tag_success and library_success and essence_success:
|
|
print("Successfully loaded all game components")
|
|
else:
|
|
print(f"Partial load - Tags: {tag_success}, Library: {library_success}, Essence: {essence_success}")
|
|
|
|
return tag_success or library_success or essence_success
|
|
except Exception as e:
|
|
print(f"Error in full game load: {str(e)}")
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Enhanced tag storage module loaded. Run directly for testing.") |