admin commited on
Commit
a29d530
·
1 Parent(s): 293a93b
Files changed (10) hide show
  1. README.md +3 -3
  2. app.py +23 -88
  3. modules/activate.py +0 -193
  4. modules/qr.py +47 -0
  5. modules/restart.py +0 -80
  6. modules/smtp.py +58 -53
  7. modules/times.py +0 -21
  8. modules/trans.py +69 -0
  9. modules/url.py +58 -0
  10. utils.py +46 -29
README.md CHANGED
@@ -1,6 +1,6 @@
1
  ---
2
- title: Keep Demos Alive
3
- emoji: 🕓
4
  colorFrom: blue
5
  colorTo: yellow
6
  sdk: gradio
@@ -8,5 +8,5 @@ sdk_version: 5.22.0
8
  app_file: app.py
9
  pinned: false
10
  license: apache-2.0
11
- short_description: Keep all your spaces/studios alive
12
  ---
 
1
  ---
2
+ title: Online Tools II
3
+ emoji: 🛠️
4
  colorFrom: blue
5
  colorTo: yellow
6
  sdk: gradio
 
8
  app_file: app.py
9
  pinned: false
10
  license: apache-2.0
11
+ short_description: Online tool collection 2
12
  ---
app.py CHANGED
@@ -1,101 +1,36 @@
1
- import time
2
- import schedule
3
- import threading
4
  import gradio as gr
5
- from datetime import datetime
6
- from modules.smtp import send_email
7
- from modules.activate import trigger
8
- from modules.times import fix_datetime, time_diff
9
- from modules.restart import self_restart, test_hf_restart, test_ms_restart
10
- from utils import START_TIME, PERIOD, DELAY
11
 
 
 
 
 
 
 
 
12
 
13
- def run_schedule():
14
- global START_TIME
15
- START_TIME = datetime.now()
16
- trigger()
17
- while True:
18
- schedule.run_pending()
19
- time.sleep(DELAY)
20
 
21
-
22
- def monitor(period=PERIOD):
23
- print(f"Monitor is on and triggered every {period}h...")
24
- schedule.every(47).hours.do(self_restart)
25
- schedule.every(int(period)).hours.do(trigger)
26
- threading.Thread(target=run_schedule, daemon=True).start()
27
-
28
-
29
- # UI func
30
- def tasklst():
31
- status = "Success"
32
- logs = None
33
- try:
34
- logs = f"\nHas been running for {time_diff(START_TIME, datetime.now())}\n"
35
- jobs = schedule.get_jobs()
36
- for job in jobs:
37
- prev_run = ""
38
- last_run = fix_datetime(job.last_run)
39
- if last_run:
40
- prev_run = f"last run: {last_run}, "
41
-
42
- next_run = fix_datetime(job.next_run)
43
- logs += f"\nEvery {job.interval}h do {job.job_func.__name__} ({prev_run}next run: {next_run})\n"
44
-
45
- except Exception as e:
46
- status = f"{e}"
47
-
48
- return status, logs
49
 
50
 
51
  if __name__ == "__main__":
52
- monitor()
53
  with gr.Blocks() as demo:
54
- gr.Markdown("# Keep Spaces/Studios Alive")
55
- with gr.Row():
56
- with gr.Column():
57
- task_btn = gr.Button("See current task status")
58
- once_btn = gr.Button("Trigger once manually")
59
- smtp_btn = gr.Button("SMTP test")
60
- with gr.Tab("HF token test"):
61
- hf_txt = gr.Textbox(
62
- label="Restart a space with permissions",
63
- placeholder="username/repo",
64
- value="Genius-Society/online_tools",
65
- )
66
- with gr.Row():
67
- clhf_btn = gr.Button("Clear")
68
- hf_btn = gr.Button("Restart to test token validity")
69
 
70
- with gr.Tab("MS cookie test"):
71
- ms_txt = gr.Textbox(
72
- label="Restart a studio with permissions",
73
- placeholder="username/repo",
74
- value="Genius-Society/online_tools",
75
- )
76
- with gr.Row():
77
- clms_btn = gr.Button("Clear")
78
- ms_btn = gr.Button("Restart to test cookie validity")
79
 
80
- with gr.Column():
81
- status_bar = gr.Textbox(label="Status", show_copy_button=True)
82
- gr.Markdown("Logs")
83
- logs_bar = gr.Markdown(container=True, show_copy_button=True)
84
 
85
- task_btn.click(fn=tasklst, outputs=[status_bar, logs_bar])
86
- once_btn.click(fn=trigger, outputs=[status_bar, logs_bar])
87
- smtp_btn.click(fn=send_email, outputs=[status_bar, logs_bar])
88
- hf_btn.click(
89
- fn=test_hf_restart,
90
- inputs=[hf_txt],
91
- outputs=[status_bar, logs_bar],
92
- )
93
- clhf_btn.click(fn=lambda: None, outputs=hf_txt)
94
- ms_btn.click(
95
- fn=test_ms_restart,
96
- inputs=[ms_txt],
97
- outputs=[status_bar, logs_bar],
98
- )
99
- clms_btn.click(fn=lambda: None, outputs=ms_txt)
100
 
101
  demo.launch()
 
 
 
 
1
  import gradio as gr
2
+ from modules.qr import qrcode
3
+ from modules.smtp import smtp_tester
4
+ from modules.trans import translator
5
+ from modules.url import url_shortner
6
+ from utils import EN_US
 
7
 
8
+ ZH2EN = {
9
+ "# 在线工具合集2": "# Online Tools Collection II",
10
+ "二维码生成": "QR Code",
11
+ "SMTP 测试": "SMTP Test",
12
+ "翻译器": "Translator",
13
+ "短链接生成": "URL Shortner",
14
+ }
15
 
 
 
 
 
 
 
 
16
 
17
+ def _L(zh_txt: str):
18
+ return ZH2EN[zh_txt] if EN_US else zh_txt
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
 
21
  if __name__ == "__main__":
 
22
  with gr.Blocks() as demo:
23
+ gr.Markdown(_L("# 在线工具合集2"))
24
+ with gr.Tab(_L("二维码生成")):
25
+ qrcode()
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
+ with gr.Tab(_L("SMTP 测试")):
28
+ smtp_tester()
 
 
 
 
 
 
 
29
 
30
+ with gr.Tab(_L("翻译器")):
31
+ translator()
 
 
32
 
33
+ with gr.Tab(_L("短链接生成")):
34
+ url_shortner()
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
  demo.launch()
modules/activate.py DELETED
@@ -1,193 +0,0 @@
1
- import json
2
- import time
3
- import requests
4
- import threading
5
- from tqdm import tqdm
6
- from datetime import datetime
7
- from huggingface_hub import HfApi
8
- from modules.smtp import send_email
9
- from modules.times import fix_datetime
10
- from modules.restart import restart_repos
11
- from utils import HF_DOMAIN, HEADER, MS_HEADER, TIMEOUT, MS_DOMAIN, DELAY, USERS
12
-
13
-
14
- def get_space_status(repo_id: str):
15
- response: list = requests.get(
16
- url=f"{HF_DOMAIN}/api/spaces/semantic-search",
17
- params={"q": repo_id, "includeNonRunning": "true"},
18
- headers=HEADER,
19
- timeout=TIMEOUT,
20
- ).json()
21
-
22
- if response:
23
- for repo in response:
24
- if repo["id"] == repo_id:
25
- return repo["runtime"]["stage"]
26
-
27
- return "ERROR"
28
-
29
-
30
- def get_spaces(username: str):
31
- sleepings, errors = [], []
32
- try:
33
- spaces = HfApi().list_spaces(author=username)
34
- for space in spaces:
35
- status = get_space_status(space.id)
36
- if status == "SLEEPING":
37
- space_id = space.id.replace("/", "-").replace("_", "-").lower()
38
- if space.sdk == "gradio":
39
- sleepings.append(f"https://{space_id}.hf.space")
40
- else:
41
- sleepings.append(f"https://{space_id}.static.hf.space")
42
-
43
- elif "ERROR" in status:
44
- errors.append(f"{HF_DOMAIN}/spaces/{space.id}")
45
-
46
- return sleepings, errors
47
-
48
- except Exception as e:
49
- print(f"An error occurred in the request: {e}")
50
-
51
- return [], []
52
-
53
-
54
- def activate_space(url: str):
55
- try:
56
- response = requests.get(url, headers=HEADER, timeout=TIMEOUT)
57
- response.raise_for_status()
58
-
59
- except Exception as e:
60
- print(e)
61
-
62
-
63
- def check_ms_login():
64
- try:
65
- response = requests.get(
66
- "https://www.modelscope.cn/api/v1/users/login/info",
67
- headers=MS_HEADER,
68
- timeout=TIMEOUT,
69
- )
70
- response.raise_for_status()
71
-
72
- except Exception as e:
73
- send_email(f"ModelScope cookie 失效: {e}")
74
-
75
-
76
- def get_studios(username: str):
77
- try:
78
- response = requests.put(
79
- f"{MS_DOMAIN}/api/v1/studios/{username}/list",
80
- data=json.dumps(
81
- {
82
- "PageNumber": 1,
83
- "PageSize": 1000,
84
- "Name": "",
85
- "SortBy": "gmt_modified",
86
- "Order": "desc",
87
- }
88
- ),
89
- headers=MS_HEADER,
90
- timeout=TIMEOUT,
91
- )
92
- response.raise_for_status()
93
- spaces: list = response.json()["Data"]["Studios"]
94
- if spaces:
95
- studios, errors = [], []
96
- for space in spaces:
97
- status = space["Status"]
98
- if status == "Expired":
99
- studios.append(f"{username}/{space['Name']}")
100
- elif status == "Failed":
101
- errors.append(f"{MS_DOMAIN}/studios/{username}/{space['Name']}")
102
-
103
- return studios, errors
104
-
105
- except requests.exceptions.Timeout as e:
106
- print(f"Timeout: {e}, retrying...")
107
- time.sleep(DELAY)
108
- return get_studios(username)
109
-
110
- except Exception as e:
111
- print(f"Requesting error: {e}")
112
-
113
- return [], []
114
-
115
-
116
- def activate_studio(repo: str, holding_delay=5):
117
- repo_page = f"{MS_DOMAIN}/studios/{repo}"
118
- status_api = f"{MS_DOMAIN}/api/v1/studio/{repo}/status"
119
- start_expired_api = f"{MS_DOMAIN}/api/v1/studio/{repo}/start_expired"
120
- try:
121
- response = requests.put(start_expired_api, headers=MS_HEADER, timeout=TIMEOUT)
122
- response.raise_for_status()
123
- while (
124
- requests.get(status_api, headers=MS_HEADER, timeout=TIMEOUT).json()["Data"][
125
- "Status"
126
- ]
127
- != "Running"
128
- ):
129
- requests.get(repo_page, headers=MS_HEADER, timeout=TIMEOUT)
130
- time.sleep(holding_delay)
131
-
132
- except requests.exceptions.Timeout as e:
133
- print(f"Failed to activate {repo}: {e}, retrying...")
134
- activate_studio(repo)
135
-
136
- except Exception as e:
137
- print(e)
138
-
139
-
140
- # UI func
141
- def trigger(users=USERS):
142
- status = "Success"
143
- logs = None
144
- try:
145
- spaces, studios, failures = [], [], []
146
- usernames = users.split(";")
147
- for user in tqdm(usernames, desc="Collecting spaces"):
148
- username = user.strip()
149
- if username:
150
- sleeps, errors = get_spaces(username)
151
- spaces += sleeps
152
- failures += errors
153
- time.sleep(DELAY)
154
-
155
- for space in tqdm(spaces, desc="Activating spaces"):
156
- activate_space(space)
157
- time.sleep(DELAY)
158
-
159
- check_ms_login()
160
- for user in tqdm(usernames, desc="Collecting studios"):
161
- username = user.strip()
162
- if username:
163
- sleeps, errors = get_studios(username)
164
- studios += sleeps
165
- failures += errors
166
- time.sleep(DELAY)
167
-
168
- for studio in tqdm(studios, desc="Activating studios"):
169
- threading.Thread(
170
- target=activate_studio, args=(studio,), daemon=True
171
- ).start()
172
- time.sleep(DELAY)
173
-
174
- logs = (
175
- "\n".join(spaces + studios)
176
- + f"\n[{fix_datetime(datetime.now())}] Activation complete!\n"
177
- )
178
- print(logs)
179
- content = ""
180
- for failure in failures:
181
- errepo: str = failure
182
- errepo = errepo.replace(HF_DOMAIN, "").replace(MS_DOMAIN, "")
183
- content += f"<br><a href='{failure}'>{errepo[1:]}</a><br>"
184
-
185
- if content:
186
- send_email(content)
187
-
188
- restart_repos()
189
-
190
- except Exception as e:
191
- status = f"{e}"
192
-
193
- return status, logs
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/qr.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ from utils import download_file, EN_US, API_QR, TMP_DIR
3
+
4
+ ZH2EN = {
5
+ "二维码输出尺寸": "Image size",
6
+ "输入文本": "Input text",
7
+ "输出二维码": "QR code",
8
+ "输入文字在线生成二维码": "Enter text to generate a QR code.",
9
+ "状态栏": "Status",
10
+ }
11
+
12
+
13
+ def _L(zh_txt: str):
14
+ return ZH2EN[zh_txt] if EN_US else zh_txt
15
+
16
+
17
+ def infer(img_size: int, input_txt: str):
18
+ status = "Success"
19
+ img = None
20
+ try:
21
+ if (not input_txt) or input_txt == "0":
22
+ raise ValueError("Please input valid text!")
23
+
24
+ img = download_file(
25
+ f"{API_QR}/?size={img_size}x{img_size}&data={input_txt}",
26
+ f"{TMP_DIR}/qrcode.jpg",
27
+ )
28
+
29
+ except Exception as e:
30
+ status = f"{e}"
31
+
32
+ return status, img
33
+
34
+
35
+ def qrcode():
36
+ return gr.Interface(
37
+ fn=infer,
38
+ inputs=[
39
+ gr.Slider(35, 1000, 217, label=_L("二维码输出尺寸")),
40
+ gr.Textbox(label=_L("输入文本"), placeholder=_L("输入文字在线生成二维码")),
41
+ ],
42
+ outputs=[
43
+ gr.Textbox(label=_L("状态栏"), show_copy_button=True),
44
+ gr.Image(label=_L("输出二维码"), show_share_button=False),
45
+ ],
46
+ flagging_mode="never",
47
+ )
modules/restart.py DELETED
@@ -1,80 +0,0 @@
1
- import requests
2
- from huggingface_hub import HfApi
3
- from modules.smtp import send_email
4
- from utils import HF_DOMAIN, MS_DOMAIN, HF_TK, MS_HEADER, TIMEOUT, REPOS
5
-
6
- HF_API = HfApi(token=HF_TK)
7
-
8
-
9
- def restart_private_studio(repo: str):
10
- response = requests.put(
11
- f"{MS_DOMAIN}/api/v1/studio/{repo}/reset_restart",
12
- headers=MS_HEADER,
13
- timeout=TIMEOUT,
14
- )
15
- response.raise_for_status()
16
- return f"\n{response.text}\n"
17
-
18
-
19
- def restart_repos():
20
- if REPOS:
21
- repos = REPOS.split(";")
22
- for repo in repos:
23
- private_repo = repo.strip()
24
- print(f"Restarting {private_repo} space & studio...")
25
- restart_private_studio(private_repo)
26
- HF_API.restart_space(private_repo)
27
-
28
-
29
- def self_restart(me="kakamond/keeps_alive"):
30
- try:
31
- spaces = HF_API.list_spaces(author=me.split("/")[0])
32
- for space in spaces:
33
- if space.id != me and not space.disabled:
34
- HF_API.restart_space(space.id)
35
-
36
- HF_API.restart_space(me)
37
-
38
- except Exception as e:
39
- send_email(f"Failed to self-restart: {e}")
40
-
41
-
42
- # UI func
43
- def test_hf_restart(repo: str):
44
- status = "Success"
45
- logs = None
46
- try:
47
- if not repo:
48
- raise ValueError("Empty repo!")
49
-
50
- if f"{HF_DOMAIN}/spaces/" in repo:
51
- repo = repo.split("/spaces/")[1].split("/")[:2]
52
- repo = "/".join(repo)
53
-
54
- logs = f"\n{HF_API.restart_space(repo)}\n"
55
- logs += f"\nSuccessfully restart {HF_DOMAIN}/spaces/{repo}\n"
56
-
57
- except Exception as e:
58
- status = f"{e}"
59
-
60
- return status, logs
61
-
62
-
63
- def test_ms_restart(repo: str):
64
- status = "Success"
65
- logs = None
66
- try:
67
- if not repo:
68
- raise ValueError("Empty repo!")
69
-
70
- if f"{MS_DOMAIN}/studios/" in repo:
71
- repo = repo.split("/studios/")[1].split("/")[:2]
72
- repo = "/".join(repo)
73
-
74
- logs = restart_private_studio(repo)
75
- logs += f"\nSuccessfully restart {MS_DOMAIN}/studios/{repo}\n"
76
-
77
- except Exception as e:
78
- status = f"{e}"
79
-
80
- return status, logs
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/smtp.py CHANGED
@@ -1,61 +1,66 @@
1
  import requests
2
- from utils import EMAIL, SMTP, TAG, HOST, PORT
3
-
4
-
5
- def email_api(
6
- content: str,
7
- title="Runtime error detected",
8
- header="Please fix following repo(s):",
9
- email=EMAIL,
10
- password=SMTP,
11
- target=TAG,
12
- host=HOST,
13
- port=PORT,
14
- ):
15
- body = f"""
16
- <html>
17
- <body>
18
- <h2>{header}</h2>
19
- {content}
20
- </body>
21
- </html>
22
- """
23
- response = requests.get(
24
- "http://api.mmp.cc/api/mail",
25
- params={
26
- "host": host,
27
- "Port": port,
28
- "key": password, # apikey
29
- "email": email, # from
30
- "mail": target, # to
31
- "title": title, # subject
32
- "name": "Keep Spaces Alive", # nickname
33
- "text": body, # content
34
- },
35
- )
36
- if response.status_code == 200:
37
- result: dict = response.json()
38
- if result.get("status") == "success":
39
- return "Email sent successfully!"
40
- else:
41
- raise ConnectionError(f"Failed to send email: {result.get('message')}")
42
 
43
- else:
44
- raise ConnectionError(
45
- f"Request failed with status code: {response.status_code}"
46
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
 
49
- # UI func
50
- def send_email(content="Test SMTP"):
51
- status = "Success"
52
- logs = None
53
  try:
54
- status = email_api(content)
55
- logs = f"\nEmail content: {content}\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
  except Exception as e:
58
- status = f"{e}"
59
- print(status)
60
 
61
- return status, logs
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import requests
2
+ import gradio as gr
3
+ from utils import API_SMTP, EN_US
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
+ ZH2EN = {
6
+ "收信人邮箱": "To email",
7
+ "标题": "Title",
8
+ "测试标题": "Test title",
9
+ "正文": "Content",
10
+ "SMTP 在线测试工具": "SMTP online tester",
11
+ "发信人昵称": "Sender name",
12
+ "测试昵称": "Test nickname",
13
+ "发信人邮箱": "From email",
14
+ "应用密钥": "API key",
15
+ "SMTP 服务器": "SMTP host",
16
+ "端口": "Port",
17
+ "发送状态": "Status",
18
+ }
19
+
20
+
21
+ def _L(zh_txt: str):
22
+ return ZH2EN[zh_txt] if EN_US else zh_txt
23
 
24
 
25
+ def infer(target, title, content, name, email, password, host, port):
 
 
 
26
  try:
27
+ response = requests.get(
28
+ API_SMTP,
29
+ params={
30
+ "host": host,
31
+ "Port": port,
32
+ "key": password, # apikey
33
+ "email": email, # from
34
+ "mail": target, # to
35
+ "title": title, # subject
36
+ "name": name, # nickname
37
+ "text": content, # content
38
+ },
39
+ )
40
+ if response.status_code == 200:
41
+ result: dict = response.json()
42
+ return result.get("status")
43
+
44
+ else:
45
+ raise ConnectionError(f"{response.status_code}")
46
 
47
  except Exception as e:
48
+ return f"{e}"
 
49
 
50
+
51
+ def smtp_tester():
52
+ return gr.Interface(
53
+ fn=infer,
54
+ inputs=[
55
+ gr.Textbox(label=_L("收信人邮箱"), placeholder="Recipient"),
56
+ gr.Textbox(label=_L("标题"), value=_L("测试标题")),
57
+ gr.TextArea(label=_L("正文"), value=_L("SMTP 在线测试工具")),
58
+ gr.Textbox(label=_L("发信人昵称"), value=_L("测试昵称")),
59
+ gr.Textbox(label=_L("发信人邮箱"), placeholder="Sender"),
60
+ gr.Textbox(label=_L("应用密钥"), placeholder="SMTP password"),
61
+ gr.Textbox(label=_L("SMTP 服务器"), value="smtp.163.com"),
62
+ gr.Slider(label=_L("端口"), minimum=0, maximum=65535, step=1, value=25),
63
+ ],
64
+ outputs=gr.TextArea(label=_L("发送状态"), show_copy_button=True),
65
+ flagging_mode="never",
66
+ )
modules/times.py DELETED
@@ -1,21 +0,0 @@
1
- from datetime import datetime
2
- from zoneinfo import ZoneInfo
3
- from tzlocal import get_localzone
4
-
5
-
6
- def fix_datetime(naive_time: datetime, target_tz=ZoneInfo("Asia/Shanghai")):
7
- if not naive_time:
8
- return None
9
-
10
- local_tz = get_localzone()
11
- aware_local = naive_time.replace(tzinfo=local_tz)
12
- return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S")
13
-
14
-
15
- def time_diff(time1: datetime, time2: datetime):
16
- t_diff = time2 - time1
17
- days = t_diff.days
18
- hours = t_diff.seconds // 3600
19
- minutes = (t_diff.seconds % 3600) // 60
20
- seconds = t_diff.seconds % 60
21
- return f"{days} d {hours} h {minutes} m {seconds} s"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/trans.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import requests
3
+ import gradio as gr
4
+ from utils import API_TRANS, KEY_TRANS, EN_US
5
+
6
+ ZH2EN = {
7
+ "输入文本区域": "Input text area",
8
+ "在这里输入文本...": "Type the text here...",
9
+ "模式": "Mode",
10
+ "翻译结果": "Translation results",
11
+ "状态栏": "Status",
12
+ }
13
+
14
+
15
+ def _L(zh_txt: str):
16
+ return ZH2EN[zh_txt] if EN_US else zh_txt
17
+
18
+
19
+ def infer(source, direction):
20
+ status = "Success"
21
+ result = None
22
+ try:
23
+ if not source or not direction:
24
+ raise ValueError("请输入有效文本!")
25
+
26
+ response = requests.request(
27
+ "POST",
28
+ API_TRANS,
29
+ data=json.dumps(
30
+ {
31
+ "source": source,
32
+ "trans_type": direction,
33
+ "request_id": "demo",
34
+ "detect": True,
35
+ }
36
+ ),
37
+ headers={
38
+ "content-type": "application/json",
39
+ "x-authorization": f"token {KEY_TRANS}",
40
+ },
41
+ )
42
+
43
+ result = json.loads(response.text)["target"]
44
+
45
+ except Exception as e:
46
+ status = f"{e}"
47
+
48
+ return status, result
49
+
50
+
51
+ def translator():
52
+ return gr.Interface(
53
+ fn=infer,
54
+ inputs=[
55
+ gr.TextArea(label=_L("输入文本区域"), placeholder=_L("在这里输入文本...")),
56
+ gr.Textbox(label=_L("模式"), value="auto2en"),
57
+ ],
58
+ outputs=[
59
+ gr.Textbox(label=_L("状态栏"), show_copy_button=True),
60
+ gr.TextArea(label=_L("翻译结果"), show_copy_button=True),
61
+ ],
62
+ flagging_mode="never",
63
+ examples=[
64
+ ["这是最好的翻译服务。", "auto2ja"],
65
+ ["これは最高の翻訳サービスです。", "auto2en"],
66
+ ["This is the best translation service.", "auto2zh"],
67
+ ],
68
+ cache_examples=False,
69
+ )
modules/url.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import requests
3
+ import gradio as gr
4
+ from utils import is_valid_url, HEADER, EN_US, API_URL
5
+
6
+ ZH2EN = {
7
+ "输入长链接": "Input a long URL",
8
+ "输出短链接": "Output short URL",
9
+ "预览短链接": "Preview short URL",
10
+ "将长链接转换为短的、易于共享的链接": "Convert long urls into short, easy-to-share links",
11
+ "状态栏": "Status",
12
+ }
13
+
14
+
15
+ def _L(zh_txt: str):
16
+ return ZH2EN[zh_txt] if EN_US else zh_txt
17
+
18
+
19
+ # outer func
20
+ def infer(longUrl: str):
21
+ status = "Success"
22
+ shortUrl = preview = None
23
+ try:
24
+ response = requests.post(API_URL, json={"url": longUrl}, headers=HEADER)
25
+ if response.status_code == 200:
26
+ shortUrl = json.loads(response.text)["shortUrl"]
27
+ else:
28
+ raise ConnectionError(response.text)
29
+
30
+ if is_valid_url(shortUrl):
31
+ preview = f'<a href="{shortUrl}" target="_blank">{shortUrl}</a>'
32
+
33
+ except Exception as e:
34
+ status = f"{e}"
35
+
36
+ return status, shortUrl, preview
37
+
38
+
39
+ def url_shortner():
40
+ return gr.Interface(
41
+ fn=infer,
42
+ inputs=gr.Textbox(
43
+ label=_L("输入长链接"),
44
+ placeholder=_L("将长链接转换为短的、易于共享的链接"),
45
+ ),
46
+ outputs=[
47
+ gr.Textbox(label=_L("状态栏"), show_copy_button=True),
48
+ gr.Textbox(label=_L("输出短链接"), show_copy_button=True),
49
+ gr.HTML(
50
+ container=True,
51
+ show_label=True,
52
+ label=_L("预览短链接"),
53
+ ),
54
+ ],
55
+ flagging_mode="never",
56
+ examples=["https://www.bing.com", "https://www.baidu.com"],
57
+ cache_examples=False,
58
+ )
utils.py CHANGED
@@ -1,33 +1,50 @@
1
  import os
 
 
 
2
 
3
- PERIOD = os.getenv("period")
4
- USERS = os.getenv("users")
5
- REPOS = os.getenv("repos")
6
- EMAIL = os.getenv("email")
7
- SMTP = os.getenv("smtp")
8
- HF_TK = os.getenv("hftk")
9
- MS_CK = os.getenv("msck")
10
- TAG = os.getenv("target")
11
- if not TAG:
12
- TAG = EMAIL
13
-
14
- HOST = os.getenv("host")
15
- PORT = os.getenv("port")
16
- if not (HOST and PORT):
17
- HOST = "smtp.qq.com"
18
- PORT = 587
19
- else:
20
- PORT = int(PORT)
21
-
22
- DELAY = 1
23
- TIMEOUT = None
24
- START_TIME = None
25
- HF_DOMAIN = "https://huggingface.co"
26
- MS_DOMAIN = "https://www.modelscope.cn"
27
  HEADER = {
28
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537"
29
- }
30
- MS_HEADER = {
31
- "User-Agent": HEADER["User-Agent"],
32
- "Cookie": MS_CK,
33
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
+ import re
3
+ import shutil
4
+ import requests
5
 
6
+ EN_US = os.getenv("LANG") != "zh_CN.UTF-8"
7
+ API_QR = os.getenv("api_qr")
8
+ API_URL = os.getenv("api_url")
9
+ API_SMTP = os.getenv("api_smtp")
10
+ API_TRANS = os.getenv("api_caiyun")
11
+ KEY_TRANS = os.getenv("apikey_caiyun")
12
+ if not (API_SMTP and API_TRANS and KEY_TRANS):
13
+ print("请检查环境变量")
14
+ exit()
15
+
16
+
17
+ TMP_DIR = "./__pycache__"
 
 
 
 
 
 
 
 
 
 
 
 
18
  HEADER = {
19
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0",
 
 
 
 
20
  }
21
+
22
+
23
+ def clean_dir(dir_path: str):
24
+ if os.path.exists(dir_path):
25
+ shutil.rmtree(dir_path)
26
+
27
+ os.makedirs(dir_path)
28
+
29
+
30
+ def is_valid_url(url):
31
+ # 定义 URL 的正则表达式
32
+ pattern = re.compile(
33
+ r"^(https?://)?" # 协议(http 或 https,可选)
34
+ r"([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}" # 域名
35
+ r"(:\d+)?" # 端口号(可选)
36
+ r"(/[^ ]*)?$" # 路径(可选)
37
+ )
38
+ # 使用正则表达式匹配 URL
39
+ return bool(pattern.match(url))
40
+
41
+
42
+ def download_file(url, local_filename):
43
+ clean_dir(os.path.dirname(local_filename))
44
+ response = requests.get(url, stream=True)
45
+ response.raise_for_status()
46
+ with open(local_filename, "wb") as f:
47
+ for chunk in response.iter_content(chunk_size=8192):
48
+ f.write(chunk)
49
+
50
+ return local_filename