Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import JSONResponse | |
from pathlib import Path | |
import pytesseract | |
from PIL import Image | |
import PyPDF2 | |
import docx | |
import shutil | |
import os | |
import io | |
from transformers import pipeline, CLIPProcessor, CLIPModel | |
from datetime import datetime | |
import uvicorn | |
# Hugging Face GPT or LLM model for content-based name generation | |
from langchain_openai import ChatOpenAI | |
from langchain.schema import HumanMessage | |
app = FastAPI() | |
# Set up upload folder and allowed extensions | |
UPLOAD_FOLDER = 'uploads' | |
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt'} | |
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16 MB | |
if not os.path.exists(UPLOAD_FOLDER): | |
os.makedirs(UPLOAD_FOLDER) | |
# Load your OpenAI API key from environment variables | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
# Ensure the API key is correctly loaded | |
if openai_api_key is None: | |
raise ValueError("API key not found. Please set your OPENAI_API_KEY environment variable.") | |
# Initialize the LLM (Language Model) with GPT-4o-mini or other available model | |
llm = ChatOpenAI( | |
model_name="gpt-4o-mini", # Specify the correct model name (e.g., "gpt-4" or "gpt-4o-mini") | |
temperature=0, # Set temperature to 0 for deterministic responses (no randomness) | |
openai_api_key=openai_api_key # Pass the OpenAI API key | |
) | |
# Load the CLIP model for image feature extraction | |
# Function to generate a more appropriate name based on content | |
def generate_name_based_on_content(text,industry): | |
prompt = f"Generate a meaningful file name for the following content: {text[:200]} based {industry}" # Truncate text to first 200 characters | |
response = llm(prompt) # Get the model's response | |
# Extract the generated file name and clean it | |
file_name = response.strip() # Strip any unnecessary whitespace or characters | |
return file_name | |
# Allowed file extensions check | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# Function to extract text from PDF | |
def extract_text_from_pdf(pdf_path): | |
text = "" | |
with open(pdf_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
for page in reader.pages: | |
text += page.extract_text() | |
return text | |
# Function to extract text from DOCX | |
def extract_text_from_docx(docx_path): | |
doc = docx.Document(docx_path) | |
text = "" | |
for para in doc.paragraphs: | |
text += para.text | |
return text | |
# Function to extract text from images | |
def extract_text_from_image(image_path): | |
image = Image.open(image_path) | |
return pytesseract.image_to_string(image) | |
# Function to extract image features | |
def extract_features_from_image(image_path): | |
image = Image.open(image_path) | |
inputs = clip_processor(images=image, return_tensors="pt") | |
outputs = clip_model.get_image_features(**inputs) | |
return outputs | |
# Function to process files | |
def process_files(files, industry): | |
directories = [] | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
for file in files: | |
if file and allowed_file(file.filename): | |
filename = file.filename | |
file_path = os.path.join(UPLOAD_FOLDER, filename) | |
with open(file_path, "wb") as buffer: | |
buffer.write(file.file.read()) | |
text = "" | |
if filename.endswith('.pdf'): | |
text = extract_text_from_pdf(file_path) | |
elif filename.endswith('.docx'): | |
text = extract_text_from_docx(file_path) | |
elif filename.endswith(('png', 'jpg', 'jpeg')): | |
text = extract_text_from_image(file_path) | |
# Generate name based on LLM and include timestamp for uniqueness | |
content_name = generate_name_based_on_content(text,industry) if text else 'Untitled' | |
directory_name = f"{industry}_{content_name}_{timestamp}" | |
new_dir = os.path.join(UPLOAD_FOLDER, directory_name) | |
if not os.path.exists(new_dir): | |
os.makedirs(new_dir) | |
# Rename and move the file to the new directory | |
new_file_path = os.path.join(new_dir, f"{directory_name}_{filename}") | |
shutil.move(file_path, new_file_path) | |
directories.append(directory_name) | |
return directories | |
async def upload_files(industry: str = Form(...), files: list[UploadFile] = File(...)): | |
if not industry: | |
return JSONResponse(content={"message": "Industry is required."}, status_code=400) | |
if not files: | |
return JSONResponse(content={"message": "No files selected."}, status_code=400) | |
directories = process_files(files, industry) | |
return JSONResponse(content={"message": "Files successfully uploaded and organized.", "directories": directories}) | |
if __name__ == "__main__": | |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) | |