jzou19950715's picture
Update app.py
aa4bdc8 verified
raw
history blame
26.2 kB
"""
Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
This module provides a complete implementation of a blockchain wallet analysis tool
with a Gradio web interface. It includes wallet analysis, NFT tracking,
interactive chat capabilities via the OpenAI API,
and now NFT image rendering from OpenSea (with user-provided API key).
Author: Claude
Date: January 2025
"""
from __future__ import annotations
import os
import re
import json
import time
import logging
import asyncio
import base64
from typing import List, Dict, Tuple, Any, Optional, TypeVar
from datetime import datetime
from decimal import Decimal
from dataclasses import dataclass
from pathlib import Path
from io import BytesIO
from PIL import Image
import aiohttp
import openai
import gradio as gr
from tenacity import retry, stop_after_attempt, wait_exponential
# Type variables
T = TypeVar('T')
WalletData = Dict[str, Any]
ChatHistory = List[Tuple[str, str]]
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('blockchain_analyzer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ConfigError(Exception):
"""Raised when there's an error in configuration."""
pass
class APIError(Exception):
"""Raised when there's an error in API calls."""
pass
class ValidationError(Exception):
"""Raised when there's an error in input validation."""
pass
@dataclass
class Config:
"""Application configuration settings."""
SYSTEM_PROMPT: str = """
You are LOSS DOG πŸ• (Learning & Observing Smart Systems Digital Output Generator),
an adorable blockchain-sniffing puppy!
Your personality:
- Friendly and enthusiastic
- Explain findings in fun, simple ways
Instructions:
- You have access to detailed wallet data in your context
- Use this data to provide specific answers about holdings
- Reference exact numbers and collections when discussing NFTs
- Compare wallets if multiple are available
"""
ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
RATE_LIMIT_DELAY: float = 0.2 # 5 requests/sec for free tier
MAX_RETRIES: int = 3
OPENAI_MODEL: str = "gpt-4o-mini" # GPT model name
MAX_TOKENS: int = 4000
TEMPERATURE: float = 0.7
HISTORY_LIMIT: int = 5
class WalletAnalyzer:
"""Analyzes Ethereum wallet contents using Etherscan API."""
def __init__(self, api_key: str):
"""Initialize the analyzer with API key."""
self.api_key = api_key
self.base_url = Config.ETHERSCAN_BASE_URL
self.last_request_time = 0
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self) -> WalletAnalyzer:
"""Create aiohttp session on context manager enter."""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Close aiohttp session on context manager exit."""
if self.session:
await self.session.close()
self.session = None
@retry(stop=stop_after_attempt(Config.MAX_RETRIES), wait=wait_exponential(multiplier=1, min=4, max=10))
async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
"""Fetch data from Etherscan with retry logic."""
if not self.session:
raise APIError("No active session. Use context manager.")
await self._rate_limit()
params["apikey"] = self.api_key
try:
async with self.session.get(self.base_url, params=params) as response:
if response.status != 200:
raise APIError(f"API request failed: {response.status}")
data = await response.json()
if data.get("status") == "0":
err_msg = data.get("message", "Unknown Etherscan error")
if "Max rate limit reached" in err_msg:
raise APIError("Rate limit exceeded")
raise APIError(f"API error: {err_msg}")
return data
except aiohttp.ClientError as e:
raise APIError(f"Network error: {str(e)}")
except Exception as e:
raise APIError(f"Unexpected error: {str(e)}")
async def _rate_limit(self) -> None:
"""Simple rate limit for Etherscan free tier requests."""
now = time.time()
diff = now - self.last_request_time
if diff < Config.RATE_LIMIT_DELAY:
await asyncio.sleep(Config.RATE_LIMIT_DELAY - diff)
self.last_request_time = time.time()
@staticmethod
def _validate_address(address: str) -> bool:
"""Validate Ethereum address format."""
return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
async def get_portfolio_data(self, address: str) -> WalletData:
"""
Get entire portfolio data:
- ETH balance
- ERC-20 tokens
- NFT collections (with contract + token_ids)
"""
if not self._validate_address(address):
raise ValidationError(f"Invalid Ethereum address: {address}")
logger.info(f"Fetching portfolio data for {address}")
eth_balance = await self._get_eth_balance(address)
token_holdings = await self._get_token_holdings(address)
nft_collections = await self._get_nft_holdings(address)
return {
"address": address,
"last_updated": datetime.now().isoformat(),
"eth_balance": float(eth_balance),
"tokens": token_holdings,
"nft_collections": nft_collections
}
async def _get_eth_balance(self, address: str) -> Decimal:
"""Get ETH balance for address."""
params = {
"module": "account",
"action": "balance",
"address": address,
"tag": "latest"
}
data = await self._fetch_data(params)
return Decimal(data["result"]) / Decimal("1e18")
async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
"""
Retrieve ERC-20 token balances for the address by
summing inbound/outbound transactions.
"""
params = {
"module": "account",
"action": "tokentx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
token_map: Dict[str, Dict[str, Any]] = {}
for tx in data.get("result", []):
caddr = tx["contractAddress"]
if caddr not in token_map:
token_map[caddr] = {
"name": tx["tokenName"],
"symbol": tx["tokenSymbol"],
"decimals": int(tx["tokenDecimal"]),
"balance": Decimal(0)
}
amount = Decimal(tx["value"]) / Decimal(10 ** token_map[caddr]["decimals"])
if tx["to"].lower() == address.lower():
token_map[caddr]["balance"] += amount
elif tx["from"].lower() == address.lower():
token_map[caddr]["balance"] -= amount
return [
{
"name": v["name"],
"symbol": v["symbol"],
"balance": float(v["balance"])
}
for v in token_map.values() if v["balance"] > 0
]
async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
"""
Retrieve NFT holdings for address.
We store them in a structure that includes contract addresses + token_ids
so we can fetch images from OpenSea easily.
Example structure:
{
"collections": [
{
"collection_name": "Bored Ape Yacht Club",
"items": [
{ "contract": "0xbc4ca0e...", "token_id": "1234" },
...
]
},
...
]
}
"""
params = {
"module": "account",
"action": "tokennfttx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
if data.get("status") != "1" or "result" not in data:
return {"collections": []}
# We'll track them by (collection_name -> list of NFT {contract, token_id})
coll_map: Dict[str, List[Dict[str, str]]] = {}
# We'll also track what's currently owned
ownership = {}
for tx in data["result"]:
contract = tx["contractAddress"]
coll_name = tx.get("tokenName", "Unknown Collection")
token_id = tx["tokenID"]
key = f"{contract}_{token_id}"
# If received
if tx["to"].lower() == address.lower():
ownership[key] = {
"contract": contract,
"collection_name": coll_name,
"token_id": token_id
}
# If sent out
elif tx["from"].lower() == address.lower():
if key in ownership:
ownership.pop(key, None)
# Group by collection
for entry in ownership.values():
coll = entry["collection_name"]
if coll not in coll_map:
coll_map[coll] = []
coll_map[coll].append({
"contract": entry["contract"],
"token_id": entry["token_id"]
})
# Convert to list form
collections_out = []
for c_name, items in coll_map.items():
collections_out.append({
"collection_name": c_name,
"items": items
})
return {"collections": collections_out}
#########################
# OPENSEA IMAGE FETCHING
#########################
OPENSEA_API_BASE = "https://api.opensea.io/api/v2/chain/ethereum/contract"
async def fetch_nft_metadata(opensea_key: str, contract: str, token_id: str) -> Dict[str, Any]:
"""
Fetch NFT metadata (name, image_url) from OpenSea.
Returns { "name": ..., "image_url": ... } or { "error": ... }
"""
url = f"{OPENSEA_API_BASE}/{contract}/nfts/{token_id}"
headers = {"X-API-KEY": opensea_key} if opensea_key else {}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 403:
return {"error": "403 Forbidden: Invalid or restricted OpenSea API key."}
if response.status == 404:
return {"error": f"404 Not Found: {contract} #{token_id}"}
try:
data = await response.json()
except Exception as e:
return {"error": f"Invalid JSON from OpenSea: {str(e)}"}
# parse data
nft_obj = data.get("nft", {})
name = nft_obj.get("name", f"NFT #{token_id}")
image_url = nft_obj.get("image_url", "")
return {"name": name, "image_url": image_url}
def convert_image_to_base64(image_bytes: bytes) -> str:
"""
Convert raw image bytes to a base64 data URI that Gradio can display.
"""
try:
img = Image.open(BytesIO(image_bytes))
buffer = BytesIO()
img.save(buffer, format="PNG")
b64_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{b64_str}"
except Exception as e:
logger.error(f"Error converting image to base64: {e}")
return ""
class ChatInterface:
"""Handles the chat + wallet + NFT images integration."""
def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
self.openai_key = openai_key
self.etherscan_key = etherscan_key
self.opensea_key = opensea_key
self.context: Dict[str, Any] = {}
openai.api_key = openai_key
@staticmethod
def _validate_api_keys(openai_key: str, etherscan_key: str, opensea_key: str) -> Tuple[bool, str]:
"""
Validate OpenAI + Etherscan keys with sample calls,
and check if OpenSea key is non-empty.
"""
try:
# Check OpenAI
client = openai.OpenAI(api_key=openai_key)
client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[{"role": "user", "content": "test"}],
max_tokens=1
)
# Check Etherscan
async def check_etherscan():
async with WalletAnalyzer(etherscan_key) as analyzer:
params = {"module": "stats", "action": "ethsupply"}
await analyzer._fetch_data(params)
asyncio.run(check_etherscan())
if not opensea_key.strip():
return False, "OpenSea API key cannot be empty."
return True, "All API keys are valid!"
except Exception as e:
return False, f"API key validation failed: {str(e)}"
def _format_context_message(self) -> str:
"""Format the wallet data in the system prompt for GPT context."""
lines = []
if not self.context:
return ""
lines.append("Current Wallet Data:\n")
for addr, data in self.context.items():
lines.append(f"Wallet {addr[:8]}...{addr[-6:]}")
lines.append(f" ETH Balance: {data['eth_balance']:.4f}")
lines.append(f" ERC-20 Tokens: {len(data['tokens'])}")
# NFT aggregator
coll_struct = data["nft_collections"]
if "collections" in coll_struct:
lines.append(" NFT Collections:")
for col in coll_struct["collections"]:
lines.append(f" * {col['collection_name']}: {len(col['items'])} NFT(s)")
return "\n".join(lines)
async def process_message(
self,
message: str,
history: Optional[ChatHistory] = None
) -> Tuple[ChatHistory, Dict[str, Any], List[str]]:
"""
1) Detect Ethereum address
2) Fetch wallet data from Etherscan
3) For each NFT, fetch from OpenSea + convert to base64
4) Return image data + chat response
"""
if history is None:
history = []
if not message.strip():
return history, self.context, []
# Attempt address detection
match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
nft_images_base64: List[str] = []
if match:
eth_address = match.group(0)
partial_info = f"Analyzing {eth_address}..."
history.append((message, partial_info))
try:
# Grab wallet data
async with WalletAnalyzer(self.etherscan_key) as analyzer:
wallet_data = await analyzer.get_portfolio_data(eth_address)
self.context[eth_address] = wallet_data
# Summaries
eth_bal = wallet_data["eth_balance"]
token_count = len(wallet_data["tokens"])
nft_data = wallet_data["nft_collections"]
# Summarize
lines = [
f"πŸ“Š Summary for {eth_address[:8]}...{eth_address[-6:]}",
f"ETH: {eth_bal:.4f}",
f"Tokens: {token_count}"
]
total_nft_count = 0
# If we have "collections"
if "collections" in nft_data:
for col in nft_data["collections"]:
total_nft_count += len(col["items"])
lines.append(f"NFTs: {total_nft_count}")
# Append summary
history.append((message, "\n".join(lines)))
# Fetch NFT images (limit for demonstration)
# For each collection, let's do up to 2 NFTs
# We'll do 2 collections max as well, to avoid heavy rate usage
if "collections" in nft_data:
for col in nft_data["collections"][:2]:
for nft_obj in col["items"][:2]:
contract = nft_obj["contract"]
token_id = nft_obj["token_id"]
# 1) fetch metadata from OpenSea
meta = await fetch_nft_metadata(self.opensea_key, contract, token_id)
if "error" in meta:
logger.warning(f"Failed to fetch NFT: {meta['error']}")
continue
image_url = meta["image_url"]
if not image_url:
logger.info(f"No image for NFT {contract}#{token_id}")
continue
# 2) Download image & convert to base64
async with aiohttp.ClientSession() as session:
try:
async with session.get(image_url) as resp:
if resp.status == 200:
raw_img = await resp.read()
img_b64 = convert_image_to_base64(raw_img)
if img_b64:
nft_images_base64.append(img_b64)
# Also reflect in chat
found_msg = f"Found NFT: {meta['name']} (Contract {contract[:8]}...{contract[-6:]}, ID {token_id})"
history.append((message, found_msg))
else:
logger.warning(f"Image fetch failed: {resp.status}")
except Exception as e:
logger.error(f"Error fetching NFT image: {e}")
except Exception as e:
err_msg = f"Error analyzing wallet: {str(e)}"
logger.error(err_msg)
history.append((message, err_msg))
# Generate response from GPT
try:
context_str = self._format_context_message()
# Convert local chat history -> OpenAI format
limit = Config.HISTORY_LIMIT
truncated = history[-limit:]
openai_msgs = []
for umsg, amsg in truncated:
openai_msgs.append({"role": "user", "content": umsg})
openai_msgs.append({"role": "assistant", "content": amsg})
openai.api_key = self.openai_key
client = openai.OpenAI(api_key=self.openai_key)
response = client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[
{"role": "system", "content": Config.SYSTEM_PROMPT},
{"role": "system", "content": context_str},
*openai_msgs,
{"role": "user", "content": message}
],
temperature=Config.TEMPERATURE,
max_tokens=Config.MAX_TOKENS
)
final_answer = response.choices[0].message.content
history.append((message, final_answer))
# Return new chat, new context, and the collected base64 images
return history, self.context, nft_images_base64
except Exception as e:
logger.error(f"OpenAI Error: {e}")
err_resp = f"OpenAI error: {str(e)}"
history.append((message, err_resp))
return history, self.context, []
def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
"""Clear wallet context + chat."""
self.context = {}
return {}, []
class GradioInterface:
"""Manages the Gradio UI for Hugging Face Space, with NFT images top-right."""
def __init__(self):
self.chat_interface: Optional[ChatInterface] = None
self.demo = self._create_interface()
def _create_interface(self) -> gr.Blocks:
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ• LOSS DOG: Blockchain Wallet Analyzer (NFT Images at Top-Right)
**Instructions**:
- Enter your **OpenAI**, **Etherscan**, and **OpenSea** API keys below.
- Validate them, then chat with your Ethereum wallet address.
- NFT images (if any) will appear as **base64** images in the top-right gallery.
""")
with gr.Row():
openai_key = gr.Textbox(
label="OpenAI API Key",
type="password",
placeholder="Enter your OpenAI API key..."
)
etherscan_key = gr.Textbox(
label="Etherscan API Key",
type="password",
placeholder="Enter your Etherscan API key..."
)
opensea_key = gr.Textbox(
label="OpenSea API Key",
type="password",
placeholder="Enter your OpenSea API key..."
)
validate_status = gr.Textbox(label="Validation Status", interactive=False)
validate_btn = gr.Button("Validate API Keys")
with gr.Row():
# Main Chat Column
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Chat History",
height=420,
value=[]
)
with gr.Row():
msg_input = gr.Textbox(
label="Message",
placeholder="Enter an Ethereum address or question..."
)
send_btn = gr.Button("Send")
# NFT Gallery on Top-Right
with gr.Column(scale=1):
nft_gallery = gr.Gallery(
label="NFT Images (Top-Right)",
columns=2,
show_label=True
)
# Then wallet context below
wallet_context = gr.JSON(
label="Active Wallet Context",
value={}
)
clear_btn = gr.Button("Clear Context")
# Initially disable chat until keys are validated
msg_input.interactive = False
send_btn.interactive = False
# Validation function
def validate_keys(openai_k: str, etherscan_k: str, opensea_k: str):
is_valid, message = ChatInterface._validate_api_keys(openai_k, etherscan_k, opensea_k)
if is_valid:
self.chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k)
return (
f"βœ… {message}",
gr.update(interactive=True),
gr.update(interactive=True)
)
else:
return (
f"❌ {message}",
gr.update(interactive=False),
gr.update(interactive=False)
)
validate_btn.click(
fn=validate_keys,
inputs=[openai_key, etherscan_key, opensea_key],
outputs=[validate_status, msg_input, send_btn]
)
# Clear context
def clear_all():
if self.chat_interface:
return self.chat_interface.clear_context()
return {}, []
clear_btn.click(
fn=clear_all,
inputs=[],
outputs=[wallet_context, chatbot]
)
# Async callback to handle chat
async def handle_message(
message: str,
chat_hist: List[Tuple[str, str]],
context: Dict[str, Any]
) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[str]]:
"""Process user message. Return updated chat, context, base64 images."""
if not self.chat_interface:
return [], {}, []
try:
new_hist, new_ctx, nft_imgs_b64 = await self.chat_interface.process_message(message, chat_hist)
return new_hist, new_ctx, nft_imgs_b64
except Exception as e:
logger.error(f"Error in handle_message: {e}")
chat_hist.append((message, f"Error: {str(e)}"))
return chat_hist, context, []
# Submit callback
msg_input.submit(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context, nft_gallery]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
# Send button callback
send_btn.click(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context, nft_gallery]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
return demo
def launch(self):
self.demo.queue()
self.demo.launch()
def main():
"""
Main entry point for the Hugging Face Space.
"""
logger.info("Launching LOSS DOG with NFT image display (top-right).")
interface = GradioInterface()
interface.launch()
if __name__ == "__main__":
main()