Spaces:
Runtime error
Runtime error
| import os | |
| import logging | |
| from confluent_kafka.schema_registry import SchemaRegistryClient | |
| from confluent_kafka.schema_registry.error import SchemaRegistryError | |
| class SchemaClient: | |
| def __init__(self, schema_url, schema_subject_name): | |
| self.schema_url = schema_url | |
| self.schema_subject_name = schema_subject_name | |
| self.schema_registry_client = SchemaRegistryClient({"url": self.schema_url}) | |
| def get_schema_str(self): | |
| try: | |
| schema_version = self.schema_registry_client.get_latest_version( | |
| self.schema_subject_name | |
| ) | |
| schema_id = schema_version.schema_id | |
| schema = self.schema_registry_client.get_schema(schema_id) | |
| logging.info( | |
| f"Schema ID for {self.schema_subject_name}: {schema_id}" | |
| ) | |
| return schema.schema_str | |
| except SchemaRegistryError as e: | |
| logging.error(f"Error fetching schema: {e}") | |
| return None | |
| def set_compatibility(self, compatibility_level): | |
| try: | |
| self.schema_registry_client.set_compatibility( | |
| self.schema_subject_name, compatibility_level | |
| ) | |
| logging.info(f"Compatibility level set to {compatibility_level}") | |
| except SchemaRegistryError as e: | |
| logging.error(e) | |
| exit(1) |