from flask import Flask, render_template, request, jsonify, Response from dotenv import load_dotenv import requests from datetime import datetime import os import json import openai import numpy as np import pickle from pathlib import Path import umap import plotly.express as px import plotly.graph_objects as go import pandas as pd from sklearn.cluster import DBSCAN from sklearn.preprocessing import StandardScaler import time import queue import threading import hdbscan from sklearn.neighbors import NearestNeighbors import traceback load_dotenv() app = Flask(__name__) # Get API keys from environment variables SERPAPI_API_KEY = os.getenv('SERPAPI_API_KEY') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') MAX_PATENTS = 3000 # Increased from 2000 to 5000 for better coverage MIN_PATENTS_FOR_GAPS = 3000 # Minimum patents needed for reliable gap detection CACHE_FILE = 'patent_embeddings_cache.pkl' # Global progress queue for SSE updates progress_queue = queue.Queue() if not SERPAPI_API_KEY: raise ValueError("SERPAPI_API_KEY environment variable is not set") if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY environment variable is not set") # Initialize OpenAI API key openai.api_key = OPENAI_API_KEY def load_cache(): """Load cached embeddings from file""" try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, 'rb') as f: return pickle.load(f) except Exception as e: print(f"Error loading cache: {e}") return {} def save_cache(cache): """Save embeddings cache to file""" try: with open(CACHE_FILE, 'wb') as f: pickle.dump(cache, f) except Exception as e: print(f"Error saving cache: {e}") def get_embedding(text, cache): """Get embedding for text, using cache if available""" if not text or text.strip() == "": return None if text in cache: return cache[text] try: response = openai.Embedding.create( model="text-embedding-3-small", input=text ) embedding = response['data'][0]['embedding'] if embedding: # Only cache if we got a valid embedding cache[text] = embedding save_cache(cache) # Save cache after each new embedding return embedding except Exception as e: print(f"Error getting embedding: {e}") return None def search_patents(keywords, page_size=100): """ Search patents using SerpApi's Google Patents API with pagination and generate embeddings """ # Load existing cache embedding_cache = load_cache() all_patents = [] page = 1 total_processed = 0 while len(all_patents) < MAX_PATENTS: update_progress('search', 'processing', f'Fetching page {page} of patents...') # SerpApi Google Patents API endpoint api_url = "https://serpapi.com/search" params = { "engine": "google_patents", "q": keywords, "api_key": SERPAPI_API_KEY, "num": page_size, "start": (page - 1) * page_size } try: response = requests.get(api_url, params=params) response_data = response.json() if "error" in response_data: print(f"API returned error: {response_data['error']}") break patents_data = response_data.get('organic_results', []) if not patents_data: print(f"No more patents found on page {page}") break for idx, patent in enumerate(patents_data): if len(all_patents) >= MAX_PATENTS: break # Format filing date filing_date = patent.get('filing_date', '') filing_year = 'N/A' if filing_date: try: filing_year = datetime.strptime(filing_date, '%Y-%m-%d').year except ValueError: pass # Get assignee assignee = patent.get('assignee', 'N/A') if isinstance(assignee, list) and assignee: assignee = assignee[0] # Format title and abstract for embedding title = patent.get('title', '').strip() abstract = patent.get('snippet', '').strip() combined_text = f"{title}\n{abstract}".strip() # Get embedding for combined text total_processed += 1 if total_processed % 10 == 0: # Update progress every 10 patents update_progress('embedding', 'processing', f'Processing patent {total_processed} of {MAX_PATENTS}...') embedding = get_embedding(combined_text, embedding_cache) formatted_patent = { 'title': title, 'assignee': assignee, 'filing_year': filing_year, 'abstract': abstract, 'link': patent.get('patent_link', '') or patent.get('link', ''), 'embedding': embedding } all_patents.append(formatted_patent) print(f"Retrieved {len(patents_data)} patents from page {page}") # Check if there are more pages if not response_data.get('serpapi_pagination', {}).get('next'): break page += 1 except Exception as e: print(f"Error searching patents: {e}") break # Save final cache state save_cache(embedding_cache) print(f"Total patents retrieved and embedded: {len(all_patents)}") return all_patents def analyze_patent_group(patents, group_type, label, max_retries=3): """Analyze patent groups using ChatGPT""" # Get titles and date range titles = "; ".join(patents['title'].tolist()[:3]) years = f"{patents['year'].min()}-{patents['year'].max()}" prompts = { 'cluster': ( f"Patents: {titles}. Years: {years}\nSummarize in 2-3 sentences.", "Describe the key aspects." ), 'transitional': ( f"Patents: {titles}. Years: {years}\nSummarize in 2-3 sentences.", "Describe the key aspects." ), 'innovation_subcluster': ( f"Patents: {titles}. Years: {years}\nSummarize in 2-3 sentences.", "Describe the key aspects." ) } base_prompt = prompts[group_type][0] retry_count = 0 while retry_count < max_retries: try: response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": prompts[group_type][1]}, {"role": "user", "content": base_prompt} ], max_tokens=150, temperature=0.7 ) return response.choices[0]['message']['content'] except Exception as e: retry_count += 1 if retry_count < max_retries: time.sleep(2 ** (retry_count - 1)) else: return "Analysis failed." def create_3d_visualization(patents): """ Create a 3D visualization of patent embeddings using UMAP and Plotly """ # Initialize variables for tracking different point types df = pd.DataFrame(patents) df['point_type'] = 'cluster' # Default type for all points transitional_areas = [] # Initialize empty list for transitional areas if not patents: return None update_progress('clustering', 'processing', 'Extracting embeddings...') # Extract embeddings and metadata embeddings = [] metadata = [] for patent in patents: if patent['embedding'] is not None: embeddings.append(patent['embedding']) abstract = patent['abstract'] if len(abstract) > 200: abstract = abstract[:200] + "..." metadata.append({ 'title': patent['title'], 'assignee': patent['assignee'], 'year': patent['filing_year'], 'abstract': abstract, 'link': patent['link'] }) if not embeddings: return None # Check if we have enough patents for reliable gap detection if len(embeddings) < MIN_PATENTS_FOR_GAPS: print(f"\nWarning: Dataset size ({len(embeddings)} patents) is below recommended minimum ({MIN_PATENTS_FOR_GAPS})") print("Underexplored area detection may be less reliable with smaller datasets") print("Consider:") print("1. Broadening your search terms") print("2. Including more patent categories") print("3. Expanding the time range") # Convert embeddings to numpy array embeddings_array = np.array(embeddings) update_progress('clustering', 'processing', 'Applying UMAP dimensionality reduction...') # Apply UMAP dimensionality reduction reducer = umap.UMAP(n_components=3, random_state=42) embedding_3d = reducer.fit_transform(embeddings_array) update_progress('clustering', 'processing', 'Performing DBSCAN clustering...') # Create DataFrame for plotting df = pd.DataFrame(metadata) df['x'] = embedding_3d[:, 0] df['y'] = embedding_3d[:, 1] df['z'] = embedding_3d[:, 2] # --- Improved HDBSCAN clustering logic for sparse region detection --- scaler = StandardScaler() scaled_embeddings = scaler.fit_transform(embedding_3d) n_points = len(scaled_embeddings) update_progress('clustering', 'processing', f'Analyzing {n_points} patents for clustering...') # Dynamically set max_clusters and target_noise based on number of patents if n_points < 100: max_clusters = 4 max_retries = 2 target_noise_ratio = 0.08 elif n_points < 500: max_clusters = 6 max_retries = 3 target_noise_ratio = 0.06 elif n_points < 1000: max_clusters = 8 max_retries = 4 target_noise_ratio = 0.05 else: max_clusters = 15 # Increased from 12 to force more granular clustering max_retries = 8 # More retries to find optimal clustering target_noise_ratio = 0.03 # Keep low noise ratio # Even more aggressive cluster parameters for large datasets if n_points >= 1000: min_cluster_size = max(5, int(n_points * 0.015)) # Further reduced to 1.5% for large datasets min_samples = max(3, int(min_cluster_size * 0.95)) # Increased to 0.95 for even stricter formation else: min_cluster_size = max(5, int(n_points * 0.02)) # 2% for smaller datasets min_samples = max(3, int(min_cluster_size * 0.9)) # 0.9 ratio for smaller datasets target_noise = int(n_points * target_noise_ratio) print(f"Initial HDBSCAN: min_cluster_size={min_cluster_size}, min_samples={min_samples}, max_clusters={max_clusters}, max_retries={max_retries}, target_noise={target_noise}") retry = 0 clusters = None n_clusters = 0 n_noise = 0 best_result = None best_score = float('-inf') while retry < max_retries: hdb = hdbscan.HDBSCAN( min_cluster_size=min_cluster_size, min_samples=min_samples, cluster_selection_epsilon=0.03, # Reduced further to force even tighter clusters cluster_selection_method='eom', metric='euclidean', prediction_data=True ) clusters = hdb.fit_predict(scaled_embeddings) n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0) n_noise = list(clusters).count(-1) noise_ratio = n_noise / len(clusters) avg_cluster_size = (len(clusters) - n_noise) / n_clusters if n_clusters > 0 else float('inf') print(f"\nClustering Statistics (try {retry+1}):") print(f"Number of clusters: {n_clusters}") print(f"Number of patents in sparse regions: {n_noise}") print(f"Total number of patents: {len(clusters)}") print(f"Noise ratio: {noise_ratio:.2%}") print(f"Average cluster size: {avg_cluster_size:.1f} patents") update_progress('clustering', 'processing', f'Optimizing clusters (attempt {retry + 1}/{max_retries}): ' + f'Found {n_clusters} clusters with avg size {avg_cluster_size:.1f} patents') # Calculate a score for this clustering result # Penalize both too many and too few clusters, and reward good noise ratio score = -abs(n_clusters - max_clusters) + \ -abs(noise_ratio - target_noise_ratio) * 10 + \ -abs(avg_cluster_size - (n_points / max_clusters)) / 10 if score > best_score: best_score = score best_result = (clusters, n_clusters, n_noise, noise_ratio, avg_cluster_size) # Adjust parameters based on results if n_clusters > max_clusters: print("Too many clusters, increasing parameters more aggressively...") min_cluster_size = int(min_cluster_size * 1.5) # More aggressive increase min_samples = int(min_samples * 1.4) elif n_clusters == 1 and avg_cluster_size > len(clusters) * 0.8: print("Single dominant cluster detected, adjusting for better separation...") min_cluster_size = max(5, int(min_cluster_size * 0.6)) # More aggressive decrease min_samples = max(3, int(min_samples * 0.6)) elif n_noise < target_noise * 0.5: print("Too few noise points, adjusting parameters...") min_cluster_size = int(min_cluster_size * 1.2) min_samples = max(3, int(min_samples * 0.8)) elif n_clusters < max_clusters * 0.5: print("Too few clusters, decreasing parameters...") min_cluster_size = max(5, int(min_cluster_size * 0.8)) min_samples = max(3, int(min_samples * 0.7)) else: print("Acceptable clustering found.") break retry += 1 # Use the best result if we didn't find an acceptable one if retry == max_retries and best_result is not None: print("Using best clustering result found...") clusters, n_clusters, n_noise, noise_ratio, avg_cluster_size = best_result df['cluster'] = clusters # --- First gather all existing clusters and their sizes --- cluster_info = [] for label in set(clusters): if label != -1: # Skip noise points cluster_mask = clusters == label cluster_patents = df[cluster_mask] if len(cluster_patents) > 0: cluster_info.append((label, len(cluster_patents), cluster_patents)) # Sort clusters by size in descending order cluster_info.sort(key=lambda x: x[1], reverse=True) print("\nCluster Size Distribution:") for i, (label, size, _) in enumerate(cluster_info): print(f"Cluster {i} (originally {label}): {size} patents") # Create mapping for new cluster IDs cluster_id_map = {old_label: i for i, (old_label, _, _) in enumerate(cluster_info)} # Update cluster IDs in DataFrame new_clusters = clusters.copy() for old_label, new_label in cluster_id_map.items(): new_clusters[clusters == old_label] = new_label df['cluster'] = new_clusters update_progress('clustering', 'processing', 'Identifying technology clusters and underexplored areas...') # --- Initialize point types --- df['point_type'] = 'unassigned' # Start with all points unassigned cluster_insights = [] # Initialize insights list # First handle clustered points total_clusters = len(cluster_info) for new_id, (_, size, cluster_patents) in enumerate(cluster_info): update_progress('clustering', 'processing', f'Analyzing cluster {new_id + 1} of {total_clusters} ({size} patents)...') description = analyze_patent_group(cluster_patents, 'cluster', new_id) df.loc[cluster_patents.index, 'point_type'] = 'cluster' # Mark clustered points cluster_insights.append({ 'type': 'cluster', 'id': int(new_id) + 1, # Store as 1-based ID 'size': size, 'label': f"Cluster {new_id + 1}", 'description': description }) # --- Improved two-stage density analysis for noise points --- noise_mask = df['cluster'] == -1 noise_points = scaled_embeddings[noise_mask] noise_indices = df[noise_mask].index dense_noise_indices = [] # Initialize empty list for dense noise points if len(noise_points) >= 3: update_progress('clustering', 'processing', f'Analyzing {len(noise_points)} potential underexplored areas...') print(f"\nStructural Analysis for Underexplored Area Detection:") # Initialize sparse indices true_sparse_indices = [] # Stage 1: Calculate local and global density metrics n_neighbors = min(max(5, int(len(noise_points) * 0.05)), 15) print(f"Using {n_neighbors} nearest neighbors for density calculation") # Calculate local density for noise points nbrs_local = NearestNeighbors(n_neighbors=n_neighbors, metric='euclidean').fit(noise_points) local_distances, local_indices = nbrs_local.kneighbors(noise_points) local_densities = 1 / (np.mean(local_distances, axis=1) + 1e-6) # Add small epsilon to avoid division by zero # Calculate distances to cluster centers and their densities cluster_centers = [] cluster_densities = [] # Store density of each cluster for label in set(clusters) - {-1}: cluster_mask = clusters == label cluster_points = scaled_embeddings[cluster_mask] center = np.mean(cluster_points, axis=0) cluster_centers.append(center) # Calculate cluster density using its member points if len(cluster_points) > 1: nbrs_cluster = NearestNeighbors(n_neighbors=min(5, len(cluster_points))).fit(cluster_points) cluster_dists, _ = nbrs_cluster.kneighbors(cluster_points) cluster_density = 1 / (np.mean(cluster_dists) + 1e-6) else: cluster_density = 0 cluster_densities.append(cluster_density) cluster_centers = np.array(cluster_centers) cluster_densities = np.array(cluster_densities) if len(cluster_centers) > 0: # Calculate distances and density ratios to nearest clusters nbrs_clusters = NearestNeighbors(n_neighbors=1, metric='euclidean').fit(cluster_centers) cluster_distances, nearest_cluster_indices = nbrs_clusters.kneighbors(noise_points) cluster_distances = cluster_distances.flatten() # Get density of nearest cluster for each point nearest_cluster_densities = cluster_densities[nearest_cluster_indices.flatten()] # Calculate density ratios (local density / nearest cluster density) density_ratios = local_densities / (nearest_cluster_densities + 1e-6) print("\nDensity Analysis Statistics:") print(f"Mean local density: {np.mean(local_densities):.3f}") print(f"Mean cluster density: {np.mean(cluster_densities):.3f}") print(f"Mean density ratio: {np.mean(density_ratios):.3f}") # Identify structural gaps using multiple criteria with more sensitive thresholds # 1. Density Isolation: Points with very low density compared to clusters # 2. Spatial Isolation: Points far from both clusters and other noise points # 3. Structural Stability: Points whose local neighborhood is also sparse # Calculate isolation scores with more balanced thresholds density_isolation = density_ratios < np.percentile(density_ratios, 65) # More balanced threshold spatial_isolation = cluster_distances > np.percentile(cluster_distances, 50) # Median distance threshold # Calculate structural stability with more balanced criteria structural_stability = np.zeros(len(noise_points), dtype=bool) for i, neighbors in enumerate(local_indices): neighbor_densities = local_densities[neighbors] # Point is stable if its neighborhood is relatively sparse structural_stability[i] = np.mean(neighbor_densities) < np.percentile(local_densities, 50) # Use median # Use more balanced criteria - only need to meet any 1 of 3 criteria initially candidate_sparse_indices = [ idx for i, idx in enumerate(noise_indices) if sum([density_isolation[i], spatial_isolation[i], structural_stability[i]]) >= 1 # Only need 1 out of 3 criteria ] # Start by assuming all non-candidate points are dense noise dense_noise_indices = [idx for idx in noise_indices if idx not in candidate_sparse_indices] # Now calculate distances between candidates and dense noise points with more sensitive threshold min_distance_threshold = np.percentile(cluster_distances, 40) # More sensitive threshold # Filter candidates based on distance from dense noise regions if len(candidate_sparse_indices) > 0 and len(dense_noise_indices) > 0: dense_noise_points = scaled_embeddings[dense_noise_indices] true_sparse_indices = [] for idx in candidate_sparse_indices: point = scaled_embeddings[idx].reshape(1, -1) distances_to_dense = NearestNeighbors(n_neighbors=1).fit(dense_noise_points).kneighbors(point)[0][0] if distances_to_dense > min_distance_threshold: true_sparse_indices.append(idx) # Update dense_noise_indices to include rejected candidates rejected_indices = [idx for idx in candidate_sparse_indices if idx not in true_sparse_indices] dense_noise_indices.extend(rejected_indices) else: true_sparse_indices = candidate_sparse_indices else: # Fallback using only local density analysis density_threshold = np.percentile(local_densities, 25) # Bottom 25% sparsest points true_sparse_indices = [idx for i, idx in enumerate(noise_indices) if local_densities[i] < density_threshold] dense_noise_indices = [idx for idx in noise_indices if idx not in true_sparse_indices] print(f"\nFinal Classification:") print(f"True underexplored areas identified: {len(true_sparse_indices)}") print(f"Transitional areas identified: {len(dense_noise_indices)}") if len(true_sparse_indices) > 0: print(f"Underexplored area ratio: {len(true_sparse_indices)/len(noise_points):.2%}") print("\nUnderexplored Area Criteria Used:") print("1. Density Isolation: Significantly lower density than nearest cluster") print("2. Spatial Isolation: Far from both clusters and other points") print("3. Structural Stability: Forms stable sparse regions with neighbors") # Update point types in DataFrame for sparse points and dense noise for idx in true_sparse_indices: df.at[idx, 'point_type'] = 'sparse' for idx in dense_noise_indices: df.at[idx, 'point_type'] = 'dense_noise' # --- Handle dense noise points as transitional areas --- transitional_areas = [] # Store transitional areas for sorting if len(dense_noise_indices) >= 3: update_progress('clustering', 'processing', f'Analyzing {len(dense_noise_indices)} potential transitional areas...') print("\nAnalyzing dense noise points as transitional areas...") dense_noise_points = scaled_embeddings[dense_noise_indices] # Use HDBSCAN to find subgroups within transitional areas min_size = max(3, len(dense_noise_points) // 10) print(f"Attempting to identify transitional area subgroups with min_size={min_size}") hdb_dense = hdbscan.HDBSCAN( min_cluster_size=min_size, min_samples=max(2, min_size // 2), cluster_selection_epsilon=0.3, cluster_selection_method='leaf' ) dense_labels = hdb_dense.fit_predict(dense_noise_points) # Count potential transitional areas unique_dense_labels = set(dense_labels) - {-1} n_transitional = len(unique_dense_labels) print(f"Found {n_transitional} distinct transitional areas") # First get all transitional points, including scattered ones all_transitional_points = {} # Count sizes first label_sizes = {} for label in dense_labels: if label != -1: label_sizes[label] = label_sizes.get(label, 0) + 1 # Then collect points with their pre-calculated sizes for i, label in enumerate(dense_labels): idx = dense_noise_indices[i] if label != -1: # Regular transitional area if label not in all_transitional_points: all_transitional_points[label] = {'indices': [], 'size': label_sizes[label]} all_transitional_points[label]['indices'].append(idx) else: # Scattered points label_key = 'scattered' if label_key not in all_transitional_points: all_transitional_points[label_key] = {'indices': [], 'size': 0} all_transitional_points[label_key]['indices'].append(idx) all_transitional_points[label_key]['size'] += 1 # Sort transitional areas by size and create insights # Filter out areas that are too small and sort by size min_area_size = 3 # Minimum size for a valid transitional area valid_areas = [(k, v) for k, v in all_transitional_points.items() if k != 'scattered' and v['size'] >= min_area_size] sorted_areas = sorted(valid_areas, key=lambda x: x[1]['size'], reverse=True) # Add regular transitional areas to insights total_areas = len(sorted_areas) for area_idx, (label, area_info) in enumerate(sorted_areas): update_progress('clustering', 'processing', f'Analyzing transitional area {area_idx + 1} of {total_areas} ({area_info["size"]} patents)...') area_patents = df.iloc[area_info['indices']] description = analyze_patent_group(area_patents, 'transitional', label) area_number = area_idx + 1 # 1-based numbering for display # Create label without duplicate size info area_label = f"Transitional Area {area_number}" transitional_areas.append({ 'label': area_label, 'indices': area_info['indices'], 'size': area_info['size'], 'patents': area_patents, 'description': description }) area_insight = { 'type': 'transitional', 'id': area_idx + 1, # Store as 1-based ID 'size': area_info['size'], 'label': f"{area_label} ({area_info['size']} patents)", 'description': description } cluster_insights.append(area_insight) # Handle scattered points by analyzing them individually if 'scattered' in all_transitional_points: scattered_indices = all_transitional_points['scattered']['indices'] if len(scattered_indices) > 0: print(f"\nAnalyzing {len(scattered_indices)} scattered points...") scattered_points = scaled_embeddings[scattered_indices] # Calculate distances to nearest cluster and transitional area distances_to_clusters = [] distances_to_transitional = [] print("\nDistance analysis for each scattered point:") point_counter = 0 # First calculate all distances for point in scattered_points: point = point.reshape(1, -1) # Distance to nearest cluster if len(cluster_centers) > 0: dist_cluster = NearestNeighbors(n_neighbors=1).fit(cluster_centers).kneighbors(point)[0][0][0] else: dist_cluster = float('inf') # Distance to nearest transitional area (excluding scattered points) if len(dense_noise_points) > 0: # Get only the transitional area points (excluding scattered points) transitional_points = [] for i, point_idx in enumerate(dense_noise_indices): if point_idx not in scattered_indices: transitional_points.append(dense_noise_points[i]) if transitional_points: transitional_points = np.array(transitional_points) nbrs_trans = NearestNeighbors(n_neighbors=1).fit(transitional_points) dist_trans = nbrs_trans.kneighbors(point.reshape(1, -1))[0][0][0] else: dist_trans = float('inf') else: dist_trans = float('inf') # Store distances for ratio calculation distances_to_clusters.append(dist_cluster) distances_to_transitional.append(dist_trans) total_classified_as_gaps = 0 total_classified_as_transitional = 0 # Use more aggressive thresholds for scattered points cluster_distance_threshold = np.percentile(distances_to_clusters, 35) # Even more lenient transitional_distance_threshold = np.percentile(distances_to_transitional, 35) # Even more lenient print(f"\nClassification thresholds:") print(f"- Cluster distance threshold: {cluster_distance_threshold:.3f}") print(f"- Transitional distance threshold: {transitional_distance_threshold:.3f}") # Classify scattered points for idx, (dist_c, dist_t) in zip(scattered_indices, zip(distances_to_clusters, distances_to_transitional)): # 1. Check absolute distances with more lenient thresholds cluster_dist_threshold = np.percentile(distances_to_clusters, 60) # Use 60th percentile trans_dist_threshold = np.percentile(distances_to_transitional, 60) # Use 60th percentile # Point is isolated if it's farther than median distance from both clusters and transitional areas is_isolated = (dist_c > cluster_dist_threshold or dist_t > trans_dist_threshold) # 2. Calculate isolation based on absolute difference rather than ratio isolation_diff = dist_t - dist_c # Positive means farther from transitional areas is_relatively_isolated = isolation_diff > 0 # Any positive difference counts # 3. Simplified region formation check nearby_transitional = sum(1 for d in distances_to_transitional if d < trans_dist_threshold) nearby_clusters = sum(1 for d in distances_to_clusters if d < cluster_dist_threshold) # Point forms new region if it has any cluster neighbors forms_new_region = nearby_clusters > 0 # Classification decision and immediate DataFrame update # More lenient classification - if the point is isolated OR relatively isolated, mark as gap if is_isolated or is_relatively_isolated: true_sparse_indices.append(idx) df.at[idx, 'point_type'] = 'sparse' # Immediately update DataFrame total_classified_as_gaps += 1 else: dense_noise_indices.append(idx) df.at[idx, 'point_type'] = 'dense_noise' # Immediately update DataFrame total_classified_as_transitional += 1 print(f"\nFinal classification summary for scattered points:") print(f"- Total scattered points: {len(scattered_indices)}") print(f"- Classified as underexplored areas: {total_classified_as_gaps}") print(f"- Classified as transitional: {total_classified_as_transitional}") if total_classified_as_gaps == 0: print("\nWarning: No scattered points were classified as underexplored areas!") print("Possible reasons:") print("1. Distance thresholds may be too high") print("2. Relative distance ratio may be too strict") print("3. Nearby points criterion may be too restrictive") if total_classified_as_transitional > 0: # Create a transitional area for scattered points scattered_transitional_patents = df.iloc[dense_noise_indices[-total_classified_as_transitional:]] description = analyze_patent_group(scattered_transitional_patents, 'transitional', 'scattered') area_number = len(transitional_areas) + 1 # 1-based numbering for display # Add to transitional areas area_label = f"Transitional Area {area_number}" transitional_areas.append({ 'label': area_label, 'indices': dense_noise_indices[-total_classified_as_transitional:], 'size': total_classified_as_transitional, 'patents': scattered_transitional_patents, 'description': description }) # Add to insights area_insight = { 'type': 'transitional', 'id': -1, # Special ID for scattered points 'size': total_classified_as_transitional, 'label': f"{area_label} ({total_classified_as_transitional} patents)", 'description': description } cluster_insights.append(area_insight) print(f"\nFinal classification summary for scattered points:") print(f"True underexplored areas identified: {len(true_sparse_indices)}") print(f"Transitional areas identified: {len(dense_noise_indices)}") if len(true_sparse_indices) > 0: print(f"Underexplored area ratio: {len(true_sparse_indices)/len(noise_points):.2%}") print("\nUnderexplored Area Criteria Used:") print("1. Density Isolation: Significantly lower density than nearest cluster") print("2. Spatial Isolation: Far from both clusters and other points") print("3. Structural Stability: Forms stable sparse regions with neighbors") # Update point types in DataFrame for sparse points and dense noise for idx in true_sparse_indices: df.at[idx, 'point_type'] = 'sparse' for idx in dense_noise_indices: df.at[idx, 'point_type'] = 'dense_noise' # --- Analyze underexplored areas --- if len(true_sparse_indices) > 0: update_progress('clustering', 'processing', f'Analyzing {len(true_sparse_indices)} potential underexplored areas...') print(f"\nProcessing {len(true_sparse_indices)} underexplored areas...") sparse_patents = df.iloc[true_sparse_indices] sparse_points = scaled_embeddings[true_sparse_indices] # Ensure points are marked as sparse in the DataFrame df.loc[true_sparse_indices, 'point_type'] = 'sparse' # More lenient subclustering parameters for underexplored areas min_subcluster_size = max(2, min(5, len(true_sparse_indices) // 10)) # More lenient minimum size sparse_clusterer = hdbscan.HDBSCAN( min_cluster_size=min_subcluster_size, min_samples=1, # Most lenient possible cluster_selection_epsilon=0.8, # Even more lenient cluster_selection_method='leaf', # Changed to leaf for finer subcluster detection metric='euclidean' ) sparse_labels = sparse_clusterer.fit_predict(sparse_points) # Collect innovation subclusters for sorting innovation_subclusters = [] for label in set(sparse_labels): subcluster_mask = sparse_labels == label subcluster_patents = sparse_patents[subcluster_mask] subcluster_size = len(subcluster_patents) # Accept all subclusters, even single points description = analyze_patent_group(subcluster_patents, 'innovation_subcluster', label) innovation_subclusters.append({ 'label': label, 'size': subcluster_size, 'patents': subcluster_patents, 'description': description }) # Sort innovation subclusters by size in descending order innovation_subclusters.sort(key=lambda x: x['size'], reverse=True) # Add sorted innovation subclusters to insights total_subclusters = len(innovation_subclusters) for idx, subcluster in enumerate(innovation_subclusters): update_progress('clustering', 'processing', f'Analyzing underexplored area opportunity {idx + 1} of {total_subclusters} ({subcluster["size"]} patents)...') cluster_insights.append({ 'type': 'innovation_subcluster', 'id': idx + 1, # Store as 1-based ID 'size': subcluster['size'], 'label': f"Underexplored Area {idx + 1}", 'description': subcluster['description'] }) else: cluster_insights.append({ 'type': 'innovation_subcluster', 'id': -1, 'size': 0, 'label': 'No Underexplored Areas', 'description': 'No significant underexplored areas were detected in this technology space.' }) update_progress('visualization', 'processing', 'Creating interactive plot...') # Create Plotly figure with clusters # Ensure all points are properly categorized unassigned_mask = df['point_type'] == 'unassigned' if any(unassigned_mask): print(f"Warning: {sum(unassigned_mask)} points remain unassigned") df.loc[unassigned_mask, 'point_type'] = 'cluster' # Default unassigned to clusters # Separate points into three categories: clusters, underexplored areas, and dense noise cluster_mask = df['point_type'] == 'cluster' innovation_gaps_mask = df['point_type'] == 'sparse' dense_noise_mask = df['point_type'] == 'dense_noise' # Create hover text for all points hover_text = [] # Create mapping for underexplored area points to their numbers innovation_gap_map = {} # Map underexplored areas using the analyzed subclusters to ensure consistent numbering if len(true_sparse_indices) > 0: for idx, subcluster in enumerate(innovation_subclusters, 1): for patent in subcluster['patents'].index: innovation_gap_map[patent] = idx # Create mapping for transitional areas transitional_area_map = {} for area_idx, area in enumerate(transitional_areas): for idx in area['indices']: transitional_area_map[idx] = {'number': area_idx + 1} # Generate hover text for each point for idx, row in df.iterrows(): point_info = "" if row['point_type'] == 'sparse': gap_number = innovation_gap_map.get(idx) if gap_number: point_info = f"
Region: Underexplored Area {gap_number}" else: point_info = "
Region: Potential Innovation Area" elif row['point_type'] == 'dense_noise': area_info = transitional_area_map.get(idx) if area_info: point_info = f"
Region: Transitional Area {area_info['number']}" else: # This is a scattered transitional point point_info = f"
Region: Transitional Area {len(transitional_areas)} (Scattered)" else: point_info = f"
Cluster: {int(row['cluster']) + 1}" # Cluster IDs are still 0-based in the DataFrame text = ( f"{row['title']}

" f"By: {row['assignee']} ({row['year']})
" f"{point_info}

" f"Abstract:
{row['abstract']}" ) hover_text.append(text) # Create three separate traces: clusters, underexplored areas, and dense noise points cluster_trace = go.Scatter3d( x=df[cluster_mask]['x'], y=df[cluster_mask]['y'], z=df[cluster_mask]['z'], mode='markers', marker=dict( size=6, color=clusters[cluster_mask] + 1, # Add 1 to shift cluster numbers from 0-based to 1-based colorscale='Viridis', opacity=0.5, showscale=True, colorbar=dict( title="Clusters", ticktext=[f"Cluster {i+1}" for i in range(n_clusters)], # Custom tick labels tickvals=list(range(1, n_clusters + 1)), # Values to match the 1-based cluster numbers tickmode="array", tickfont=dict(size=10), titlefont=dict(size=10) ) ), text=[hover_text[i] for i in range(len(hover_text)) if cluster_mask[i]], hoverinfo='text', name='Clusters', hoverlabel=dict( bgcolor="white", font_size=12, font_family="Arial", align="left" ), customdata=[df['link'].tolist()[i] for i in range(len(df)) if cluster_mask[i]] ) innovation_gaps_trace = go.Scatter3d( x=df[innovation_gaps_mask]['x'], y=df[innovation_gaps_mask]['y'], z=df[innovation_gaps_mask]['z'], mode='markers', marker=dict( size=6, # Same size as other points color='rgb(255, 0, 0)', # Pure bright red symbol='diamond', opacity=1.0, # Full opacity for visibility line=dict( color='white', width=1 # Thinner border to match other points ) ), text=[hover_text[i] for i in range(len(hover_text)) if innovation_gaps_mask[i]], hoverinfo='text', name='Underexplored Areas', hoverlabel=dict( bgcolor="white", font_size=12, font_family="Arial", align="left" ), customdata=[df['link'].tolist()[i] for i in range(len(df)) if innovation_gaps_mask[i]] ) dense_noise_trace = go.Scatter3d( x=df[dense_noise_mask]['x'], y=df[dense_noise_mask]['y'], z=df[dense_noise_mask]['z'], mode='markers', marker=dict( size=6, # Same size as other points color='rgb(255, 165, 0)', # Orange for transitional areas symbol='circle', opacity=0.7, # Less opacity to make gaps more visible line=dict( color='white', width=1 # Thin border ) ), text=[hover_text[i] for i in range(len(hover_text)) if dense_noise_mask[i]], hoverinfo='text', name='Transitional Areas', hoverlabel=dict( bgcolor="white", font_size=12, font_family="Arial", align="left" ), customdata=[df['link'].tolist()[i] for i in range(len(df)) if dense_noise_mask[i]] ) fig = go.Figure(data=[cluster_trace, innovation_gaps_trace, dense_noise_trace]) # Update layout fig.update_layout( title="Patent Technology Landscape", scene=dict( xaxis_title="UMAP 1", yaxis_title="UMAP 2", zaxis_title="UMAP 3", camera=dict( up=dict(x=0, y=0, z=1), center=dict(x=0, y=0, z=0), eye=dict(x=1.8, y=1.8, z=1.8) # Slightly further out for better overview ), aspectmode='cube' # Force equal scaling ), margin=dict(l=0, r=0, b=0, t=30), showlegend=True, template="plotly_dark", hoverlabel_align='left', hoverdistance=100, hovermode='closest', legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor="rgba(0,0,0,0.7)", # Darker background for better contrast font=dict( color="white", size=12 ), itemsizing='constant' # Keep legend marker sizes consistent ) ) # Configure hover behavior fig.update_traces( hovertemplate='%{text}', hoverlabel=dict( bgcolor="rgba(0,0,0,0.8)", font_size=12, font_family="Arial" ) ) update_progress('visualization', 'processing', 'Finalizing visualization...') return { 'plot': fig.to_json(), 'insights': cluster_insights } def analyze_innovation_opportunities(cluster_insights): """ Analyze relationships between different areas to identify potential innovation opportunities. Returns focused analysis of three key innovation gaps between existing technology areas. """ # Extract cluster numbers and validate cluster_nums = set() transitional_nums = set() underexplored_nums = set() # Parse and validate cluster numbers with explicit error checking for insight in cluster_insights: area_type = insight.get('type', '') area_id = insight.get('id', -1) if area_id < 0 and area_type != 'cluster': continue if area_type == 'cluster': cluster_nums.add(area_id) elif area_type == 'transitional': transitional_nums.add(area_id) elif area_type == 'innovation_subcluster': if area_id >= 1: # Skip the "No underexplored areas" entry underexplored_nums.add(area_id) # Format areas list with validation def format_area_list(area_nums): return f"Areas {', '.join(str(n) for n in sorted(area_nums))}" if area_nums else "None identified" # Only generate analysis if we have areas to analyze if not any([cluster_nums, transitional_nums, underexplored_nums]): return "No distinct areas found. Try broadening search terms or increasing patent count." # Create descriptions list descriptions = [] for insight in cluster_insights: if insight.get('description'): area_type = insight.get('type', '') area_id = int(insight.get('id', -1)) # 1-based IDs if area_type == 'cluster': desc = f"C{area_id}:{insight['description']}" elif area_type == 'transitional': desc = f"T{area_id}:{insight['description']}" elif area_type == 'innovation_subcluster' and insight['id'] >= 1: desc = f"U{area_id}:{insight['description']}" else: continue descriptions.append(desc) # Format descriptions as a string with newlines descriptions_text = '\n'.join(descriptions) prompt = f"""Available Areas: Clusters: {format_area_list(cluster_nums)} Transitional Areas: {format_area_list(transitional_nums)} Underexplored Areas: {format_area_list(underexplored_nums)} Area Descriptions: {descriptions_text} Analyze the most promising innovation opportunities. For each opportunity: 1. Identify two technologically complementary areas (e.g. "Cluster 1 + Transitional Area 2") 2. Focus on specific technical capabilities that could be combined 3. Aim for practical, near-term innovations Provide 3 opportunities, formatted as: Opportunity N: [Area 1] + [Area 2] - Gap: Specific technical capability missing between these areas - Solution: Concrete technical approach using existing methods - Impact: Clear technical or market advantage gained Prioritize: - Technical feasibility over speculative concepts - Cross-domain applications with clear synergies - Opportunities that build on existing technology strengths""" # Get analysis from LLM response = generate_analysis(prompt, cluster_insights) return response def update_progress(step, status='processing', message=None): """Update progress through the progress queue""" data = { 'step': step, 'status': status } if message: data['message'] = message progress_queue.put(data) def validate_area_references(analysis_text, cluster_insights): """Validate that all area references in the analysis are valid and match their descriptions.""" import re from difflib import SequenceMatcher # Create maps of area descriptions area_descriptions = {} for insight in cluster_insights: if insight.get('description'): area_type = insight.get('type', '') area_id = int(insight.get('id', -1)) # IDs are already 1-based area_descriptions[f"{area_type}_{area_id}"] = insight['description'].lower() def check_context_similarity(area_ref, context, area_type): # Get the referenced area's description key = f"{area_type}_{area_ref}" if key not in area_descriptions: return False, f"Area {area_ref} does not exist" return True, None return True, None def find_references_with_context(text, pattern, label): matches = [] for match in re.finditer(pattern, text): start = max(0, match.start() - 200) end = min(len(text), match.end() + 200) context = text[start:end] matches.append((match.group(1), context)) return matches patterns = [ (r'(?:Cluster|cluster) (\d+)(?!\d)', 'cluster'), (r'(?:Transitional|transitional) [Aa]rea (\d+)(?!\d)', 'transitional'), (r'(?:Underexplored|underexplored) [Aa]rea (\d+)(?!\d)', 'innovation_subcluster') ] # Check each type of reference for pattern, area_type in patterns: refs = find_references_with_context(analysis_text, pattern, area_type) for ref, context in refs: ref_num = int(ref) valid, message = check_context_similarity(ref_num, context, area_type) if not valid: return False, message return True, "All area references are valid and match their descriptions" def generate_analysis(prompt, cluster_insights): """Generate an analysis of innovation opportunities using OpenAI's API""" try: # Count the number of each type of area from cluster_insights cluster_count = sum(1 for x in cluster_insights if x['type'] == 'cluster') transitional_count = sum(1 for x in cluster_insights if x['type'] == 'transitional') underexplored_count = sum(1 for x in cluster_insights if x['type'] == 'innovation_subcluster' and x['id'] >= 0) # Minimal system message system_message = """Expert patent analyst specializing in technology landscapes and innovation opportunities. Guidelines: 1. Reference only valid areas with correct type and number 2. Focus on specific technical aspects and capabilities 3. Consider both direct applications and cross-domain potential 4. Identify concrete opportunities and practical approaches 5. Ground analysis in technical feasibility""" response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=1000 ) analysis = response.choices[0].message.content # Validate the generated analysis is_valid, message = validate_area_references(analysis, cluster_insights) if not is_valid: # Retry with minimal error context messages = [ {"role": "system", "content": system_message}, {"role": "user", "content": prompt}, {"role": "system", "content": "Fix invalid areas."}, {"role": "assistant", "content": analysis} ] chat_completion = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, temperature=0.7, max_tokens=1000 ) analysis = chat_completion.choices[0].message.content # Final validation is_valid, _ = validate_area_references(analysis, cluster_insights) if not is_valid: analysis = "Error: Invalid analysis. Try again." return analysis except Exception as e: print(f"Error generating analysis: {e}") return "Unable to generate innovation analysis at this time." @app.route('/') def home(): return render_template('index.html') @app.route('/progress') def get_progress(): """Server-sent events endpoint for progress updates""" def generate(): connection_active = True while connection_active: try: data = progress_queue.get(timeout=10) # Reduced timeout for more responsive updates if data == 'DONE': yield f"data: {json.dumps({'step': 'complete', 'status': 'done'})}\n\n" connection_active = False else: yield f"data: {json.dumps(data)}\n\n" except queue.Empty: # Send a keep-alive message yield f"data: {json.dumps({'step': 'alive', 'status': 'processing'})}\n\n" continue # Ensure the data is sent immediately if hasattr(generate, 'flush'): generate.flush() return Response(generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'Content-Type': 'text/event-stream', 'X-Accel-Buffering': 'no' # Disable buffering for nginx }) @app.route('/search', methods=['POST']) def search(): keywords = request.form.get('keywords', '') if not keywords: return jsonify({'error': 'Please enter search keywords'}) print(f"\nProcessing search request for keywords: {keywords}") try: # Clear any existing progress updates while not progress_queue.empty(): progress_queue.get_nowait() # Initial progress update update_progress('search', 'processing', 'Starting patent search...') patents = search_patents(keywords) if not patents: update_progress('search', 'error', 'No patents found') progress_queue.put('DONE') return jsonify({'error': 'No patents found or an error occurred'}) # Generate embeddings progress is handled in search_patents function # Start visualization processing update_progress('visualization', 'Creating visualization...') viz_data = create_3d_visualization(patents) if not viz_data: progress_queue.put('DONE') return jsonify({'error': 'Error creating visualization'}) # Final progress update update_progress('complete', 'Analysis complete!') progress_queue.put('DONE') # Generate innovation analysis from insights innovation_analysis = analyze_innovation_opportunities(viz_data['insights']) return jsonify({ 'visualization': viz_data['plot'], 'insights': viz_data['insights'], 'innovationAnalysis': innovation_analysis }) except Exception as e: print(f"Error processing request: {e}") traceback.print_exc() progress_queue.put('DONE') return jsonify({'error': str(e)}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)