jzou19950715 commited on
Commit
fa6db5e
·
verified ·
1 Parent(s): 180265f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +441 -748
app.py CHANGED
@@ -2,170 +2,16 @@
2
  Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
3
 
4
  This module provides a complete implementation of a blockchain wallet analysis tool
5
- with a Gradio web interface. It includes improved NFT tracking, natural AI communication,
6
- and enhanced error handling.
 
7
 
8
  Author: Claude
9
  Date: January 2025
10
  """
11
 
12
  from __future__ import annotations
13
- import os
14
- import re
15
- import json
16
- import time
17
- import logging
18
- import asyncio
19
- import base64
20
- from typing import List, Dict, Tuple, Any, Optional, TypeVar, Set
21
- from datetime import datetime
22
- from decimal import Decimal
23
- from dataclasses import dataclass, field
24
- from pathlib import Path
25
- from io import BytesIO
26
-
27
- import aiohttp
28
- import openai
29
- import gradio as gr
30
- from PIL import Image
31
- from tenacity import retry, stop_after_attempt, wait_exponential
32
-
33
- # Type variables
34
- T = TypeVar('T')
35
- WalletData = Dict[str, Any]
36
- ChatHistory = List[Tuple[str, str]]
37
- NFTIdentifier = Tuple[str, str] # (contract, token_id)
38
-
39
- # Configure logging
40
- logging.basicConfig(
41
- level=logging.INFO,
42
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
43
- handlers=[
44
- logging.FileHandler('blockchain_analyzer.log'),
45
- logging.StreamHandler()
46
- ]
47
- )
48
- logger = logging.getLogger(__name__)
49
-
50
- @dataclass
51
- class ChatContext:
52
- """Track chat context and discussed NFTs."""
53
- wallet_data: Dict[str, Any] = field(default_factory=dict)
54
- discussed_nfts: Set[str] = field(default_factory=set)
55
- last_collection: str = ""
56
-
57
- def mark_nft_discussed(self, contract: str, token_id: str) -> None:
58
- """Track which NFTs have been discussed."""
59
- self.discussed_nfts.add(f"{contract}_{token_id}")
60
-
61
- def is_nft_discussed(self, contract: str, token_id: str) -> bool:
62
- """Check if NFT has been discussed."""
63
- return f"{contract}_{token_id}" in self.discussed_nfts
64
-
65
- class BlockchainError(Exception):
66
- """Base exception for blockchain-related errors."""
67
- pass
68
-
69
- class ConfigError(BlockchainError):
70
- """Configuration error."""
71
- pass
72
-
73
- class APIError(BlockchainError):
74
- """API call error."""
75
- pass
76
-
77
- class ValidationError(BlockchainError):
78
- """Input validation error."""
79
- pass
80
-
81
- class NFTError(BlockchainError):
82
- """NFT processing error."""
83
- pass
84
-
85
- @dataclass
86
- class NFTMetadata:
87
- """Structure for NFT metadata."""
88
- contract: str
89
- token_id: str
90
- name: str
91
- collection_name: str
92
- image_url: Optional[str] = None
93
- description: Optional[str] = None
94
- traits: List[Dict[str, Any]] = field(default_factory=list)
95
- last_updated: datetime = field(default_factory=datetime.now)
96
-
97
- def to_dict(self) -> Dict[str, Any]:
98
- """Convert to dictionary format."""
99
- return {
100
- "contract": self.contract,
101
- "token_id": self.token_id,
102
- "name": self.name,
103
- "collection_name": self.collection_name,
104
- "image_url": self.image_url,
105
- "description": self.description,
106
- "traits": self.traits,
107
- "last_updated": self.last_updated.isoformat()
108
- }
109
-
110
- @dataclass
111
- class Config:
112
- """Enhanced application configuration."""
113
-
114
- # Improved system prompt for more natural responses
115
- SYSTEM_PROMPT: str = """
116
- You are LOSS DOG 🐕 (Learning & Observing Smart Systems Digital Output Generator),
117
- a friendly blockchain analysis assistant!
118
-
119
- Your conversation style:
120
- - Maintain natural dialogue flow
121
- - Avoid repetitive responses
122
- - Use context from previous messages
123
- - Track which NFTs have been discussed
124
- - Provide insights when relevant
125
-
126
- When analyzing wallets:
127
- - Reference specific NFTs by collection and ID
128
- - Highlight interesting patterns
129
- - Compare wallets if multiple available
130
- - Monitor NFT rarity and traits
131
- """
132
-
133
- # API Configuration
134
- ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
135
- OPENSEA_BASE_URL: str = "https://api.opensea.io/api/v2"
136
- ETHEREUM_ADDRESS_REGEX: str = r"^0x[a-fA-F0-9]{40}$"
137
-
138
- # Rate Limiting & Retries
139
- RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
140
- MAX_RETRIES: int = 3
141
- RETRY_BASE_DELAY: float = 1.0
142
- RETRY_MAX_DELAY: float = 10.0
143
 
144
- # API Timeouts
145
- REQUEST_TIMEOUT: float = 30.0
146
- NFT_FETCH_TIMEOUT: int = 10
147
-
148
- # NFT Processing
149
- MAX_NFTS_PER_COLLECTION: int = 50
150
- NFT_BATCH_SIZE: int = 5
151
- MAX_IMAGE_SIZE: Tuple[int, int] = (800, 800)
152
-
153
- # OpenAI Settings
154
- OPENAI_MODEL: str = "gpt-4-0125-preview"
155
- MAX_TOKENS: int = 8000
156
- TEMPERATURE: float = 0.7
157
- HISTORY_LIMIT: int = 10"""
158
- Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
159
-
160
- This module provides a complete implementation of a blockchain wallet analysis tool
161
- with a Gradio web interface. It includes improved NFT tracking, natural AI communication,
162
- and enhanced error handling.
163
-
164
- Author: Claude
165
- Date: January 2025
166
- """
167
-
168
- from __future__ import annotations
169
  import os
170
  import re
171
  import json
@@ -173,10 +19,10 @@ import time
173
  import logging
174
  import asyncio
175
  import base64
176
- from typing import List, Dict, Tuple, Any, Optional, TypeVar, Set
177
  from datetime import datetime
178
  from decimal import Decimal
179
- from dataclasses import dataclass, field
180
  from pathlib import Path
181
  from io import BytesIO
182
 
@@ -190,7 +36,6 @@ from tenacity import retry, stop_after_attempt, wait_exponential
190
  T = TypeVar('T')
191
  WalletData = Dict[str, Any]
192
  ChatHistory = List[Tuple[str, str]]
193
- NFTIdentifier = Tuple[str, str] # (contract, token_id)
194
 
195
  # Configure logging
196
  logging.basicConfig(
@@ -203,99 +48,45 @@ logging.basicConfig(
203
  )
204
  logger = logging.getLogger(__name__)
205
 
206
- class BlockchainError(Exception):
207
- """Base exception for blockchain-related errors."""
208
- pass
209
-
210
- class ConfigError(BlockchainError):
211
- """Configuration error."""
212
- pass
213
-
214
- class APIError(BlockchainError):
215
- """API call error."""
216
  pass
217
 
218
- class ValidationError(BlockchainError):
219
- """Input validation error."""
220
  pass
221
 
222
- class NFTError(BlockchainError):
223
- """NFT processing error."""
224
  pass
225
 
226
- @dataclass
227
- class NFTMetadata:
228
- """Structure for NFT metadata."""
229
- contract: str
230
- token_id: str
231
- name: str
232
- collection_name: str
233
- image_url: Optional[str] = None
234
- description: Optional[str] = None
235
- traits: List[Dict[str, Any]] = field(default_factory=list)
236
- last_updated: datetime = field(default_factory=datetime.now)
237
-
238
- def to_dict(self) -> Dict[str, Any]:
239
- """Convert to dictionary format."""
240
- return {
241
- "contract": self.contract,
242
- "token_id": self.token_id,
243
- "name": self.name,
244
- "collection_name": self.collection_name,
245
- "image_url": self.image_url,
246
- "description": self.description,
247
- "traits": self.traits,
248
- "last_updated": self.last_updated.isoformat()
249
- }
250
-
251
  @dataclass
252
  class Config:
253
- """Enhanced application configuration."""
254
-
255
- # Improved system prompt for more natural responses
256
  SYSTEM_PROMPT: str = """
257
  You are LOSS DOG 🐕 (Learning & Observing Smart Systems Digital Output Generator),
258
- a friendly blockchain analysis assistant!
259
-
260
- Your conversation style:
261
- - Maintain natural dialogue flow
262
- - Avoid repetitive responses
263
- - Use context from previous messages
264
- - Track which NFTs have been discussed
265
- - Provide insights when relevant
266
-
267
- When analyzing wallets:
268
- - Reference specific NFTs by collection and ID
269
- - Highlight interesting patterns
270
- - Compare wallets if multiple available
271
- - Monitor NFT rarity and traits
272
  """
273
-
274
- # API Configuration
275
  ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
276
- OPENSEA_BASE_URL: str = "https://api.opensea.io/api/v2"
277
- ETHEREUM_ADDRESS_REGEX: str = r"^0x[a-fA-F0-9]{40}$"
278
-
279
- # Rate Limiting & Retries
280
  RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
281
  MAX_RETRIES: int = 3
282
- RETRY_BASE_DELAY: float = 1.0
283
- RETRY_MAX_DELAY: float = 10.0
284
-
285
- # API Timeouts
286
- REQUEST_TIMEOUT: float = 30.0
287
- NFT_FETCH_TIMEOUT: int = 10
288
-
289
- # NFT Processing
290
- MAX_NFTS_PER_COLLECTION: int = 50
291
- NFT_BATCH_SIZE: int = 5
292
- MAX_IMAGE_SIZE: Tuple[int, int] = (800, 800)
293
-
294
- # OpenAI Settings
295
- OPENAI_MODEL: str = "gpt-4-0125-preview"
296
- MAX_TOKENS: int = 8000
297
  TEMPERATURE: float = 0.7
298
- HISTORY_LIMIT: int = 10
 
 
299
  class WalletAnalyzer:
300
  """Analyzes Ethereum wallet contents using Etherscan API."""
301
 
@@ -304,12 +95,9 @@ class WalletAnalyzer:
304
  self.base_url = Config.ETHERSCAN_BASE_URL
305
  self.session: Optional[aiohttp.ClientSession] = None
306
  self.last_request_time = 0
307
- self._rate_limit_semaphore = asyncio.Semaphore(1)
308
 
309
  async def __aenter__(self) -> WalletAnalyzer:
310
- self.session = aiohttp.ClientSession(
311
- timeout=aiohttp.ClientTimeout(total=Config.REQUEST_TIMEOUT)
312
- )
313
  return self
314
 
315
  async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
@@ -317,108 +105,83 @@ class WalletAnalyzer:
317
  await self.session.close()
318
  self.session = None
319
 
320
- async def _rate_limit(self) -> None:
321
- """Thread-safe rate limiting."""
322
- async with self._rate_limit_semaphore:
323
- now = time.time()
324
- time_since_last = now - self.last_request_time
325
- if time_since_last < Config.RATE_LIMIT_DELAY:
326
- await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_since_last)
327
- self.last_request_time = time.time()
328
-
329
  @retry(
330
  stop=stop_after_attempt(Config.MAX_RETRIES),
331
- wait=wait_exponential(
332
- multiplier=Config.RETRY_BASE_DELAY,
333
- min=Config.RETRY_BASE_DELAY,
334
- max=Config.RETRY_MAX_DELAY
335
- )
336
  )
337
  async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
338
- """Fetch data from Etherscan with improved error handling."""
339
  if not self.session:
340
  raise APIError("No active session. Use context manager.")
341
-
342
  await self._rate_limit()
343
  params["apikey"] = self.api_key
344
-
345
  try:
346
  async with self.session.get(self.base_url, params=params) as response:
347
- if response.status == 429:
348
- raise APIError("Rate limit exceeded")
349
  if response.status != 200:
350
  raise APIError(f"Etherscan request failed: {response.status}")
351
-
352
- try:
353
- data = await response.json()
354
- except json.JSONDecodeError as e:
355
- raise APIError(f"Invalid JSON response: {e}")
356
-
357
  if data.get("status") == "0":
358
- message = data.get("message", "Unknown error")
359
- result = data.get("result", "")
360
- if "Max rate limit reached" in message:
361
- raise APIError(f"Rate limit exceeded: {result}")
362
- raise APIError(f"Etherscan API error: {message}")
363
-
364
  return data
365
-
366
- except asyncio.TimeoutError:
367
- raise APIError("Request timed out")
368
  except aiohttp.ClientError as e:
369
  raise APIError(f"Network error: {str(e)}")
370
  except Exception as e:
371
  raise APIError(f"Unexpected error: {str(e)}")
372
 
 
 
 
 
 
 
 
 
373
  @staticmethod
374
  def _validate_address(address: str) -> bool:
375
  """Validate Ethereum address format."""
376
  return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
377
 
378
- async def get_wallet_data(self, address: str) -> WalletData:
379
  """
380
- Get comprehensive wallet data including:
381
- - ETH balance
382
- - ERC-20 tokens
383
- - NFT collections
 
 
384
  """
385
  if not self._validate_address(address):
386
  raise ValidationError(f"Invalid Ethereum address: {address}")
387
 
388
- logger.info(f"Fetching data for wallet: {address}")
389
- try:
390
- eth_balance = await self._get_eth_balance(address)
391
- tokens = await self._get_token_holdings(address)
392
- nfts = await self._get_nft_holdings(address)
393
-
394
- return {
395
- "address": address,
396
- "last_updated": datetime.now().isoformat(),
397
- "eth_balance": float(eth_balance),
398
- "tokens": tokens,
399
- "nft_collections": nfts
400
- }
401
- except Exception as e:
402
- logger.error(f"Error fetching wallet data: {e}")
403
- raise
404
 
405
  async def _get_eth_balance(self, address: str) -> Decimal:
406
- """Get ETH balance for address."""
407
  params = {
408
  "module": "account",
409
  "action": "balance",
410
  "address": address,
411
  "tag": "latest"
412
  }
413
- try:
414
- data = await self._fetch_data(params)
415
- balance_wei = Decimal(data["result"])
416
- return balance_wei / Decimal("1e18") # Convert Wei to ETH
417
- except (KeyError, ValueError) as e:
418
- raise APIError(f"Error parsing ETH balance: {e}")
419
 
420
  async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
421
- """Get ERC-20 token holdings with improved accuracy."""
422
  params = {
423
  "module": "account",
424
  "action": "tokentx",
@@ -429,39 +192,33 @@ class WalletAnalyzer:
429
 
430
  token_map: Dict[str, Dict[str, Any]] = {}
431
  for tx in data.get("result", []):
432
- try:
433
- contract = tx["contractAddress"].lower()
434
- if contract not in token_map:
435
- token_map[contract] = {
436
- "name": tx["tokenName"],
437
- "symbol": tx["tokenSymbol"],
438
- "decimals": int(tx["tokenDecimal"]),
439
- "balance": Decimal(0)
440
- }
441
-
442
- amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
443
- if tx["to"].lower() == address.lower():
444
- token_map[contract]["balance"] += amount
445
- elif tx["from"].lower() == address.lower():
446
- token_map[contract]["balance"] -= amount
447
- except (KeyError, ValueError, decimal.InvalidOperation) as e:
448
- logger.warning(f"Error processing token transaction: {e}")
449
- continue
450
-
451
- # Filter tokens with non-zero balances
452
  return [
453
  {
454
- "name": token["name"],
455
- "symbol": token["symbol"],
456
- "balance": float(token["balance"])
457
  }
458
- for token in token_map.values()
459
- if token["balance"] > 0
460
  ]
461
- async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
 
462
  """
463
- Get NFT holdings grouped by collection.
464
- Returns: {"collections": [{"collection_name": str, "items": [{"contract", "token_id"}, ...]}]}
465
  """
466
  params = {
467
  "module": "account",
@@ -469,475 +226,411 @@ async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
469
  "address": address,
470
  "sort": "desc"
471
  }
472
-
473
- try:
474
- data = await self._fetch_data(params)
475
- if data.get("status") != "1" or "result" not in data:
476
- return {"collections": []}
477
-
478
- # Track NFT ownership by collection
479
- collections: Dict[str, List[Dict[str, str]]] = {}
480
- ownership: Dict[str, Dict[str, Any]] = {} # contract_tokenId -> ownership info
481
-
482
- for tx in data["result"]:
483
- contract = tx["contractAddress"].lower()
484
- token_id = tx["tokenID"]
485
- key = f"{contract}_{token_id}"
486
-
487
- # Get collection name, fallback with contract if needed
488
- collection_name = tx.get("tokenName", "Unknown Collection")
489
- if collection_name == "Unknown Collection" and contract in collections:
490
- collection_name = next(iter(collections[contract]))
491
-
492
- # Track ownership changes
493
- if tx["to"].lower() == address.lower():
494
- ownership[key] = {
495
- "contract": contract,
496
- "token_id": token_id,
497
- "collection_name": collection_name
498
- }
499
- elif tx["from"].lower() == address.lower():
500
- ownership.pop(key, None)
501
-
502
- # Group owned NFTs by collection
503
- for nft in ownership.values():
504
- coll_name = nft["collection_name"]
505
- if coll_name not in collections:
506
- collections[coll_name] = []
507
- collections[coll_name].append({
508
- "contract": nft["contract"],
509
- "token_id": nft["token_id"]
510
- })
511
-
512
- return {
513
- "collections": [
514
- {
515
- "collection_name": name,
516
- "items": items[:Config.MAX_NFTS_PER_COLLECTION]
517
- }
518
- for name, items in collections.items()
519
- ]
520
- }
521
-
522
- except Exception as e:
523
- logger.error(f"Error fetching NFT holdings: {e}")
524
- raise APIError(f"Failed to fetch NFT holdings: {str(e)}")
525
-
526
 
527
- class NFTProcessor:
528
- """Handles NFT metadata and image processing."""
529
-
530
- def __init__(self, opensea_key: str):
531
- self.opensea_key = opensea_key
532
- self._session: Optional[aiohttp.ClientSession] = None
533
- self.last_request_time = 0
534
- self._rate_limit_semaphore = asyncio.Semaphore(1)
535
- self._metadata_cache: Dict[str, NFTMetadata] = {}
536
- self._image_cache: Dict[str, Image.Image] = {}
537
-
538
- async def __aenter__(self) -> 'NFTProcessor':
539
- self._session = aiohttp.ClientSession(
540
- timeout=aiohttp.ClientTimeout(total=Config.NFT_FETCH_TIMEOUT)
541
- )
542
- return self
543
-
544
- async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
545
- if self._session:
546
- await self._session.close()
547
- self._session = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
548
 
549
- async def _rate_limit(self) -> None:
550
- """Thread-safe rate limiting."""
551
- async with self._rate_limit_semaphore:
552
- now = time.time()
553
- time_since_last = now - self.last_request_time
554
- if time_since_last < Config.RATE_LIMIT_DELAY:
555
- await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_since_last)
556
- self.last_request_time = time.time()
557
-
558
- def _get_cache_key(self, contract: str, token_id: str) -> str:
559
- """Generate cache key for NFT."""
560
- return f"{contract.lower()}_{token_id}"
561
 
562
- @retry(
563
- stop=stop_after_attempt(Config.MAX_RETRIES),
564
- wait=wait_exponential(
565
- multiplier=Config.RETRY_BASE_DELAY,
566
- min=Config.RETRY_BASE_DELAY,
567
- max=Config.RETRY_MAX_DELAY
568
- )
569
- )
570
- async def fetch_nft_metadata(
571
- self,
572
- contract: str,
573
- token_id: str,
574
- force_refresh: bool = False
575
- ) -> NFTMetadata:
576
- """Fetch NFT metadata from OpenSea with caching."""
577
- if not self._session:
578
- raise NFTError("No active session")
579
-
580
- cache_key = self._get_cache_key(contract, token_id)
581
-
582
- # Check cache unless force refresh
583
- if not force_refresh and cache_key in self._metadata_cache:
584
- return self._metadata_cache[cache_key]
585
-
586
- await self._rate_limit()
587
-
588
- url = f"{Config.OPENSEA_BASE_URL}/chain/ethereum/contract/{contract}/nfts/{token_id}"
589
- headers = {"X-API-KEY": self.opensea_key}
590
-
591
- try:
592
- async with self._session.get(url, headers=headers) as resp:
593
- if resp.status == 429:
594
- raise APIError("OpenSea rate limit exceeded")
595
- if resp.status != 200:
596
- raise APIError(f"OpenSea API error: {resp.status}")
597
-
598
- data = await resp.json()
599
- nft_data = data.get("nft", {})
600
-
601
- metadata = NFTMetadata(
602
- contract=contract,
603
- token_id=token_id,
604
- name=nft_data.get("name", f"NFT #{token_id}"),
605
- collection_name=nft_data.get("collection", {}).get("name", "Unknown"),
606
- image_url=nft_data.get("image_url"),
607
- description=nft_data.get("description"),
608
- traits=nft_data.get("traits", [])
609
- )
610
-
611
- # Cache the result
612
- self._metadata_cache[cache_key] = metadata
613
- return metadata
614
-
615
- except asyncio.TimeoutError:
616
- raise NFTError(f"Timeout fetching NFT {contract} #{token_id}")
617
- except Exception as e:
618
- raise NFTError(f"Error fetching NFT metadata: {str(e)}")
619
 
620
- async def fetch_image(self, image_url: str) -> Optional[Image.Image]:
621
- """Download and process NFT image with caching."""
622
- if not image_url or not self._session:
623
- return None
624
-
625
- # Check image cache
626
- if image_url in self._image_cache:
627
- return self._image_cache[image_url]
628
-
629
- try:
630
- async with self._session.get(
631
- image_url,
632
- timeout=aiohttp.ClientTimeout(total=Config.NFT_FETCH_TIMEOUT)
633
- ) as resp:
634
- if resp.status != 200:
635
- return None
636
- data = await resp.read()
637
-
638
- img = Image.open(BytesIO(data))
639
-
640
- # Convert to RGB/RGBA if needed
641
- if img.mode not in ('RGB', 'RGBA'):
642
- img = img.convert('RGBA')
643
-
644
- # Resize if needed
645
- if (img.size[0] > Config.MAX_IMAGE_SIZE[0] or
646
- img.size[1] > Config.MAX_IMAGE_SIZE[1]):
647
- img.thumbnail(Config.MAX_IMAGE_SIZE)
648
-
649
- # Cache the processed image
650
- self._image_cache[image_url] = img
651
- return img
652
-
653
- except Exception as e:
654
- logger.error(f"Error processing image: {e}")
655
- return None
656
  class ChatInterface:
657
- """Manages chat interactions and context."""
658
 
659
  def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
660
  self.openai_key = openai_key
661
  self.etherscan_key = etherscan_key
662
  self.opensea_key = opensea_key
663
- self.context = ChatContext()
664
- self.nft_processor = None
665
  openai.api_key = openai_key
666
 
667
- async def _ensure_nft_processor(self) -> NFTProcessor:
668
- """Initialize NFT processor if needed."""
669
- if self.nft_processor is None:
670
- self.nft_processor = NFTProcessor(self.opensea_key)
671
- await self.nft_processor.__aenter__()
672
- return self.nft_processor
673
-
674
- def _format_context(self) -> str:
675
- """Format wallet data for AI context."""
676
- if not self.context.wallet_data:
677
- return "No wallet data available."
678
-
679
- lines = ["Current Wallet Analysis:"]
680
- for addr, data in self.context.wallet_data.items():
681
- lines.append(f"\nWallet {addr[:8]}...{addr[-6:]}:")
682
- lines.append(f"ETH Balance: {data['eth_balance']:.4f} ETH")
683
-
684
- if data['tokens']:
685
- lines.append("\nToken Holdings:")
686
- for token in sorted(
687
- data['tokens'],
688
- key=lambda x: x['balance'],
689
- reverse=True
690
- )[:10]: # Show top 10 tokens
691
- lines.append(f"- {token['symbol']}: {token['balance']:.4f}")
692
-
693
- nft_data = data.get("nft_collections", {})
694
- if "collections" in nft_data and nft_data["collections"]:
695
- lines.append("\nNFT Collections:")
696
- for coll in nft_data["collections"]:
697
- coll_name = coll["collection_name"]
698
- total_items = len(coll["items"])
699
- discussed = sum(1 for item in coll["items"]
700
- if self.context.is_nft_discussed(
701
- item["contract"],
702
- item["token_id"]
703
- ))
704
- lines.append(f"\n{coll_name}:")
705
- lines.append(f"- Total Items: {total_items}")
706
- lines.append(f"- Discussed: {discussed}/{total_items}")
707
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
708
  return "\n".join(lines)
709
 
710
  async def process_message(
711
  self,
712
  message: str,
713
- history: Optional[List[Tuple[str, str]]] = None
714
- ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
715
- """Process user message and return chat updates."""
 
 
 
 
 
716
  if history is None:
717
  history = []
718
-
719
  if not message.strip():
720
- return history, self.context.wallet_data, []
721
 
722
- nft_images: List[Image.Image] = []
723
-
724
- # Check for Ethereum address
725
  match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
 
 
726
  if match:
727
  eth_address = match.group(0)
728
- history.append((
729
- message,
730
- f"Analyzing wallet {eth_address[:8]}...{eth_address[-6:]}..."
731
- ))
732
 
733
  try:
734
- # Fetch wallet data
735
  async with WalletAnalyzer(self.etherscan_key) as analyzer:
736
- wallet_data = await analyzer.get_wallet_data(eth_address)
737
- self.context.wallet_data[eth_address] = wallet_data
738
-
739
- # Process NFTs
740
- nft_processor = await self._ensure_nft_processor()
741
- nft_info = wallet_data.get("nft_collections", {})
742
-
743
- if "collections" in nft_info and nft_info["collections"]:
744
- for collection in nft_info["collections"]:
745
- for i in range(0, len(collection["items"]), Config.NFT_BATCH_SIZE):
746
- batch = collection["items"][i:i + Config.NFT_BATCH_SIZE]
747
- metadata_tasks = [
748
- nft_processor.fetch_nft_metadata(
749
- item["contract"],
750
- item["token_id"]
751
- )
752
- for item in batch
753
- ]
754
- metadata_results = await asyncio.gather(
755
- *metadata_tasks,
756
- return_exceptions=True
757
- )
758
-
759
- for meta in metadata_results:
760
- if isinstance(meta, Exception):
761
- continue
762
- if meta.image_url:
763
- img = await nft_processor.fetch_image(meta.image_url)
764
- if img:
765
- nft_images.append(img)
 
 
 
 
 
 
 
 
766
 
767
  except Exception as e:
768
- error_msg = f"Error analyzing wallet: {str(e)}"
769
- logger.error(error_msg)
770
- history.append((message, error_msg))
771
- return history, self.context.wallet_data, []
772
 
773
- # Process with OpenAI
774
  try:
775
- messages = [
776
- {"role": "system", "content": Config.SYSTEM_PROMPT},
777
- {"role": "system", "content": self._format_context()}
778
- ]
779
-
780
- # Add recent history
781
- recent_history = history[-Config.HISTORY_LIMIT:]
782
- for usr, bot in recent_history:
783
- messages.extend([
784
- {"role": "user", "content": usr},
785
- {"role": "assistant", "content": bot}
786
- ])
787
-
788
- messages.append({"role": "user", "content": message})
789
-
790
  client = openai.OpenAI(api_key=self.openai_key)
791
- response = client.chat.completions.create(
792
  model=Config.OPENAI_MODEL,
793
- messages=messages,
 
 
 
 
 
794
  temperature=Config.TEMPERATURE,
795
  max_tokens=Config.MAX_TOKENS
796
  )
 
 
797
 
798
- ai_message = response.choices[0].message.content
799
- history.append((message, ai_message))
800
-
801
- return history, self.context.wallet_data, nft_images
802
 
803
  except Exception as e:
804
- error_msg = f"Chat error: {str(e)}"
805
- logger.error(error_msg)
806
- history.append((message, error_msg))
807
- return history, self.context.wallet_data, []
808
 
809
  def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
810
- """Clear chat context and history."""
811
- self.context = ChatContext()
812
  return {}, []
 
813
 
814
- async def cleanup(self):
815
- """Cleanup resources."""
816
- if self.nft_processor:
817
- await self.nft_processor.__aexit__(None, None, None)
818
- self.nft_processor = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
819
 
820
- def create_gradio_interface() -> gr.Blocks:
821
- """Create Gradio web interface."""
822
- chat_interface = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
823
 
824
- with gr.Blocks(theme=gr.themes.Soft()) as demo:
825
- gr.Markdown("""
826
- # 🐕 LOSS DOG: Blockchain Wallet Analyzer
827
-
828
- Enter your API keys and chat with your friendly blockchain-sniffing puppy!
829
- - Paste an Ethereum address to analyze
830
- - Chat about the wallet contents
831
- - View NFT images in the gallery
832
- """)
833
-
834
- with gr.Row():
835
- openai_key = gr.Textbox(
836
- label="OpenAI API Key",
837
- type="password",
838
- placeholder="sk-..."
839
  )
840
- etherscan_key = gr.Textbox(
841
- label="Etherscan API Key",
842
- type="password",
843
- placeholder="Enter Etherscan key..."
 
 
 
 
 
 
 
844
  )
845
- opensea_key = gr.Textbox(
846
- label="OpenSea API Key",
847
- type="password",
848
- placeholder="Enter OpenSea key..."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
849
  )
850
 
851
- validation_status = gr.Textbox(label="Status", interactive=False)
852
- validate_btn = gr.Button("Validate Keys", variant="primary")
 
 
 
 
 
 
 
853
 
854
- with gr.Row():
855
- with gr.Column(scale=2):
856
- chatbot = gr.Chatbot(
857
- label="Chat with LOSS DOG 🐕",
858
- height=500,
859
- bubble_full_width=False
860
- )
861
- with gr.Row():
862
- msg_input = gr.Textbox(
863
- label="Message",
864
- placeholder="Enter ETH address or ask about the wallet...",
865
- scale=8
866
- )
867
- send_btn = gr.Button("Send", scale=1, variant="primary")
868
- clear_btn = gr.Button("Clear", scale=1)
869
-
870
- with gr.Column(scale=1):
871
- nft_gallery = gr.Gallery(
872
- label="NFT Gallery",
873
- columns=2,
874
- height=400
875
- )
876
- wallet_data = gr.JSON(label="Wallet Data")
877
 
878
- def init_interface(openai_k: str, etherscan_k: str, opensea_k: str) -> str:
879
- """Initialize chat interface with API keys."""
880
- nonlocal chat_interface
881
- try:
882
- chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k)
883
- return "✅ API keys validated successfully!"
884
- except Exception as e:
885
- return f"❌ Error: {str(e)}"
886
-
887
- async def process_message(
888
- message: str,
889
- history: List[Tuple[str, str]]
890
- ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
891
- """Process user message."""
892
- if not chat_interface:
893
- return [], {}, []
894
- return await chat_interface.process_message(message, history)
895
-
896
- def clear_chat():
897
- """Clear chat and context."""
898
- if chat_interface:
899
- return chat_interface.clear_context()
900
- return {}, []
901
-
902
- # Wire up the interface
903
- validate_btn.click(
904
- init_interface,
905
- inputs=[openai_key, etherscan_key, opensea_key],
906
- outputs=[validation_status]
907
- )
908
-
909
- msg_input.submit(
910
- process_message,
911
- inputs=[msg_input, chatbot],
912
- outputs=[chatbot, wallet_data, nft_gallery]
913
- ).then(
914
- lambda: gr.update(value=""),
915
- None,
916
- [msg_input]
917
- )
918
-
919
- send_btn.click(
920
- process_message,
921
- inputs=[msg_input, chatbot],
922
- outputs=[chatbot, wallet_data, nft_gallery]
923
- ).then(
924
- lambda: gr.update(value=""),
925
- None,
926
- [msg_input]
927
- )
928
-
929
- clear_btn.click(
930
- clear_chat,
931
- outputs=[wallet_data, chatbot]
932
- )
933
-
934
- return demo
935
 
936
  def main():
937
- """Main entry point."""
938
- demo = create_gradio_interface()
939
- demo.queue()
940
- demo.launch()
941
 
942
  if __name__ == "__main__":
943
  main()
 
2
  Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
3
 
4
  This module provides a complete implementation of a blockchain wallet analysis tool
5
+ with a Gradio web interface. It includes wallet analysis, NFT tracking,
6
+ interactive chat capabilities using the OpenAI API,
7
+ and NFT image rendering from OpenSea via PIL objects.
8
 
9
  Author: Claude
10
  Date: January 2025
11
  """
12
 
13
  from __future__ import annotations
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  import os
16
  import re
17
  import json
 
19
  import logging
20
  import asyncio
21
  import base64
22
+ from typing import List, Dict, Tuple, Any, Optional, TypeVar
23
  from datetime import datetime
24
  from decimal import Decimal
25
+ from dataclasses import dataclass
26
  from pathlib import Path
27
  from io import BytesIO
28
 
 
36
  T = TypeVar('T')
37
  WalletData = Dict[str, Any]
38
  ChatHistory = List[Tuple[str, str]]
 
39
 
40
  # Configure logging
41
  logging.basicConfig(
 
48
  )
49
  logger = logging.getLogger(__name__)
50
 
51
+ class ConfigError(Exception):
52
+ """Raised when there's an error in configuration."""
 
 
 
 
 
 
 
 
53
  pass
54
 
55
+ class APIError(Exception):
56
+ """Raised when there's an error in API calls."""
57
  pass
58
 
59
+ class ValidationError(Exception):
60
+ """Raised when there's an error in input validation."""
61
  pass
62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  @dataclass
64
  class Config:
65
+ """Application configuration settings."""
 
 
66
  SYSTEM_PROMPT: str = """
67
  You are LOSS DOG 🐕 (Learning & Observing Smart Systems Digital Output Generator),
68
+ an adorable blockchain-sniffing puppy!
69
+ Your personality:
70
+ - Friendly and enthusiastic
71
+ - Explain findings in fun, simple ways
72
+ - Provide NFT images from OpenSea when possible
73
+
74
+ Instructions:
75
+ - You have access to detailed wallet data in your context
76
+ - Use this data to provide specific answers about holdings
77
+ - Reference exact numbers and collections when discussing NFTs
78
+ - Compare wallets if multiple are available
 
 
 
79
  """
 
 
80
  ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
81
+ ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
 
 
 
82
  RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
83
  MAX_RETRIES: int = 3
84
+ OPENAI_MODEL: str = "gpt-4o-mini" # Example updated model
85
+ MAX_TOKENS: int = 4000
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  TEMPERATURE: float = 0.7
87
+ HISTORY_LIMIT: int = 5
88
+ #### Part 2: Wallet Analyzer (Etherscan)
89
+
90
  class WalletAnalyzer:
91
  """Analyzes Ethereum wallet contents using Etherscan API."""
92
 
 
95
  self.base_url = Config.ETHERSCAN_BASE_URL
96
  self.session: Optional[aiohttp.ClientSession] = None
97
  self.last_request_time = 0
 
98
 
99
  async def __aenter__(self) -> WalletAnalyzer:
100
+ self.session = aiohttp.ClientSession()
 
 
101
  return self
102
 
103
  async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
 
105
  await self.session.close()
106
  self.session = None
107
 
 
 
 
 
 
 
 
 
 
108
  @retry(
109
  stop=stop_after_attempt(Config.MAX_RETRIES),
110
+ wait=wait_exponential(multiplier=1, min=4, max=10)
 
 
 
 
111
  )
112
  async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
113
+ """Fetch data from Etherscan with retry logic."""
114
  if not self.session:
115
  raise APIError("No active session. Use context manager.")
116
+
117
  await self._rate_limit()
118
  params["apikey"] = self.api_key
 
119
  try:
120
  async with self.session.get(self.base_url, params=params) as response:
 
 
121
  if response.status != 200:
122
  raise APIError(f"Etherscan request failed: {response.status}")
123
+ data = await response.json()
 
 
 
 
 
124
  if data.get("status") == "0":
125
+ err = data.get("message", "Unknown Etherscan error")
126
+ if "Max rate limit reached" in err:
127
+ raise APIError("Etherscan rate limit exceeded")
128
+ raise APIError(f"Etherscan error: {err}")
 
 
129
  return data
 
 
 
130
  except aiohttp.ClientError as e:
131
  raise APIError(f"Network error: {str(e)}")
132
  except Exception as e:
133
  raise APIError(f"Unexpected error: {str(e)}")
134
 
135
+ async def _rate_limit(self) -> None:
136
+ """Simple rate limiting for Etherscan free tier."""
137
+ now = time.time()
138
+ diff = now - self.last_request_time
139
+ if diff < Config.RATE_LIMIT_DELAY:
140
+ await asyncio.sleep(Config.RATE_LIMIT_DELAY - diff)
141
+ self.last_request_time = time.time()
142
+
143
  @staticmethod
144
  def _validate_address(address: str) -> bool:
145
  """Validate Ethereum address format."""
146
  return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
147
 
148
+ async def get_portfolio_data(self, address: str) -> WalletData:
149
  """
150
+ Return a dictionary with:
151
+ - address
152
+ - last_updated
153
+ - eth_balance
154
+ - tokens (list of ERC-20)
155
+ - nft_collections (with contract & token_id)
156
  """
157
  if not self._validate_address(address):
158
  raise ValidationError(f"Invalid Ethereum address: {address}")
159
 
160
+ logger.info(f"Fetching portfolio data for {address}")
161
+ eth_balance = await self._get_eth_balance(address)
162
+ tokens = await self._get_token_holdings(address)
163
+ nft_colls = await self._get_nft_holdings(address)
164
+
165
+ return {
166
+ "address": address,
167
+ "last_updated": datetime.now().isoformat(),
168
+ "eth_balance": float(eth_balance),
169
+ "tokens": tokens,
170
+ "nft_collections": nft_colls
171
+ }
 
 
 
 
172
 
173
  async def _get_eth_balance(self, address: str) -> Decimal:
 
174
  params = {
175
  "module": "account",
176
  "action": "balance",
177
  "address": address,
178
  "tag": "latest"
179
  }
180
+ data = await self._fetch_data(params)
181
+ return Decimal(data["result"]) / Decimal("1e18")
 
 
 
 
182
 
183
  async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
184
+ """Fetch ERC-20 tokens for address."""
185
  params = {
186
  "module": "account",
187
  "action": "tokentx",
 
192
 
193
  token_map: Dict[str, Dict[str, Any]] = {}
194
  for tx in data.get("result", []):
195
+ contract = tx["contractAddress"]
196
+ if contract not in token_map:
197
+ token_map[contract] = {
198
+ "name": tx["tokenName"],
199
+ "symbol": tx["tokenSymbol"],
200
+ "decimals": int(tx["tokenDecimal"]),
201
+ "balance": Decimal(0)
202
+ }
203
+ amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
204
+ if tx["to"].lower() == address.lower():
205
+ token_map[contract]["balance"] += amount
206
+ elif tx["from"].lower() == address.lower():
207
+ token_map[contract]["balance"] -= amount
208
+
 
 
 
 
 
 
209
  return [
210
  {
211
+ "name": v["name"],
212
+ "symbol": v["symbol"],
213
+ "balance": float(v["balance"])
214
  }
215
+ for v in token_map.values() if v["balance"] > 0
 
216
  ]
217
+
218
+ async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
219
  """
220
+ Return { "collections": [ { "collection_name": str, "items": [{ contract, token_id }, ...] }, ... ] }
221
+ so we can fetch images from OpenSea by contract + token_id.
222
  """
223
  params = {
224
  "module": "account",
 
226
  "address": address,
227
  "sort": "desc"
228
  }
229
+ data = await self._fetch_data(params)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
 
231
+ if data.get("status") != "1" or "result" not in data:
232
+ return {"collections": []}
233
+
234
+ # track final ownership
235
+ ownership_map = {} # key = (contract_token), value = {contract, token_id, coll_name}
236
+ for tx in data["result"]:
237
+ contract = tx["contractAddress"]
238
+ coll_name = tx.get("tokenName", "Unknown Collection")
239
+ token_id = tx["tokenID"]
240
+ key = f"{contract}_{token_id}"
241
+
242
+ if tx["to"].lower() == address.lower():
243
+ ownership_map[key] = {
244
+ "contract": contract,
245
+ "token_id": token_id,
246
+ "collection_name": coll_name
247
+ }
248
+ elif tx["from"].lower() == address.lower():
249
+ if key in ownership_map:
250
+ ownership_map.pop(key, None)
251
+
252
+ # group by collection_name
253
+ coll_dict: Dict[str, List[Dict[str, str]]] = {}
254
+ for item in ownership_map.values():
255
+ c_name = item["collection_name"]
256
+ if c_name not in coll_dict:
257
+ coll_dict[c_name] = []
258
+ coll_dict[c_name].append({
259
+ "contract": item["contract"],
260
+ "token_id": item["token_id"]
261
+ })
262
+
263
+ collections_out = []
264
+ for cname, items in coll_dict.items():
265
+ collections_out.append({
266
+ "collection_name": cname,
267
+ "items": items
268
+ })
269
+
270
+ return { "collections": collections_out }
271
+ #### Part 3: OpenSea & PIL Conversion
272
+
273
+ async def fetch_nft_metadata(opensea_key: str, contract: str, token_id: str) -> Dict[str, Any]:
274
+ """
275
+ Fetch NFT metadata (including image_url) from OpenSea v2.
276
+ Returns { "name": str, "image_url": str } or { "error": str }
277
+ """
278
+ url = f"https://api.opensea.io/api/v2/chain/ethereum/contract/{contract}/nfts/{token_id}"
279
+ headers = {"X-API-KEY": opensea_key} if opensea_key else {}
280
+ async with aiohttp.ClientSession() as session:
281
+ async with session.get(url, headers=headers) as resp:
282
+ if resp.status == 403:
283
+ return {"error": "403 Forbidden: OpenSea API key issue"}
284
+ if resp.status == 404:
285
+ return {"error": f"404 Not Found: {contract} #{token_id}"}
286
+ try:
287
+ data = await resp.json()
288
+ except Exception as e:
289
+ return {"error": f"OpenSea JSON parse error: {str(e)}"}
290
 
291
+ nft_obj = data.get("nft", {})
292
+ name = nft_obj.get("name", f"NFT #{token_id}")
293
+ image_url = nft_obj.get("image_url", "")
294
+ return {"name": name, "image_url": image_url}
 
 
 
 
 
 
 
 
295
 
296
+ async def fetch_image_as_pil(url: str) -> Optional[Image.Image]:
297
+ """
298
+ Download an image from a URL and return as a PIL Image.
299
+ """
300
+ if not url:
301
+ return None
302
+ async with aiohttp.ClientSession() as session:
303
+ async with session.get(url) as resp:
304
+ if resp.status != 200:
305
+ return None
306
+ raw_bytes = await resp.read()
307
+ try:
308
+ return Image.open(BytesIO(raw_bytes))
309
+ except Exception as e:
310
+ logger.error(f"Error converting to PIL: {e}")
311
+ return None
312
+ #### Part 4: ChatInterface
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
  class ChatInterface:
315
+ """Handles chat logic with Etherscan (wallet data), OpenSea (NFT images), and OpenAI (chat)."""
316
 
317
  def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
318
  self.openai_key = openai_key
319
  self.etherscan_key = etherscan_key
320
  self.opensea_key = opensea_key
321
+ self.context: Dict[str, Any] = {}
 
322
  openai.api_key = openai_key
323
 
324
+ @staticmethod
325
+ def _validate_api_keys(openai_key: str, etherscan_key: str, opensea_key: str) -> Tuple[bool, str]:
326
+ """
327
+ Validate all keys. We'll do a minimal check for OpenSea (non-empty).
328
+ """
329
+ try:
330
+ # Check OpenAI
331
+ client = openai.OpenAI(api_key=openai_key)
332
+ client.chat.completions.create(
333
+ model=Config.OPENAI_MODEL,
334
+ messages=[{"role": "user", "content": "test"}],
335
+ max_tokens=1
336
+ )
337
+
338
+ # Check Etherscan
339
+ async def check_etherscan():
340
+ async with WalletAnalyzer(etherscan_key) as analyzer:
341
+ params = {"module": "stats", "action": "ethsupply"}
342
+ await analyzer._fetch_data(params)
343
+ asyncio.run(check_etherscan())
344
+
345
+ # Check OpenSea
346
+ if not opensea_key.strip():
347
+ return False, "OpenSea API key is empty!"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
348
 
349
+ return True, "All API keys validated!"
350
+ except Exception as e:
351
+ return False, f"API key validation failed: {str(e)}"
352
+
353
+ def _format_context_message(self) -> str:
354
+ """Format wallet data for GPT system prompt."""
355
+ lines = []
356
+ if not self.context:
357
+ return ""
358
+ lines.append("Current Wallet Data:")
359
+ for addr, wdata in self.context.items():
360
+ lines.append(f"Wallet {addr[:8]}...{addr[-6:]}:")
361
+ lines.append(f" ETH Balance: {wdata['eth_balance']:.4f}")
362
+ lines.append(f" # of Tokens: {len(wdata['tokens'])}")
363
+ # NFT aggregator
364
+ nft_data = wdata["nft_collections"]
365
+ if "collections" in nft_data:
366
+ lines.append(" NFT Collections:")
367
+ for c in nft_data["collections"]:
368
+ lines.append(f" - {c['collection_name']}: {len(c['items'])} NFT(s)")
369
  return "\n".join(lines)
370
 
371
  async def process_message(
372
  self,
373
  message: str,
374
+ history: Optional[ChatHistory] = None
375
+ ) -> Tuple[ChatHistory, Dict[str, Any], List[Image.Image]]:
376
+ """
377
+ 1) Detect Ethereum address
378
+ 2) Etherscan for wallet data
379
+ 3) For each NFT, fetch from OpenSea, convert to PIL
380
+ 4) Return images + chat response
381
+ """
382
  if history is None:
383
  history = []
384
+
385
  if not message.strip():
386
+ return history, self.context, []
387
 
 
 
 
388
  match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
389
+ nft_images: List[Image.Image] = []
390
+
391
  if match:
392
  eth_address = match.group(0)
393
+ # partial
394
+ partial_text = f"Analyzing {eth_address}..."
395
+ history.append((message, partial_text))
 
396
 
397
  try:
398
+ # Etherscan
399
  async with WalletAnalyzer(self.etherscan_key) as analyzer:
400
+ wallet_data = await analyzer.get_portfolio_data(eth_address)
401
+ self.context[eth_address] = wallet_data
402
+
403
+ # Summaries
404
+ lines = [
405
+ f"📊 Summary for {eth_address[:8]}...{eth_address[-6:]}",
406
+ f"ETH: {wallet_data['eth_balance']:.4f}",
407
+ f"Tokens: {len(wallet_data['tokens'])}"
408
+ ]
409
+ nft_info = wallet_data["nft_collections"]
410
+ total_nfts = 0
411
+ if "collections" in nft_info:
412
+ for c in nft_info["collections"]:
413
+ total_nfts += len(c["items"])
414
+ lines.append(f"NFTs: {total_nfts}")
415
+ # Add to chat
416
+ history.append((message, "\n".join(lines)))
417
+
418
+ # If we want to fetch images for, say, first 2 collections, 2 items each
419
+ if "collections" in nft_info:
420
+ for coll in nft_info["collections"][:2]:
421
+ for item in coll["items"][:2]:
422
+ # 1) fetch metadata
423
+ meta = await fetch_nft_metadata(self.opensea_key, item["contract"], item["token_id"])
424
+ if "error" in meta:
425
+ logger.warning(f"OpenSea metadata error: {meta['error']}")
426
+ continue
427
+ image_url = meta["image_url"]
428
+ if not image_url:
429
+ logger.info(f"No image for {meta['name']}")
430
+ continue
431
+ # 2) Download + convert to PIL
432
+ pil_img = await fetch_image_as_pil(image_url)
433
+ if pil_img:
434
+ nft_images.append(pil_img)
435
+ # Also mention in chat
436
+ found_msg = f"Found NFT image: {meta['name']} (contract {item['contract'][:8]}...{item['contract'][-6:]}, id={item['token_id']})"
437
+ history.append((message, found_msg))
438
 
439
  except Exception as e:
440
+ err = f"Error analyzing {eth_address}: {str(e)}"
441
+ logger.error(err)
442
+ history.append((message, err))
 
443
 
444
+ # Now do an OpenAI chat
445
  try:
446
+ context_str = self._format_context_message()
447
+ # Convert chat to OpenAI format
448
+ limit = Config.HISTORY_LIMIT
449
+ short_hist = history[-limit:]
450
+ openai_msgs = []
451
+ for usr, ans in short_hist:
452
+ openai_msgs.append({"role": "user", "content": usr})
453
+ openai_msgs.append({"role": "assistant", "content": ans})
454
+
455
+ # openai
456
+ openai.api_key = self.openai_key
 
 
 
 
457
  client = openai.OpenAI(api_key=self.openai_key)
458
+ resp = client.chat.completions.create(
459
  model=Config.OPENAI_MODEL,
460
+ messages=[
461
+ {"role": "system", "content": Config.SYSTEM_PROMPT},
462
+ {"role": "system", "content": context_str},
463
+ *openai_msgs,
464
+ {"role": "user", "content": message}
465
+ ],
466
  temperature=Config.TEMPERATURE,
467
  max_tokens=Config.MAX_TOKENS
468
  )
469
+ final_msg = resp.choices[0].message.content
470
+ history.append((message, final_msg))
471
 
472
+ return history, self.context, nft_images
 
 
 
473
 
474
  except Exception as e:
475
+ logger.error(f"OpenAI error: {e}")
476
+ err_chat = f"OpenAI error: {str(e)}"
477
+ history.append((message, err_chat))
478
+ return history, self.context, []
479
 
480
  def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
481
+ """Clear the wallet context and chat."""
482
+ self.context = {}
483
  return {}, []
484
+ #### Part 5: Gradio UI & Launch
485
 
486
+ class GradioInterface:
487
+ """Manages the Gradio UI for Hugging Face Space, with top-right NFT gallery as PIL images."""
488
+
489
+ def __init__(self):
490
+ self.chat_interface: Optional[ChatInterface] = None
491
+ self.demo = self._create_interface()
492
+
493
+ def _create_interface(self) -> gr.Blocks:
494
+ with gr.Blocks(theme=gr.themes.Soft()) as demo:
495
+ gr.Markdown("""
496
+ # 🐕 LOSS DOG: Blockchain Wallet Analyzer (NFT Images, top-right)
497
+
498
+ - Enter your **OpenAI**, **Etherscan**, **OpenSea** keys below
499
+ - Validate, then chat with your Ethereum address
500
+ - NFT images will appear as **PIL** objects in the top-right
501
+ """)
502
+
503
+ with gr.Row():
504
+ openai_key = gr.Textbox(
505
+ label="OpenAI API Key",
506
+ type="password",
507
+ placeholder="Enter your OpenAI API key..."
508
+ )
509
+ etherscan_key = gr.Textbox(
510
+ label="Etherscan API Key",
511
+ type="password",
512
+ placeholder="Enter your Etherscan API key..."
513
+ )
514
+ opensea_key = gr.Textbox(
515
+ label="OpenSea API Key",
516
+ type="password",
517
+ placeholder="Enter your OpenSea API key..."
518
+ )
519
 
520
+ validation_status = gr.Textbox(label="Validation Status", interactive=False)
521
+ validate_btn = gr.Button("Validate API Keys", variant="primary")
522
+
523
+ with gr.Row():
524
+ # Main Chat Column
525
+ with gr.Column(scale=2):
526
+ chatbot = gr.Chatbot(label="Chat History", height=420, value=[])
527
+ with gr.Row():
528
+ msg_input = gr.Textbox(
529
+ label="Message",
530
+ placeholder="Enter an ETH address or question..."
531
+ )
532
+ send_btn = gr.Button("Send", variant="primary")
533
+ # Top-Right: NFT Gallery (PIL images)
534
+ with gr.Column(scale=1):
535
+ nft_gallery = gr.Gallery(
536
+ label="NFT Images (Top-Right)",
537
+ columns=2
538
+ )
539
+ wallet_context = gr.JSON(
540
+ label="Active Wallet Context",
541
+ value={}
542
+ )
543
+ clear_btn = gr.Button("Clear Context")
544
+
545
+ msg_input.interactive = False
546
+ send_btn.interactive = False
547
+
548
+ def validate_keys(openai_k: str, etherscan_k: str, opensea_k: str) -> Tuple[str, gr.update, gr.update]:
549
+ """Validate user-provided keys; enable chat if all pass."""
550
+ is_valid, msg = ChatInterface._validate_api_keys(openai_k, etherscan_k, opensea_k)
551
+ if is_valid:
552
+ self.chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k)
553
+ return (
554
+ f"✅ {msg}",
555
+ gr.update(interactive=True),
556
+ gr.update(interactive=True)
557
+ )
558
+ else:
559
+ return (
560
+ f"❌ {msg}",
561
+ gr.update(interactive=False),
562
+ gr.update(interactive=False)
563
+ )
564
 
565
+ validate_btn.click(
566
+ fn=validate_keys,
567
+ inputs=[openai_key, etherscan_key, opensea_key],
568
+ outputs=[validation_status, msg_input, send_btn]
 
 
 
 
 
 
 
 
 
 
 
569
  )
570
+
571
+ def clear_all():
572
+ """Clear wallet context and chat."""
573
+ if self.chat_interface:
574
+ return self.chat_interface.clear_context()
575
+ return {}, []
576
+
577
+ clear_btn.click(
578
+ fn=clear_all,
579
+ inputs=[],
580
+ outputs=[wallet_context, chatbot]
581
  )
582
+
583
+ async def handle_message(
584
+ message: str,
585
+ chat_hist: List[Tuple[str, str]],
586
+ context: Dict[str, Any]
587
+ ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
588
+ """Process user input, return updated chat, context, list of PIL images."""
589
+ if not self.chat_interface:
590
+ return [], {}, []
591
+
592
+ try:
593
+ new_history, new_context, images = await self.chat_interface.process_message(message, chat_hist)
594
+ return new_history, new_context, images
595
+ except Exception as e:
596
+ logger.error(f"Error in handle_message: {e}")
597
+ if chat_hist is None:
598
+ chat_hist = []
599
+ chat_hist.append((message, f"Error: {str(e)}"))
600
+ return chat_hist, context, []
601
+
602
+ # Chat flow
603
+ msg_input.submit(
604
+ fn=handle_message,
605
+ inputs=[msg_input, chatbot, wallet_context],
606
+ outputs=[chatbot, wallet_context, nft_gallery]
607
+ ).then(
608
+ lambda: gr.update(value=""),
609
+ None,
610
+ [msg_input]
611
  )
612
 
613
+ send_btn.click(
614
+ fn=handle_message,
615
+ inputs=[msg_input, chatbot, wallet_context],
616
+ outputs=[chatbot, wallet_context, nft_gallery]
617
+ ).then(
618
+ lambda: gr.update(value=""),
619
+ None,
620
+ [msg_input]
621
+ )
622
 
623
+ return demo
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
624
 
625
+ def launch(self):
626
+ self.demo.queue()
627
+ self.demo.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
628
 
629
  def main():
630
+ """Main entry point for Hugging Face Space."""
631
+ logger.info("Launching LOSS DOG with PIL-based NFT images (top-right).")
632
+ interface = GradioInterface()
633
+ interface.launch()
634
 
635
  if __name__ == "__main__":
636
  main()