from ultralytics import YOLO import time import numpy as np import mediapipe as mp import uvicorn from socketio import ASGIApp import cv2 from flask import Flask, render_template, request, Response, session, redirect, url_for from flask import Flask, render_template from flask_socketio import SocketIO from flask_socketio import emit from flask_cors import CORS from flask_socketio import SocketIO import yt_dlp as youtube_dl import uvicorn model_object_detection = YOLO("bisindov2.pt") app = Flask(__name__) socketio = SocketIO(app, cors_allowed_origins="*") CORS(app) app.secret_key = 'flask-sockets-builds' @socketio.on('image') def handle_image(image_data): # Process the received image data # Here, you can save the image, perform analysis, etc. # For demonstration purposes, let's just print the length of the image data print("Received image data length:", len(image_data)) # Send a response back to the client # You can send any data you want back to the client response_data = {'status': 'success'} socketio.emit('response_back', response_data) ###################################################### classes_translation = { "all": "الكل", "A": "أ", "B": "ب", "C": "ج", "D": "د", "F": "ف", "H": "هـ", "I": "أنا", "J": "جيم", "L": "إل", "M": "إم", "O": "أو", "R": "ر", "T": "ت", "U": "يو", "V": "في", "W": "دبليو", "Z": "زد", "additional": "إضافي", "alcohol": "مدرسة", "allergy": "حساسية", "bacon": "لحم المقدد", "bag": "حقيبة", "barbecue": "شواء", "bill": "فاتورة", "biscuit": "بسكويت", "bitter": "مر", "bread": "خبز", "burger": "برغر", "bye": "وداعاً", "cheese": "جبن", "chicken": "دجاج", "coke": "كوكاكولا", "cold": "بارد", "cost": "تكلفة", "coupon": "كوبون", "cup": "كوب", "dessert": "حلوى", "drink": "شراب", "drive": "قيادة", "eat": "تناول الطعام", "eggs": "بيض", "enjoy": "استمتع", "fork": "شوكة", "french fries": "بطاطس مقلية", "fresh": "طازج", "hello": "مرحبا", "hot": "ساخن", "icecream": "آيس كريم", "ingredients": "مكونات", "juicy": "عصيري", "ketchup": "كاتشب", "lactose": "لاكتوز", "lettuce": "خس", "lid": "غطاء", "manager": "مدير", "menu": "قائمة الطعام", "milk": "حليب", "mustard": "خردل", "napkin": "منديل", "no": "لا", "order": "طلب", "pepper": "فلفل", "pickle": "مخلل", "pizza": "بيتزا", "please": "من فضلك", "ready": "جاهز", "refill": "إعادة ملء", "repeat": "كرر", "safe": "آمن", "salt": "ملح", "sandwich": "ساندويتش", "sauce": "صلصة", "small": "صغير", "soda": "صودا", "sorry": "آسف", "spicy": "حار", "spoon": "ملعقة", "straw": "قش", "sugar": "سكر", "sweet": "حلو", "tissues": "مناديل", "total": "مجموع", "urgent": "عاجل", "vegetables": "خضروات", "warm": "دافئ", "water": "ماء", "what": "ماذا", "yoghurt": "زبادي", "your": "لك", "ILoveYou":"أحبك", "Halo":"مرحبًا" } ###################################################### class VideoStreaming(object): def __init__(self): super(VideoStreaming, self).__init__() print ("===== Video Streaming =====") self._preview = False self._flipH = False self._detect = False self._model = False self._mediaPipe = False self._confidence = 75.0 self.mp_hands = mp.solutions.hands self.hands = self.mp_hands.Hands() @property def confidence(self): return self._confidence @confidence.setter def confidence(self, value): self._confidence = int(value) @property def preview(self): return self._preview @preview.setter def preview(self, value): self._preview = bool(value) @property def flipH(self): return self._flipH @flipH.setter def flipH(self, value): self._flipH = bool(value) @property def detect(self): return self._detect @detect.setter def detect(self, value): self._detect = bool(value) @property def mediaPipe(self): return self._mediaPipe @mediaPipe.setter def mediaPipe(self, value): self._mediaPipe = bool(value) def show(self, url): print(url) self._preview = False self._flipH = False self._detect = False self._mediaPipe = False self._confidence = 75.0 ydl_opts = { "quiet": True, "no_warnings": True, "format": "best", "forceurl": True, } if url == '0': cap = cv2.VideoCapture(0) else: ydl = youtube_dl.YoutubeDL(ydl_opts) info = ydl.extract_info(url, download=False) url = info["url"] cap = cv2.VideoCapture(url) while True: if self._preview: if stop_flag: print("Process Stopped") return grabbed, frame = cap.read() if not grabbed: break if self.flipH: frame = cv2.flip(frame, 1) if self.detect: frame_yolo = frame.copy() results_yolo = model_object_detection.predict(frame_yolo, conf=self._confidence / 100) frame_yolo, labels = results_yolo[0].plot() list_labels = [] # labels_confidences for label in labels: confidence = label.split(" ")[-1] label_name = " ".join(label.split(" ")[:-1]) # Translate the label if it exists in the translation dictionary translated_label = classes_translation.get(label_name, label_name) list_labels.append(translated_label) list_labels.append(confidence) socketio.emit('label', list_labels) if self.mediaPipe: # Convert the image to RGB for processing with MediaPipe image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.hands.process(image) if results.multi_hand_landmarks: for hand_landmarks in results.multi_hand_landmarks: mp.solutions.drawing_utils.draw_landmarks( frame, hand_landmarks, self.mp_hands.HAND_CONNECTIONS, landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 0, 0), thickness=4, circle_radius=3), connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 255, 255), thickness=2, circle_radius=2), ) frame = cv2.imencode(".jpg", frame)[1].tobytes() yield ( b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n' ) else: snap = np.zeros(( 1000, 1000 ), np.uint8) label = "Streaming Off" H, W = snap.shape font = cv2.FONT_HERSHEY_PLAIN color = (255, 255, 255) cv2.putText(snap, label, (W//2 - 100, H//2), font, 2, color, 2) frame = cv2.imencode(".jpg", snap)[1].tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # check_settings() VIDEO = VideoStreaming() @app.route('/', methods=['GET', 'POST']) def homepage(): return render_template('hompage.html') @app.route('/index', methods=['GET', 'POST']) def index(): print("index") global stop_flag stop_flag = False if request.method == 'POST': print("Index post request") url = request.form['url'] print("index: ", url) session['url'] = url return redirect(url_for('index')) return render_template('index.html') @app.route('/video_feed') def video_feed(): url = session.get('url', None) print("video feed: ", url) if url is None: return redirect(url_for('homepage')) print("video feed: ", url) return Response(VIDEO.show(url), mimetype='multipart/x-mixed-replace; boundary=frame') # * Button requests @app.route("/request_preview_switch") def request_preview_switch(): VIDEO.preview = not VIDEO.preview print("*"*10, VIDEO.preview) return "nothing" @app.route("/request_flipH_switch") def request_flipH_switch(): VIDEO.flipH = not VIDEO.flipH print("*"*10, VIDEO.flipH) return "nothing" @app.route("/request_run_model_switch") def request_run_model_switch(): VIDEO.detect = not VIDEO.detect print("*"*10, VIDEO.detect) return "nothing" @app.route("/request_mediapipe_switch") def request_mediapipe_switch(): VIDEO.mediaPipe = not VIDEO.mediaPipe print("*"*10, VIDEO.mediaPipe) return "nothing" @app.route('/update_slider_value', methods=['POST']) def update_slider_value(): slider_value = request.form['sliderValue'] VIDEO.confidence = slider_value return 'OK' @app.route('/stop_process') def stop_process(): print("Process stop Request") global stop_flag stop_flag = True return 'Process Stop Request' @socketio.on('connect') def test_connect(): print('Connected') #emit('message', data, broadcast=True) if __name__ == '__main__': socketio.run(app, host="0.0.0.0", allow_unsafe_werkzeug=True,port=7860)