import logging from typing import Optional from flask import Flask, jsonify, request, abort, render_template import json import random import string import datetime from webscout import WEBS from functools import wraps import requests import os app = Flask(__name__) TIMEOUT = 10 PROXY = None PRICING_PLANS = { 'free': { 'name': 'Free Plan', 'price': '$0/month', 'rate_limit': 1000 }, 'pro': { 'name': 'Pro Plan', 'price': 'Coming Soon', 'rate_limit': None # Unlimited } } # Define the directories for API keys API_KEYS_DIRECTORY = os.path.join(os.getcwd(), 'static', 'data') FREE_API_KEYS_DIRECTORY = os.path.join(API_KEYS_DIRECTORY, 'free') PAID_API_KEYS_DIRECTORY = os.path.join(API_KEYS_DIRECTORY, 'paid') # Ensure directories exist os.makedirs(FREE_API_KEYS_DIRECTORY, exist_ok=True) os.makedirs(PAID_API_KEYS_DIRECTORY, exist_ok=True) # Function to generate a new API key and save it to a file def generate_api_key(): current_date = datetime.datetime.now().strftime("%Y%m%d") username = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) # Generate a 4-character username api_key = 'HUAI' + username + current_date + ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)) # Determine the directory based on the plan api_keys_directory = FREE_API_KEYS_DIRECTORY if request.form.get('plan') == 'free' else PAID_API_KEYS_DIRECTORY # Save the API key to a file with open(os.path.join(api_keys_directory, username + '.txt'), 'w') as file: file.write(api_key) return api_key # Middleware to require an API key for each request and enforce rate limits for free plan # Middleware to require an API key for each request and enforce rate limits for free plan def require_api_key(view_function): @wraps(view_function) def decorated_function(*args, **kwargs): # Check if the API key is provided in the headers api_key = request.headers.get('HUSI') # If not provided in headers, check query parameters if not api_key: api_key = request.args.get('HUAI') if not api_key: abort(401) # Unauthorized if not validate_api_key(api_key): abort(401) # Unauthorized # Check if it's a free plan and enforce rate limit if request.form.get('plan') == 'free': username = api_key[4:8] # Extract username from API key if not check_rate_limit(username): abort(429) # Too Many Requests return view_function(*args, **kwargs) return decorated_function # Function to validate an API key by checking if it matches any file in the directory def validate_api_key(api_key): api_keys_directory = FREE_API_KEYS_DIRECTORY if api_key.startswith('HUAI') else PAID_API_KEYS_DIRECTORY for filename in os.listdir(api_keys_directory): filepath = os.path.join(api_keys_directory, filename) with open(filepath, 'r') as file: if file.read().strip() == api_key: return True return False # Function to check rate limit for free plan def check_rate_limit(username): today_date = datetime.datetime.now().strftime("%Y%m%d") # Count the number of requests made by the user today requests_count = sum(1 for _ in os.listdir(FREE_API_KEYS_DIRECTORY) if _.startswith(username + '_' + today_date)) return requests_count # Routes with API key requirement @app.route('/api/usage', methods=['GET']) @require_api_key def get_api_usage(): # Extract username from API key api_key = request.headers.get('HUSI') or request.args.get('HUAI') username = api_key[4:8] # Get the usage count for the user usage_count = check_rate_limit(username) return jsonify({'username': username, 'usage_count': usage_count}) @app.route('/api/search', methods=['GET']) @require_api_key def search_text(): query = request.args.get('q', '') max_results = request.args.get('max_results', 10, type=int) timelimit = request.args.get('timelimit', None) safesearch = request.args.get('safesearch', 'moderate') region = request.args.get('region', 'wt-wt') WEBS_instance = WEBS() results = [] with WEBS() as webs: for result in enumerate(WEBS_instance.text(query, max_results=max_results, timelimit=timelimit, safesearch=safesearch, region=region)): results.append(result) return jsonify({'results': results}) @app.route('/api/images', methods=['GET']) @require_api_key def search_images(): query = request.args.get('q', '') max_results = request.args.get('max_results', 10, type=int) safesearch = request.args.get('safesearch', 'moderate') region = request.args.get('region', 'wt-wt') WEBS_instance = WEBS() results = [] with WEBS() as webs: for result in enumerate(WEBS_instance.images(query, max_results=max_results, safesearch=safesearch, region=region)): results.append(result) return jsonify({'results': results}) @app.route('/api/videos', methods=['GET']) @require_api_key def search_videos(): query = request.args.get('q', '') max_results = request.args.get('max_results', 10, type=int) safesearch = request.args.get('safesearch', 'moderate') region = request.args.get('region', 'wt-wt') timelimit = request.args.get('timelimit', None) resolution = request.args.get('resolution', None) duration = request.args.get('duration', None) WEBS_instance = WEBS() results = [] with WEBS() as webs: for result in enumerate(WEBS_instance.videos(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit, resolution=resolution, duration=duration)): results.append(result) return jsonify({'results': results}) @app.route('/api/news', methods=['GET']) @require_api_key def search_news(): query = request.args.get('q', '') max_results = request.args.get('max_results', 10, type=int) safesearch = request.args.get('safesearch', 'moderate') region = request.args.get('region', 'wt-wt') timelimit = request.args.get('timelimit', None) WEBS_instance = WEBS() results = [] with WEBS() as webs: for result in enumerate(WEBS_instance.news(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit)): results.append(result) return jsonify({'results': results}) @app.route('/api/maps', methods=['GET']) @require_api_key def search_maps(): query = request.args.get('q', '') place = request.args.get('place', None) max_results = request.args.get('max_results', 10, type=int) WEBS_instance = WEBS() results = [] with WEBS() as webs: for result in enumerate(WEBS_instance.maps(query, place=place, max_results=max_results)): results.append(result) return jsonify({'results': results}) @app.route('/api/translate', methods=['GET']) @require_api_key def translate_text(): query = request.args.get('q', '') to_lang = request.args.get('to', 'en') WEBS_instance = WEBS() with WEBS() as webs: translation = enumerate(WEBS_instance.translate(query, to=to_lang)) return jsonify({'translation': translation}) @app.route('/api/suggestions', methods=['GET']) @require_api_key def search_suggestions(): query = request.args.get('q', '') if not query: return jsonify({'error': 'Query parameter missing'}) results = [] try: with WEBS() as webs: for result in webs.suggestions(query): results.append(result) except Exception as e: return jsonify({'error': str(e)}), 500 return jsonify({'results': results}) @app.route('/api/health', methods=['GET']) @require_api_key def health_check(): return jsonify({'status': 'working'}) import json from flask import jsonify, request, abort @app.route('/pricing', methods=['GET']) def pricing(): return render_template('pricing.html', plans=PRICING_PLANS) @app.route('/generate_key', methods=['GET', 'POST']) def generate_key(): if request.method == 'POST': # Generate a new API key with a random username (userid) api_key = generate_api_key() return jsonify({'api_key': api_key}), 201 else: # Render the form for GET requests return render_template('index.html', plans=PRICING_PLANS) if __name__ == '__main__': app.run(debug=True)