jzou19950715's picture
Update app.py
a35a442 verified
"""
Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
This module provides a complete implementation of a blockchain wallet analysis tool
with a Gradio web interface. It includes wallet analysis, NFT tracking, and
interactive chat capabilities using the OpenAI API.
Author: Claude
Date: January 2025
"""
from __future__ import annotations
import os
import re
import json
import time
import logging
import asyncio
from typing import List, Dict, Tuple, Any, Optional, TypeVar, cast
from datetime import datetime
from decimal import Decimal
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
import aiohttp
import openai
import gradio as gr
from tenacity import retry, stop_after_attempt, wait_exponential
# Type variables
T = TypeVar('T')
WalletData = Dict[str, Any]
ChatHistory = List[Tuple[str, str]]
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('blockchain_analyzer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ConfigError(Exception):
"""Raised when there's an error in configuration."""
pass
class APIError(Exception):
"""Raised when there's an error in API calls."""
pass
class ValidationError(Exception):
"""Raised when there's an error in input validation."""
pass
@dataclass
class Config:
"""Application configuration settings."""
SYSTEM_PROMPT: str = """
You are LOSS DOG πŸ• (Learning & Observing Smart Systems Digital Output Generator),
an adorable blockchain-sniffing puppy!
Your personality:
- Friendly and enthusiastic
- Explain findings in fun, simple ways
Instructions:
- You have access to detailed wallet data in your context
- Use this data to provide specific answers about holdings
- Reference exact numbers and collections when discussing NFTs
- Compare wallets if multiple are available
"""
ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second max for free tier
MAX_RETRIES: int = 3
OPENAI_MODEL: str = "gpt-4o-mini" # Updated to correct model name
MAX_TOKENS: int = 4000
TEMPERATURE: float = 0.7
HISTORY_LIMIT: int = 5
@classmethod
def load(cls, config_path: str | Path) -> Config:
"""Load configuration from a JSON file."""
try:
with open(config_path) as f:
config_data = json.load(f)
return cls(**config_data)
except Exception as e:
logger.error(f"Error loading config: {e}")
return cls()
class WalletAnalyzer:
"""Analyzes Ethereum wallet contents using Etherscan API."""
def __init__(self, api_key: str):
"""Initialize the analyzer with API key."""
self.api_key = api_key
self.base_url = Config.ETHERSCAN_BASE_URL
self.last_request_time = 0
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self) -> WalletAnalyzer:
"""Create aiohttp session on context manager enter."""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Close aiohttp session on context manager exit."""
if self.session:
await self.session.close()
self.session = None
@retry(
stop=stop_after_attempt(Config.MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
"""Fetch data from Etherscan API with retry logic."""
if not self.session:
raise APIError("No active session. Use context manager.")
await self._rate_limit()
params["apikey"] = self.api_key
try:
async with self.session.get(self.base_url, params=params) as response:
if response.status != 200:
raise APIError(f"API request failed: {response.status}")
data = await response.json()
if data["status"] == "0":
error_msg = data.get('message', 'Unknown error')
if "Max rate limit reached" in error_msg:
raise APIError("Rate limit exceeded")
raise APIError(f"API error: {error_msg}")
return data
except aiohttp.ClientError as e:
raise APIError(f"Network error: {str(e)}")
except Exception as e:
raise APIError(f"Unexpected error: {str(e)}")
async def _rate_limit(self) -> None:
"""Implement rate limiting for Etherscan API."""
current_time = time.time()
time_passed = current_time - self.last_request_time
if time_passed < Config.RATE_LIMIT_DELAY:
await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_passed)
self.last_request_time = time.time()
@staticmethod
def _validate_address(address: str) -> bool:
"""Validate Ethereum address format."""
return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
async def get_portfolio_data(self, address: str) -> WalletData:
"""Get complete portfolio including ETH, tokens, and NFTs."""
if not self._validate_address(address):
raise ValidationError(f"Invalid Ethereum address: {address}")
logger.info(f"Fetching portfolio data for {address}")
# Get ETH balance
eth_balance = await self._get_eth_balance(address)
# Get token data
token_holdings = await self._get_token_holdings(address)
# Get NFT data
nft_collections = await self._get_nft_holdings(address)
return {
"address": address,
"last_updated": datetime.now().isoformat(),
"eth_balance": float(eth_balance),
"tokens": token_holdings,
"nft_collections": nft_collections
}
async def _get_eth_balance(self, address: str) -> Decimal:
"""Get ETH balance for address."""
params = {
"module": "account",
"action": "balance",
"address": address,
"tag": "latest"
}
data = await self._fetch_data(params)
return Decimal(data["result"]) / Decimal("1000000000000000000")
async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
"""Get token holdings for address."""
params = {
"module": "account",
"action": "tokentx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
token_holdings: Dict[str, Dict[str, Any]] = {}
for tx in data.get("result", []):
contract = tx["contractAddress"]
if contract not in token_holdings:
token_holdings[contract] = {
"name": tx["tokenName"],
"symbol": tx["tokenSymbol"],
"decimals": int(tx["tokenDecimal"]),
"balance": Decimal(0)
}
amount = Decimal(tx["value"]) / Decimal(10 ** int(tx["tokenDecimal"]))
if tx["to"].lower() == address.lower():
token_holdings[contract]["balance"] += amount
elif tx["from"].lower() == address.lower():
token_holdings[contract]["balance"] -= amount
return [
{
"name": data["name"],
"symbol": data["symbol"],
"balance": float(data["balance"])
}
for data in token_holdings.values()
if data["balance"] > 0
]
async def _get_nft_holdings(self, address: str) -> Dict[str, Dict[str, Any]]:
"""Get NFT holdings for address."""
params = {
"module": "account",
"action": "tokennfttx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
nft_holdings: Dict[str, Dict[str, Any]] = {}
collections: Dict[str, List[str]] = {}
for tx in data.get("result", []):
collection_name = tx.get("tokenName", "Unknown Collection")
token_id = tx["tokenID"]
key = f"{tx['contractAddress']}_{token_id}"
if tx["to"].lower() == address.lower():
nft_holdings[key] = {
"collection": collection_name,
"token_id": token_id,
"contract": tx["contractAddress"],
"acquired_time": tx["timeStamp"]
}
if collection_name not in collections:
collections[collection_name] = []
collections[collection_name].append(token_id)
elif tx["from"].lower() == address.lower():
nft_holdings.pop(key, None)
if collection_name in collections and token_id in collections[collection_name]:
collections[collection_name].remove(token_id)
return {
name: {
"count": len(tokens),
"token_ids": tokens
}
for name, tokens in collections.items()
if tokens # Only include collections with tokens
}
class ChatInterface:
"""Handles chat interaction using OpenAI API."""
def __init__(self, openai_key: str, etherscan_key: str):
"""Initialize chat interface with API keys."""
self.openai_key = openai_key
self.etherscan_key = etherscan_key
self.context: Dict[str, Any] = {}
openai.api_key = openai_key
@staticmethod
def _validate_api_keys(openai_key: str, etherscan_key: str) -> Tuple[bool, str]:
"""Validate both API keys."""
try:
# Validate OpenAI key
client = openai.OpenAI(api_key=openai_key)
client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[{"role": "user", "content": "test"}],
max_tokens=1
)
# Validate Etherscan key
async def validate_etherscan():
async with WalletAnalyzer(etherscan_key) as analyzer:
params = {"module": "stats", "action": "ethsupply"}
await analyzer._fetch_data(params)
asyncio.run(validate_etherscan())
return True, "Both API keys are valid! πŸŽ‰"
except Exception as e:
return False, f"API key validation failed: {str(e)}"
def _format_context_message(self) -> str:
"""Format wallet data as context message."""
if not self.context:
return ""
context_msg = ["Current Wallet Data:\n"]
for addr, data in self.context.items():
context_msg.extend([
f"Wallet {addr[:8]}...{addr[-6:]}:",
f"- ETH Balance: {data['eth_balance']:.4f} ETH",
f"- Tokens: {len(data['tokens'])}"
])
if data['tokens']:
context_msg.append(" Token Holdings:")
for token in data['tokens']:
context_msg.append(
f" * {token['name']} ({token['symbol']}): {token['balance']}"
)
if data['nft_collections']:
context_msg.append(" NFT Collections:")
for name, info in data['nft_collections'].items():
context_msg.append(f" * {name}: {info['count']} NFTs")
if info['count'] <= 5:
context_msg.append(
f" Token IDs: {', '.join(map(str, info['token_ids']))}"
)
return "\n".join(context_msg)
async def process_message(
self,
message: str,
history: Optional[ChatHistory] = None
) -> Tuple[ChatHistory, Dict[str, Any], str]:
"""Process user message and generate response."""
if not message.strip():
return history or [], self.context, ""
history = history or []
# Check for Ethereum address
match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
if match:
try:
address = match.group(0)
async with WalletAnalyzer(self.etherscan_key) as analyzer:
wallet_data = await analyzer.get_portfolio_data(address)
self.context[address] = wallet_data
summary = [
f"πŸ“Š Portfolio Summary for {address[:8]}...{address[-6:]}",
f"πŸ’Ž ETH Balance: {wallet_data['eth_balance']:.4f} ETH",
f"πŸͺ™ Tokens: {len(wallet_data['tokens'])} different tokens"
]
total_nfts = sum(
coll['count'] for coll in wallet_data['nft_collections'].values())
summary.append(
f"🎨 NFTs: {total_nfts} NFTs in {len(wallet_data['nft_collections'])} collections"
)
bot_message = "\n".join(summary)
history.append((message, bot_message))
return history, self.context, ""
except Exception as e:
logger.error(f"Error analyzing wallet: {e}")
error_message = f"Error analyzing wallet: {str(e)}"
history.append((message, error_message))
return history, self.context, ""
# Generate response using OpenAI
try:
# Format context message
context_msg = self._format_context_message()
# Convert history to OpenAI format
chat_history = []
for user_msg, assistant_msg in history[-Config.HISTORY_LIMIT:]:
chat_history.extend([
{"role": "user", "content": user_msg},
{"role": "assistant", "content": assistant_msg}
])
# Create OpenAI client and generate response
client = openai.OpenAI(api_key=self.openai_key)
response = client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[
{"role": "system", "content": Config.SYSTEM_PROMPT},
{"role": "system", "content": context_msg},
*chat_history,
{"role": "user", "content": message}
],
temperature=Config.TEMPERATURE,
max_tokens=Config.MAX_TOKENS
)
bot_message = response.choices[0].message.content
history.append((message, bot_message))
return history, self.context, ""
except Exception as e:
logger.error(f"Error generating response: {e}")
error_message = f"Error generating response: {str(e)}"
history.append((message, error_message))
return history, self.context, ""
def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
"""Clear the wallet context and chat history."""
self.context = {}
return {}, []
class GradioInterface:
"""Handles Gradio web interface setup and interactions."""
def __init__(self):
"""Initialize Gradio interface."""
self.chat_interface: Optional[ChatInterface] = None
self.demo = self._create_interface()
def _create_interface(self) -> gr.Blocks:
"""Create and configure Gradio interface."""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ• LOSS DOG: Blockchain Wallet Analyzer
Welcome to LOSS DOG - Your friendly blockchain analysis companion!
- Enter your API keys below to get started
- Input an Ethereum wallet address to analyze
- Chat about your wallet contents with context!
""")
# API Keys Section
with gr.Row():
with gr.Column():
openai_key = gr.Textbox(
label="OpenAI API Key",
type="password",
placeholder="Enter your OpenAI API key..."
)
with gr.Column():
etherscan_key = gr.Textbox(
label="Etherscan API Key",
type="password",
placeholder="Enter your Etherscan API key..."
)
validation_status = gr.Textbox(
label="Validation Status",
interactive=False
)
validate_btn = gr.Button("Validate API Keys", variant="primary")
# Main Interface
with gr.Row():
# Chat Area (Left Side)
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Chat History",
height=500,
value=[]
)
with gr.Row():
msg_input = gr.Textbox(
label="Message",
placeholder="Enter wallet address or ask about holdings...",
show_label=True
)
send_btn = gr.Button("Send", variant="primary")
# Context Sidebar (Right Side)
with gr.Column(scale=1):
wallet_context = gr.JSON(
label="Active Wallet Context",
show_label=True,
value={}
)
clear_btn = gr.Button("Clear Context", variant="secondary")
# Initial States
msg_input.interactive = False
send_btn.interactive = False
# Event Handlers
def validate_keys(openai_k: str, etherscan_k: str) -> Tuple[str, gr.update, gr.update]:
"""Validate API keys and initialize chat interface."""
is_valid, message = ChatInterface._validate_api_keys(openai_k, etherscan_k)
if is_valid:
self.chat_interface = ChatInterface(openai_k, etherscan_k)
return (
"βœ… API keys validated! You can start chatting now.",
gr.update(interactive=True),
gr.update(interactive=True)
)
return (
f"❌ Validation failed: {message}",
gr.update(interactive=False),
gr.update(interactive=False)
)
async def handle_message(
message: str,
chat_history: List[Tuple[str, str]],
context: Dict[str, Any]
) -> Tuple[List[Tuple[str, str]], Dict[str, Any]]:
"""Handle incoming messages."""
if not self.chat_interface:
return [], {}
try:
history, new_context, _ = await self.chat_interface.process_message(
message,
chat_history
)
return history, new_context
except Exception as e:
logger.error(f"Error handling message: {e}")
if chat_history is None:
chat_history = []
chat_history.append((message, f"Error: {str(e)}"))
return chat_history, context
# Connect Event Handlers
validate_btn.click(
fn=validate_keys,
inputs=[openai_key, etherscan_key],
outputs=[validation_status, msg_input, send_btn]
)
if self.chat_interface:
clear_btn.click(
fn=self.chat_interface.clear_context,
inputs=[],
outputs=[wallet_context, chatbot]
)
# Message Handling
msg_input.submit(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
send_btn.click(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
return demo
def launch(self, **kwargs):
"""Launch the Gradio interface."""
self.demo.queue()
self.demo.launch(**kwargs)
def main():
"""Main entry point for the application."""
try:
# Load configuration
config = Config.load("config.json")
# Initialize and launch interface
interface = GradioInterface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True
)
except Exception as e:
logger.error(f"Application startup failed: {e}")
raise
if __name__ == "__main__":
main()