Spaces:
Runtime error
Runtime error
| import os | |
| import time | |
| import logging | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from confluent_kafka import KafkaException, TopicPartition, Producer, Consumer | |
| from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer | |
| from confluent_kafka.serialization import MessageField, SerializationContext | |
| from aitask import handle_message, TooManyRequestsError | |
| from schemaregistry import SchemaClient | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| schema_registry_url = os.getenv("SCHEMA_REGISTRY_URL") | |
| kafka_domain = os.getenv('DOMAIN') | |
| password = os.getenv('PASSWORD') | |
| conf = { | |
| 'bootstrap.servers': f"{kafka_domain}:29092", | |
| 'security.protocol': 'SASL_PLAINTEXT', | |
| 'sasl.mechanism': 'PLAIN', | |
| 'sasl.username': "dathuynh", | |
| 'sasl.password': password, | |
| } | |
| # Shutdown flag | |
| shutdown_event = threading.Event() | |
| def avro_deserializer(): | |
| schema_client = SchemaClient(schema_registry_url, "cybersentinal.avro.scan") | |
| schema_str = schema_client.get_schema_str() | |
| if schema_str is None: | |
| raise RuntimeError("Failed to fetch schema for MessageResponse") | |
| schema_registry_client = schema_client.schema_registry_client | |
| return AvroDeserializer(schema_registry_client, schema_str) | |
| def avro_serializer(): | |
| schema_client = SchemaClient(schema_registry_url, "cybersentinal.avro.scandetail") | |
| schema_str = schema_client.get_schema_str() | |
| if schema_str is None: | |
| raise RuntimeError("Failed to fetch schema for MessageResponse") | |
| schema_registry_client = schema_client.schema_registry_client | |
| return AvroSerializer(schema_registry_client, schema_str) | |
| def create_consumer(group_id): | |
| consumer_conf = { | |
| **conf, | |
| 'group.id': group_id, | |
| 'auto.offset.reset': 'latest', | |
| 'session.timeout.ms': 60000, | |
| 'heartbeat.interval.ms': 3000, | |
| 'enable.auto.commit': False, # Manual commit | |
| 'log_level': 4 | |
| } | |
| return Consumer(**consumer_conf) | |
| def create_producer(): | |
| producer_conf = { | |
| **conf, | |
| 'linger.ms': 10, | |
| 'batch.num.messages': 1000, | |
| 'queue.buffering.max.ms': 1000 | |
| } | |
| return Producer(**producer_conf) | |
| # Create producer instance | |
| producer = create_producer() | |
| def ensure_producer_connected(producer): | |
| retries = 5 | |
| for attempt in range(retries): | |
| try: | |
| producer.list_topics(timeout=5) | |
| break | |
| except KafkaException as e: | |
| if attempt < retries - 1: | |
| time.sleep(5) | |
| else: | |
| logger.error("Max retries reached. Could not establish a producer connection.") | |
| raise e | |
| def decode_message(msg, avro_deserializer, topic): | |
| try: | |
| byte_message = msg.value() | |
| return avro_deserializer(byte_message, SerializationContext(topic, MessageField.VALUE)) | |
| except Exception as e: | |
| logger.error(f"Error decoding message: {e}") | |
| return None | |
| def kafka_consumer(group_id, topic): | |
| consumer = create_consumer(group_id) | |
| consumer.subscribe([topic]) | |
| deserializer = avro_deserializer() | |
| serializer = avro_serializer() | |
| logger.info(f"Consumer {group_id} is running. Waiting for messages on topic {topic}...") | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| shutdown_timer = threading.Timer(14400, shutdown_event.set) # Set to shutdown after 4 hours | |
| shutdown_timer.start() | |
| while not shutdown_event.is_set(): | |
| try: | |
| msgs = consumer.consume(num_messages=10, timeout=1.0) | |
| if not msgs: | |
| continue | |
| futures = [ | |
| executor.submit( | |
| handle_message, | |
| decode_message(msg, deserializer, topic), | |
| producer, | |
| ensure_producer_connected, | |
| serializer | |
| ) for msg in msgs if decode_message(msg, deserializer, topic) is not None | |
| ] | |
| for future in as_completed(futures): | |
| try: | |
| future.result() | |
| except Exception as e: | |
| if isinstance(e, TooManyRequestsError): | |
| partition = msg.partition() | |
| consumer.pause([TopicPartition(topic, partition)]) | |
| logger.info(f"Paused partition {partition} due to TooManyRequestsError") | |
| handle_retry(consumer, topic, partition, e.retry_after) | |
| else: | |
| logger.error(f"Error processing message: {e}") | |
| raise e | |
| except KafkaException as e: | |
| logger.error(f"Kafka exception: {e}. Restarting consumer loop...") | |
| time.sleep(5) | |
| except KeyboardInterrupt: | |
| logger.info("Consumer interrupted. Exiting...") | |
| shutdown_event.set() | |
| shutdown_timer.cancel() | |
| consumer.close() | |
| def handle_retry(consumer, topic, partition, retry_after): | |
| time.sleep(retry_after) | |
| consumer.resume([TopicPartition(topic, partition)]) | |
| def start_kafka_consumer_thread(group_id, topic): | |
| consumer_thread = threading.Thread(target=kafka_consumer, args=(group_id, topic)) | |
| consumer_thread.daemon = True | |
| consumer_thread.start() | |
| return consumer_thread |