|
import logging |
|
from typing import Optional |
|
from flask import Flask, jsonify, request, abort, render_template |
|
import json |
|
import random |
|
import string |
|
import datetime |
|
from webscout import WEBS |
|
from functools import wraps |
|
import requests |
|
import os |
|
|
|
app = Flask(__name__) |
|
|
|
TIMEOUT = 10 |
|
PROXY = None |
|
|
|
PRICING_PLANS = { |
|
'free': { |
|
'name': 'Free Plan', |
|
'price': '$0/month', |
|
'rate_limit': 1000 |
|
}, |
|
'pro': { |
|
'name': 'Pro Plan', |
|
'price': 'Coming Soon', |
|
'rate_limit': None |
|
} |
|
} |
|
|
|
|
|
API_KEYS_DIRECTORY = os.path.join(os.getcwd(), 'static', 'data') |
|
FREE_API_KEYS_DIRECTORY = os.path.join(API_KEYS_DIRECTORY, 'free') |
|
PAID_API_KEYS_DIRECTORY = os.path.join(API_KEYS_DIRECTORY, 'paid') |
|
|
|
|
|
os.makedirs(FREE_API_KEYS_DIRECTORY, exist_ok=True) |
|
os.makedirs(PAID_API_KEYS_DIRECTORY, exist_ok=True) |
|
|
|
|
|
def generate_api_key(): |
|
current_date = datetime.datetime.now().strftime("%Y%m%d") |
|
username = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) |
|
api_key = 'HUAI' + username + current_date + ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)) |
|
|
|
|
|
api_keys_directory = FREE_API_KEYS_DIRECTORY if request.form.get('plan') == 'free' else PAID_API_KEYS_DIRECTORY |
|
|
|
|
|
with open(os.path.join(api_keys_directory, username + '.txt'), 'w') as file: |
|
file.write(api_key) |
|
return api_key |
|
|
|
|
|
|
|
def require_api_key(view_function): |
|
@wraps(view_function) |
|
def decorated_function(*args, **kwargs): |
|
|
|
api_key = request.headers.get('HUSI') |
|
|
|
|
|
if not api_key: |
|
api_key = request.args.get('HUAI') |
|
|
|
if not api_key: |
|
abort(401) |
|
|
|
if not validate_api_key(api_key): |
|
abort(401) |
|
|
|
|
|
if request.form.get('plan') == 'free': |
|
username = api_key[4:8] |
|
if not check_rate_limit(username): |
|
abort(429) |
|
|
|
return view_function(*args, **kwargs) |
|
return decorated_function |
|
|
|
|
|
|
|
def validate_api_key(api_key): |
|
api_keys_directory = FREE_API_KEYS_DIRECTORY if api_key.startswith('HUAI') else PAID_API_KEYS_DIRECTORY |
|
for filename in os.listdir(api_keys_directory): |
|
filepath = os.path.join(api_keys_directory, filename) |
|
with open(filepath, 'r') as file: |
|
if file.read().strip() == api_key: |
|
return True |
|
return False |
|
|
|
|
|
|
|
def check_rate_limit(username): |
|
today_date = datetime.datetime.now().strftime("%Y%m%d") |
|
|
|
requests_count = sum(1 for _ in os.listdir(FREE_API_KEYS_DIRECTORY) if _.startswith(username + '_' + today_date)) |
|
return requests_count |
|
|
|
|
|
@app.route('/api/usage', methods=['GET']) |
|
@require_api_key |
|
def get_api_usage(): |
|
|
|
api_key = request.headers.get('HUSI') or request.args.get('HUAI') |
|
username = api_key[4:8] |
|
|
|
usage_count = check_rate_limit(username) |
|
return jsonify({'username': username, 'usage_count': usage_count}) |
|
|
|
@app.route('/api/search', methods=['GET']) |
|
@require_api_key |
|
def search_text(): |
|
|
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
timelimit = request.args.get('timelimit', None) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.text(query, max_results=max_results, timelimit=timelimit, safesearch=safesearch, region=region)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/images', methods=['GET']) |
|
@require_api_key |
|
def search_images(): |
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.images(query, max_results=max_results, safesearch=safesearch, region=region)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/videos', methods=['GET']) |
|
@require_api_key |
|
def search_videos(): |
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
timelimit = request.args.get('timelimit', None) |
|
resolution = request.args.get('resolution', None) |
|
duration = request.args.get('duration', None) |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.videos(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit, resolution=resolution, duration=duration)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/news', methods=['GET']) |
|
@require_api_key |
|
def search_news(): |
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
timelimit = request.args.get('timelimit', None) |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.news(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/maps', methods=['GET']) |
|
@require_api_key |
|
def search_maps(): |
|
query = request.args.get('q', '') |
|
place = request.args.get('place', None) |
|
max_results = request.args.get('max_results', 10, type=int) |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.maps(query, place=place, max_results=max_results)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/translate', methods=['GET']) |
|
@require_api_key |
|
def translate_text(): |
|
query = request.args.get('q', '') |
|
to_lang = request.args.get('to', 'en') |
|
WEBS_instance = WEBS() |
|
with WEBS() as webs: |
|
translation = enumerate(WEBS_instance.translate(query, to=to_lang)) |
|
return jsonify({'translation': translation}) |
|
|
|
@app.route('/api/suggestions', methods=['GET']) |
|
@require_api_key |
|
def search_suggestions(): |
|
query = request.args.get('q', '') |
|
if not query: |
|
return jsonify({'error': 'Query parameter missing'}) |
|
results = [] |
|
try: |
|
with WEBS() as webs: |
|
for result in webs.suggestions(query): |
|
results.append(result) |
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/health', methods=['GET']) |
|
@require_api_key |
|
def health_check(): |
|
return jsonify({'status': 'working'}) |
|
|
|
import json |
|
from flask import jsonify, request, abort |
|
|
|
@app.route('/pricing', methods=['GET']) |
|
def pricing(): |
|
return render_template('pricing.html', plans=PRICING_PLANS) |
|
|
|
|
|
@app.route('/generate_key', methods=['GET', 'POST']) |
|
def generate_key(): |
|
if request.method == 'POST': |
|
|
|
api_key = generate_api_key() |
|
|
|
return jsonify({'api_key': api_key}), 201 |
|
else: |
|
|
|
return render_template('index.html', plans=PRICING_PLANS) |
|
|
|
if __name__ == '__main__': |
|
app.run(debug=True) |
|
|
|
|