Spaces:
Running
Running
| # %%writefile app.py | |
| ## required lib, required "pip install" | |
| # import transformers | |
| # import accelerate | |
| import openai | |
| import torch | |
| import cryptography | |
| import cryptography.fernet | |
| ## interface libs, required "pip install" | |
| import gradio | |
| import huggingface_hub | |
| import huggingface_hub.hf_api | |
| ## standard libs, no need to install | |
| import json | |
| import requests | |
| import time | |
| import os | |
| import random | |
| import re | |
| import sys | |
| import psutil | |
| import threading | |
| import socket | |
| # import PIL | |
| # import pandas | |
| import matplotlib | |
| class HFace_Pluto(object): | |
| # | |
| # initialize the object | |
| def __init__(self, name="Pluto",*args, **kwargs): | |
| super(HFace_Pluto, self).__init__(*args, **kwargs) | |
| self.author = "Duc Haba" | |
| self.name = name | |
| self._ph() | |
| self._pp("Hello from class", str(self.__class__) + " Class: " + str(self.__class__.__name__)) | |
| self._pp("Code name", self.name) | |
| self._pp("Author is", self.author) | |
| self._ph() | |
| # | |
| # define class var for stable division | |
| self._device = 'cuda' | |
| self._steps = [3,8,21,55,89,144] | |
| self._guidances = [1.1,3.0,5.0,8.0,13.0,21.0] | |
| self._xkeyfile = '.xoxo' | |
| self._models = [] | |
| self._seed = 667 # sum of walnut in ascii (or Angle 667) | |
| self._width = 512 | |
| self._height = 512 | |
| self._step = 50 | |
| self._guidances = 7.5 | |
| #self._generator = torch.Generator(device='cuda') | |
| self.pipes = [] | |
| self.prompts = [] | |
| self.images = [] | |
| self.seeds = [] | |
| self.fname_id = 0 | |
| self.dname_img = "img_colab/" | |
| self._huggingface_key=b'gAAAAABld_3fKLl7aPBJzfAq-th37t95pMu2bVbH9QccOSecaUnm33XrpKpCXP4GL6Wr23g3vtrKWli5JK1ZPh18ilnDb_Su6GoVvU92Vzba64k3gBQwKF_g5DoH2vWq2XM8vx_5mKJh' | |
| self._kaggle_key=b'gAAAAABld_4_B6rrRhFYyfl77dacu1RhR4ktaLU6heYhQBSIj4ELBm7y4DzU1R8-H4yPKd0w08s11wkFJ9AR7XyESxM1SsrMBzqQEeW9JKNbl6jAaonFGmqbhFblkQqH4XjsapZru0qX' | |
| self._fkey="fes_f8Im569hYnI1Tn6FqP-6hS4rdmNOJ6DWcRPOsvc=" | |
| self._color_primary = '#2780e3' #blue | |
| self._color_secondary = '#373a3c' #dark gray | |
| self._color_success = '#3fb618' #green | |
| self._color_info = '#9954bb' #purple | |
| self._color_warning = '#ff7518' #orange | |
| self._color_danger = '#ff0039' #red | |
| self._color_mid_gray = '#495057' | |
| self._ok=b'gAAAAABld_-y70otUll4Jwq3jEBXiw1tooSFo_gStRbkCyuu9_Dmdehc4M8lI_hFbum9CwyZuj9ZnXgxFIROebcPSF5qoA197VRvzUDQOMxY5zmHnImVROrsXVdZqXyIeYH_Q6cvXvFTX3rLBIKKWgvJmnpYGRaV6Q==' | |
| return | |
| # | |
| # pretty print output name-value line | |
| def _pp(self, a, b,is_print=True): | |
| # print("%34s : %s" % (str(a), str(b))) | |
| x = f'{"%34s" % str(a)} : {str(b)}' | |
| y = None | |
| if (is_print): | |
| print(x) | |
| else: | |
| y = x | |
| return y | |
| # | |
| # pretty print the header or footer lines | |
| def _ph(self,is_print=True): | |
| x = f'{"-"*34} : {"-"*34}' | |
| y = None | |
| if (is_print): | |
| print(x) | |
| else: | |
| y = x | |
| return y | |
| # | |
| # fetch huggingface file | |
| def fetch_hface_files(self, | |
| hf_names, | |
| hf_space="duchaba/monty", | |
| local_dir="/content/"): | |
| f = str(hf_names) + " is not iteratable, type: " + str(type(hf_names)) | |
| try: | |
| for f in hf_names: | |
| lo = local_dir + f | |
| huggingface_hub.hf_hub_download(repo_id=hf_space, filename=f, | |
| use_auth_token=True,repo_type=huggingface_hub.REPO_TYPE_SPACE, | |
| force_filename=lo) | |
| except: | |
| self._pp("*Error", f) | |
| return | |
| # | |
| # | |
| def push_hface_files(self, | |
| hf_names, | |
| hf_space="duchaba/skin_cancer_diagnose", | |
| local_dir="/content/"): | |
| f = str(hf_names) + " is not iteratable, type: " + str(type(hf_names)) | |
| try: | |
| for f in hf_names: | |
| lo = local_dir + f | |
| huggingface_hub.upload_file( | |
| path_or_fileobj=lo, | |
| path_in_repo=f, | |
| repo_id=hf_space, | |
| repo_type=huggingface_hub.REPO_TYPE_SPACE) | |
| except Exception as e: | |
| self._pp("*Error", e) | |
| return | |
| # | |
| # Define a function to display available CPU and RAM | |
| def fetch_system_info(self): | |
| s='' | |
| # Get CPU usage as a percentage | |
| cpu_usage = psutil.cpu_percent() | |
| # Get available memory in bytes | |
| mem = psutil.virtual_memory() | |
| # Convert bytes to gigabytes | |
| mem_total_gb = mem.total / (1024 ** 3) | |
| mem_available_gb = mem.available / (1024 ** 3) | |
| mem_used_gb = mem.used / (1024 ** 3) | |
| # Print the results | |
| s += f"CPU usage: {cpu_usage}%\n" | |
| s += f"Total memory: {mem_total_gb:.2f} GB\n" | |
| s += f"Available memory: {mem_available_gb:.2f} GB\n" | |
| # print(f"Used memory: {mem_used_gb:.2f} GB") | |
| s += f"Memory usage: {mem_used_gb/mem_total_gb:.2f}%\n" | |
| return s | |
| # | |
| def restart_script_periodically(self): | |
| while True: | |
| #random_time = random.randint(540, 600) | |
| random_time = random.randint(15800, 21600) | |
| time.sleep(random_time) | |
| os.execl(sys.executable, sys.executable, *sys.argv) | |
| return | |
| # | |
| def write_file(self,fname, txt): | |
| f = open(fname, "w") | |
| f.writelines("\n".join(txt)) | |
| f.close() | |
| return | |
| # | |
| def fetch_gpu_info(self): | |
| s='' | |
| try: | |
| s += f'Your GPU is the {torch.cuda.get_device_name(0)}\n' | |
| s += f'GPU ready staus {torch.cuda.is_available()}\n' | |
| s += f'GPU allocated RAM: {round(torch.cuda.memory_allocated(0)/1024**3,1)} GB\n' | |
| s += f'GPU reserved RAM {round(torch.cuda.memory_reserved(0)/1024**3,1)} GB\n' | |
| except Exception as e: | |
| s += f'**Warning, No GPU: {e}' | |
| return s | |
| # | |
| def _fetch_crypt(self,is_generate=False): | |
| s=self._fkey | |
| if (is_generate): | |
| s=open(self._xkeyfile, "rb").read() | |
| return s | |
| # | |
| def _gen_key(self): | |
| key = cryptography.fernet.Fernet.generate_key() | |
| with open(self._xkeyfile, "wb") as key_file: | |
| key_file.write(key) | |
| return | |
| # | |
| def _decrypt_it(self, x): | |
| y = self._fetch_crypt() | |
| f = cryptography.fernet.Fernet(y) | |
| m = f.decrypt(x) | |
| return m.decode() | |
| # | |
| def _encrypt_it(self, x): | |
| key = self._fetch_crypt() | |
| p = x.encode() | |
| f = cryptography.fernet.Fernet(key) | |
| y = f.encrypt(p) | |
| return y | |
| # | |
| def _login_hface(self): | |
| huggingface_hub.login(self._decrypt_it(self._huggingface_key), | |
| add_to_git_credential=True) # non-blocking login | |
| self._ph() | |
| return | |
| # | |
| def _fetch_version(self): | |
| s = '' | |
| # print(f"{'torch: 2.0.1':<25} Actual: {torch.__version__}") | |
| # print(f"{'transformers: 4.29.2':<25} Actual: {transformers.__version__}") | |
| s += f"{'openai: 0.27.7,':<28} Actual: {openai.__version__}\n" | |
| s += f"{'huggingface_hub: 0.14.1,':<28} Actual: {huggingface_hub.__version__}\n" | |
| s += f"{'gradio: 3.32.0,':<28} Actual: {gradio.__version__}\n" | |
| s += f"{'cryptography: 40.0.2,':<28} cryptography: {gradio.__version__}\n" | |
| return s | |
| # | |
| def _fetch_host_ip(self): | |
| s='' | |
| hostname = socket.gethostname() | |
| ip_address = socket.gethostbyname(hostname) | |
| s += f"Hostname: {hostname}\n" | |
| s += f"IP Address: {ip_address}\n" | |
| return s | |
| # | |
| def fetch_code_cells_from_notebook(self, notebook_name, filter_magic="# %%write", | |
| write_to_file=True, fname_override=None): | |
| """ | |
| Reads a Jupyter notebook (.ipynb file) and writes out all the code cells | |
| that start with the specified magic command to a .py file. | |
| Parameters: | |
| - notebook_name (str): Name of the notebook file (with .ipynb extension). | |
| - filter_magic (str): Magic command filter. Only cells starting with this command will be written. | |
| The defualt is: "# %%write" | |
| - write_to_file (bool): If True, writes the filtered cells to a .py file. | |
| Otherwise, prints them to the standard output. The default is True. | |
| - fname_override (str): If provided, overrides the output filename. The default is None. | |
| Returns: | |
| - None: Writes the filtered code cells to a .py file or prints them based on the parameters. | |
| """ | |
| with open(notebook_name, 'r', encoding='utf-8') as f: | |
| notebook_content = json.load(f) | |
| output_content = [] | |
| # Loop through all the cells in the notebook | |
| for cell in notebook_content['cells']: | |
| # Check if the cell type is 'code' and starts with the specified magic command | |
| if cell['cell_type'] == 'code' and cell['source'] and cell['source'][0].startswith(filter_magic): | |
| # Append the source code of the cell to output_content | |
| output_content.append(''.join(cell['source'])) | |
| if write_to_file: | |
| if fname_override is None: | |
| # Derive the output filename by replacing .ipynb with .py | |
| output_filename = notebook_name.replace(".ipynb", ".py") | |
| else: | |
| output_filename = fname_override | |
| with open(output_filename, 'w', encoding='utf-8') as f: | |
| f.write('\n'.join(output_content)) | |
| print(f'File: {output_filename} written to disk.') | |
| else: | |
| # Print the code cells to the standard output | |
| print('\n'.join(output_content)) | |
| print('-' * 40) # print separator | |
| return | |
| # | |
| # add module/method | |
| # | |
| import functools | |
| def add_method(cls): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| return func(*args, **kwargs) | |
| setattr(cls, func.__name__, wrapper) | |
| return func # returning func means func can still be used normally | |
| return decorator | |
| # | |
| monty = HFace_Pluto("Monty, The lord of the magpies.") | |
| monty._login_hface() | |
| print(monty._fetch_version()) | |
| monty._ph() | |
| print(monty.fetch_system_info()) | |
| monty._ph() | |
| print(monty.fetch_gpu_info()) | |
| monty._ph() | |
| print(monty._fetch_host_ip()) | |
| monty._ph() | |
| # %%write -a app.py | |
| # client.moderations.create() | |
| # ai_client = openai.OpenAI(api_key=monty._decrypt_it(monty._ok)) | |
| # %%writefile -a app.py | |
| #@add_method(HFace_Pluto) | |
| # # for OpenAI less version 0.27.7 | |
| # def _censor_me(self, p, safer=0.0005): | |
| # #openai.Moderation.create() | |
| # omod = openai.Moderation.create(p) | |
| # r = omod.results[0].category_scores | |
| # jmod = json.loads(str(r)) | |
| # # | |
| # max_key = max(jmod, key=jmod.get) | |
| # max_value = jmod[max_key] | |
| # sum_value = sum(jmod.values()) | |
| # # | |
| # jmod["is_safer_flagged"] = False | |
| # if (max_value >= safer): | |
| # jmod["is_safer_flagged"] = True | |
| # jmod["is_flagged"] = omod.results[0].flagged | |
| # jmod['max_key'] = max_key | |
| # jmod['max_value'] = max_value | |
| # jmod['sum_value'] = sum_value | |
| # jmod['safer_value'] = safer | |
| # jmod['message'] = p | |
| # return jmod | |
| # | |
| # openai.api_key = monty._decrypt_it(monty._gpt_key) | |
| # | |
| # # for openai version 1.3.8 | |
| # for OpenAI less version 0.27.7 | |
| def _fetch_moderate_engine(self): | |
| self.ai_client = openai.OpenAI(api_key=self._decrypt_it(self._ok)) | |
| self.text_model = "text-moderation-latest" | |
| return | |
| # | |
| # for OpenAI less version 0.27.7 | |
| def _censor_me(self, p, safer=0.0005): | |
| self._fetch_moderate_engine() | |
| resp_orig = self.ai_client.moderations.create(input=p, model=self.text_model) | |
| resp_dict = resp_orig.model_dump() | |
| # | |
| v1 = resp_dict["results"][0]["category_scores"] | |
| max_key = max(v1, key=v1.get) | |
| max_value = v1[max_key] | |
| sum_value = sum(v1.values()) | |
| # | |
| v1["is_safer_flagged"] = False | |
| if (max_value >= safer): | |
| v1["is_safer_flagged"] = True | |
| v1["is_flagged"] = resp_dict["results"][0]["flagged"] | |
| v1['max_key'] = max_key | |
| v1['max_value'] = max_value | |
| v1['sum_value'] = sum_value | |
| v1['safer_value'] = safer | |
| v1['message'] = p | |
| return v1 | |
| # | |
| def _draw_censor(self,data): | |
| self._color_mid_gray = '#6c757d' | |
| exp = (0.01, 0.01) | |
| x = [data['max_value'], (data['sum_value']-data['max_value'])] | |
| title='\nMessage Is Flagged As Unsafe\n' | |
| lab = [data['max_key'], 'Other 18 categories'] | |
| if (data['is_flagged']): | |
| col=[self._color_danger, self._color_mid_gray] | |
| elif (data['is_safer_flagged']): | |
| col=[self._color_warning, self._color_mid_gray] | |
| lab = ['Relative Score:\n'+data['max_key'], 'Other 18 categories'] | |
| title='\nBased On Your Personalized Safer Settings,\nThe Message Is Flagged As Unsafe\n' | |
| else: | |
| col=[self._color_success, self._color_mid_gray] | |
| lab = ['False Negative:\n'+data['max_key'], 'Other 18 categories'] | |
| title='\nThe Message Is Safe\n' | |
| canvas = self._draw_donut(x, lab, col, exp,title) | |
| return canvas | |
| # | |
| def _draw_donut(self,data,labels,col, exp,title): | |
| # col = [self._color_danger, self._color_secondary] | |
| # exp = (0.01, 0.01) | |
| # Create a pie chart | |
| canvas, pic = matplotlib.pyplot.subplots() | |
| pic.pie(data, explode=exp, | |
| labels=labels, | |
| colors=col, | |
| autopct='%1.1f%%', | |
| startangle=90, | |
| textprops={'color':'#0a0a0a'}) | |
| # Draw a circle at the center of pie to make it look like a donut | |
| # centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc='white') | |
| centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc=col[0],linewidth=2, ec='white') | |
| canvas = matplotlib.pyplot.gcf() | |
| canvas.gca().add_artist(centre_circle) | |
| # Equal aspect ratio ensures that pie is drawn as a circle. | |
| pic.axis('equal') | |
| pic.set_title(title) | |
| canvas.tight_layout() | |
| # canvas.show() | |
| return canvas | |
| # | |
| def censor_me(self, msg, safer=0.0005): | |
| yjson = self._censor_me(msg,safer) | |
| _canvas = self._draw_censor(yjson) | |
| _yjson = json.dumps(yjson, indent=4) | |
| return (_canvas, _yjson) | |
| # %%writefile -a app.py | |
| in_box = [gradio.Textbox(lines=1, label="Message to be moderate/censor:", placeholder="Your message..."), | |
| gradio.Slider(0.001, .05, value=0.005, step=.005,label="Your Personalize Safer Value:")] | |
| out_box = [gradio.Plot(label="Moderate/Censor Score: (Red, Orange, and Green)"), | |
| gradio.Textbox(lines=4, label="Response Raw JSON Data:")] | |
| # | |
| title = "Friendly Text Moderate <br>- Identify 19 categories of hate, violent, and sexual language." | |
| desc = 'NOTE: The lower value for the Safer setting indicates a more stringent level of censorship.' | |
| # desc += "<br>There are 19 different categories of hate, violent, and sexual language that can be identified." | |
| arti = "<ul><li>This model use the NLP OpenAI Moderation model." | |
| arti += "</li><li>Creator: Duc Haba</li><li>License: GNU General Public License 3.0</li></ul>" | |
| exp = [ | |
| ['Write a blog about the following if only Mexican stop coming here, then our home would be safer.',0.001], | |
| ['Gay homosexual people should stay in the closet, or we will hang them.',0.001], | |
| ['I will jump off the bridge at midnight.',0.0005], | |
| ['Hey, baby girl. How about you and I go and rock the bed.',0.001], | |
| ['F**-#k you. Eat s-it and dik.',0.001], | |
| ['Please tell me a joke about a chicken.',0.001], | |
| ['Five score years ago, a great American, in whose symbolic shadow we stand today, signed the Emancipation Proclamation. This momentous decree came as a great beacon light of hope to millions of Negro slaves who had been seared in the flames of withering injustice. It came as a joyous daybreak to end the long night of their captivity.',0.005], | |
| ] | |
| # %%writefile -a app.py | |
| ginterface = gradio.Interface(fn=monty.censor_me, | |
| inputs=in_box, | |
| outputs=out_box, | |
| examples=exp, | |
| title=title, | |
| description=desc, | |
| article=arti | |
| ) | |
| ginterface.launch() |