|
import os |
|
import tempfile |
|
import requests |
|
import gradio as gr |
|
from fastapi import FastAPI, Form |
|
from instagrapi import Client |
|
from PIL import Image |
|
import uvicorn |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
cl = Client() |
|
|
|
|
|
IG_USERNAME = os.getenv("USERNAME") |
|
IG_PASSWORD = os.getenv("PASSWORD") |
|
|
|
|
|
logs = [] |
|
|
|
def log(message: str): |
|
print(message) |
|
logs.append(message) |
|
if len(logs) > 50: |
|
logs.pop(0) |
|
|
|
def login(): |
|
try: |
|
cl.login(IG_USERNAME, IG_PASSWORD) |
|
log("β
Logged into Instagram as " + IG_USERNAME) |
|
except Exception as e: |
|
log(f"β οΈ Login failed: {e}") |
|
raise |
|
|
|
def fix_google_drive_url(url: str) -> str: |
|
if "drive.google.com" in url: |
|
if "/d/" in url: |
|
file_id = url.split("/d/")[1].split("/")[0] |
|
return f"https://drive.google.com/uc?export=download&id={file_id}" |
|
return url |
|
|
|
def process_upload(photo_url: str, caption: str): |
|
try: |
|
if not cl.user_id: |
|
login() |
|
|
|
photo_url = fix_google_drive_url(photo_url) |
|
log(f"π₯ Downloading from {photo_url}") |
|
|
|
response = requests.get(photo_url, stream=True) |
|
if response.status_code != 200: |
|
msg = f"Invalid image URL: {response.status_code}" |
|
log("β " + msg) |
|
return {"status": "error", "message": msg} |
|
|
|
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") |
|
tmp_file.write(response.content) |
|
tmp_file.close() |
|
|
|
try: |
|
Image.open(tmp_file.name).verify() |
|
except Exception: |
|
msg = "Downloaded file is not a valid image." |
|
log("β " + msg) |
|
return {"status": "error", "message": msg} |
|
|
|
cl.photo_upload(tmp_file.name, caption) |
|
log("β
Photo uploaded successfully with caption: " + caption) |
|
return {"status": "success", "message": "Photo uploaded successfully!"} |
|
|
|
except Exception as e: |
|
log("β οΈ Exception: " + str(e)) |
|
return {"status": "error", "message": str(e)} |
|
|
|
|
|
|
|
|
|
@app.post("/upload") |
|
def upload_post(photo_url: str = Form(...), caption: str = Form(...)): |
|
return process_upload(photo_url, caption) |
|
|
|
@app.get("/") |
|
def root(): |
|
return {"status": "ok", "message": "Instagram uploader is running π"} |
|
|
|
|
|
|
|
|
|
def gradio_upload(photo_url, caption): |
|
result = process_upload(photo_url, caption) |
|
return result["status"], result["message"], "\n".join(logs) |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## πΈ Instagram Auto Uploader") |
|
with gr.Row(): |
|
photo_url_in = gr.Textbox(label="Photo URL", placeholder="Paste image link (Google Drive supported)") |
|
caption_in = gr.Textbox(label="Caption", placeholder="Enter your caption") |
|
upload_btn = gr.Button("π Upload to Instagram") |
|
|
|
status_out = gr.Textbox(label="Status") |
|
message_out = gr.Textbox(label="Message") |
|
logs_out = gr.Textbox(label="Logs (last 50)", lines=15) |
|
|
|
upload_btn.click(fn=gradio_upload, inputs=[photo_url_in, caption_in], outputs=[status_out, message_out, logs_out]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|