jzou19950715 commited on
Commit
c166129
·
verified ·
1 Parent(s): e025681

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +355 -922
app.py CHANGED
@@ -1,12 +1,10 @@
1
  """
2
- Enhanced Blockchain Wallet Analyzer with Persistent State Management
3
- and Improved NFT Interactions
4
 
5
- This version includes:
6
- - Persistent state management for wallet data
7
- - Enhanced NFT tracking and caching
8
- - Improved chat context management
9
- - Better error handling and recovery
10
 
11
  Author: Claude
12
  Date: January 2025
@@ -20,28 +18,24 @@ import json
20
  import time
21
  import logging
22
  import asyncio
23
- import pickle
24
  import base64
25
- from typing import List, Dict, Tuple, Any, Optional, TypeVar, Set
26
- from datetime import datetime, timedelta
27
  from decimal import Decimal
28
- from dataclasses import dataclass, asdict
29
  from pathlib import Path
30
  from io import BytesIO
31
- from threading import Lock
32
 
33
  import aiohttp
34
  import openai
35
  import gradio as gr
36
  from PIL import Image
37
  from tenacity import retry, stop_after_attempt, wait_exponential
38
- from cachetools import TTLCache, LRUCache
39
 
40
  # Type variables
41
  T = TypeVar('T')
42
  WalletData = Dict[str, Any]
43
  ChatHistory = List[Tuple[str, str]]
44
- NFTData = Dict[str, Any]
45
 
46
  # Configure logging
47
  logging.basicConfig(
@@ -54,7 +48,6 @@ logging.basicConfig(
54
  )
55
  logger = logging.getLogger(__name__)
56
 
57
- # Custom Exceptions
58
  class ConfigError(Exception):
59
  """Raised when there's an error in configuration."""
60
  pass
@@ -67,10 +60,6 @@ class ValidationError(Exception):
67
  """Raised when there's an error in input validation."""
68
  pass
69
 
70
- class CacheError(Exception):
71
- """Raised when there's an error with the cache."""
72
- pass
73
-
74
  @dataclass
75
  class Config:
76
  """Application configuration settings."""
@@ -80,170 +69,32 @@ class Config:
80
  Your personality:
81
  - Friendly and enthusiastic
82
  - Explain findings in fun, simple ways
83
- - Track and remember NFTs across conversations
84
- - Maintain context about wallets and collections
85
 
86
  Instructions:
87
- - Use cached wallet data when available
88
- - Reference specific NFTs by name and ID
89
- - Compare wallets and collections over time
90
- - Highlight interesting patterns or changes
91
  """
92
  ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
93
  ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
94
  RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
95
  MAX_RETRIES: int = 3
96
- OPENAI_MODEL: str = "gpt-4o-mini" # Updated model
97
  MAX_TOKENS: int = 4000
98
  TEMPERATURE: float = 0.7
99
- HISTORY_LIMIT: int = 10 # Increased history
100
- CACHE_DIR: str = ".blockchain_cache"
101
- NFT_CACHE_TTL: int = 3600 # 1 hour
102
- WALLET_CACHE_TTL: int = 1800 # 30 minutes
103
- MAX_CHAT_SESSIONS: int = 50
104
- MAX_NFTS_PER_COLLECTION: int = 5 # Increased from 2
105
-
106
- @dataclass
107
- class NFTMetadata:
108
- """Stores NFT metadata with cache management."""
109
- contract: str
110
- token_id: str
111
- name: str
112
- image_url: str
113
- collection_name: str
114
- last_updated: datetime
115
- image_data: Optional[bytes] = None
116
-
117
- def to_dict(self) -> Dict[str, Any]:
118
- """Convert to dictionary for serialization."""
119
- return {
120
- "contract": self.contract,
121
- "token_id": self.token_id,
122
- "name": self.name,
123
- "image_url": self.image_url,
124
- "collection_name": self.collection_name,
125
- "last_updated": self.last_updated.isoformat(),
126
- "image_data": base64.b64encode(self.image_data).decode() if self.image_data else None
127
- }
128
-
129
- @classmethod
130
- def from_dict(cls, data: Dict[str, Any]) -> 'NFTMetadata':
131
- """Create from dictionary after deserialization."""
132
- if data.get("image_data"):
133
- data["image_data"] = base64.b64decode(data["image_data"])
134
- data["last_updated"] = datetime.fromisoformat(data["last_updated"])
135
- return cls(**data)
136
-
137
- class StateManager:
138
- """Manages persistent state for the application."""
139
-
140
- def __init__(self, cache_dir: str = Config.CACHE_DIR):
141
- self.cache_dir = Path(cache_dir)
142
- self.cache_dir.mkdir(exist_ok=True)
143
-
144
- # In-memory caches with TTL
145
- self.nft_cache = TTLCache(maxsize=1000, ttl=Config.NFT_CACHE_TTL)
146
- self.wallet_cache = TTLCache(maxsize=100, ttl=Config.WALLET_CACHE_TTL)
147
- self.chat_history_cache = LRUCache(maxsize=Config.MAX_CHAT_SESSIONS)
148
-
149
- # Thread safety for cache operations
150
- self._lock = Lock()
151
-
152
- # Load existing cache data
153
- self._load_cache()
154
-
155
- def _load_cache(self):
156
- """Load cached data from disk with error handling."""
157
- try:
158
- # Load NFT cache
159
- nft_cache_file = self.cache_dir / "nft_cache.pkl"
160
- if nft_cache_file.exists():
161
- with open(nft_cache_file, "rb") as f:
162
- cached_data = pickle.load(f)
163
- now = datetime.now()
164
- for key, value in cached_data.items():
165
- if isinstance(value, NFTMetadata):
166
- if now - value.last_updated < timedelta(hours=1):
167
- self.nft_cache[key] = value
168
-
169
- # Load wallet cache
170
- wallet_cache_file = self.cache_dir / "wallet_cache.pkl"
171
- if wallet_cache_file.exists():
172
- with open(wallet_cache_file, "rb") as f:
173
- cached_data = pickle.load(f)
174
- now = datetime.now()
175
- for key, value in cached_data.items():
176
- if now - value.get("last_updated", now) < timedelta(minutes=30):
177
- self.wallet_cache[key] = value
178
- except Exception as e:
179
- logger.error(f"Error loading cache: {e}")
180
- # Continue with empty cache if load fails
181
-
182
- def save_cache(self):
183
- """Save current cache to disk with error handling."""
184
- try:
185
- with self._lock:
186
- # Save NFT cache
187
- nft_cache_file = self.cache_dir / "nft_cache.pkl"
188
- with open(nft_cache_file, "wb") as f:
189
- pickle.dump(dict(self.nft_cache), f)
190
-
191
- # Save wallet cache
192
- wallet_cache_file = self.cache_dir / "wallet_cache.pkl"
193
- with open(wallet_cache_file, "wb") as f:
194
- pickle.dump(dict(self.wallet_cache), f)
195
- except Exception as e:
196
- logger.error(f"Error saving cache: {e}")
197
- raise CacheError(f"Failed to save cache: {str(e)}")
198
-
199
- def get_nft_metadata(self, contract: str, token_id: str) -> Optional[NFTMetadata]:
200
- """Get NFT metadata from cache if available."""
201
- cache_key = f"{contract}_{token_id}"
202
- return self.nft_cache.get(cache_key)
203
-
204
- def store_nft_metadata(self, metadata: NFTMetadata):
205
- """Store NFT metadata in cache with immediate disk sync."""
206
- cache_key = f"{metadata.contract}_{metadata.token_id}"
207
- with self._lock:
208
- self.nft_cache[cache_key] = metadata
209
- self.save_cache()
210
-
211
- def get_wallet_data(self, address: str) -> Optional[WalletData]:
212
- """Get wallet data from cache if available."""
213
- return self.wallet_cache.get(address.lower())
214
-
215
- def store_wallet_data(self, address: str, data: WalletData):
216
- """Store wallet data in cache with timestamp."""
217
- with self._lock:
218
- data["last_updated"] = datetime.now()
219
- self.wallet_cache[address.lower()] = data
220
- self.save_cache()
221
-
222
- def get_chat_history(self, session_id: str) -> ChatHistory:
223
- """Get chat history for a session."""
224
- return self.chat_history_cache.get(session_id, [])
225
-
226
- def store_chat_history(self, session_id: str, history: ChatHistory):
227
- """Store chat history for a session."""
228
- with self._lock:
229
- self.chat_history_cache[session_id] = history
230
-
231
- def clear_session(self, session_id: str):
232
- """Clear all data for a specific session."""
233
- with self._lock:
234
- if session_id in self.chat_history_cache:
235
- del self.chat_history_cache[session_id]
236
 
237
  class WalletAnalyzer:
238
- """Analyzes Ethereum wallet contents with improved state management."""
239
 
240
- def __init__(self, api_key: str, state_manager: StateManager):
241
  self.api_key = api_key
242
- self.state_manager = state_manager
243
  self.base_url = Config.ETHERSCAN_BASE_URL
244
  self.session: Optional[aiohttp.ClientSession] = None
245
  self.last_request_time = 0
246
- self.active_collections: Set[str] = set()
247
 
248
  async def __aenter__(self) -> WalletAnalyzer:
249
  self.session = aiohttp.ClientSession()
@@ -259,125 +110,78 @@ class WalletAnalyzer:
259
  wait=wait_exponential(multiplier=1, min=4, max=10)
260
  )
261
  async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
262
- """Fetch data from Etherscan with enhanced error handling."""
263
  if not self.session:
264
  raise APIError("No active session. Use context manager.")
265
 
266
  await self._rate_limit()
267
  params["apikey"] = self.api_key
268
-
269
  try:
270
  async with self.session.get(self.base_url, params=params) as response:
271
- if response.status == 429: # Too Many Requests
272
- raise APIError("Rate limit exceeded. Please wait.")
273
  if response.status != 200:
274
  raise APIError(f"Etherscan request failed: {response.status}")
275
-
276
  data = await response.json()
277
  if data.get("status") == "0":
278
- err_msg = data.get("message", "Unknown Etherscan error")
279
- if "Max rate limit reached" in err_msg:
280
  raise APIError("Etherscan rate limit exceeded")
281
- if "Invalid API Key" in err_msg:
282
- raise ConfigError("Invalid Etherscan API key")
283
- raise APIError(f"Etherscan error: {err_msg}")
284
-
285
  return data
286
-
287
  except aiohttp.ClientError as e:
288
  raise APIError(f"Network error: {str(e)}")
289
- except json.JSONDecodeError as e:
290
- raise APIError(f"Invalid JSON response: {str(e)}")
291
  except Exception as e:
292
  raise APIError(f"Unexpected error: {str(e)}")
293
 
294
  async def _rate_limit(self) -> None:
295
- """Enhanced rate limiting with dynamic backoff."""
296
  now = time.time()
297
  diff = now - self.last_request_time
298
  if diff < Config.RATE_LIMIT_DELAY:
299
- delay = Config.RATE_LIMIT_DELAY - diff
300
- logger.debug(f"Rate limiting: waiting {delay:.2f}s")
301
- await asyncio.sleep(delay)
302
  self.last_request_time = time.time()
303
 
304
  @staticmethod
305
  def _validate_address(address: str) -> bool:
306
- """Validate Ethereum address format with checksum."""
307
- if not re.match(Config.ETHEREUM_ADDRESS_REGEX, address):
308
- return False
309
- # Additional validation could be added here
310
- return True
311
 
312
  async def get_portfolio_data(self, address: str) -> WalletData:
313
  """
314
- Get comprehensive wallet portfolio data with caching.
315
-
316
- Returns a dictionary with:
317
  - address
318
  - last_updated
319
  - eth_balance
320
  - tokens (list of ERC-20)
321
  - nft_collections (with contract & token_id)
322
- - analytics (additional metrics)
323
  """
324
  if not self._validate_address(address):
325
  raise ValidationError(f"Invalid Ethereum address: {address}")
326
 
327
- # Check cache first
328
- cached_data = self.state_manager.get_wallet_data(address)
329
- if cached_data:
330
- logger.info(f"Using cached data for {address}")
331
- return cached_data
332
 
333
- logger.info(f"Fetching fresh portfolio data for {address}")
334
-
335
- try:
336
- # Parallel fetching of data
337
- eth_balance, tokens, nft_colls = await asyncio.gather(
338
- self._get_eth_balance(address),
339
- self._get_token_holdings(address),
340
- self._get_nft_holdings(address)
341
- )
342
-
343
- # Calculate additional analytics
344
- analytics = self._calculate_analytics(eth_balance, tokens, nft_colls)
345
-
346
- portfolio_data = {
347
- "address": address,
348
- "last_updated": datetime.now().isoformat(),
349
- "eth_balance": float(eth_balance),
350
- "tokens": tokens,
351
- "nft_collections": nft_colls,
352
- "analytics": analytics
353
- }
354
-
355
- # Cache the results
356
- self.state_manager.store_wallet_data(address, portfolio_data)
357
-
358
- return portfolio_data
359
-
360
- except Exception as e:
361
- logger.error(f"Error fetching portfolio data: {e}")
362
- raise
363
 
364
  async def _get_eth_balance(self, address: str) -> Decimal:
365
- """Get ETH balance with proper decimal handling."""
366
  params = {
367
  "module": "account",
368
  "action": "balance",
369
  "address": address,
370
  "tag": "latest"
371
  }
372
- try:
373
- data = await self._fetch_data(params)
374
- balance_wei = Decimal(data["result"])
375
- return balance_wei / Decimal("1e18") # Convert to ETH
376
- except (KeyError, ValueError) as e:
377
- raise APIError(f"Invalid balance data: {str(e)}")
378
 
379
  async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
380
- """Get ERC-20 tokens with improved tracking."""
381
  params = {
382
  "module": "account",
383
  "action": "tokentx",
@@ -387,52 +191,34 @@ class WalletAnalyzer:
387
  data = await self._fetch_data(params)
388
 
389
  token_map: Dict[str, Dict[str, Any]] = {}
390
- try:
391
- for tx in data.get("result", []):
392
- contract = tx["contractAddress"]
393
- if contract not in token_map:
394
- token_map[contract] = {
395
- "name": tx["tokenName"],
396
- "symbol": tx["tokenSymbol"],
397
- "decimals": int(tx["tokenDecimal"]),
398
- "balance": Decimal(0),
399
- "last_tx": datetime.fromtimestamp(int(tx["timeStamp"]))
400
- }
401
-
402
- amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
403
- if tx["to"].lower() == address.lower():
404
- token_map[contract]["balance"] += amount
405
- else:
406
- token_map[contract]["balance"] -= amount
407
-
408
- # Filter and format results
409
- return [
410
- {
411
- "name": v["name"],
412
- "symbol": v["symbol"],
413
- "balance": float(v["balance"]),
414
- "last_activity": v["last_tx"].isoformat()
415
  }
416
- for v in token_map.values()
417
- if v["balance"] > 0
418
- ]
419
-
420
- except Exception as e:
421
- logger.error(f"Error processing token data: {e}")
422
- raise APIError(f"Token processing error: {str(e)}")
 
 
 
 
 
 
 
423
 
424
  async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
425
  """
426
- Enhanced NFT holdings tracking with collection stats.
427
- Returns {
428
- "collections": [
429
- {
430
- "collection_name": str,
431
- "items": [{"contract": str, "token_id": str}],
432
- "stats": {"count": int, "first_acquired": datetime}
433
- }
434
- ]
435
- }
436
  """
437
  params = {
438
  "module": "account",
@@ -445,718 +231,375 @@ class WalletAnalyzer:
445
  if data.get("status") != "1" or "result" not in data:
446
  return {"collections": []}
447
 
448
- try:
449
- # Track ownership and collection stats
450
- ownership_map = {} # (contract, token_id) -> ownership_data
451
- collection_stats = {} # collection_name -> stats
452
-
453
- for tx in data["result"]:
454
- contract = tx["contractAddress"]
455
- coll_name = tx.get("tokenName", "Unknown Collection")
456
- token_id = tx["tokenID"]
457
- timestamp = datetime.fromtimestamp(int(tx["timeStamp"]))
458
- key = (contract, token_id)
459
-
460
- # Update collection stats
461
- if coll_name not in collection_stats:
462
- collection_stats[coll_name] = {
463
- "count": 0,
464
- "first_acquired": timestamp,
465
- "last_activity": timestamp
466
- }
467
-
468
- if tx["to"].lower() == address.lower():
469
- ownership_map[key] = {
470
- "contract": contract,
471
- "token_id": token_id,
472
- "collection_name": coll_name,
473
- "acquired": timestamp
474
- }
475
- collection_stats[coll_name]["count"] += 1
476
- if timestamp < collection_stats[coll_name]["first_acquired"]:
477
- collection_stats[coll_name]["first_acquired"] = timestamp
478
- elif tx["from"].lower() == address.lower():
479
- if key in ownership_map:
480
- del ownership_map[key]
481
- collection_stats[coll_name]["count"] -= 1
482
-
483
- collection_stats[coll_name]["last_activity"] = timestamp
484
-
485
- # Format output
486
- collections_out = []
487
- for cname, stats in collection_stats.items():
488
- if stats["count"] > 0:
489
- items = [
490
- {"contract": data["contract"], "token_id": data["token_id"]}
491
- for data in ownership_map.values()
492
- if data["collection_name"] == cname
493
- ]
494
- collections_out.append({
495
- "collection_name": cname,
496
- "items": items,
497
- "stats": {
498
- "count": stats["count"],
499
- "first_acquired": stats["first_acquired"].isoformat(),
500
- "last_activity": stats["last_activity"].isoformat()
501
- }
502
- })
503
-
504
- return {"collections": collections_out}
505
 
506
- except Exception as e:
507
- logger.error(f"Error processing NFT data: {e}")
508
- raise APIError(f"NFT processing error: {str(e)}")
 
509
 
510
- def _calculate_analytics(
511
- self,
512
- eth_balance: Decimal,
513
- tokens: List[Dict[str, Any]],
514
- nft_collections: Dict[str, Any]
515
- ) -> Dict[str, Any]:
516
- """Calculate additional portfolio analytics."""
517
- total_nfts = sum(
518
- len(c["items"])
519
- for c in nft_collections.get("collections", [])
520
- )
521
-
522
- token_count = len(tokens)
523
- collection_count = len(nft_collections.get("collections", []))
524
-
525
- return {
526
- "total_assets": {
527
- "eth": float(eth_balance),
528
- "tokens": token_count,
529
- "nfts": total_nfts,
530
- "collections": collection_count
531
- },
532
- "last_calculated": datetime.now().isoformat()
533
- }
534
 
535
- class NFTProcessor:
536
- """Enhanced NFT processing with caching and rate limiting."""
537
 
538
- def __init__(self, opensea_key: str, state_manager: StateManager):
 
 
539
  self.opensea_key = opensea_key
540
- self.state_manager = state_manager
541
- self.session: Optional[aiohttp.ClientSession] = None
542
- self.last_request_time = 0
543
- self.rate_limit_delay = 0.25 # 4 requests per second for OpenSea
544
- self._lock = Lock()
545
-
546
- async def __aenter__(self) -> 'NFTProcessor':
547
- self.session = aiohttp.ClientSession()
548
- return self
549
-
550
- async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
551
- if self.session:
552
- await self.session.close()
553
- self.session = None
554
-
555
- async def _rate_limit(self) -> None:
556
- """OpenSea-specific rate limiting."""
557
- now = time.time()
558
- async with self._lock:
559
- diff = now - self.last_request_time
560
- if diff < self.rate_limit_delay:
561
- await asyncio.sleep(self.rate_limit_delay - diff)
562
- self.last_request_time = time.time()
563
 
564
- @retry(
565
- stop=stop_after_attempt(3),
566
- wait=wait_exponential(multiplier=1, min=4, max=10)
567
- )
568
- async def fetch_nft_metadata(
569
- self,
570
- contract: str,
571
- token_id: str,
572
- collection_name: str
573
- ) -> Optional[NFTMetadata]:
574
  """
575
- Fetch NFT metadata from OpenSea with caching and retry logic.
576
- Returns cached data if available and fresh.
577
  """
578
- # Check cache first
579
- cached = self.state_manager.get_nft_metadata(contract, token_id)
580
- if cached and (datetime.now() - cached.last_updated < timedelta(hours=1)):
581
- logger.debug(f"Using cached metadata for {contract}_{token_id}")
582
- return cached
583
-
584
- if not self.session:
585
- raise APIError("No active session. Use context manager.")
586
-
587
- await self._rate_limit()
588
-
589
- url = f"https://api.opensea.io/api/v2/chain/ethereum/contract/{contract}/nfts/{token_id}"
590
- headers = {"X-API-KEY": self.opensea_key} if self.opensea_key else {}
591
-
592
  try:
593
- async with self.session.get(url, headers=headers) as resp:
594
- if resp.status == 403:
595
- raise APIError("OpenSea API key invalid or expired")
596
- if resp.status == 404:
597
- logger.warning(f"NFT not found: {contract} #{token_id}")
598
- return None
599
- if resp.status == 429:
600
- raise APIError("OpenSea rate limit exceeded")
601
- if resp.status != 200:
602
- raise APIError(f"OpenSea API error: {resp.status}")
603
-
604
- data = await resp.json()
605
-
606
- nft_obj = data.get("nft", {})
607
- if not nft_obj:
608
- logger.warning(f"Empty NFT data for {contract} #{token_id}")
609
- return None
610
-
611
- # Create metadata object
612
- metadata = NFTMetadata(
613
- contract=contract,
614
- token_id=token_id,
615
- name=nft_obj.get("name", f"NFT #{token_id}"),
616
- image_url=nft_obj.get("image_url", ""),
617
- collection_name=collection_name,
618
- last_updated=datetime.now()
619
  )
620
-
621
- # Fetch and store image if available
622
- if metadata.image_url:
623
- image_data = await self._fetch_and_process_image(metadata.image_url)
624
- metadata.image_data = image_data
625
-
626
- # Cache the metadata
627
- self.state_manager.store_nft_metadata(metadata)
628
-
629
- return metadata
630
-
631
- except Exception as e:
632
- logger.error(f"Error fetching NFT metadata: {e}")
633
- return None
634
 
635
- async def _fetch_and_process_image(self, url: str) -> Optional[bytes]:
636
- """
637
- Fetch and process NFT image with enhanced error handling.
638
- Returns image bytes suitable for PIL conversion.
639
- """
640
- if not url:
641
- return None
642
-
643
- try:
644
- async with self.session.get(url) as resp:
645
- if resp.status != 200:
646
- logger.warning(f"Failed to fetch image: {url} ({resp.status})")
647
- return None
648
-
649
- content_type = resp.headers.get("content-type", "")
650
- if not content_type.startswith(("image/", "application/octet-stream")):
651
- logger.warning(f"Invalid content type: {content_type}")
652
- return None
653
-
654
- raw_bytes = await resp.read()
655
-
656
- # Process with PIL to validate and optimize
657
- try:
658
- img = Image.open(BytesIO(raw_bytes))
659
-
660
- # Convert to RGB if necessary
661
- if img.mode not in ("RGB", "RGBA"):
662
- img = img.convert("RGB")
663
-
664
- # Resize if too large while maintaining aspect ratio
665
- max_size = (800, 800)
666
- if img.size[0] > max_size[0] or img.size[1] > max_size[1]:
667
- img.thumbnail(max_size, Image.Resampling.LANCZOS)
668
-
669
- # Save as PNG
670
- output = BytesIO()
671
- img.save(output, format="PNG", optimize=True)
672
- return output.getvalue()
673
-
674
- except Exception as e:
675
- logger.error(f"Error processing image: {e}")
676
- return None
677
-
678
- except Exception as e:
679
- logger.error(f"Error fetching image: {e}")
680
- return None
681
-
682
- async def batch_process_nfts(
683
- self,
684
- nft_collections: List[Dict[str, Any]],
685
- max_per_collection: int = Config.MAX_NFTS_PER_COLLECTION
686
- ) -> List[NFTMetadata]:
687
- """
688
- Process multiple NFTs in parallel with rate limiting.
689
- Returns list of successfully processed NFT metadata.
690
- """
691
- all_tasks = []
692
- processed_nfts = []
693
-
694
- async with self: # Use context manager for session
695
- for collection in nft_collections:
696
- coll_name = collection["collection_name"]
697
- items = collection["items"][:max_per_collection] # Limit items per collection
698
-
699
- # Create tasks for parallel processing
700
- tasks = [
701
- self.fetch_nft_metadata(
702
- item["contract"],
703
- item["token_id"],
704
- coll_name
705
- )
706
- for item in items
707
- ]
708
-
709
- all_tasks.extend(tasks)
710
-
711
- # Process in batches to respect rate limits
712
- batch_size = 4 # Process 4 at a time
713
- for i in range(0, len(tasks), batch_size):
714
- batch = tasks[i:i + batch_size]
715
- results = await asyncio.gather(*batch, return_exceptions=True)
716
-
717
- # Filter successful results
718
- for result in results:
719
- if isinstance(result, NFTMetadata):
720
- processed_nfts.append(result)
721
- elif isinstance(result, Exception):
722
- logger.error(f"Error processing NFT: {result}")
723
-
724
- return processed_nfts
725
 
726
- def get_cached_images(self, nft_list: List[NFTMetadata]) -> List[Image.Image]:
727
- """Convert cached image data to PIL Images."""
728
- images = []
729
- for nft in nft_list:
730
- if nft.image_data:
731
- try:
732
- img = Image.open(BytesIO(nft.image_data))
733
- images.append(img)
734
- except Exception as e:
735
- logger.error(f"Error converting image data: {e}")
736
- return images
737
 
738
- class ChatInterface:
739
- """Enhanced chat interface with persistent state and improved NFT handling."""
740
-
741
- def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
742
- self.openai_key = openai_key
743
- self.etherscan_key = etherscan_key
744
- self.opensea_key = opensea_key
745
- self.state_manager = StateManager()
746
- self.nft_processor = None
747
- self.current_session_id = str(int(time.time()))
748
-
749
- # Track conversation context
750
- self.active_wallets: Set[str] = set()
751
- self.active_nfts: Set[str] = set() # contract_tokenid
752
- self.mentioned_collections: Set[str] = set()
753
-
754
- # Initialize OpenAI client
755
- openai.api_key = openai_key
756
- self.client = openai.OpenAI(api_key=openai_key)
757
 
758
  def _format_context_message(self) -> str:
759
- """Format enhanced context for GPT prompt."""
760
  lines = []
761
-
762
- # Active wallets section
763
- if self.active_wallets:
764
- lines.append("\nActive Wallets:")
765
- for addr in self.active_wallets:
766
- wallet_data = self.state_manager.get_wallet_data(addr)
767
- if wallet_data:
768
- lines.append(f"\nWallet {addr[:8]}...{addr[-6:]}:")
769
- lines.append(f"- ETH Balance: {wallet_data['eth_balance']:.4f}")
770
- lines.append(f"- Token Count: {len(wallet_data['tokens'])}")
771
-
772
- # Token details
773
- if wallet_data['tokens']:
774
- lines.append("- Notable Tokens:")
775
- for token in wallet_data['tokens'][:3]: # Top 3 tokens
776
- lines.append(f" * {token['name']} ({token['symbol']}): {token['balance']}")
777
-
778
- # NFT Collections
779
- nft_data = wallet_data["nft_collections"]
780
- if "collections" in nft_data and nft_data["collections"]:
781
- lines.append("- NFT Collections:")
782
- for coll in nft_data["collections"]:
783
- count = len(coll["items"])
784
- lines.append(f" * {coll['collection_name']}: {count} NFT(s)")
785
- if "stats" in coll:
786
- lines.append(f" First acquired: {coll['stats']['first_acquired']}")
787
-
788
- # Active NFTs section
789
- if self.active_nfts:
790
- lines.append("\nRecently Discussed NFTs:")
791
- for nft_key in list(self.active_nfts)[-5:]: # Last 5 NFTs
792
- contract, token_id = nft_key.split("_")
793
- nft_meta = self.state_manager.get_nft_metadata(contract, token_id)
794
- if nft_meta:
795
- lines.append(f"- {nft_meta.name} from {nft_meta.collection_name}")
796
-
797
  return "\n".join(lines)
798
 
799
- def _prepare_chat_messages(
800
- self,
801
- user_message: str,
802
- history: List[Tuple[str, str]]
803
- ) -> List[Dict[str, str]]:
804
- """Prepare messages for OpenAI chat completion."""
805
- messages = [
806
- {
807
- "role": "system",
808
- "content": Config.SYSTEM_PROMPT
809
- },
810
- {
811
- "role": "system",
812
- "content": self._format_context_message()
813
- }
814
- ]
815
-
816
- # Add relevant history with context
817
- limit = Config.HISTORY_LIMIT
818
- for usr, ast in history[-limit:]:
819
- messages.append({"role": "user", "content": usr})
820
- messages.append({"role": "assistant", "content": ast})
821
-
822
- # Add current message
823
- messages.append({"role": "user", "content": user_message})
824
-
825
- return messages
826
-
827
  async def process_message(
828
  self,
829
  message: str,
830
  history: Optional[ChatHistory] = None
831
  ) -> Tuple[ChatHistory, Dict[str, Any], List[Image.Image]]:
832
  """
833
- Enhanced message processing with better context management:
834
- 1) Track wallet addresses and NFTs mentioned
835
- 2) Maintain conversation context
836
- 3) Use cached data when appropriate
837
- 4) Return relevant NFT images
838
  """
839
- history = self.state_manager.get_chat_history(self.current_session_id) if history is None else history
840
-
 
841
  if not message.strip():
842
- return history, {}, []
843
 
844
- try:
845
- # Initialize NFT processor if needed
846
- if self.nft_processor is None:
847
- self.nft_processor = NFTProcessor(self.opensea_key, self.state_manager)
 
 
 
 
848
 
849
- # Look for Ethereum addresses
850
- eth_addresses = re.findall(Config.ETHEREUM_ADDRESS_REGEX, message)
851
- nft_images: List[Image.Image] = []
852
-
853
- # Process new addresses
854
- for eth_address in eth_addresses:
855
- if eth_address not in self.active_wallets:
856
- # Add to tracking
857
- self.active_wallets.add(eth_address)
858
-
859
- # Fetch wallet data
860
- try:
861
- partial_msg = f"📊 Analyzing {eth_address[:8]}...{eth_address[-6:]}"
862
- history.append((message, partial_msg))
863
-
864
- async with WalletAnalyzer(self.etherscan_key, self.state_manager) as analyzer:
865
- wallet_data = await analyzer.get_portfolio_data(eth_address)
866
-
867
- # Process NFTs for new wallet
868
- if "collections" in wallet_data["nft_collections"]:
869
- collections = wallet_data["nft_collections"]["collections"]
870
- if collections:
871
- # Process NFTs in parallel
872
- processed_nfts = await self.nft_processor.batch_process_nfts(
873
- collections,
874
- max_per_collection=Config.MAX_NFTS_PER_COLLECTION
875
- )
876
-
877
- # Update tracking
878
- for nft in processed_nfts:
879
- self.active_nfts.add(f"{nft.contract}_{nft.token_id}")
880
- self.mentioned_collections.add(nft.collection_name)
881
-
882
- # Get images for display
883
- nft_images.extend(
884
- self.nft_processor.get_cached_images(processed_nfts)
885
- )
886
-
887
- # Generate wallet summary
888
- summary = [
889
- f"📊 Summary for {eth_address[:8]}...{eth_address[-6:]}",
890
- f"ETH: {wallet_data['eth_balance']:.4f}",
891
- f"Tokens: {len(wallet_data['tokens'])}",
892
- f"NFTs: {sum(len(c['items']) for c in wallet_data['nft_collections'].get('collections', []))}",
893
- "\nNotable Collections:"
894
- ]
895
-
896
- for coll in wallet_data["nft_collections"].get("collections", [])[:3]:
897
- summary.append(f"- {coll['collection_name']}: {len(coll['items'])} items")
898
-
899
- history.append((message, "\n".join(summary)))
900
-
901
- except Exception as e:
902
- err_msg = f"Error analyzing {eth_address}: {str(e)}"
903
- logger.error(err_msg)
904
- history.append((message, err_msg))
905
-
906
- # Prepare and send chat completion
907
  try:
908
- messages = self._prepare_chat_messages(message, history)
909
-
910
- response = await openai.chat.completions.acreate(
911
- model=Config.OPENAI_MODEL,
912
- messages=messages,
913
- temperature=Config.TEMPERATURE,
914
- max_tokens=Config.MAX_TOKENS
915
- )
916
-
917
- ai_message = response.choices[0].message.content
918
- history.append((message, ai_message))
919
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
920
  except Exception as e:
921
- logger.error(f"OpenAI error: {e}")
922
- error_msg = f"Chat error: {str(e)}"
923
- history.append((message, error_msg))
924
 
925
- # Update chat history in state manager
926
- self.state_manager.store_chat_history(self.current_session_id, history)
927
-
928
- # Prepare context for response
929
- context = {
930
- "active_wallets": list(self.active_wallets),
931
- "active_nfts": list(self.active_nfts),
932
- "mentioned_collections": list(self.mentioned_collections)
933
- }
934
-
935
- return history, context, nft_images
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
936
 
937
  except Exception as e:
938
- logger.error(f"Error in process_message: {e}")
939
- error_msg = f"Error processing message: {str(e)}"
940
- history.append((message, error_msg))
941
- return history, {}, []
942
 
943
  def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
944
- """Clear all conversation context and tracking."""
945
- self.active_wallets.clear()
946
- self.active_nfts.clear()
947
- self.mentioned_collections.clear()
948
- self.state_manager.clear_session(self.current_session_id)
949
  return {}, []
 
950
 
951
  class GradioInterface:
952
- """Enhanced Gradio UI with improved state management and user experience."""
953
 
954
  def __init__(self):
955
  self.chat_interface: Optional[ChatInterface] = None
956
  self.demo = self._create_interface()
957
 
958
  def _create_interface(self) -> gr.Blocks:
959
- """Create enhanced Gradio interface with better state visualization."""
960
-
961
- with gr.Blocks(
962
- theme=gr.themes.Soft(),
963
- css="""
964
- .container { max-width: 1200px; margin: auto; }
965
- .status-box { padding: 10px; border-radius: 5px; }
966
- .success { background-color: #d4edda; color: #155724; }
967
- .error { background-color: #f8d7da; color: #721c24; }
968
- """
969
- ) as demo:
970
  gr.Markdown("""
971
- # 🐕 LOSS DOG: Enhanced Blockchain Wallet Analyzer
972
 
973
- ### Features:
974
- - **Smart Context**: Maintains conversation memory about wallets and NFTs
975
- - **Caching**: Faster responses for previously seen data
976
- - **Enhanced NFT Display**: View NFT images with metadata
977
- - **Intelligent Analysis**: Tracks patterns across wallets
978
  """)
979
 
980
  with gr.Row():
981
- with gr.Column(scale=1):
982
- # API Configuration
983
- with gr.Group(label="API Configuration"):
984
- openai_key = gr.Textbox(
985
- label="OpenAI API Key",
986
- type="password",
987
- placeholder="Enter your OpenAI API key...",
988
- container=False
989
- )
990
- etherscan_key = gr.Textbox(
991
- label="Etherscan API Key",
992
- type="password",
993
- placeholder="Enter your Etherscan API key...",
994
- container=False
995
- )
996
- opensea_key = gr.Textbox(
997
- label="OpenSea API Key",
998
- type="password",
999
- placeholder="Enter your OpenSea API key...",
1000
- container=False
1001
- )
1002
-
1003
- # Validation Status
1004
- validation_status = gr.Textbox(
1005
- label="Status",
1006
- interactive=False,
1007
- container=True
1008
- )
1009
- validate_btn = gr.Button(
1010
- "🔑 Validate API Keys",
1011
- variant="primary",
1012
- size="sm"
1013
- )
1014
 
1015
  with gr.Row():
1016
- # Main Chat Area
1017
  with gr.Column(scale=2):
1018
- chatbot = gr.Chatbot(
1019
- label="Conversation",
1020
- height=500,
1021
- value=[],
1022
- elem_classes="chatbot"
1023
- )
1024
-
1025
  with gr.Row():
1026
  msg_input = gr.Textbox(
1027
  label="Message",
1028
- placeholder="Enter an ETH address or ask about NFTs...",
1029
- scale=4
1030
- )
1031
- send_btn = gr.Button(
1032
- "🚀 Send",
1033
- variant="primary",
1034
- scale=1
1035
- )
1036
- clear_btn = gr.Button(
1037
- "🔄 Clear",
1038
- variant="secondary",
1039
- scale=1
1040
  )
1041
-
1042
- # Right Column: NFTs and Context
1043
  with gr.Column(scale=1):
1044
- # NFT Gallery
1045
  nft_gallery = gr.Gallery(
1046
- label="📸 NFT Gallery",
1047
- columns=2,
1048
- height=300,
1049
- object_fit="contain"
1050
  )
1051
-
1052
- # Context Display
1053
- with gr.Accordion("🔍 Active Context", open=True):
1054
- wallet_context = gr.JSON(
1055
- label="Tracked Data",
1056
- value={}
1057
- )
1058
-
1059
- context_info = gr.Markdown("""
1060
- - **Wallets**: Currently tracked addresses
1061
- - **NFTs**: Recently discussed tokens
1062
- - **Collections**: Active collections
1063
- """)
1064
-
1065
- # Footer
1066
- gr.Markdown("""
1067
- ---
1068
- ### 📝 Usage Tips:
1069
- 1. Enter your API keys and validate
1070
- 2. Type an Ethereum address to analyze
1071
- 3. Ask about specific NFTs or collections
1072
- 4. Images appear in the gallery automatically
1073
- """)
1074
 
1075
- # Initialize state
1076
  msg_input.interactive = False
1077
  send_btn.interactive = False
1078
 
1079
- def validate_keys(
1080
- openai_k: str,
1081
- etherscan_k: str,
1082
- opensea_k: str
1083
- ) -> Tuple[str, gr.update, gr.update]:
1084
- """Validate API keys with enhanced feedback."""
1085
- try:
1086
- is_valid, msg = ChatInterface._validate_api_keys(
1087
- openai_k, etherscan_k, opensea_k
1088
  )
1089
- if is_valid:
1090
- self.chat_interface = ChatInterface(
1091
- openai_k, etherscan_k, opensea_k
1092
- )
1093
- return (
1094
- "✅ All API keys validated successfully!",
1095
- gr.update(interactive=True),
1096
- gr.update(interactive=True)
1097
- )
1098
- else:
1099
- return (
1100
- f"❌ Validation failed: {msg}",
1101
- gr.update(interactive=False),
1102
- gr.update(interactive=False)
1103
- )
1104
- except Exception as e:
1105
  return (
1106
- f"❌ Error during validation: {str(e)}",
1107
  gr.update(interactive=False),
1108
  gr.update(interactive=False)
1109
  )
1110
 
1111
- def clear_all() -> Tuple[Dict[str, Any], List[Tuple[str, str]], List[Image.Image]]:
1112
- """Clear all state with visual feedback."""
 
 
 
 
 
 
1113
  if self.chat_interface:
1114
- context, history = self.chat_interface.clear_context()
1115
- return context, history, []
1116
- return {}, [], []
 
 
 
 
 
1117
 
1118
  async def handle_message(
1119
  message: str,
1120
  chat_hist: List[Tuple[str, str]],
1121
  context: Dict[str, Any]
1122
  ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
1123
- """Process messages with enhanced error handling."""
1124
  if not self.chat_interface:
1125
  return [], {}, []
1126
 
1127
  try:
1128
- # Show typing indicator
1129
- yield [*chat_hist, (message, "🤔 Processing...")], context, []
1130
-
1131
- # Process message
1132
- new_history, new_context, images = await self.chat_interface.process_message(
1133
- message,
1134
- chat_hist
1135
- )
1136
-
1137
  return new_history, new_context, images
1138
-
1139
  except Exception as e:
1140
  logger.error(f"Error in handle_message: {e}")
1141
- error_msg = f"🔴 Error: {str(e)}"
1142
  if chat_hist is None:
1143
  chat_hist = []
1144
- return [*chat_hist, (message, error_msg)], context, []
 
1145
 
1146
- # Wire up events
1147
- validate_btn.click(
1148
- fn=validate_keys,
1149
- inputs=[openai_key, etherscan_key, opensea_key],
1150
- outputs=[validation_status, msg_input, send_btn]
1151
- )
1152
-
1153
- clear_btn.click(
1154
- fn=clear_all,
1155
- inputs=[],
1156
- outputs=[wallet_context, chatbot, nft_gallery]
1157
- )
1158
-
1159
- # Message handling
1160
  msg_input.submit(
1161
  fn=handle_message,
1162
  inputs=[msg_input, chatbot, wallet_context],
@@ -1179,25 +622,15 @@ class GradioInterface:
1179
 
1180
  return demo
1181
 
1182
- def launch(self, **kwargs):
1183
- """Launch the interface with configurable options."""
1184
- self.demo.queue(concurrency_count=3)
1185
- self.demo.launch(**kwargs)
1186
 
1187
  def main():
1188
- """Enhanced main entry point with error handling."""
1189
- try:
1190
- logger.info("Launching Enhanced LOSS DOG Blockchain Analyzer")
1191
- interface = GradioInterface()
1192
- interface.launch(
1193
- share=False,
1194
- server_name="0.0.0.0",
1195
- server_port=7860,
1196
- show_error=True
1197
- )
1198
- except Exception as e:
1199
- logger.error(f"Failed to launch interface: {e}")
1200
- raise
1201
 
1202
  if __name__ == "__main__":
1203
  main()
 
1
  """
2
+ Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
 
3
 
4
+ This module provides a complete implementation of a blockchain wallet analysis tool
5
+ with a Gradio web interface. It includes wallet analysis, NFT tracking,
6
+ interactive chat capabilities using the OpenAI API,
7
+ and NFT image rendering from OpenSea via PIL objects.
 
8
 
9
  Author: Claude
10
  Date: January 2025
 
18
  import time
19
  import logging
20
  import asyncio
 
21
  import base64
22
+ from typing import List, Dict, Tuple, Any, Optional, TypeVar
23
+ from datetime import datetime
24
  from decimal import Decimal
25
+ from dataclasses import dataclass
26
  from pathlib import Path
27
  from io import BytesIO
 
28
 
29
  import aiohttp
30
  import openai
31
  import gradio as gr
32
  from PIL import Image
33
  from tenacity import retry, stop_after_attempt, wait_exponential
 
34
 
35
  # Type variables
36
  T = TypeVar('T')
37
  WalletData = Dict[str, Any]
38
  ChatHistory = List[Tuple[str, str]]
 
39
 
40
  # Configure logging
41
  logging.basicConfig(
 
48
  )
49
  logger = logging.getLogger(__name__)
50
 
 
51
  class ConfigError(Exception):
52
  """Raised when there's an error in configuration."""
53
  pass
 
60
  """Raised when there's an error in input validation."""
61
  pass
62
 
 
 
 
 
63
  @dataclass
64
  class Config:
65
  """Application configuration settings."""
 
69
  Your personality:
70
  - Friendly and enthusiastic
71
  - Explain findings in fun, simple ways
72
+ - Provide NFT images from OpenSea when possible
 
73
 
74
  Instructions:
75
+ - You have access to detailed wallet data in your context
76
+ - Use this data to provide specific answers about holdings
77
+ - Reference exact numbers and collections when discussing NFTs
78
+ - Compare wallets if multiple are available
79
  """
80
  ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
81
  ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
82
  RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
83
  MAX_RETRIES: int = 3
84
+ OPENAI_MODEL: str = "gpt-4o-mini" # Example updated model
85
  MAX_TOKENS: int = 4000
86
  TEMPERATURE: float = 0.7
87
+ HISTORY_LIMIT: int = 5
88
+ #### Part 2: Wallet Analyzer (Etherscan)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  class WalletAnalyzer:
91
+ """Analyzes Ethereum wallet contents using Etherscan API."""
92
 
93
+ def __init__(self, api_key: str):
94
  self.api_key = api_key
 
95
  self.base_url = Config.ETHERSCAN_BASE_URL
96
  self.session: Optional[aiohttp.ClientSession] = None
97
  self.last_request_time = 0
 
98
 
99
  async def __aenter__(self) -> WalletAnalyzer:
100
  self.session = aiohttp.ClientSession()
 
110
  wait=wait_exponential(multiplier=1, min=4, max=10)
111
  )
112
  async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
113
+ """Fetch data from Etherscan with retry logic."""
114
  if not self.session:
115
  raise APIError("No active session. Use context manager.")
116
 
117
  await self._rate_limit()
118
  params["apikey"] = self.api_key
 
119
  try:
120
  async with self.session.get(self.base_url, params=params) as response:
 
 
121
  if response.status != 200:
122
  raise APIError(f"Etherscan request failed: {response.status}")
 
123
  data = await response.json()
124
  if data.get("status") == "0":
125
+ err = data.get("message", "Unknown Etherscan error")
126
+ if "Max rate limit reached" in err:
127
  raise APIError("Etherscan rate limit exceeded")
128
+ raise APIError(f"Etherscan error: {err}")
 
 
 
129
  return data
 
130
  except aiohttp.ClientError as e:
131
  raise APIError(f"Network error: {str(e)}")
 
 
132
  except Exception as e:
133
  raise APIError(f"Unexpected error: {str(e)}")
134
 
135
  async def _rate_limit(self) -> None:
136
+ """Simple rate limiting for Etherscan free tier."""
137
  now = time.time()
138
  diff = now - self.last_request_time
139
  if diff < Config.RATE_LIMIT_DELAY:
140
+ await asyncio.sleep(Config.RATE_LIMIT_DELAY - diff)
 
 
141
  self.last_request_time = time.time()
142
 
143
  @staticmethod
144
  def _validate_address(address: str) -> bool:
145
+ """Validate Ethereum address format."""
146
+ return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
 
 
 
147
 
148
  async def get_portfolio_data(self, address: str) -> WalletData:
149
  """
150
+ Return a dictionary with:
 
 
151
  - address
152
  - last_updated
153
  - eth_balance
154
  - tokens (list of ERC-20)
155
  - nft_collections (with contract & token_id)
 
156
  """
157
  if not self._validate_address(address):
158
  raise ValidationError(f"Invalid Ethereum address: {address}")
159
 
160
+ logger.info(f"Fetching portfolio data for {address}")
161
+ eth_balance = await self._get_eth_balance(address)
162
+ tokens = await self._get_token_holdings(address)
163
+ nft_colls = await self._get_nft_holdings(address)
 
164
 
165
+ return {
166
+ "address": address,
167
+ "last_updated": datetime.now().isoformat(),
168
+ "eth_balance": float(eth_balance),
169
+ "tokens": tokens,
170
+ "nft_collections": nft_colls
171
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
172
 
173
  async def _get_eth_balance(self, address: str) -> Decimal:
 
174
  params = {
175
  "module": "account",
176
  "action": "balance",
177
  "address": address,
178
  "tag": "latest"
179
  }
180
+ data = await self._fetch_data(params)
181
+ return Decimal(data["result"]) / Decimal("1e18")
 
 
 
 
182
 
183
  async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
184
+ """Fetch ERC-20 tokens for address."""
185
  params = {
186
  "module": "account",
187
  "action": "tokentx",
 
191
  data = await self._fetch_data(params)
192
 
193
  token_map: Dict[str, Dict[str, Any]] = {}
194
+ for tx in data.get("result", []):
195
+ contract = tx["contractAddress"]
196
+ if contract not in token_map:
197
+ token_map[contract] = {
198
+ "name": tx["tokenName"],
199
+ "symbol": tx["tokenSymbol"],
200
+ "decimals": int(tx["tokenDecimal"]),
201
+ "balance": Decimal(0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
  }
203
+ amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
204
+ if tx["to"].lower() == address.lower():
205
+ token_map[contract]["balance"] += amount
206
+ elif tx["from"].lower() == address.lower():
207
+ token_map[contract]["balance"] -= amount
208
+
209
+ return [
210
+ {
211
+ "name": v["name"],
212
+ "symbol": v["symbol"],
213
+ "balance": float(v["balance"])
214
+ }
215
+ for v in token_map.values() if v["balance"] > 0
216
+ ]
217
 
218
  async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
219
  """
220
+ Return { "collections": [ { "collection_name": str, "items": [{ contract, token_id }, ...] }, ... ] }
221
+ so we can fetch images from OpenSea by contract + token_id.
 
 
 
 
 
 
 
 
222
  """
223
  params = {
224
  "module": "account",
 
231
  if data.get("status") != "1" or "result" not in data:
232
  return {"collections": []}
233
 
234
+ # track final ownership
235
+ ownership_map = {} # key = (contract_token), value = {contract, token_id, coll_name}
236
+ for tx in data["result"]:
237
+ contract = tx["contractAddress"]
238
+ coll_name = tx.get("tokenName", "Unknown Collection")
239
+ token_id = tx["tokenID"]
240
+ key = f"{contract}_{token_id}"
241
+
242
+ if tx["to"].lower() == address.lower():
243
+ ownership_map[key] = {
244
+ "contract": contract,
245
+ "token_id": token_id,
246
+ "collection_name": coll_name
247
+ }
248
+ elif tx["from"].lower() == address.lower():
249
+ if key in ownership_map:
250
+ ownership_map.pop(key, None)
251
+
252
+ # group by collection_name
253
+ coll_dict: Dict[str, List[Dict[str, str]]] = {}
254
+ for item in ownership_map.values():
255
+ c_name = item["collection_name"]
256
+ if c_name not in coll_dict:
257
+ coll_dict[c_name] = []
258
+ coll_dict[c_name].append({
259
+ "contract": item["contract"],
260
+ "token_id": item["token_id"]
261
+ })
262
+
263
+ collections_out = []
264
+ for cname, items in coll_dict.items():
265
+ collections_out.append({
266
+ "collection_name": cname,
267
+ "items": items
268
+ })
269
+
270
+ return { "collections": collections_out }
271
+ #### Part 3: OpenSea & PIL Conversion
272
+
273
+ async def fetch_nft_metadata(opensea_key: str, contract: str, token_id: str) -> Dict[str, Any]:
274
+ """
275
+ Fetch NFT metadata (including image_url) from OpenSea v2.
276
+ Returns { "name": str, "image_url": str } or { "error": str }
277
+ """
278
+ url = f"https://api.opensea.io/api/v2/chain/ethereum/contract/{contract}/nfts/{token_id}"
279
+ headers = {"X-API-KEY": opensea_key} if opensea_key else {}
280
+ async with aiohttp.ClientSession() as session:
281
+ async with session.get(url, headers=headers) as resp:
282
+ if resp.status == 403:
283
+ return {"error": "403 Forbidden: OpenSea API key issue"}
284
+ if resp.status == 404:
285
+ return {"error": f"404 Not Found: {contract} #{token_id}"}
286
+ try:
287
+ data = await resp.json()
288
+ except Exception as e:
289
+ return {"error": f"OpenSea JSON parse error: {str(e)}"}
 
290
 
291
+ nft_obj = data.get("nft", {})
292
+ name = nft_obj.get("name", f"NFT #{token_id}")
293
+ image_url = nft_obj.get("image_url", "")
294
+ return {"name": name, "image_url": image_url}
295
 
296
+ async def fetch_image_as_pil(url: str) -> Optional[Image.Image]:
297
+ """
298
+ Download an image from a URL and return as a PIL Image.
299
+ """
300
+ if not url:
301
+ return None
302
+ async with aiohttp.ClientSession() as session:
303
+ async with session.get(url) as resp:
304
+ if resp.status != 200:
305
+ return None
306
+ raw_bytes = await resp.read()
307
+ try:
308
+ return Image.open(BytesIO(raw_bytes))
309
+ except Exception as e:
310
+ logger.error(f"Error converting to PIL: {e}")
311
+ return None
312
+ #### Part 4: ChatInterface
 
 
 
 
 
 
 
313
 
314
+ class ChatInterface:
315
+ """Handles chat logic with Etherscan (wallet data), OpenSea (NFT images), and OpenAI (chat)."""
316
 
317
+ def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
318
+ self.openai_key = openai_key
319
+ self.etherscan_key = etherscan_key
320
  self.opensea_key = opensea_key
321
+ self.context: Dict[str, Any] = {}
322
+ openai.api_key = openai_key
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
 
324
+ @staticmethod
325
+ def _validate_api_keys(openai_key: str, etherscan_key: str, opensea_key: str) -> Tuple[bool, str]:
 
 
 
 
 
 
 
 
326
  """
327
+ Validate all keys. We'll do a minimal check for OpenSea (non-empty).
 
328
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
329
  try:
330
+ # Check OpenAI
331
+ client = openai.OpenAI(api_key=openai_key)
332
+ client.chat.completions.create(
333
+ model=Config.OPENAI_MODEL,
334
+ messages=[{"role": "user", "content": "test"}],
335
+ max_tokens=1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
336
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
337
 
338
+ # Check Etherscan
339
+ async def check_etherscan():
340
+ async with WalletAnalyzer(etherscan_key) as analyzer:
341
+ params = {"module": "stats", "action": "ethsupply"}
342
+ await analyzer._fetch_data(params)
343
+ asyncio.run(check_etherscan())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
344
 
345
+ # Check OpenSea
346
+ if not opensea_key.strip():
347
+ return False, "OpenSea API key is empty!"
 
 
 
 
 
 
 
 
348
 
349
+ return True, "All API keys validated!"
350
+ except Exception as e:
351
+ return False, f"API key validation failed: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
352
 
353
  def _format_context_message(self) -> str:
354
+ """Format wallet data for GPT system prompt."""
355
  lines = []
356
+ if not self.context:
357
+ return ""
358
+ lines.append("Current Wallet Data:")
359
+ for addr, wdata in self.context.items():
360
+ lines.append(f"Wallet {addr[:8]}...{addr[-6:]}:")
361
+ lines.append(f" ETH Balance: {wdata['eth_balance']:.4f}")
362
+ lines.append(f" # of Tokens: {len(wdata['tokens'])}")
363
+ # NFT aggregator
364
+ nft_data = wdata["nft_collections"]
365
+ if "collections" in nft_data:
366
+ lines.append(" NFT Collections:")
367
+ for c in nft_data["collections"]:
368
+ lines.append(f" - {c['collection_name']}: {len(c['items'])} NFT(s)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
369
  return "\n".join(lines)
370
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
  async def process_message(
372
  self,
373
  message: str,
374
  history: Optional[ChatHistory] = None
375
  ) -> Tuple[ChatHistory, Dict[str, Any], List[Image.Image]]:
376
  """
377
+ 1) Detect Ethereum address
378
+ 2) Etherscan for wallet data
379
+ 3) For each NFT, fetch from OpenSea, convert to PIL
380
+ 4) Return images + chat response
 
381
  """
382
+ if history is None:
383
+ history = []
384
+
385
  if not message.strip():
386
+ return history, self.context, []
387
 
388
+ match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
389
+ nft_images: List[Image.Image] = []
390
+
391
+ if match:
392
+ eth_address = match.group(0)
393
+ # partial
394
+ partial_text = f"Analyzing {eth_address}..."
395
+ history.append((message, partial_text))
396
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
397
  try:
398
+ # Etherscan
399
+ async with WalletAnalyzer(self.etherscan_key) as analyzer:
400
+ wallet_data = await analyzer.get_portfolio_data(eth_address)
401
+ self.context[eth_address] = wallet_data
402
+
403
+ # Summaries
404
+ lines = [
405
+ f"📊 Summary for {eth_address[:8]}...{eth_address[-6:]}",
406
+ f"ETH: {wallet_data['eth_balance']:.4f}",
407
+ f"Tokens: {len(wallet_data['tokens'])}"
408
+ ]
409
+ nft_info = wallet_data["nft_collections"]
410
+ total_nfts = 0
411
+ if "collections" in nft_info:
412
+ for c in nft_info["collections"]:
413
+ total_nfts += len(c["items"])
414
+ lines.append(f"NFTs: {total_nfts}")
415
+ # Add to chat
416
+ history.append((message, "\n".join(lines)))
417
+
418
+ # If we want to fetch images for, say, first 2 collections, 2 items each
419
+ if "collections" in nft_info:
420
+ for coll in nft_info["collections"][:2]:
421
+ for item in coll["items"][:2]:
422
+ # 1) fetch metadata
423
+ meta = await fetch_nft_metadata(self.opensea_key, item["contract"], item["token_id"])
424
+ if "error" in meta:
425
+ logger.warning(f"OpenSea metadata error: {meta['error']}")
426
+ continue
427
+ image_url = meta["image_url"]
428
+ if not image_url:
429
+ logger.info(f"No image for {meta['name']}")
430
+ continue
431
+ # 2) Download + convert to PIL
432
+ pil_img = await fetch_image_as_pil(image_url)
433
+ if pil_img:
434
+ nft_images.append(pil_img)
435
+ # Also mention in chat
436
+ found_msg = f"Found NFT image: {meta['name']} (contract {item['contract'][:8]}...{item['contract'][-6:]}, id={item['token_id']})"
437
+ history.append((message, found_msg))
438
+
439
  except Exception as e:
440
+ err = f"Error analyzing {eth_address}: {str(e)}"
441
+ logger.error(err)
442
+ history.append((message, err))
443
 
444
+ # Now do an OpenAI chat
445
+ try:
446
+ context_str = self._format_context_message()
447
+ # Convert chat to OpenAI format
448
+ limit = Config.HISTORY_LIMIT
449
+ short_hist = history[-limit:]
450
+ openai_msgs = []
451
+ for usr, ans in short_hist:
452
+ openai_msgs.append({"role": "user", "content": usr})
453
+ openai_msgs.append({"role": "assistant", "content": ans})
454
+
455
+ # openai
456
+ openai.api_key = self.openai_key
457
+ client = openai.OpenAI(api_key=self.openai_key)
458
+ resp = client.chat.completions.create(
459
+ model=Config.OPENAI_MODEL,
460
+ messages=[
461
+ {"role": "system", "content": Config.SYSTEM_PROMPT},
462
+ {"role": "system", "content": context_str},
463
+ *openai_msgs,
464
+ {"role": "user", "content": message}
465
+ ],
466
+ temperature=Config.TEMPERATURE,
467
+ max_tokens=Config.MAX_TOKENS
468
+ )
469
+ final_msg = resp.choices[0].message.content
470
+ history.append((message, final_msg))
471
+
472
+ return history, self.context, nft_images
473
 
474
  except Exception as e:
475
+ logger.error(f"OpenAI error: {e}")
476
+ err_chat = f"OpenAI error: {str(e)}"
477
+ history.append((message, err_chat))
478
+ return history, self.context, []
479
 
480
  def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
481
+ """Clear the wallet context and chat."""
482
+ self.context = {}
 
 
 
483
  return {}, []
484
+ #### Part 5: Gradio UI & Launch
485
 
486
  class GradioInterface:
487
+ """Manages the Gradio UI for Hugging Face Space, with top-right NFT gallery as PIL images."""
488
 
489
  def __init__(self):
490
  self.chat_interface: Optional[ChatInterface] = None
491
  self.demo = self._create_interface()
492
 
493
  def _create_interface(self) -> gr.Blocks:
494
+ with gr.Blocks(theme=gr.themes.Soft()) as demo:
 
 
 
 
 
 
 
 
 
 
495
  gr.Markdown("""
496
+ # 🐕 LOSS DOG: Blockchain Wallet Analyzer (NFT Images, top-right)
497
 
498
+ - Enter your **OpenAI**, **Etherscan**, **OpenSea** keys below
499
+ - Validate, then chat with your Ethereum address
500
+ - NFT images will appear as **PIL** objects in the top-right
 
 
501
  """)
502
 
503
  with gr.Row():
504
+ openai_key = gr.Textbox(
505
+ label="OpenAI API Key",
506
+ type="password",
507
+ placeholder="Enter your OpenAI API key..."
508
+ )
509
+ etherscan_key = gr.Textbox(
510
+ label="Etherscan API Key",
511
+ type="password",
512
+ placeholder="Enter your Etherscan API key..."
513
+ )
514
+ opensea_key = gr.Textbox(
515
+ label="OpenSea API Key",
516
+ type="password",
517
+ placeholder="Enter your OpenSea API key..."
518
+ )
519
+
520
+ validation_status = gr.Textbox(label="Validation Status", interactive=False)
521
+ validate_btn = gr.Button("Validate API Keys", variant="primary")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
522
 
523
  with gr.Row():
524
+ # Main Chat Column
525
  with gr.Column(scale=2):
526
+ chatbot = gr.Chatbot(label="Chat History", height=420, value=[])
 
 
 
 
 
 
527
  with gr.Row():
528
  msg_input = gr.Textbox(
529
  label="Message",
530
+ placeholder="Enter an ETH address or question..."
 
 
 
 
 
 
 
 
 
 
 
531
  )
532
+ send_btn = gr.Button("Send", variant="primary")
533
+ # Top-Right: NFT Gallery (PIL images)
534
  with gr.Column(scale=1):
 
535
  nft_gallery = gr.Gallery(
536
+ label="NFT Images (Top-Right)",
537
+ columns=2
 
 
538
  )
539
+ wallet_context = gr.JSON(
540
+ label="Active Wallet Context",
541
+ value={}
542
+ )
543
+ clear_btn = gr.Button("Clear Context")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
544
 
 
545
  msg_input.interactive = False
546
  send_btn.interactive = False
547
 
548
+ def validate_keys(openai_k: str, etherscan_k: str, opensea_k: str) -> Tuple[str, gr.update, gr.update]:
549
+ """Validate user-provided keys; enable chat if all pass."""
550
+ is_valid, msg = ChatInterface._validate_api_keys(openai_k, etherscan_k, opensea_k)
551
+ if is_valid:
552
+ self.chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k)
553
+ return (
554
+ f"✅ {msg}",
555
+ gr.update(interactive=True),
556
+ gr.update(interactive=True)
557
  )
558
+ else:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559
  return (
560
+ f"❌ {msg}",
561
  gr.update(interactive=False),
562
  gr.update(interactive=False)
563
  )
564
 
565
+ validate_btn.click(
566
+ fn=validate_keys,
567
+ inputs=[openai_key, etherscan_key, opensea_key],
568
+ outputs=[validation_status, msg_input, send_btn]
569
+ )
570
+
571
+ def clear_all():
572
+ """Clear wallet context and chat."""
573
  if self.chat_interface:
574
+ return self.chat_interface.clear_context()
575
+ return {}, []
576
+
577
+ clear_btn.click(
578
+ fn=clear_all,
579
+ inputs=[],
580
+ outputs=[wallet_context, chatbot]
581
+ )
582
 
583
  async def handle_message(
584
  message: str,
585
  chat_hist: List[Tuple[str, str]],
586
  context: Dict[str, Any]
587
  ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
588
+ """Process user input, return updated chat, context, list of PIL images."""
589
  if not self.chat_interface:
590
  return [], {}, []
591
 
592
  try:
593
+ new_history, new_context, images = await self.chat_interface.process_message(message, chat_hist)
 
 
 
 
 
 
 
 
594
  return new_history, new_context, images
 
595
  except Exception as e:
596
  logger.error(f"Error in handle_message: {e}")
 
597
  if chat_hist is None:
598
  chat_hist = []
599
+ chat_hist.append((message, f"Error: {str(e)}"))
600
+ return chat_hist, context, []
601
 
602
+ # Chat flow
 
 
 
 
 
 
 
 
 
 
 
 
 
603
  msg_input.submit(
604
  fn=handle_message,
605
  inputs=[msg_input, chatbot, wallet_context],
 
622
 
623
  return demo
624
 
625
+ def launch(self):
626
+ self.demo.queue()
627
+ self.demo.launch()
 
628
 
629
  def main():
630
+ """Main entry point for Hugging Face Space."""
631
+ logger.info("Launching LOSS DOG with PIL-based NFT images (top-right).")
632
+ interface = GradioInterface()
633
+ interface.launch()
 
 
 
 
 
 
 
 
 
634
 
635
  if __name__ == "__main__":
636
  main()