adgw
/

Joblib
quality_classifier_pl / main_parquet.py
adgw's picture
fix
aa2da0e verified
# -*- coding: utf-8 -*-
"""
Skrypt do masowego przetwarzania plik贸w parquet w celu klasyfikacji jako艣ci tekstu.
Ten modu艂 jest przeznaczony do wydajnej analizy du偶ych zbior贸w danych.
Skanuje folder wej艣ciowy w poszukiwaniu plik贸w .parquet, przetwarza ka偶dy z nich
r贸wnolegle z u偶yciem wielu proces贸w (`multiprocessing`), a nast臋pnie zapisuje
wyniki do nowego pliku w folderze wyj艣ciowym, zachowuj膮c oryginaln膮 struktur臋
danych i dodaj膮c wyniki klasyfikacji.
"""
import os
import glob
import time
import pickle
import joblib
import pandas as pd
import json
import numpy as np
from tqdm import tqdm
from typing import List
from text_analyzer.analyzer import TextAnalyzer
from text_analyzer import constants
# --- 艁adowanie modeli i konfiguracja ---
with open('models/scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
classifier = joblib.load("models/model.joblib")
text_analyzer = TextAnalyzer()
batch_size = 10
class NumpyJSONEncoder(json.JSONEncoder):
"""
Specjalny enkoder JSON do obs艂ugi typ贸w danych z NumPy,
kt贸re nie s膮 domy艣lnie serializowalne.
"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super(NumpyJSONEncoder, self).default(obj)
# --- Definicje funkcji ---
def predict_batch(texts: List[str], analyzer: TextAnalyzer, scaler_model, classifier_model) -> List[tuple[str | None, float | None]]:
"""
Przetwarza ca艂膮 list臋 tekst贸w wsadowo i zwraca list臋 predykcji.
"""
all_features = []
# Krok 1: Ekstrakcja cech dla wszystkich tekst贸w
feature_generator = analyzer.analyze_batch(texts, batch_size=batch_size)
for features_dict in tqdm(feature_generator, total=len(texts), desc="Analiza cech"):
ordered_features = [features_dict.get(fname, 0.0) for fname in constants.COLUMN_ORDER]
all_features.append(ordered_features)
if not all_features:
return []
# Krok 2: Przygotowanie i skalowanie wszystkich wektor贸w naraz
features_df = pd.DataFrame(all_features, columns=constants.COLUMN_ORDER)
features_scaled = scaler_model.transform(features_df)
# Krok 3: Predykcja dla ca艂ej paczki
pred_probas = classifier_model.predict_proba(features_scaled)
# Krok 4: Przetworzenie wynik贸w
results = []
labels = ["LOW", "MEDIUM", "HIGH"]
for single_pred_proba in pred_probas:
category_prob = {
label: prob
for label, prob in zip(labels, single_pred_proba)
}
# Sortujemy, aby znale藕膰 kategori臋 z najwy偶szym prawdopodobie艅stwem
sorted_category_prob = sorted(category_prob.items(), key=lambda item: item[1], reverse=True)
most_probable_category, confidence = sorted_category_prob[0]
results.append((most_probable_category, round(float(confidence) * 100, 2)))
return results
def process_parquet_file(input_file: str, output_file: str):
"""
Orkiestruje proces przetwarzania pojedynczego pliku .parquet wsadowo.
Wczytuje plik, przetwarza kolumn臋 'text', a nast臋pnie dopisuje
wynikowe kolumny 'quality_ai' i 'confidence' do nowego pliku Parquet.
"""
try:
# Krok 1: Wczytaj ca艂y plik Parquet do ramki danych pandas
df = pd.read_parquet(input_file)
except Exception as e:
print(f"Nie uda艂o si臋 wczyta膰 pliku {input_file}. B艂膮d: {e}")
return
# Sprawdzenie, czy kolumna 'text' istnieje
if 'text' not in df.columns:
print(f"B艂膮d: W pliku {input_file} brakuje wymaganej kolumny 'text'.")
return
# Krok 2: Przygotuj dane do przetwarzania wsadowego
texts_to_process = df['text'].tolist()
print(f"Wczytano {len(texts_to_process)} wierszy. Rozpoczynam przetwarzanie wsadowe...")
# Krok 3: Wywo艂aj funkcj臋 wsadow膮 (ta cz臋艣膰 pozostaje bez zmian)
# Zak艂adamy, 偶e predict_batch zwraca list臋 tuple: [(kategoria, pewno艣膰), ...]
results = predict_batch(texts_to_process, text_analyzer, scaler, classifier)
# Krok 4: Dodaj wyniki jako nowe kolumny do ramki danych
categories = [res[0] for res in results]
confidences = [res[1] for res in results]
df['quality_ai'] = categories
df['confidence'] = confidences
# Krok 5: Zapisz zmodyfikowan膮 ramk臋 danych do nowego pliku Parquet
try:
df.to_parquet(output_file, index=False)
print(df.head(10))
print(f"Pomy艣lnie zapisano przetworzone dane do pliku {output_file}")
except Exception as e:
print(f"Nie uda艂o si臋 zapisa膰 pliku {output_file}. B艂膮d: {e}")
# --- G艂贸wny blok wykonawczy ---
if __name__ == '__main__':
print("Inicjalizacja skryptu przetwarzania wsadowego...")
INPUT_FOLDER = 'input_parquet'
OUTPUT_FOLDER = 'output'
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
# Skanowanie plik贸w
parquet_files = glob.glob(os.path.join(INPUT_FOLDER, '*.parquet'))
for file_path in parquet_files:
start_time = time.time()
output_file = os.path.join(OUTPUT_FOLDER, os.path.basename(file_path))
if os.path.exists(output_file):
print(f"POMIJAM - plik ju偶 istnieje: {output_file}")
continue
print(f"\n--- Przetwarzanie pliku: {file_path} ---")
process_parquet_file(file_path, output_file)
end_time = time.time()
print(f"Processing time: {end_time - start_time:.4f} seconds")
print("\nWszystkie pliki zosta艂y przetworzone!")