Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import cv2 | |
| import tempfile | |
| import time | |
| import os | |
| if 'video_processed' not in st.session_state: | |
| st.session_state.video_processed = False | |
| class Video_View: | |
| def __init__(self, app, model): | |
| self.app = app | |
| self.model = model | |
| def toggle_video_processing(self): | |
| """Toggle video processing state.""" | |
| st.session_state.video_processed = False | |
| def show(self): | |
| # Top navigation | |
| col1_back, col2_back = st.columns([0.2, 0.8]) | |
| with col1_back: | |
| if st.button("Back", key='video_back', icon=':material/arrow_back:', type='primary'): | |
| self.app.change_page("Main") | |
| st.markdown("<h1 style='text-align: center;'>π§ Video Detection</h1>", unsafe_allow_html=True) | |
| st.divider() | |
| uploaded_file = st.file_uploader("Upload a video", type=["mp4", "avi", "mov"],on_change=self.toggle_video_processing) | |
| if not st.session_state.video_processed: | |
| if uploaded_file is not None: | |
| # Save to temp file | |
| tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| tfile.write(uploaded_file.read()) | |
| cap = cv2.VideoCapture(tfile.name) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Total number of frames | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| out_path = os.path.join(tempfile.gettempdir(), "predicted_video.mp4") | |
| out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'XVID'), fps, (width, height)) | |
| frame_count = 0 # To track the number of processed frames | |
| with st.spinner("Processing video... β³"): | |
| progress_bar = st.progress(0) # Create a progress bar | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Run YOLO model on the frame | |
| results = self.model(frame)[0] | |
| for result in results.boxes.data.tolist(): | |
| x1, y1, x2, y2, score, _ = result | |
| color = (0, 0, 255) if score > 0.5 else (0, 255, 0) | |
| label = f"{score:.2f}" | |
| cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 1) | |
| cv2.putText(frame, label, (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1) | |
| out.write(frame) | |
| # Update the progress bar | |
| frame_count += 1 | |
| progress_percentage = frame_count / total_frames # Calculate the percentage | |
| progress_bar.progress(progress_percentage) # Update the progress bar | |
| cap.release() | |
| out.release() | |
| cv2.destroyAllWindows() | |
| st.session_state.video_processed = True | |
| st.success("β Detection complete!") | |
| # Read and display the video | |
| with open(out_path, 'rb') as video_file: | |
| video_bytes = video_file.read() | |
| st.video(uploaded_file, loop=True, autoplay=True, muted=False) | |
| st.download_button("π₯ Download Predicted Video", video_bytes, file_name="predicted_video.mp4", | |
| mime="video/mp4") |