jzou19950715 commited on
Commit
946cce5
·
verified ·
1 Parent(s): 221069a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +486 -452
app.py CHANGED
@@ -2,16 +2,14 @@
2
  Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
3
 
4
  This module provides a complete implementation of a blockchain wallet analysis tool
5
- with a Gradio web interface. It includes wallet analysis, NFT tracking,
6
- interactive chat capabilities using the OpenAI API,
7
- and NFT image rendering from OpenSea via PIL objects.
8
 
9
  Author: Claude
10
  Date: January 2025
11
  """
12
 
13
  from __future__ import annotations
14
-
15
  import os
16
  import re
17
  import json
@@ -19,10 +17,10 @@ import time
19
  import logging
20
  import asyncio
21
  import base64
22
- from typing import List, Dict, Tuple, Any, Optional, TypeVar
23
  from datetime import datetime
24
  from decimal import Decimal
25
- from dataclasses import dataclass
26
  from pathlib import Path
27
  from io import BytesIO
28
 
@@ -36,6 +34,7 @@ from tenacity import retry, stop_after_attempt, wait_exponential
36
  T = TypeVar('T')
37
  WalletData = Dict[str, Any]
38
  ChatHistory = List[Tuple[str, str]]
 
39
 
40
  # Configure logging
41
  logging.basicConfig(
@@ -48,45 +47,99 @@ logging.basicConfig(
48
  )
49
  logger = logging.getLogger(__name__)
50
 
51
- class ConfigError(Exception):
52
- """Raised when there's an error in configuration."""
 
 
 
 
 
 
 
 
53
  pass
54
 
55
- class APIError(Exception):
56
- """Raised when there's an error in API calls."""
57
  pass
58
 
59
- class ValidationError(Exception):
60
- """Raised when there's an error in input validation."""
61
  pass
62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  @dataclass
64
  class Config:
65
- """Application configuration settings."""
 
 
66
  SYSTEM_PROMPT: str = """
67
  You are LOSS DOG 🐕 (Learning & Observing Smart Systems Digital Output Generator),
68
- an adorable blockchain-sniffing puppy!
69
- Your personality:
70
- - Friendly and enthusiastic
71
- - Explain findings in fun, simple ways
72
- - Provide NFT images from OpenSea when possible
73
-
74
- Instructions:
75
- - You have access to detailed wallet data in your context
76
- - Use this data to provide specific answers about holdings
77
- - Reference exact numbers and collections when discussing NFTs
78
- - Compare wallets if multiple are available
 
 
 
79
  """
 
 
80
  ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
81
- ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
 
 
 
82
  RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
83
  MAX_RETRIES: int = 3
84
- OPENAI_MODEL: str = "gpt-4o-mini" # Example updated model
85
- MAX_TOKENS: int = 4000
86
- TEMPERATURE: float = 0.7
87
- HISTORY_LIMIT: int = 5
88
- #### Part 2: Wallet Analyzer (Etherscan)
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  class WalletAnalyzer:
91
  """Analyzes Ethereum wallet contents using Etherscan API."""
92
 
@@ -95,9 +148,12 @@ class WalletAnalyzer:
95
  self.base_url = Config.ETHERSCAN_BASE_URL
96
  self.session: Optional[aiohttp.ClientSession] = None
97
  self.last_request_time = 0
 
98
 
99
  async def __aenter__(self) -> WalletAnalyzer:
100
- self.session = aiohttp.ClientSession()
 
 
101
  return self
102
 
103
  async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
@@ -105,83 +161,108 @@ class WalletAnalyzer:
105
  await self.session.close()
106
  self.session = None
107
 
 
 
 
 
 
 
 
 
 
108
  @retry(
109
  stop=stop_after_attempt(Config.MAX_RETRIES),
110
- wait=wait_exponential(multiplier=1, min=4, max=10)
 
 
 
 
111
  )
112
  async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
113
- """Fetch data from Etherscan with retry logic."""
114
  if not self.session:
115
  raise APIError("No active session. Use context manager.")
116
-
117
  await self._rate_limit()
118
  params["apikey"] = self.api_key
 
119
  try:
120
  async with self.session.get(self.base_url, params=params) as response:
 
 
121
  if response.status != 200:
122
  raise APIError(f"Etherscan request failed: {response.status}")
123
- data = await response.json()
 
 
 
 
 
124
  if data.get("status") == "0":
125
- err = data.get("message", "Unknown Etherscan error")
126
- if "Max rate limit reached" in err:
127
- raise APIError("Etherscan rate limit exceeded")
128
- raise APIError(f"Etherscan error: {err}")
 
 
129
  return data
 
 
 
130
  except aiohttp.ClientError as e:
131
  raise APIError(f"Network error: {str(e)}")
132
  except Exception as e:
133
  raise APIError(f"Unexpected error: {str(e)}")
134
 
135
- async def _rate_limit(self) -> None:
136
- """Simple rate limiting for Etherscan free tier."""
137
- now = time.time()
138
- diff = now - self.last_request_time
139
- if diff < Config.RATE_LIMIT_DELAY:
140
- await asyncio.sleep(Config.RATE_LIMIT_DELAY - diff)
141
- self.last_request_time = time.time()
142
-
143
  @staticmethod
144
  def _validate_address(address: str) -> bool:
145
  """Validate Ethereum address format."""
146
  return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
147
 
148
- async def get_portfolio_data(self, address: str) -> WalletData:
149
  """
150
- Return a dictionary with:
151
- - address
152
- - last_updated
153
- - eth_balance
154
- - tokens (list of ERC-20)
155
- - nft_collections (with contract & token_id)
156
  """
157
  if not self._validate_address(address):
158
  raise ValidationError(f"Invalid Ethereum address: {address}")
159
 
160
- logger.info(f"Fetching portfolio data for {address}")
161
- eth_balance = await self._get_eth_balance(address)
162
- tokens = await self._get_token_holdings(address)
163
- nft_colls = await self._get_nft_holdings(address)
164
-
165
- return {
166
- "address": address,
167
- "last_updated": datetime.now().isoformat(),
168
- "eth_balance": float(eth_balance),
169
- "tokens": tokens,
170
- "nft_collections": nft_colls
171
- }
 
 
 
 
172
 
173
  async def _get_eth_balance(self, address: str) -> Decimal:
 
174
  params = {
175
  "module": "account",
176
  "action": "balance",
177
  "address": address,
178
  "tag": "latest"
179
  }
180
- data = await self._fetch_data(params)
181
- return Decimal(data["result"]) / Decimal("1e18")
 
 
 
 
182
 
183
  async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
184
- """Fetch ERC-20 tokens for address."""
185
  params = {
186
  "module": "account",
187
  "action": "tokentx",
@@ -192,33 +273,39 @@ class WalletAnalyzer:
192
 
193
  token_map: Dict[str, Dict[str, Any]] = {}
194
  for tx in data.get("result", []):
195
- contract = tx["contractAddress"]
196
- if contract not in token_map:
197
- token_map[contract] = {
198
- "name": tx["tokenName"],
199
- "symbol": tx["tokenSymbol"],
200
- "decimals": int(tx["tokenDecimal"]),
201
- "balance": Decimal(0)
202
- }
203
- amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
204
- if tx["to"].lower() == address.lower():
205
- token_map[contract]["balance"] += amount
206
- elif tx["from"].lower() == address.lower():
207
- token_map[contract]["balance"] -= amount
208
-
 
 
 
 
 
 
209
  return [
210
  {
211
- "name": v["name"],
212
- "symbol": v["symbol"],
213
- "balance": float(v["balance"])
214
  }
215
- for v in token_map.values() if v["balance"] > 0
 
216
  ]
217
-
218
- async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
219
  """
220
- Return { "collections": [ { "collection_name": str, "items": [{ contract, token_id }, ...] }, ... ] }
221
- so we can fetch images from OpenSea by contract + token_id.
222
  """
223
  params = {
224
  "module": "account",
@@ -226,412 +313,359 @@ class WalletAnalyzer:
226
  "address": address,
227
  "sort": "desc"
228
  }
229
- data = await self._fetch_data(params)
 
 
 
 
230
 
231
- if data.get("status") != "1" or "result" not in data:
232
- return {"collections": []}
233
-
234
- # track final ownership
235
- ownership_map = {} # key = (contract_token), value = {contract, token_id, coll_name}
236
- for tx in data["result"]:
237
- contract = tx["contractAddress"]
238
- coll_name = tx.get("tokenName", "Unknown Collection")
239
- token_id = tx["tokenID"]
240
- key = f"{contract}_{token_id}"
241
-
242
- if tx["to"].lower() == address.lower():
243
- ownership_map[key] = {
244
- "contract": contract,
245
- "token_id": token_id,
246
- "collection_name": coll_name
247
- }
248
- elif tx["from"].lower() == address.lower():
249
- if key in ownership_map:
250
- ownership_map.pop(key, None)
251
-
252
- # group by collection_name
253
- coll_dict: Dict[str, List[Dict[str, str]]] = {}
254
- for item in ownership_map.values():
255
- c_name = item["collection_name"]
256
- if c_name not in coll_dict:
257
- coll_dict[c_name] = []
258
- coll_dict[c_name].append({
259
- "contract": item["contract"],
260
- "token_id": item["token_id"]
261
- })
262
-
263
- collections_out = []
264
- for cname, items in coll_dict.items():
265
- collections_out.append({
266
- "collection_name": cname,
267
- "items": items
268
- })
269
-
270
- return { "collections": collections_out }
271
- #### Part 3: OpenSea & PIL Conversion
272
-
273
- async def fetch_nft_metadata(opensea_key: str, contract: str, token_id: str) -> Dict[str, Any]:
274
- """
275
- Fetch NFT metadata (including image_url) from OpenSea v2.
276
- Returns { "name": str, "image_url": str } or { "error": str }
277
- """
278
- url = f"https://api.opensea.io/api/v2/chain/ethereum/contract/{contract}/nfts/{token_id}"
279
- headers = {"X-API-KEY": opensea_key} if opensea_key else {}
280
- async with aiohttp.ClientSession() as session:
281
- async with session.get(url, headers=headers) as resp:
282
- if resp.status == 403:
283
- return {"error": "403 Forbidden: OpenSea API key issue"}
284
- if resp.status == 404:
285
- return {"error": f"404 Not Found: {contract} #{token_id}"}
286
- try:
287
- data = await resp.json()
288
- except Exception as e:
289
- return {"error": f"OpenSea JSON parse error: {str(e)}"}
290
 
291
- nft_obj = data.get("nft", {})
292
- name = nft_obj.get("name", f"NFT #{token_id}")
293
- image_url = nft_obj.get("image_url", "")
294
- return {"name": name, "image_url": image_url}
295
 
296
- async def fetch_image_as_pil(url: str) -> Optional[Image.Image]:
297
- """
298
- Download an image from a URL and return as a PIL Image.
299
- """
300
- if not url:
301
- return None
302
- async with aiohttp.ClientSession() as session:
303
- async with session.get(url) as resp:
304
- if resp.status != 200:
305
- return None
306
- raw_bytes = await resp.read()
307
- try:
308
- return Image.open(BytesIO(raw_bytes))
309
- except Exception as e:
310
- logger.error(f"Error converting to PIL: {e}")
311
- return None
312
- #### Part 4: ChatInterface
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
  class ChatInterface:
315
- """Handles chat logic with Etherscan (wallet data), OpenSea (NFT images), and OpenAI (chat)."""
316
 
317
  def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
318
  self.openai_key = openai_key
319
  self.etherscan_key = etherscan_key
320
  self.opensea_key = opensea_key
321
- self.context: Dict[str, Any] = {}
 
322
  openai.api_key = openai_key
323
 
324
- @staticmethod
325
- def _validate_api_keys(openai_key: str, etherscan_key: str, opensea_key: str) -> Tuple[bool, str]:
326
- """
327
- Validate all keys. We'll do a minimal check for OpenSea (non-empty).
328
- """
329
- try:
330
- # Check OpenAI
331
- client = openai.OpenAI(api_key=openai_key)
332
- client.chat.completions.create(
333
- model=Config.OPENAI_MODEL,
334
- messages=[{"role": "user", "content": "test"}],
335
- max_tokens=1
336
- )
337
-
338
- # Check Etherscan
339
- async def check_etherscan():
340
- async with WalletAnalyzer(etherscan_key) as analyzer:
341
- params = {"module": "stats", "action": "ethsupply"}
342
- await analyzer._fetch_data(params)
343
- asyncio.run(check_etherscan())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
344
 
345
- # Check OpenSea
346
- if not opensea_key.strip():
347
- return False, "OpenSea API key is empty!"
348
-
349
- return True, "All API keys validated!"
350
- except Exception as e:
351
- return False, f"API key validation failed: {str(e)}"
352
-
353
- def _format_context_message(self) -> str:
354
- """Format wallet data for GPT system prompt."""
355
- lines = []
356
- if not self.context:
357
- return ""
358
- lines.append("Current Wallet Data:")
359
- for addr, wdata in self.context.items():
360
- lines.append(f"Wallet {addr[:8]}...{addr[-6:]}:")
361
- lines.append(f" ETH Balance: {wdata['eth_balance']:.4f}")
362
- lines.append(f" # of Tokens: {len(wdata['tokens'])}")
363
- # NFT aggregator
364
- nft_data = wdata["nft_collections"]
365
- if "collections" in nft_data:
366
- lines.append(" NFT Collections:")
367
- for c in nft_data["collections"]:
368
- lines.append(f" - {c['collection_name']}: {len(c['items'])} NFT(s)")
369
  return "\n".join(lines)
370
 
371
  async def process_message(
372
  self,
373
  message: str,
374
- history: Optional[ChatHistory] = None
375
- ) -> Tuple[ChatHistory, Dict[str, Any], List[Image.Image]]:
376
- """
377
- 1) Detect Ethereum address
378
- 2) Etherscan for wallet data
379
- 3) For each NFT, fetch from OpenSea, convert to PIL
380
- 4) Return images + chat response
381
- """
382
  if history is None:
383
  history = []
384
-
385
  if not message.strip():
386
- return history, self.context, []
387
 
388
- match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
389
  nft_images: List[Image.Image] = []
390
-
 
 
391
  if match:
392
  eth_address = match.group(0)
393
- # partial
394
- partial_text = f"Analyzing {eth_address}..."
395
- history.append((message, partial_text))
 
396
 
397
  try:
398
- # Etherscan
399
  async with WalletAnalyzer(self.etherscan_key) as analyzer:
400
- wallet_data = await analyzer.get_portfolio_data(eth_address)
401
- self.context[eth_address] = wallet_data
402
-
403
- # Summaries
404
- lines = [
405
- f"📊 Summary for {eth_address[:8]}...{eth_address[-6:]}",
406
- f"ETH: {wallet_data['eth_balance']:.4f}",
407
- f"Tokens: {len(wallet_data['tokens'])}"
408
- ]
409
- nft_info = wallet_data["nft_collections"]
410
- total_nfts = 0
411
- if "collections" in nft_info:
412
- for c in nft_info["collections"]:
413
- total_nfts += len(c["items"])
414
- lines.append(f"NFTs: {total_nfts}")
415
- # Add to chat
416
- history.append((message, "\n".join(lines)))
417
-
418
- # If we want to fetch images for, say, first 2 collections, 2 items each
419
- if "collections" in nft_info:
420
- for coll in nft_info["collections"][:2]:
421
- for item in coll["items"][:2]:
422
- # 1) fetch metadata
423
- meta = await fetch_nft_metadata(self.opensea_key, item["contract"], item["token_id"])
424
- if "error" in meta:
425
- logger.warning(f"OpenSea metadata error: {meta['error']}")
426
- continue
427
- image_url = meta["image_url"]
428
- if not image_url:
429
- logger.info(f"No image for {meta['name']}")
430
- continue
431
- # 2) Download + convert to PIL
432
- pil_img = await fetch_image_as_pil(image_url)
433
- if pil_img:
434
- nft_images.append(pil_img)
435
- # Also mention in chat
436
- found_msg = f"Found NFT image: {meta['name']} (contract {item['contract'][:8]}...{item['contract'][-6:]}, id={item['token_id']})"
437
- history.append((message, found_msg))
438
 
439
  except Exception as e:
440
- err = f"Error analyzing {eth_address}: {str(e)}"
441
- logger.error(err)
442
- history.append((message, err))
 
443
 
444
- # Now do an OpenAI chat
445
  try:
446
- context_str = self._format_context_message()
447
- # Convert chat to OpenAI format
448
- limit = Config.HISTORY_LIMIT
449
- short_hist = history[-limit:]
450
- openai_msgs = []
451
- for usr, ans in short_hist:
452
- openai_msgs.append({"role": "user", "content": usr})
453
- openai_msgs.append({"role": "assistant", "content": ans})
454
-
455
- # openai
456
- openai.api_key = self.openai_key
 
 
 
 
457
  client = openai.OpenAI(api_key=self.openai_key)
458
- resp = client.chat.completions.create(
459
  model=Config.OPENAI_MODEL,
460
- messages=[
461
- {"role": "system", "content": Config.SYSTEM_PROMPT},
462
- {"role": "system", "content": context_str},
463
- *openai_msgs,
464
- {"role": "user", "content": message}
465
- ],
466
  temperature=Config.TEMPERATURE,
467
  max_tokens=Config.MAX_TOKENS
468
  )
469
- final_msg = resp.choices[0].message.content
470
- history.append((message, final_msg))
471
 
472
- return history, self.context, nft_images
 
 
 
473
 
474
  except Exception as e:
475
- logger.error(f"OpenAI error: {e}")
476
- err_chat = f"OpenAI error: {str(e)}"
477
- history.append((message, err_chat))
478
- return history, self.context, []
479
 
480
  def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
481
- """Clear the wallet context and chat."""
482
- self.context = {}
483
  return {}, []
484
- #### Part 5: Gradio UI & Launch
485
-
486
- class GradioInterface:
487
- """Manages the Gradio UI for Hugging Face Space, with top-right NFT gallery as PIL images."""
488
-
489
- def __init__(self):
490
- self.chat_interface: Optional[ChatInterface] = None
491
- self.demo = self._create_interface()
492
-
493
- def _create_interface(self) -> gr.Blocks:
494
- with gr.Blocks(theme=gr.themes.Soft()) as demo:
495
- gr.Markdown("""
496
- # 🐕 LOSS DOG: Blockchain Wallet Analyzer (NFT Images, top-right)
497
-
498
- - Enter your **OpenAI**, **Etherscan**, **OpenSea** keys below
499
- - Validate, then chat with your Ethereum address
500
- - NFT images will appear as **PIL** objects in the top-right
501
- """)
502
-
503
- with gr.Row():
504
- openai_key = gr.Textbox(
505
- label="OpenAI API Key",
506
- type="password",
507
- placeholder="Enter your OpenAI API key..."
508
- )
509
- etherscan_key = gr.Textbox(
510
- label="Etherscan API Key",
511
- type="password",
512
- placeholder="Enter your Etherscan API key..."
513
- )
514
- opensea_key = gr.Textbox(
515
- label="OpenSea API Key",
516
- type="password",
517
- placeholder="Enter your OpenSea API key..."
518
- )
519
-
520
- validation_status = gr.Textbox(label="Validation Status", interactive=False)
521
- validate_btn = gr.Button("Validate API Keys", variant="primary")
522
-
523
- with gr.Row():
524
- # Main Chat Column
525
- with gr.Column(scale=2):
526
- chatbot = gr.Chatbot(label="Chat History", height=420, value=[])
527
- with gr.Row():
528
- msg_input = gr.Textbox(
529
- label="Message",
530
- placeholder="Enter an ETH address or question..."
531
- )
532
- send_btn = gr.Button("Send", variant="primary")
533
- # Top-Right: NFT Gallery (PIL images)
534
- with gr.Column(scale=1):
535
- nft_gallery = gr.Gallery(
536
- label="NFT Images (Top-Right)",
537
- columns=2
538
- )
539
- wallet_context = gr.JSON(
540
- label="Active Wallet Context",
541
- value={}
542
- )
543
- clear_btn = gr.Button("Clear Context")
544
-
545
- msg_input.interactive = False
546
- send_btn.interactive = False
547
-
548
- def validate_keys(openai_k: str, etherscan_k: str, opensea_k: str) -> Tuple[str, gr.update, gr.update]:
549
- """Validate user-provided keys; enable chat if all pass."""
550
- is_valid, msg = ChatInterface._validate_api_keys(openai_k, etherscan_k, opensea_k)
551
- if is_valid:
552
- self.chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k)
553
- return (
554
- f"✅ {msg}",
555
- gr.update(interactive=True),
556
- gr.update(interactive=True)
557
- )
558
- else:
559
- return (
560
- f"❌ {msg}",
561
- gr.update(interactive=False),
562
- gr.update(interactive=False)
563
- )
564
-
565
- validate_btn.click(
566
- fn=validate_keys,
567
- inputs=[openai_key, etherscan_key, opensea_key],
568
- outputs=[validation_status, msg_input, send_btn]
569
- )
570
-
571
- def clear_all():
572
- """Clear wallet context and chat."""
573
- if self.chat_interface:
574
- return self.chat_interface.clear_context()
575
- return {}, []
576
-
577
- clear_btn.click(
578
- fn=clear_all,
579
- inputs=[],
580
- outputs=[wallet_context, chatbot]
581
- )
582
-
583
- async def handle_message(
584
- message: str,
585
- chat_hist: List[Tuple[str, str]],
586
- context: Dict[str, Any]
587
- ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
588
- """Process user input, return updated chat, context, list of PIL images."""
589
- if not self.chat_interface:
590
- return [], {}, []
591
-
592
- try:
593
- new_history, new_context, images = await self.chat_interface.process_message(message, chat_hist)
594
- return new_history, new_context, images
595
- except Exception as e:
596
- logger.error(f"Error in handle_message: {e}")
597
- if chat_hist is None:
598
- chat_hist = []
599
- chat_hist.append((message, f"Error: {str(e)}"))
600
- return chat_hist, context, []
601
-
602
- # Chat flow
603
- msg_input.submit(
604
- fn=handle_message,
605
- inputs=[msg_input, chatbot, wallet_context],
606
- outputs=[chatbot, wallet_context, nft_gallery]
607
- ).then(
608
- lambda: gr.update(value=""),
609
- None,
610
- [msg_input]
611
- )
612
-
613
- send_btn.click(
614
- fn=handle_message,
615
- inputs=[msg_input, chatbot, wallet_context],
616
- outputs=[chatbot, wallet_context, nft_gallery]
617
- ).then(
618
- lambda: gr.update(value=""),
619
- None,
620
- [msg_input]
621
- )
622
-
623
- return demo
624
 
625
- def launch(self):
626
- self.demo.queue()
627
- self.demo.launch()
 
 
628
 
629
  def main():
630
- """Main entry point for Hugging Face Space."""
631
- logger.info("Launching LOSS DOG with PIL-based NFT images (top-right).")
632
- interface = GradioInterface()
633
- interface.launch()
634
 
635
  if __name__ == "__main__":
636
- main()
637
-
 
2
  Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
3
 
4
  This module provides a complete implementation of a blockchain wallet analysis tool
5
+ with a Gradio web interface. It includes improved NFT tracking, natural AI communication,
6
+ and enhanced error handling.
 
7
 
8
  Author: Claude
9
  Date: January 2025
10
  """
11
 
12
  from __future__ import annotations
 
13
  import os
14
  import re
15
  import json
 
17
  import logging
18
  import asyncio
19
  import base64
20
+ from typing import List, Dict, Tuple, Any, Optional, TypeVar, Set
21
  from datetime import datetime
22
  from decimal import Decimal
23
+ from dataclasses import dataclass, field
24
  from pathlib import Path
25
  from io import BytesIO
26
 
 
34
  T = TypeVar('T')
35
  WalletData = Dict[str, Any]
36
  ChatHistory = List[Tuple[str, str]]
37
+ NFTIdentifier = Tuple[str, str] # (contract, token_id)
38
 
39
  # Configure logging
40
  logging.basicConfig(
 
47
  )
48
  logger = logging.getLogger(__name__)
49
 
50
+ class BlockchainError(Exception):
51
+ """Base exception for blockchain-related errors."""
52
+ pass
53
+
54
+ class ConfigError(BlockchainError):
55
+ """Configuration error."""
56
+ pass
57
+
58
+ class APIError(BlockchainError):
59
+ """API call error."""
60
  pass
61
 
62
+ class ValidationError(BlockchainError):
63
+ """Input validation error."""
64
  pass
65
 
66
+ class NFTError(BlockchainError):
67
+ """NFT processing error."""
68
  pass
69
 
70
+ @dataclass
71
+ class NFTMetadata:
72
+ """Structure for NFT metadata."""
73
+ contract: str
74
+ token_id: str
75
+ name: str
76
+ collection_name: str
77
+ image_url: Optional[str] = None
78
+ description: Optional[str] = None
79
+ traits: List[Dict[str, Any]] = field(default_factory=list)
80
+ last_updated: datetime = field(default_factory=datetime.now)
81
+
82
+ def to_dict(self) -> Dict[str, Any]:
83
+ """Convert to dictionary format."""
84
+ return {
85
+ "contract": self.contract,
86
+ "token_id": self.token_id,
87
+ "name": self.name,
88
+ "collection_name": self.collection_name,
89
+ "image_url": self.image_url,
90
+ "description": self.description,
91
+ "traits": self.traits,
92
+ "last_updated": self.last_updated.isoformat()
93
+ }
94
+
95
  @dataclass
96
  class Config:
97
+ """Enhanced application configuration."""
98
+
99
+ # Improved system prompt for more natural responses
100
  SYSTEM_PROMPT: str = """
101
  You are LOSS DOG 🐕 (Learning & Observing Smart Systems Digital Output Generator),
102
+ a friendly blockchain analysis assistant!
103
+
104
+ Your conversation style:
105
+ - Maintain natural dialogue flow
106
+ - Avoid repetitive responses
107
+ - Use context from previous messages
108
+ - Track which NFTs have been discussed
109
+ - Provide insights when relevant
110
+
111
+ When analyzing wallets:
112
+ - Reference specific NFTs by collection and ID
113
+ - Highlight interesting patterns
114
+ - Compare wallets if multiple available
115
+ - Monitor NFT rarity and traits
116
  """
117
+
118
+ # API Configuration
119
  ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
120
+ OPENSEA_BASE_URL: str = "https://api.opensea.io/api/v2"
121
+ ETHEREUM_ADDRESS_REGEX: str = r"^0x[a-fA-F0-9]{40}$"
122
+
123
+ # Rate Limiting & Retries
124
  RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
125
  MAX_RETRIES: int = 3
126
+ RETRY_BASE_DELAY: float = 1.0
127
+ RETRY_MAX_DELAY: float = 10.0
 
 
 
128
 
129
+ # API Timeouts
130
+ REQUEST_TIMEOUT: float = 30.0
131
+ NFT_FETCH_TIMEOUT: int = 10
132
+
133
+ # NFT Processing
134
+ MAX_NFTS_PER_COLLECTION: int = 50
135
+ NFT_BATCH_SIZE: int = 5
136
+ MAX_IMAGE_SIZE: Tuple[int, int] = (800, 800)
137
+
138
+ # OpenAI Settings
139
+ OPENAI_MODEL: str = "gpt-4o-mini"
140
+ MAX_TOKENS: int = 8000
141
+ TEMPERATURE: float = 0.7
142
+ HISTORY_LIMIT: int = 10
143
  class WalletAnalyzer:
144
  """Analyzes Ethereum wallet contents using Etherscan API."""
145
 
 
148
  self.base_url = Config.ETHERSCAN_BASE_URL
149
  self.session: Optional[aiohttp.ClientSession] = None
150
  self.last_request_time = 0
151
+ self._rate_limit_semaphore = asyncio.Semaphore(1)
152
 
153
  async def __aenter__(self) -> WalletAnalyzer:
154
+ self.session = aiohttp.ClientSession(
155
+ timeout=aiohttp.ClientTimeout(total=Config.REQUEST_TIMEOUT)
156
+ )
157
  return self
158
 
159
  async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
 
161
  await self.session.close()
162
  self.session = None
163
 
164
+ async def _rate_limit(self) -> None:
165
+ """Thread-safe rate limiting."""
166
+ async with self._rate_limit_semaphore:
167
+ now = time.time()
168
+ time_since_last = now - self.last_request_time
169
+ if time_since_last < Config.RATE_LIMIT_DELAY:
170
+ await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_since_last)
171
+ self.last_request_time = time.time()
172
+
173
  @retry(
174
  stop=stop_after_attempt(Config.MAX_RETRIES),
175
+ wait=wait_exponential(
176
+ multiplier=Config.RETRY_BASE_DELAY,
177
+ min=Config.RETRY_BASE_DELAY,
178
+ max=Config.RETRY_MAX_DELAY
179
+ )
180
  )
181
  async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
182
+ """Fetch data from Etherscan with improved error handling."""
183
  if not self.session:
184
  raise APIError("No active session. Use context manager.")
185
+
186
  await self._rate_limit()
187
  params["apikey"] = self.api_key
188
+
189
  try:
190
  async with self.session.get(self.base_url, params=params) as response:
191
+ if response.status == 429:
192
+ raise APIError("Rate limit exceeded")
193
  if response.status != 200:
194
  raise APIError(f"Etherscan request failed: {response.status}")
195
+
196
+ try:
197
+ data = await response.json()
198
+ except json.JSONDecodeError as e:
199
+ raise APIError(f"Invalid JSON response: {e}")
200
+
201
  if data.get("status") == "0":
202
+ message = data.get("message", "Unknown error")
203
+ result = data.get("result", "")
204
+ if "Max rate limit reached" in message:
205
+ raise APIError(f"Rate limit exceeded: {result}")
206
+ raise APIError(f"Etherscan API error: {message}")
207
+
208
  return data
209
+
210
+ except asyncio.TimeoutError:
211
+ raise APIError("Request timed out")
212
  except aiohttp.ClientError as e:
213
  raise APIError(f"Network error: {str(e)}")
214
  except Exception as e:
215
  raise APIError(f"Unexpected error: {str(e)}")
216
 
 
 
 
 
 
 
 
 
217
  @staticmethod
218
  def _validate_address(address: str) -> bool:
219
  """Validate Ethereum address format."""
220
  return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
221
 
222
+ async def get_wallet_data(self, address: str) -> WalletData:
223
  """
224
+ Get comprehensive wallet data including:
225
+ - ETH balance
226
+ - ERC-20 tokens
227
+ - NFT collections
 
 
228
  """
229
  if not self._validate_address(address):
230
  raise ValidationError(f"Invalid Ethereum address: {address}")
231
 
232
+ logger.info(f"Fetching data for wallet: {address}")
233
+ try:
234
+ eth_balance = await self._get_eth_balance(address)
235
+ tokens = await self._get_token_holdings(address)
236
+ nfts = await self._get_nft_holdings(address)
237
+
238
+ return {
239
+ "address": address,
240
+ "last_updated": datetime.now().isoformat(),
241
+ "eth_balance": float(eth_balance),
242
+ "tokens": tokens,
243
+ "nft_collections": nfts
244
+ }
245
+ except Exception as e:
246
+ logger.error(f"Error fetching wallet data: {e}")
247
+ raise
248
 
249
  async def _get_eth_balance(self, address: str) -> Decimal:
250
+ """Get ETH balance for address."""
251
  params = {
252
  "module": "account",
253
  "action": "balance",
254
  "address": address,
255
  "tag": "latest"
256
  }
257
+ try:
258
+ data = await self._fetch_data(params)
259
+ balance_wei = Decimal(data["result"])
260
+ return balance_wei / Decimal("1e18") # Convert Wei to ETH
261
+ except (KeyError, ValueError) as e:
262
+ raise APIError(f"Error parsing ETH balance: {e}")
263
 
264
  async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
265
+ """Get ERC-20 token holdings with improved accuracy."""
266
  params = {
267
  "module": "account",
268
  "action": "tokentx",
 
273
 
274
  token_map: Dict[str, Dict[str, Any]] = {}
275
  for tx in data.get("result", []):
276
+ try:
277
+ contract = tx["contractAddress"].lower()
278
+ if contract not in token_map:
279
+ token_map[contract] = {
280
+ "name": tx["tokenName"],
281
+ "symbol": tx["tokenSymbol"],
282
+ "decimals": int(tx["tokenDecimal"]),
283
+ "balance": Decimal(0)
284
+ }
285
+
286
+ amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
287
+ if tx["to"].lower() == address.lower():
288
+ token_map[contract]["balance"] += amount
289
+ elif tx["from"].lower() == address.lower():
290
+ token_map[contract]["balance"] -= amount
291
+ except (KeyError, ValueError, decimal.InvalidOperation) as e:
292
+ logger.warning(f"Error processing token transaction: {e}")
293
+ continue
294
+
295
+ # Filter tokens with non-zero balances
296
  return [
297
  {
298
+ "name": token["name"],
299
+ "symbol": token["symbol"],
300
+ "balance": float(token["balance"])
301
  }
302
+ for token in token_map.values()
303
+ if token["balance"] > 0
304
  ]
305
+ async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
 
306
  """
307
+ Get NFT holdings grouped by collection.
308
+ Returns: {"collections": [{"collection_name": str, "items": [{"contract", "token_id"}, ...]}]}
309
  """
310
  params = {
311
  "module": "account",
 
313
  "address": address,
314
  "sort": "desc"
315
  }
316
+
317
+ try:
318
+ data = await self._fetch_data(params)
319
+ if data.get("status") != "1" or "result" not in data:
320
+ return {"collections": []}
321
 
322
+ # Track NFT ownership by collection
323
+ collections: Dict[str, List[Dict[str, str]]] = {}
324
+ ownership: Dict[str, Dict[str, Any]] = {} # contract_tokenId -> ownership info
325
+
326
+ for tx in data["result"]:
327
+ contract = tx["contractAddress"].lower()
328
+ token_id = tx["tokenID"]
329
+ key = f"{contract}_{token_id}"
330
+
331
+ # Get collection name, fallback with contract if needed
332
+ collection_name = tx.get("tokenName", "Unknown Collection")
333
+ if collection_name == "Unknown Collection" and contract in collections:
334
+ collection_name = next(iter(collections[contract]))
335
+
336
+ # Track ownership changes
337
+ if tx["to"].lower() == address.lower():
338
+ ownership[key] = {
339
+ "contract": contract,
340
+ "token_id": token_id,
341
+ "collection_name": collection_name
342
+ }
343
+ elif tx["from"].lower() == address.lower():
344
+ ownership.pop(key, None)
345
+
346
+ # Group owned NFTs by collection
347
+ for nft in ownership.values():
348
+ coll_name = nft["collection_name"]
349
+ if coll_name not in collections:
350
+ collections[coll_name] = []
351
+ collections[coll_name].append({
352
+ "contract": nft["contract"],
353
+ "token_id": nft["token_id"]
354
+ })
355
+
356
+ return {
357
+ "collections": [
358
+ {
359
+ "collection_name": name,
360
+ "items": items[:Config.MAX_NFTS_PER_COLLECTION]
361
+ }
362
+ for name, items in collections.items()
363
+ ]
364
+ }
365
+
366
+ except Exception as e:
367
+ logger.error(f"Error fetching NFT holdings: {e}")
368
+ raise APIError(f"Failed to fetch NFT holdings: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
369
 
 
 
 
 
370
 
371
+ class NFTProcessor:
372
+ """Handles NFT metadata and image processing."""
373
+
374
+ def __init__(self, opensea_key: str):
375
+ self.opensea_key = opensea_key
376
+ self._session: Optional[aiohttp.ClientSession] = None
377
+ self.last_request_time = 0
378
+ self._rate_limit_semaphore = asyncio.Semaphore(1)
379
+ self._metadata_cache: Dict[str, NFTMetadata] = {}
380
+ self._image_cache: Dict[str, Image.Image] = {}
381
+
382
+ async def __aenter__(self) -> 'NFTProcessor':
383
+ self._session = aiohttp.ClientSession(
384
+ timeout=aiohttp.ClientTimeout(total=Config.NFT_FETCH_TIMEOUT)
385
+ )
386
+ return self
387
+
388
+ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
389
+ if self._session:
390
+ await self._session.close()
391
+ self._session = None
392
+
393
+ async def _rate_limit(self) -> None:
394
+ """Thread-safe rate limiting."""
395
+ async with self._rate_limit_semaphore:
396
+ now = time.time()
397
+ time_since_last = now - self.last_request_time
398
+ if time_since_last < Config.RATE_LIMIT_DELAY:
399
+ await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_since_last)
400
+ self.last_request_time = time.time()
401
+
402
+ def _get_cache_key(self, contract: str, token_id: str) -> str:
403
+ """Generate cache key for NFT."""
404
+ return f"{contract.lower()}_{token_id}"
405
 
406
+ @retry(
407
+ stop=stop_after_attempt(Config.MAX_RETRIES),
408
+ wait=wait_exponential(
409
+ multiplier=Config.RETRY_BASE_DELAY,
410
+ min=Config.RETRY_BASE_DELAY,
411
+ max=Config.RETRY_MAX_DELAY
412
+ )
413
+ )
414
+ async def fetch_nft_metadata(
415
+ self,
416
+ contract: str,
417
+ token_id: str,
418
+ force_refresh: bool = False
419
+ ) -> NFTMetadata:
420
+ """Fetch NFT metadata from OpenSea with caching."""
421
+ if not self._session:
422
+ raise NFTError("No active session")
423
+
424
+ cache_key = self._get_cache_key(contract, token_id)
425
+
426
+ # Check cache unless force refresh
427
+ if not force_refresh and cache_key in self._metadata_cache:
428
+ return self._metadata_cache[cache_key]
429
+
430
+ await self._rate_limit()
431
+
432
+ url = f"{Config.OPENSEA_BASE_URL}/chain/ethereum/contract/{contract}/nfts/{token_id}"
433
+ headers = {"X-API-KEY": self.opensea_key}
434
+
435
+ try:
436
+ async with self._session.get(url, headers=headers) as resp:
437
+ if resp.status == 429:
438
+ raise APIError("OpenSea rate limit exceeded")
439
+ if resp.status != 200:
440
+ raise APIError(f"OpenSea API error: {resp.status}")
441
+
442
+ data = await resp.json()
443
+ nft_data = data.get("nft", {})
444
+
445
+ metadata = NFTMetadata(
446
+ contract=contract,
447
+ token_id=token_id,
448
+ name=nft_data.get("name", f"NFT #{token_id}"),
449
+ collection_name=nft_data.get("collection", {}).get("name", "Unknown"),
450
+ image_url=nft_data.get("image_url"),
451
+ description=nft_data.get("description"),
452
+ traits=nft_data.get("traits", [])
453
+ )
454
+
455
+ # Cache the result
456
+ self._metadata_cache[cache_key] = metadata
457
+ return metadata
458
+
459
+ except asyncio.TimeoutError:
460
+ raise NFTError(f"Timeout fetching NFT {contract} #{token_id}")
461
+ except Exception as e:
462
+ raise NFTError(f"Error fetching NFT metadata: {str(e)}")
463
+
464
+ async def fetch_image(self, image_url: str) -> Optional[Image.Image]:
465
+ """Download and process NFT image with caching."""
466
+ if not image_url or not self._session:
467
+ return None
468
+
469
+ # Check image cache
470
+ if image_url in self._image_cache:
471
+ return self._image_cache[image_url]
472
+
473
+ try:
474
+ async with self._session.get(
475
+ image_url,
476
+ timeout=aiohttp.ClientTimeout(total=Config.NFT_FETCH_TIMEOUT)
477
+ ) as resp:
478
+ if resp.status != 200:
479
+ return None
480
+ data = await resp.read()
481
+
482
+ img = Image.open(BytesIO(data))
483
+
484
+ # Convert to RGB/RGBA if needed
485
+ if img.mode not in ('RGB', 'RGBA'):
486
+ img = img.convert('RGBA')
487
+
488
+ # Resize if needed
489
+ if (img.size[0] > Config.MAX_IMAGE_SIZE[0] or
490
+ img.size[1] > Config.MAX_IMAGE_SIZE[1]):
491
+ img.thumbnail(Config.MAX_IMAGE_SIZE)
492
+
493
+ # Cache the processed image
494
+ self._image_cache[image_url] = img
495
+ return img
496
+
497
+ except Exception as e:
498
+ logger.error(f"Error processing image: {e}")
499
+ return None
500
  class ChatInterface:
501
+ """Manages chat interactions and context."""
502
 
503
  def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
504
  self.openai_key = openai_key
505
  self.etherscan_key = etherscan_key
506
  self.opensea_key = opensea_key
507
+ self.context = ChatContext()
508
+ self.nft_processor = None
509
  openai.api_key = openai_key
510
 
511
+ async def _ensure_nft_processor(self) -> NFTProcessor:
512
+ """Initialize NFT processor if needed."""
513
+ if self.nft_processor is None:
514
+ self.nft_processor = NFTProcessor(self.opensea_key)
515
+ await self.nft_processor.__aenter__()
516
+ return self.nft_processor
517
+
518
+ def _format_context(self) -> str:
519
+ """Format wallet data for AI context."""
520
+ if not self.context.wallet_data:
521
+ return "No wallet data available."
522
+
523
+ lines = ["Current Wallet Analysis:"]
524
+ for addr, data in self.context.wallet_data.items():
525
+ lines.append(f"\nWallet {addr[:8]}...{addr[-6:]}:")
526
+ lines.append(f"ETH Balance: {data['eth_balance']:.4f} ETH")
527
+
528
+ if data['tokens']:
529
+ lines.append("\nToken Holdings:")
530
+ for token in sorted(
531
+ data['tokens'],
532
+ key=lambda x: x['balance'],
533
+ reverse=True
534
+ )[:10]: # Show top 10 tokens
535
+ lines.append(f"- {token['symbol']}: {token['balance']:.4f}")
536
+
537
+ nft_data = data.get("nft_collections", {})
538
+ if "collections" in nft_data and nft_data["collections"]:
539
+ lines.append("\nNFT Collections:")
540
+ for coll in nft_data["collections"]:
541
+ coll_name = coll["collection_name"]
542
+ total_items = len(coll["items"])
543
+ discussed = sum(1 for item in coll["items"]
544
+ if self.context.is_nft_discussed(
545
+ item["contract"],
546
+ item["token_id"]
547
+ ))
548
+ lines.append(f"\n{coll_name}:")
549
+ lines.append(f"- Total Items: {total_items}")
550
+ lines.append(f"- Discussed: {discussed}/{total_items}")
551
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
552
  return "\n".join(lines)
553
 
554
  async def process_message(
555
  self,
556
  message: str,
557
+ history: Optional[List[Tuple[str, str]]] = None
558
+ ) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
559
+ """Process user message and return chat updates."""
 
 
 
 
 
560
  if history is None:
561
  history = []
562
+
563
  if not message.strip():
564
+ return history, self.context.wallet_data, []
565
 
 
566
  nft_images: List[Image.Image] = []
567
+
568
+ # Check for Ethereum address
569
+ match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
570
  if match:
571
  eth_address = match.group(0)
572
+ history.append((
573
+ message,
574
+ f"Analyzing wallet {eth_address[:8]}...{eth_address[-6:]}..."
575
+ ))
576
 
577
  try:
578
+ # Fetch wallet data
579
  async with WalletAnalyzer(self.etherscan_key) as analyzer:
580
+ wallet_data = await analyzer.get_wallet_data(eth_address)
581
+ self.context.wallet_data[eth_address] = wallet_data
582
+
583
+ # Process NFTs
584
+ nft_processor = await self._ensure_nft_processor()
585
+ nft_info = wallet_data.get("nft_collections", {})
586
+
587
+ if "collections" in nft_info and nft_info["collections"]:
588
+ for collection in nft_info["collections"]:
589
+ for i in range(0, len(collection["items"]), Config.NFT_BATCH_SIZE):
590
+ batch = collection["items"][i:i + Config.NFT_BATCH_SIZE]
591
+ metadata_tasks = [
592
+ nft_processor.fetch_nft_metadata(
593
+ item["contract"],
594
+ item["token_id"]
595
+ )
596
+ for item in batch
597
+ ]
598
+ metadata_results = await asyncio.gather(
599
+ *metadata_tasks,
600
+ return_exceptions=True
601
+ )
602
+
603
+ for meta in metadata_results:
604
+ if isinstance(meta, Exception):
605
+ continue
606
+ if meta.image_url:
607
+ img = await nft_processor.fetch_image(meta.image_url)
608
+ if img:
609
+ nft_images.append(img)
 
 
 
 
 
 
 
 
610
 
611
  except Exception as e:
612
+ error_msg = f"Error analyzing wallet: {str(e)}"
613
+ logger.error(error_msg)
614
+ history.append((message, error_msg))
615
+ return history, self.context.wallet_data, []
616
 
617
+ # Process with OpenAI
618
  try:
619
+ messages = [
620
+ {"role": "system", "content": Config.SYSTEM_PROMPT},
621
+ {"role": "system", "content": self._format_context()}
622
+ ]
623
+
624
+ # Add recent history
625
+ recent_history = history[-Config.HISTORY_LIMIT:]
626
+ for usr, bot in recent_history:
627
+ messages.extend([
628
+ {"role": "user", "content": usr},
629
+ {"role": "assistant", "content": bot}
630
+ ])
631
+
632
+ messages.append({"role": "user", "content": message})
633
+
634
  client = openai.OpenAI(api_key=self.openai_key)
635
+ response = client.chat.completions.create(
636
  model=Config.OPENAI_MODEL,
637
+ messages=messages,
 
 
 
 
 
638
  temperature=Config.TEMPERATURE,
639
  max_tokens=Config.MAX_TOKENS
640
  )
 
 
641
 
642
+ ai_message = response.choices[0].message.content
643
+ history.append((message, ai_message))
644
+
645
+ return history, self.context.wallet_data, nft_images
646
 
647
  except Exception as e:
648
+ error_msg = f"Chat error: {str(e)}"
649
+ logger.error(error_msg)
650
+ history.append((message, error_msg))
651
+ return history, self.context.wallet_data, []
652
 
653
  def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
654
+ """Clear chat context and history."""
655
+ self.context = ChatContext()
656
  return {}, []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
657
 
658
+ async def cleanup(self):
659
+ """Cleanup resources."""
660
+ if self.nft_processor:
661
+ await self.nft_processor.__aexit__(None, None, None)
662
+ self.nft_processor = None
663
 
664
  def main():
665
+ """Main entry point."""
666
+ demo = create_gradio_interface()
667
+ demo.queue()
668
+ demo.launch()
669
 
670
  if __name__ == "__main__":
671
+ main()