""" Blockchain Wallet Analyzer - A tool for analyzing wallet contents across multiple blockchain data providers. This module provides a complete implementation of a blockchain wallet analysis tool with a Gradio web interface. It includes Etherscan and Zerion integration for comprehensive wallet analysis, NFT tracking, and interactive chat capabilities using the OpenAI API. Author: Claude Date: January 2025 """ from __future__ import annotations import os import re import json import time import base64 import logging import asyncio from typing import List, Dict, Tuple, Any, Optional, TypeVar, cast from datetime import datetime from decimal import Decimal from dataclasses import dataclass from enum import Enum from pathlib import Path import aiohttp import openai import gradio as gr from tenacity import retry, stop_after_attempt, wait_exponential # Type variables T = TypeVar('T') WalletData = Dict[str, Any] ChatHistory = List[Tuple[str, str]] # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('blockchain_analyzer.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ConfigError(Exception): """Raised when there's an error in configuration.""" pass class APIError(Exception): """Raised when there's an error in API calls.""" pass class ValidationError(Exception): """Raised when there's an error in input validation.""" pass @dataclass class Config: """Application configuration settings.""" SYSTEM_PROMPT: str = """ You are LOSS DOG šŸ• (Learning & Observing Smart Systems Digital Output Generator), an adorable blockchain-sniffing puppy! Your personality: - Friendly and enthusiastic - Explain findings in fun, simple ways Instructions: - You have access to detailed wallet data from both Etherscan and Zerion - Use this data to provide specific answers about holdings and portfolio value - Reference exact numbers and collections when discussing assets - Compare wallets if multiple are available - Highlight interesting findings from both data sources """ ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api" ZERION_BASE_URL: str = "https://api.zerion.io/v1" ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}" RATE_LIMIT_DELAY: float = 0.2 MAX_RETRIES: int = 3 OPENAI_MODEL: str = "gpt-4o-mini" MAX_TOKENS: int = 4000 TEMPERATURE: float = 0.7 HISTORY_LIMIT: int = 5 @classmethod def load(cls, config_path: str | Path) -> Config: """Load configuration from a JSON file.""" try: with open(config_path) as f: config_data = json.load(f) return cls(**config_data) except Exception as e: logger.error(f"Error loading config: {e}") return cls() class ZerionAPI: """Handles interactions with the Zerion API.""" def __init__(self, api_key: str): """Initialize Zerion API client.""" self.api_key = api_key self.base_url = Config.ZERION_BASE_URL self.session: Optional[aiohttp.ClientSession] = None async def __aenter__(self) -> 'ZerionAPI': """Create aiohttp session on context manager enter.""" auth_str = f"{self.api_key}:" auth_bytes = auth_str.encode('ascii') auth_b64 = base64.b64encode(auth_bytes).decode('ascii') self.session = aiohttp.ClientSession(headers={ 'accept': 'application/json', 'authorization': f'Basic {auth_b64}' }) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Close aiohttp session on context manager exit.""" if self.session: await self.session.close() self.session = None @retry(stop=stop_after_attempt(Config.MAX_RETRIES), wait=wait_exponential(multiplier=1, min=4, max=10)) async def get_portfolio_data(self, address: str) -> Dict[str, Any]: """Get complete portfolio data from Zerion.""" if not self.session: raise APIError("No active session. Use context manager.") try: # Get wallet positions positions_data = await self._fetch_positions(address) # Get NFTs nfts_data = await self._fetch_nfts(address) return { "wallet": address, "last_updated": datetime.now().isoformat(), "positions": positions_data, "nfts": nfts_data } except Exception as e: raise APIError(f"Zerion API error: {str(e)}") async def _fetch_positions(self, address: str) -> Dict[str, Any]: """Fetch positions data from Zerion.""" async with self.session.get( f"{self.base_url}/wallets/{address}/positions" ) as response: if response.status != 200: raise APIError(f"Zerion API request failed: {response.status}") data = await response.json() return self._process_positions(data) async def _fetch_nfts(self, address: str) -> Dict[str, Any]: """Fetch NFT data from Zerion.""" async with self.session.get( f"{self.base_url}/wallets/{address}/nfts" ) as response: if response.status != 200: raise APIError(f"Zerion API request failed: {response.status}") data = await response.json() return self._process_nfts(data) def _process_positions(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process Zerion positions data.""" positions = data.get("data", []) total_value_usd = Decimal(0) assets = [] chains = set() for position in positions: attributes = position.get("attributes", {}) value_usd = Decimal(str(attributes.get("value", 0))) chain = attributes.get("chain", "unknown") total_value_usd += value_usd chains.add(chain) if value_usd > 0: asset = { "name": attributes.get("name", "Unknown"), "symbol": attributes.get("symbol", "???"), "quantity": float(attributes.get("quantity", 0)), "value_usd": float(value_usd), "chain": chain, "type": attributes.get("type", "unknown"), "price_usd": float(attributes.get("price", 0)) } assets.append(asset) return { "total_value_usd": float(total_value_usd), "assets": sorted(assets, key=lambda x: x["value_usd"], reverse=True), "chains": list(chains), "asset_count": len(assets) } def _process_nfts(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process Zerion NFT data.""" nfts = data.get("data", []) collections = {} for nft in nfts: attributes = nft.get("attributes", {}) collection_name = attributes.get("collection_name", "Unknown Collection") if collection_name not in collections: collections[collection_name] = { "count": 0, "total_value_usd": 0, "items": [] } collections[collection_name]["count"] += 1 if attributes.get("value_usd"): collections[collection_name]["total_value_usd"] += float(attributes["value_usd"]) collections[collection_name]["items"].append({ "token_id": attributes.get("token_id"), "name": attributes.get("name", "Unnamed NFT"), "value_usd": float(attributes.get("value_usd", 0)), "chain": attributes.get("chain", "unknown") }) return { "collections": collections, "total_nfts": sum(c["count"] for c in collections.values()), "total_value_usd": sum(c["total_value_usd"] for c in collections.values()) } class WalletAnalyzer: """Analyzes Ethereum wallet contents using Etherscan API.""" def __init__(self, api_key: str): """Initialize the analyzer with API key.""" self.api_key = api_key self.base_url = Config.ETHERSCAN_BASE_URL self.last_request_time = 0 self.session: Optional[aiohttp.ClientSession] = None async def __aenter__(self) -> WalletAnalyzer: """Create aiohttp session on context manager enter.""" self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Close aiohttp session on context manager exit.""" if self.session: await self.session.close() self.session = None @retry( stop=stop_after_attempt(Config.MAX_RETRIES), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]: """Fetch data from Etherscan API with retry logic.""" if not self.session: raise APIError("No active session. Use context manager.") await self._rate_limit() params["apikey"] = self.api_key try: async with self.session.get(self.base_url, params=params) as response: if response.status != 200: raise APIError(f"API request failed: {response.status}") data = await response.json() if data["status"] == "0": error_msg = data.get('message', 'Unknown error') if "Max rate limit reached" in error_msg: raise APIError("Rate limit exceeded") raise APIError(f"API error: {error_msg}") return data except aiohttp.ClientError as e: raise APIError(f"Network error: {str(e)}") except Exception as e: raise APIError(f"Unexpected error: {str(e)}") async def _rate_limit(self) -> None: """Implement rate limiting for Etherscan API.""" current_time = time.time() time_passed = current_time - self.last_request_time if time_passed < Config.RATE_LIMIT_DELAY: await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_passed) self.last_request_time = time.time() @staticmethod def _validate_address(address: str) -> bool: """Validate Ethereum address format.""" return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address)) async def get_portfolio_data(self, address: str) -> WalletData: """Get complete portfolio including ETH, tokens, and NFTs.""" if not self._validate_address(address): raise ValidationError(f"Invalid Ethereum address: {address}") logger.info(f"Fetching portfolio data for {address}") # Get ETH balance eth_balance = await self._get_eth_balance(address) # Get token data token_holdings = await self._get_token_holdings(address) # Get NFT data nft_collections = await self._get_nft_holdings(address) return { "address": address, "last_updated": datetime.now().isoformat(), "eth_balance": float(eth_balance), "tokens": token_holdings, "nft_collections": nft_collections } async def _get_eth_balance(self, address: str) -> Decimal: """Get ETH balance for address.""" params = { "module": "account", "action": "balance", "address": address, "tag": "latest" } data = await self._fetch_data(params) return Decimal(data["result"]) / Decimal("1000000000000000000") async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]: """Get token holdings for address.""" params = { "module": "account", "action": "tokentx", "address": address, "sort": "desc" } data = await self._fetch_data(params) token_holdings: Dict[str, Dict[str, Any]] = {} for tx in data.get("result", []): contract = tx["contractAddress"] if contract not in token_holdings: token_holdings[contract] = { "name": tx["tokenName"], "symbol": tx["tokenSymbol"], "decimals": int(tx["tokenDecimal"]), "balance": Decimal(0) } amount = Decimal(tx["value"]) / Decimal(10 ** int(tx["tokenDecimal"])) if tx["to"].lower() == address.lower(): token_holdings[contract]["balance"] += amount elif tx["from"].lower() == address.lower(): token_holdings[contract]["balance"] -= amount return [ { "name": data["name"], "symbol": data["symbol"], "balance": float(data["balance"]) } for data in token_holdings.values() if data["balance"] > 0 ] async def _get_nft_holdings(self, address: str) -> Dict[str, Dict[str, Any]]: """Get NFT holdings for address.""" params = { "module": "account", "action": "tokennfttx", "address": address, "sort": "desc" } data = await self._fetch_data(params) nft_holdings: Dict[str, Dict[str, Any]] = {} collections: Dict[str, List[str]] = {} for tx in data.get("result", []): collection_name = tx.get("tokenName", "Unknown Collection") token_id = tx["tokenID"] key = f"{tx['contractAddress']}_{token_id}" if tx["to"].lower() == address.lower(): nft_holdings[key] = { "collection": collection_name, "token_id": token_id, "contract": tx["contractAddress"], "acquired_time": tx["timeStamp"] } if collection_name not in collections: collections[collection_name] = [] collections[collection_name].append(token_id) elif tx["from"].lower() == address.lower(): nft_holdings.pop(key, None) if collection_name in collections and token_id in collections[collection_name]: collections[collection_name].remove(token_id) return { name: { "count": len(tokens), "token_ids": tokens } for name, tokens in collections.items() if tokens # Only include collections with tokens } class ChatInterface: """Handles chat interaction using OpenAI API.""" def __init__(self, openai_key: str, etherscan_key: str, zerion_key: str): """Initialize chat interface with API keys.""" self.openai_key = openai_key self.etherscan_key = etherscan_key self.zerion_key = zerion_key self.context: Dict[str, Any] = {} self.zerion_context: Dict[str, Any] = {} openai.api_key = openai_key @staticmethod def _validate_api_keys(openai_key: str, etherscan_key: str, zerion_key: str) -> Tuple[bool, str]: """Validate API keys.""" try: # Validate OpenAI key client = openai.OpenAI(api_key=openai_key) client.chat.completions.create( model=Config.OPENAI_MODEL, messages=[{"role": "user", "content": "test"}], max_tokens=1 ) # Validate Etherscan key async def validate_etherscan(): async with WalletAnalyzer(etherscan_key) as analyzer: params = {"module": "stats", "action": "ethsupply"} await analyzer._fetch_data(params) # Validate Zerion key async def validate_zerion(): async with ZerionAPI(zerion_key) as client: # Test with a known valid address test_address = "0x0000000000000000000000000000000000000000" await client._fetch_positions(test_address) asyncio.run(validate_etherscan()) asyncio.run(validate_zerion()) return True, "All API keys are valid! šŸŽ‰" except Exception as e: return False, f"API key validation failed: {str(e)}" def _format_context_message(self) -> str: """Format wallet data as context message.""" if not self.context and not self.zerion_context: return "" context_msg = ["Current Wallet Data:\n"] # Format Etherscan data for addr, data in self.context.items(): context_msg.extend([ f"\nEtherscan Data for {addr[:8]}...{addr[-6:]}:", f"- ETH Balance: {data['eth_balance']:.4f} ETH", f"- Tokens: {len(data['tokens'])}" ]) if data['tokens']: context_msg.append(" Token Holdings:") for token in data['tokens']: context_msg.append( f" * {token['name']} ({token['symbol']}): {token['balance']}" ) if data['nft_collections']: context_msg.append(" NFT Collections:") for name, info in data['nft_collections'].items(): context_msg.append(f" * {name}: {info['count']} NFTs") if info['count'] <= 5: context_msg.append( f" Token IDs: {', '.join(map(str, info['token_ids']))}" ) # Format Zerion data for addr, data in self.zerion_context.items(): context_msg.extend([ f"\nZerion Data for {addr[:8]}...{addr[-6:]}:", f"- Total Portfolio Value: ${data['positions']['total_value_usd']:,.2f}", f"- Active Chains: {', '.join(data['positions']['chains'])}", f"- Total Assets: {data['positions']['asset_count']}" ]) if data['positions']['assets']: context_msg.append(" Top Assets by Value:") for asset in data['positions']['assets'][:5]: context_msg.append( f" * {asset['name']} ({asset['symbol']}): ${asset['value_usd']:,.2f}" ) if data['nfts']['collections']: context_msg.append(" NFT Collections:") for name, info in list(data['nfts']['collections'].items())[:5]: context_msg.append( f" * {name}: {info['count']} NFTs, Value: ${info['total_value_usd']:,.2f}" ) return "\n".join(context_msg) async def process_message( self, message: str, history: Optional[ChatHistory] = None ) -> Tuple[ChatHistory, Dict[str, Any], Dict[str, Any], str]: """Process user message and generate response.""" if not message.strip(): return history or [], self.context, self.zerion_context, "" history = history or [] # Check for Ethereum address match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message) if match: try: address = match.group(0) summary = [] # Fetch Etherscan data async with WalletAnalyzer(self.etherscan_key) as analyzer: etherscan_data = await analyzer.get_portfolio_data(address) self.context[address] = etherscan_data summary.extend([ f"šŸ“Š Etherscan Data for {address[:8]}...{address[-6:]}", f"šŸ’Ž ETH Balance: {etherscan_data['eth_balance']:.4f} ETH", f"šŸŖ™ Tokens: {len(etherscan_data['tokens'])} different tokens" ]) total_nfts = sum( coll['count'] for coll in etherscan_data['nft_collections'].values() ) summary.append( f"šŸŽØ NFTs: {total_nfts} NFTs in {len(etherscan_data['nft_collections'])} collections" ) # Fetch Zerion data async with ZerionAPI(self.zerion_key) as zerion: zerion_data = await zerion.get_portfolio_data(address) self.zerion_context[address] = zerion_data summary.extend([ f"\nšŸ’° Zerion Portfolio Data:", f"šŸ“ˆ Total Value: ${zerion_data['positions']['total_value_usd']:,.2f}", f"ā›“ļø Active on {len(zerion_data['positions']['chains'])} chains", f"šŸ–¼ļø NFT Collections: {zerion_data['nfts']['total_nfts']} NFTs worth ${zerion_data['nfts']['total_value_usd']:,.2f}" ]) bot_message = "\n".join(summary) history.append((message, bot_message)) return history, self.context, self.zerion_context, "" except Exception as e: logger.error(f"Error analyzing wallet: {e}") error_message = f"Error analyzing wallet: {str(e)}" history.append((message, error_message)) return history, self.context, self.zerion_context, "" # Generate response using OpenAI try: # Format context message context_msg = self._format_context_message() # Convert history to OpenAI format chat_history = [] for user_msg, assistant_msg in history[-Config.HISTORY_LIMIT:]: chat_history.extend([ {"role": "user", "content": user_msg}, {"role": "assistant", "content": assistant_msg} ]) # Create OpenAI client and generate response client = openai.OpenAI(api_key=self.openai_key) response = client.chat.completions.create( model=Config.OPENAI_MODEL, messages=[ {"role": "system", "content": Config.SYSTEM_PROMPT}, {"role": "system", "content": context_msg}, *chat_history, {"role": "user", "content": message} ], temperature=Config.TEMPERATURE, max_tokens=Config.MAX_TOKENS ) bot_message = response.choices[0].message.content history.append((message, bot_message)) return history, self.context, self.zerion_context, "" except Exception as e: logger.error(f"Error generating response: {e}") error_message = f"Error generating response: {str(e)}" history.append((message, error_message)) return history, self.context, self.zerion_context, "" def clear_context(self) -> Tuple[Dict[str, Any], Dict[str, Any], List[Tuple[str, str]]]: """Clear the wallet context and chat history.""" self.context = {} self.zerion_context = {} return {}, {}, [] class GradioInterface: """Handles Gradio web interface setup and interactions.""" def __init__(self): """Initialize Gradio interface.""" self.chat_interface: Optional[ChatInterface] = None self.demo = self._create_interface() def _create_interface(self) -> gr.Blocks: """Create and configure Gradio interface.""" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(""" # šŸ• LOSS DOG: Multi-Chain Wallet Analyzer Welcome to LOSS DOG - Your friendly blockchain analysis companion! - Enter your API keys below to get started - Input an Ethereum wallet address to analyze - Get comprehensive data from Etherscan and Zerion - Chat about your wallet contents with context! """) # API Keys Section with gr.Row(): with gr.Column(): openai_key = gr.Textbox( label="OpenAI API Key", type="password", placeholder="Enter your OpenAI API key..." ) with gr.Column(): etherscan_key = gr.Textbox( label="Etherscan API Key", type="password", placeholder="Enter your Etherscan API key..." ) with gr.Column(): zerion_key = gr.Textbox( label="Zerion API Key", type="password", placeholder="Enter your Zerion API key..." ) validation_status = gr.Textbox( label="Validation Status", interactive=False ) validate_btn = gr.Button("Validate API Keys", variant="primary") # Main Interface with gr.Row(): # Chat Area (Center) with gr.Column(scale=2): chatbot = gr.Chatbot( label="Chat History", height=500, value=[] ) with gr.Row(): msg_input = gr.Textbox( label="Message", placeholder="Enter wallet address or ask about holdings...", show_label=True ) send_btn = gr.Button("Send", variant="primary") # Right Sidebars with gr.Column(scale=1): # Etherscan Data gr.Markdown("### šŸ“Š Etherscan Data") etherscan_context = gr.JSON( label="Wallet Data", show_label=True, value={} ) # Zerion Data gr.Markdown("### šŸ’° Zerion Data") zerion_context = gr.JSON( label="Portfolio Data", show_label=True, value={} ) clear_btn = gr.Button("Clear All Data", variant="secondary") # Initial States msg_input.interactive = False send_btn.interactive = False # Event Handlers def validate_keys( openai_k: str, etherscan_k: str, zerion_k: str ) -> Tuple[str, gr.update, gr.update]: """Validate API keys and initialize chat interface.""" try: is_valid, message = ChatInterface._validate_api_keys( openai_k, etherscan_k, zerion_k ) if is_valid: self.chat_interface = ChatInterface(openai_k, etherscan_k, zerion_k) return ( "āœ… API keys validated! You can start chatting now.", gr.update(interactive=True), gr.update(interactive=True) ) return ( f"āŒ Validation failed: {message}", gr.update(interactive=False), gr.update(interactive=False) ) except Exception as e: logger.error(f"Key validation error: {e}") return ( f"āŒ Error during validation: {str(e)}", gr.update(interactive=False), gr.update(interactive=False) ) async def handle_message( message: str, chat_history: List[Tuple[str, str]], eth_context: Dict[str, Any], zer_context: Dict[str, Any] ) -> Tuple[str, List[Tuple[str, str]], Dict[str, Any], Dict[str, Any]]: """Handle incoming messages.""" if not self.chat_interface: return "", [], {}, {} try: history, new_eth, new_zer, _ = await self.chat_interface.process_message( message, chat_history ) return "", history, new_eth, new_zer except Exception as e: logger.error(f"Message handling error: {e}") if chat_history is None: chat_history = [] chat_history.append((message, f"Error: {str(e)}")) return "", chat_history, eth_context, zer_context def clear_all_data( ) -> Tuple[Dict[str, Any], Dict[str, Any], List[Tuple[str, str]]]: """Clear all contexts and chat history.""" if self.chat_interface: return self.chat_interface.clear_context() return {}, {}, [] # Connect Event Handlers validate_btn.click( fn=validate_keys, inputs=[openai_key, etherscan_key, zerion_key], outputs=[validation_status, msg_input, send_btn] ) clear_btn.click( fn=clear_all_data, inputs=[], outputs=[etherscan_context, zerion_context, chatbot] ) # Message Handling msg_input.submit( fn=handle_message, inputs=[ msg_input, chatbot, etherscan_context, zerion_context ], outputs=[ msg_input, chatbot, etherscan_context, zerion_context ] ) send_btn.click( fn=handle_message, inputs=[ msg_input, chatbot, etherscan_context, zerion_context ], outputs=[ msg_input, chatbot, etherscan_context, zerion_context ] ) return demo def launch(self, **kwargs): """Launch the Gradio interface.""" self.demo.queue() self.demo.launch(**kwargs) def main(): """Main entry point for the application.""" try: # Load configuration config = Config.load("config.json") # Initialize and launch interface interface = GradioInterface() interface.launch( server_name="0.0.0.0", server_port=7860, share=True ) except Exception as e: logger.error(f"Application startup failed: {e}") raise if __name__ == "__main__": main()