File size: 2,678 Bytes
25a7cd3 c983b19 25a7cd3 e106d7d 25a7cd3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
"""Module for AstraDB database"""
import logging
import os
import astrapy
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.ERROR)
class KnowledgeBase: # pylint: disable=too-few-public-methods
"""
AstraDB class for direct collection operations.
"""
def __init__(self):
"""Initialize AstraDB connection."""
self.collection = astrapy.DataAPIClient(
os.environ["ASTRA_DB_APPLICATION_TOKEN"]).get_database(
os.environ["ASTRA_DB_API_ENDPOINT"]).documents
def get_doc_count(self, user_id: str) -> dict:
"""
Count unique emails and files for a specific user.
Args:
user_id (str): The user's email address
Returns:
dict: {"emails": count, "files": count, "total_documents": count}
Raises:
ValueError: If user_id is invalid
Exception: If database query fails
"""
if not user_id or not isinstance(user_id, str):
raise ValueError("user_id must be a non-empty string")
try:
# Get all documents for the user with type gmail or file
filter_criteria = {
"metadata.userId": user_id,
"metadata.type": {"$in": ["gmail", "file"]}
}
# Use direct collection access
results = list(self.collection.find(filter=filter_criteria))
# Group by metadata.id to get unique documents
unique_docs = {}
for doc in results:
doc_id = doc.get("metadata", {}).get("id")
doc_type = doc.get("metadata", {}).get("type")
if doc_id and doc_type:
if doc_id not in unique_docs:
unique_docs[doc_id] = doc_type
# Count by type
email_count = sum(1 for doc_type in unique_docs.values() if doc_type == "gmail")
file_count = sum(1 for doc_type in unique_docs.values() if doc_type == "file")
total_count = len(unique_docs)
return {
"gmail": email_count,
"file": file_count,
"total": total_count
}
except Exception as e: # pylint: disable=broad-exception-caught
logging.error("Failed to get document count for user %s: %s", user_id, str(e))
# pylint: disable=raise-missing-from
raise Exception(f"Database query failed: {str(e)}") # pylint: disable=broad-exception-raised
|