Spaces:
Sleeping
Sleeping
import logging | |
from typing import Any, Dict, List, Optional | |
from presidio_analyzer import AnalyzerEngine, EntityRecognizer | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_analyzer.nlp_engine import NlpEngineProvider | |
from presidio_anonymizer.entities.engine import OperatorConfig | |
from pydantic import BaseModel, Field, PrivateAttr | |
from obsei.analyzer.base_analyzer import ( | |
BaseAnalyzer, | |
BaseAnalyzerConfig, | |
) | |
from obsei.payload import TextPayload | |
logger = logging.getLogger(__name__) | |
class PresidioModelConfig(BaseModel): | |
lang_code: Optional[str] = Field("en") | |
model_name: Optional[str] = Field("en_core_web_lg") | |
class PresidioEngineConfig(BaseModel): | |
nlp_engine_name: Optional[str] = Field("spacy") | |
models: Optional[List[PresidioModelConfig]] = None | |
def __init__(self, **data: Any): | |
super().__init__(**data) | |
if not self.models or len(self.models) == 0: | |
self.models = [PresidioModelConfig()] | |
class PresidioAnonymizerConfig(OperatorConfig, BaseModel): # type: ignore | |
def __init__(self, anonymizer_name: str, params: Optional[Dict[str, Any]] = None): | |
super().__init__(anonymizer_name=anonymizer_name, params=params) | |
class Config: | |
arbitrary_types_allowed = True | |
class PresidioPIIAnalyzerConfig(BaseAnalyzerConfig): | |
TYPE: str = "PresidioPII" | |
# To find more details refer https://microsoft.github.io/presidio/anonymizer/ | |
anonymizers_config: Optional[Dict[str, PresidioAnonymizerConfig]] = None | |
# To see list of supported entities refer https://microsoft.github.io/presidio/supported_entities/ | |
# By default it will search for all the supported entities | |
entities: Optional[List[str]] = None | |
analyze_only: Optional[bool] = False | |
replace_original_text: Optional[bool] = True | |
# Whether the analysis decision process steps returned in the response | |
return_decision_process: Optional[bool] = False | |
class PresidioPIIAnalyzer(BaseAnalyzer): | |
_analyzer: AnalyzerEngine = PrivateAttr() | |
_anonymizer: AnonymizerEngine = PrivateAttr() | |
TYPE: str = "PresidioPII" | |
engine_config: Optional[PresidioEngineConfig] = None | |
# To see list of supported entities refer https://microsoft.github.io/presidio/supported_entities/ | |
# To add customer recognizers refer https://microsoft.github.io/presidio/analyzer/adding_recognizers/ | |
entity_recognizers: Optional[List[EntityRecognizer]] = None | |
# To find more details refer https://microsoft.github.io/presidio/anonymizer/ | |
anonymizers_config: Optional[Dict[str, OperatorConfig]] = None | |
def __init__(self, **data: Any): | |
super().__init__(**data) | |
if not self.engine_config: | |
self.engine_config = PresidioEngineConfig() | |
if not self.engine_config.models or len(self.engine_config.models) == 0: | |
self.engine_config.models = [PresidioModelConfig()] | |
# If spacy engine then load Spacy models and select languages | |
languages = [] | |
for model_config in self.engine_config.models: | |
languages.append(model_config.lang_code) | |
# Check SpacyNlpEngine.engine_name | |
if ( | |
self.engine_config.nlp_engine_name == "spacy" | |
and model_config.model_name is not None | |
): | |
try: | |
spacy_model = __import__(model_config.model_name) | |
spacy_model.load() | |
logger.info( | |
f"Spacy model {model_config.model_name} is already downloaded" | |
) | |
except: | |
logger.warning( | |
f"Spacy model {model_config.model_name} is not downloaded" | |
) | |
logger.warning( | |
f"Downloading spacy model {model_config.model_name}, it might take some time" | |
) | |
from spacy.cli import download # type: ignore | |
download(model_config.model_name) | |
# Create NLP engine based on configuration | |
provider = NlpEngineProvider(nlp_configuration=self.engine_config.dict()) | |
nlp_engine = provider.create_engine() | |
# Pass the created NLP engine and supported_languages to the AnalyzerEngine | |
self._analyzer = AnalyzerEngine( | |
nlp_engine=nlp_engine, supported_languages=languages | |
) | |
# self._analyzer.registry.load_predefined_recognizers() | |
if self.entity_recognizers: | |
for entity_recognizer in self.entity_recognizers: | |
self._analyzer.registry.add_recognizer(entity_recognizer) | |
# Initialize the anonymizer with logger | |
self._anonymizer = AnonymizerEngine() | |
def analyze_input( # type: ignore[override] | |
self, | |
source_response_list: List[TextPayload], | |
analyzer_config: Optional[PresidioPIIAnalyzerConfig] = None, | |
language: Optional[str] = "en", | |
**kwargs: Any, | |
) -> List[TextPayload]: | |
if analyzer_config is None: | |
raise ValueError("analyzer_config can't be None") | |
analyzer_output: List[TextPayload] = [] | |
for batch_responses in self.batchify(source_response_list, self.batch_size): | |
for source_response in batch_responses: | |
analyzer_result = self._analyzer.analyze( | |
text=source_response.processed_text, | |
entities=analyzer_config.entities, | |
return_decision_process=analyzer_config.return_decision_process, | |
language=language, | |
) | |
anonymized_result = None | |
if not analyzer_config.analyze_only: | |
anonymizers_config = ( | |
analyzer_config.anonymizers_config or self.anonymizers_config | |
) | |
if ( | |
source_response.processed_text is not None | |
and len(source_response.processed_text) > 0 | |
): | |
anonymized_result = self._anonymizer.anonymize( | |
text=source_response.processed_text, | |
operators=anonymizers_config, | |
analyzer_results=analyzer_result, | |
) | |
if ( | |
analyzer_config.replace_original_text | |
and anonymized_result is not None | |
): | |
text = anonymized_result.text | |
else: | |
text = source_response.processed_text | |
segmented_data = { | |
"pii_data": { | |
"analyzer_result": [vars(result) for result in analyzer_result], | |
"anonymized_result": None | |
if not anonymized_result | |
else [vars(item) for item in anonymized_result.items], | |
"anonymized_text": None | |
if not anonymized_result | |
else anonymized_result.text, | |
} | |
} | |
if source_response.segmented_data: | |
segmented_data = { | |
**segmented_data, | |
**source_response.segmented_data, | |
} | |
analyzer_output.append( | |
TextPayload( | |
processed_text=text, | |
meta=source_response.meta, | |
segmented_data=segmented_data, | |
source_name=source_response.source_name, | |
) | |
) | |
return analyzer_output | |