Spaces:
Sleeping
Sleeping
File size: 10,545 Bytes
410fd66 0d6f092 68390a5 410fd66 cdb9181 410fd66 cdb9181 68390a5 cdb9181 68390a5 cdb9181 68390a5 cdb9181 410fd66 9c963a6 410fd66 cdb9181 234e921 cdb9181 f66fff5 9c963a6 f66fff5 9c963a6 f66fff5 9c963a6 f66fff5 410fd66 f66fff5 cdb9181 9c963a6 cdb9181 9c963a6 f66fff5 9c963a6 cdb9181 0dc01e7 cdb9181 0dc01e7 cdb9181 0dc01e7 cdb9181 410fd66 9c963a6 cdb9181 9c963a6 0dc01e7 9c963a6 0dc01e7 cdb9181 410fd66 f66fff5 9c963a6 cdb9181 410fd66 cdb9181 410fd66 9c963a6 cdb9181 68390a5 f66fff5 cdb9181 9c963a6 68390a5 a7f5e55 68390a5 cdb9181 410fd66 a937006 f66fff5 a937006 410fd66 cdb9181 0dc01e7 410fd66 f66fff5 c997088 58d097d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
import gradio as gr
import librosa
import numpy as np
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from simple_salesforce import Salesforce
import os
from datetime import datetime
import logging
import webrtcvad
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Salesforce credentials
SF_USERNAME = os.getenv("SF_USERNAME")
SF_PASSWORD = os.getenv("SF_PASSWORD")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN")
SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://login.salesforce.com")
# Initialize Salesforce
sf = None
try:
if all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]):
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
instance_url=SF_INSTANCE_URL
)
logger.info("Connected to Salesforce")
else:
logger.warning("Salesforce credentials missing; skipping integration")
except Exception as e:
logger.error(f"Salesforce connection failed: {str(e)}")
# Load Whisper model for speech-to-text
whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")
whisper_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny")
whisper_model.config.forced_decoder_ids = whisper_processor.get_decoder_prompt_ids(language="english", task="transcribe")
# Initialize VAD
vad = webrtcvad.Vad(mode=2) # Moderate mode for balanced voice detection
def extract_health_features(audio, sr):
"""Extract health-related audio features."""
try:
# Normalize audio
audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) != 0 else audio
# Voice Activity Detection
frame_duration = 30 # ms
frame_samples = int(sr * frame_duration / 1000)
frames = [audio[i:i + frame_samples] for i in range(0, len(audio), frame_samples)]
voiced_frames = [
frame for frame in frames
if len(frame) == frame_samples and vad.is_speech((frame * 32768).astype(np.int16).tobytes(), sr)
]
if not voiced_frames:
raise ValueError("No voiced segments detected")
voiced_audio = np.concatenate(voiced_frames)
# Pitch (F0)
pitches, magnitudes = librosa.piptrack(y=voiced_audio, sr=sr, fmin=50, fmax=500)
valid_pitches = [p for p in pitches[magnitudes > 0] if p > 0]
pitch = np.mean(valid_pitches) if valid_pitches else 0
jitter = np.std(valid_pitches) / pitch if pitch and valid_pitches else 0
# Shimmer (amplitude variation)
amplitudes = librosa.feature.rms(y=voiced_audio, frame_length=2048, hop_length=512)[0]
shimmer = np.std(amplitudes) / np.mean(amplitudes) if np.mean(amplitudes) else 0
# Energy
energy = np.mean(librosa.feature.rms(y=voiced_audio, frame_length=2048, hop_length=512)[0])
return {
"pitch": pitch,
"jitter": jitter * 100, # Convert to percentage
"shimmer": shimmer * 100, # Convert to percentage
"energy": energy
}
except Exception as e:
logger.error(f"Feature extraction failed: {str(e)}")
raise
def transcribe_audio(audio):
"""Transcribe audio to text using Whisper."""
try:
inputs = whisper_processor(audio, sampling_rate=16000, return_tensors="pt")
with torch.no_grad():
generated_ids = whisper_model.generate(inputs["input_features"])
transcription = whisper_processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
logger.info(f"Transcription: {transcription}")
return transcription
except Exception as e:
logger.error(f"Transcription failed: {str(e)}")
return ""
def analyze_symptoms(text):
"""Mock symptom-to-disease analysis (placeholder for symptom-2-disease-net)."""
text = text.lower()
feedback = []
if "cough" in text or "difficulty breathing" in text:
feedback.append("Symptoms like cough or difficulty breathing may indicate a respiratory condition, such as bronchitis or asthma. Consult a doctor.")
if "stressed" in text or "stress" in text or "fatigue" in text:
feedback.append("Reported stress or fatigue may suggest conditions like anxiety or chronic fatigue syndrome. Seek medical advice.")
if not feedback:
feedback.append("No specific conditions detected from reported symptoms.")
return "\n".join(feedback)
def analyze_voice(audio_file=None, audio_data=None):
"""Analyze voice for health indicators."""
try:
# Use provided audio file or in-memory audio data
if audio_file and os.path.exists(audio_file):
audio, sr = librosa.load(audio_file, sr=16000)
elif audio_data is not None:
audio = audio_data
sr = 16000
else:
raise ValueError("No audio input provided")
if len(audio) < sr:
raise ValueError("Audio too short (minimum 1 second)")
# Extract voice features
features = extract_health_features(audio, sr)
# Transcribe audio for symptom analysis
transcription = transcribe_audio(audio)
symptom_feedback = analyze_symptoms(transcription) if transcription else "No transcription available for symptom analysis."
# Analyze voice features for health indicators
feedback = []
respiratory_score = features["jitter"]
mental_health_score = features["shimmer"]
# Rule-based analysis (thresholds from voice pathology studies)
if respiratory_score > 1.0:
feedback.append(f"Elevated jitter ({respiratory_score:.2f}%) suggests potential respiratory issues, such as vocal cord irregularities or breathing difficulties. Consult a doctor.")
if mental_health_score > 5.0:
feedback.append(f"Elevated shimmer ({mental_health_score:.2f}%) suggests potential stress or emotional strain, which may affect vocal stability. Consider consulting a healthcare provider.")
if features["energy"] < 0.01:
feedback.append(f"Low vocal energy ({features['energy']:.4f}) may indicate fatigue or reduced vocal effort, potentially linked to physical or mental exhaustion.")
if not feedback:
feedback.append("No significant health indicators detected from voice features.")
# Combine voice and symptom feedback
feedback.append("\n**Symptom Analysis (from transcription)**:")
feedback.append(symptom_feedback)
feedback.append("\n**Voice Analysis Details**:")
feedback.append(f"Pitch: {features['pitch']:.2f} Hz (average fundamental frequency)")
feedback.append(f"Jitter: {respiratory_score:.2f}% (pitch variation, higher values may indicate respiratory issues)")
feedback.append(f"Shimmer: {mental_health_score:.2f}% (amplitude variation, higher values may indicate stress)")
feedback.append(f"Energy: {features['energy']:.4f} (vocal intensity, lower values may indicate fatigue)")
feedback.append(f"Transcription: {transcription if transcription else 'None'}")
feedback.append("\n**Disclaimer**: This is a preliminary analysis, not a medical diagnosis. Always consult a healthcare provider for professional evaluation.")
feedback_str = "\n".join(feedback)
# Store in Salesforce
if sf and audio_file:
store_in_salesforce(audio_file, feedback_str, respiratory_score, mental_health_score, features, transcription)
return feedback_str
except Exception as e:
logger.error(f"Audio processing failed: {str(e)}")
return f"Error: {str(e)}"
def store_in_salesforce(audio_file, feedback, respiratory_score, mental_health_score, features, transcription):
"""Store results in Salesforce."""
try:
sf.HealthAssessment__c.create({
"AssessmentDate__c": datetime.utcnow().isoformat(),
"Feedback__c": feedback,
"RespiratoryScore__c": float(respiratory_score),
"MentalHealthScore__c": float(mental_health_score),
"AudioFileName__c": os.path.basename(audio_file) if audio_file else "in_memory_audio",
"Pitch__c": float(features["pitch"]),
"Jitter__c": float(features["jitter"]),
"Shimmer__c": float(features["shimmer"]),
"Energy__c": float(features["energy"]),
"Transcription__c": transcription
})
logger.info("Stored in Salesforce")
except Exception as e:
logger.error(f"Salesforce storage failed: {str(e)}")
def test_with_sample_audio():
"""Test with dummy audio simulating a user's voice saying 'I have a cough and feel stressed'."""
logger.info("Starting test with in-memory audio simulation")
# Generate synthetic audio: 150 Hz base frequency with variations to mimic a stressed voice with cough
sr = 16000
t = np.linspace(0, 2, 2 * sr)
freq_mod = 150 + 25 * np.sin(2 * np.pi * 0.5 * t) # Increased jitter for respiratory hint
amplitude_mod = 0.5 + 0.25 * np.sin(2 * np.pi * 0.3 * t) # Increased shimmer for stress hint
noise = 0.05 * np.random.normal(0, 1, len(t)) # Moderate noise
dummy_audio = amplitude_mod * np.sin(2 * np.pi * freq_mod * t) + noise
# Ensure dummy_audio is a 1D NumPy array
dummy_audio = np.asarray(dummy_audio, dtype=np.float32).flatten()
if not isinstance(dummy_audio, np.ndarray) or dummy_audio.ndim != 1:
logger.error(f"Invalid dummy_audio: type={type(dummy_audio)}, shape={dummy_audio.shape if hasattr(dummy_audio, 'shape') else 'N/A'}")
raise ValueError("Generated audio is not a 1D NumPy array")
logger.info(f"Dummy audio shape: {dummy_audio.shape}, type: {type(dummy_audio)}, dtype: {dummy_audio.dtype}")
return analyze_voice(audio_data=dummy_audio)
# Gradio interface
iface = gr.Interface(
fn=analyze_voice,
inputs=gr.Audio(type="filepath", label="Record/Upload Voice (WAV, MP3, FLAC, 1+ sec)"),
outputs=gr.Textbox(label="Health Assessment Results"),
title="Voice Health Analyzer",
description="Analyze your voice for preliminary health insights. Supports WAV, MP3, FLAC in multiple languages. Minimum 1 second."
)
if __name__ == "__main__":
logger.info("Starting Voice Health Analyzer at 10:31 AM IST, June 23, 2025")
print(test_with_sample_audio())
iface.launch(server_name="0.0.0.0", server_port=7860) |