sign_final / app.py
zolodickk's picture
Update app.py
daa1383 verified
raw
history blame
10.3 kB
from ultralytics import YOLO
import time
import numpy as np
import mediapipe as mp
import uvicorn
from socketio import ASGIApp
import cv2
from flask import Flask, render_template, request, Response, session, redirect, url_for
from flask import Flask, render_template
from flask_socketio import SocketIO
from flask_socketio import emit
from flask_cors import CORS
from flask_socketio import SocketIO
import yt_dlp as youtube_dl
import uvicorn
model_object_detection = YOLO("bisindov2.pt")
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
CORS(app)
app.secret_key = 'flask-sockets-builds'
@socketio.on('image')
def handle_image(image_data):
# Process the received image data
# Here, you can save the image, perform analysis, etc.
# For demonstration purposes, let's just print the length of the image data
print("Received image data length:", len(image_data))
# Send a response back to the client
# You can send any data you want back to the client
response_data = {'status': 'success'}
socketio.emit('response_back', response_data)
######################################################
classes_translation = {
"all": "الكل",
"A": "أ",
"B": "ب",
"C": "ج",
"D": "د",
"F": "ف",
"H": "هـ",
"I": "أنا",
"J": "جيم",
"L": "إل",
"M": "إم",
"O": "أو",
"R": "ر",
"T": "ت",
"U": "يو",
"V": "في",
"W": "دبليو",
"Z": "زد",
"additional": "إضافي",
"alcohol": "مدرسة",
"allergy": "حساسية",
"bacon": "لحم المقدد",
"bag": "حقيبة",
"barbecue": "شواء",
"bill": "فاتورة",
"biscuit": "بسكويت",
"bitter": "مر",
"bread": "خبز",
"burger": "برغر",
"bye": "وداعاً",
"cheese": "جبن",
"chicken": "دجاج",
"coke": "كوكاكولا",
"cold": "بارد",
"cost": "تكلفة",
"coupon": "كوبون",
"cup": "كوب",
"dessert": "حلوى",
"drink": "شراب",
"drive": "قيادة",
"eat": "تناول الطعام",
"eggs": "بيض",
"enjoy": "استمتع",
"fork": "شوكة",
"french fries": "بطاطس مقلية",
"fresh": "طازج",
"hello": "مرحبا",
"hot": "ساخن",
"icecream": "آيس كريم",
"ingredients": "مكونات",
"juicy": "عصيري",
"ketchup": "كاتشب",
"lactose": "لاكتوز",
"lettuce": "خس",
"lid": "غطاء",
"manager": "مدير",
"menu": "قائمة الطعام",
"milk": "حليب",
"mustard": "خردل",
"napkin": "منديل",
"no": "لا",
"order": "طلب",
"pepper": "فلفل",
"pickle": "مخلل",
"pizza": "بيتزا",
"please": "من فضلك",
"ready": "جاهز",
"refill": "إعادة ملء",
"repeat": "كرر",
"safe": "آمن",
"salt": "ملح",
"sandwich": "ساندويتش",
"sauce": "صلصة",
"small": "صغير",
"soda": "صودا",
"sorry": "آسف",
"spicy": "حار",
"spoon": "ملعقة",
"straw": "قش",
"sugar": "سكر",
"sweet": "حلو",
"tissues": "مناديل",
"total": "مجموع",
"urgent": "عاجل",
"vegetables": "خضروات",
"warm": "دافئ",
"water": "ماء",
"what": "ماذا",
"yoghurt": "زبادي",
"your": "لك",
"ILoveYou":"أحبك",
"Halo":"مرحبًا"
}
######################################################
class VideoStreaming(object):
def __init__(self):
super(VideoStreaming, self).__init__()
print ("===== Video Streaming =====")
self._preview = False
self._flipH = False
self._detect = False
self._model = False
self._mediaPipe = False
self._confidence = 75.0
self.mp_hands = mp.solutions.hands
self.hands = self.mp_hands.Hands()
@property
def confidence(self):
return self._confidence
@confidence.setter
def confidence(self, value):
self._confidence = int(value)
@property
def preview(self):
return self._preview
@preview.setter
def preview(self, value):
self._preview = bool(value)
@property
def flipH(self):
return self._flipH
@flipH.setter
def flipH(self, value):
self._flipH = bool(value)
@property
def detect(self):
return self._detect
@detect.setter
def detect(self, value):
self._detect = bool(value)
@property
def mediaPipe(self):
return self._mediaPipe
@mediaPipe.setter
def mediaPipe(self, value):
self._mediaPipe = bool(value)
def show(self, url):
print(url)
self._preview = False
self._flipH = False
self._detect = False
self._mediaPipe = False
self._confidence = 75.0
ydl_opts = {
"quiet": True,
"no_warnings": True,
"format": "best",
"forceurl": True,
}
if url == '0':
cap = cv2.VideoCapture(0)
else:
ydl = youtube_dl.YoutubeDL(ydl_opts)
info = ydl.extract_info(url, download=False)
url = info["url"]
cap = cv2.VideoCapture(url)
while True:
if self._preview:
if stop_flag:
print("Process Stopped")
return
grabbed, frame = cap.read()
if not grabbed:
break
if self.flipH:
frame = cv2.flip(frame, 1)
if self.detect:
frame_yolo = frame.copy()
results_yolo = model_object_detection.predict(frame_yolo, conf=self._confidence / 100)
frame_yolo, labels = results_yolo[0].plot()
list_labels = []
# labels_confidences
for label in labels:
confidence = label.split(" ")[-1]
label_name = " ".join(label.split(" ")[:-1])
# Translate the label if it exists in the translation dictionary
translated_label = classes_translation.get(label_name, label_name)
list_labels.append(translated_label)
list_labels.append(confidence)
socketio.emit('label', list_labels)
if self.mediaPipe:
# Convert the image to RGB for processing with MediaPipe
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.hands.process(image)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp.solutions.drawing_utils.draw_landmarks(
frame,
hand_landmarks,
self.mp_hands.HAND_CONNECTIONS,
landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 0, 0), thickness=4, circle_radius=3),
connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 255, 255), thickness=2, circle_radius=2),
)
frame = cv2.imencode(".jpg", frame)[1].tobytes()
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
)
else:
snap = np.zeros((
1000,
1000
), np.uint8)
label = "Streaming Off"
H, W = snap.shape
font = cv2.FONT_HERSHEY_PLAIN
color = (255, 255, 255)
cv2.putText(snap, label, (W//2 - 100, H//2),
font, 2, color, 2)
frame = cv2.imencode(".jpg", snap)[1].tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
# check_settings()
VIDEO = VideoStreaming()
@app.route('/', methods=['GET', 'POST'])
def homepage():
return render_template('hompage.html')
@app.route('/index', methods=['GET', 'POST'])
def index():
print("index")
global stop_flag
stop_flag = False
if request.method == 'POST':
print("Index post request")
url = request.form['url']
print("index: ", url)
session['url'] = url
return redirect(url_for('index'))
return render_template('index.html')
@app.route('/video_feed')
def video_feed():
url = session.get('url', None)
print("video feed: ", url)
if url is None:
return redirect(url_for('homepage'))
print("video feed: ", url)
return Response(VIDEO.show(url), mimetype='multipart/x-mixed-replace; boundary=frame')
# * Button requests
@app.route("/request_preview_switch")
def request_preview_switch():
VIDEO.preview = not VIDEO.preview
print("*"*10, VIDEO.preview)
return "nothing"
@app.route("/request_flipH_switch")
def request_flipH_switch():
VIDEO.flipH = not VIDEO.flipH
print("*"*10, VIDEO.flipH)
return "nothing"
@app.route("/request_run_model_switch")
def request_run_model_switch():
VIDEO.detect = not VIDEO.detect
print("*"*10, VIDEO.detect)
return "nothing"
@app.route("/request_mediapipe_switch")
def request_mediapipe_switch():
VIDEO.mediaPipe = not VIDEO.mediaPipe
print("*"*10, VIDEO.mediaPipe)
return "nothing"
@app.route('/update_slider_value', methods=['POST'])
def update_slider_value():
slider_value = request.form['sliderValue']
VIDEO.confidence = slider_value
return 'OK'
@app.route('/stop_process')
def stop_process():
print("Process stop Request")
global stop_flag
stop_flag = True
return 'Process Stop Request'
@socketio.on('connect')
def test_connect():
print('Connected')
#emit('message', data, broadcast=True)
if __name__ == '__main__':
socketio.run(app, host="0.0.0.0", allow_unsafe_werkzeug=True,port=7860)