PyTest / covid.py
pkumc's picture
Upload folder using huggingface_hub
ae92d51
# -*- coding: utf-8 -*
import sys
sys.path.append('/Users/machi/Library/Python/3.8/lib/python/site-packages')
import os
import asyncio
# from pyppeteer import launcher
# # 在导入 launch 之前 把 --enable-automation 禁用 防止监测webdriver
# launcher.AUTOMATION_ARGS.remove("--enable-automation")
from pyppeteer import launch
from bs4 import BeautifulSoup
import re
import time
async def pyppteer_fetchUrl(url):
browser = await launch({'headless': False,'dumpio':True, 'autoClose':True})
page = await browser.newPage()
# await page.setDefaultNavigationTimeout(60000)
await page.goto(url)
await asyncio.wait([page.waitForNavigation()])
str = await page.content()
await browser.close()
return str
def fetchUrl(url):
return asyncio.get_event_loop().run_until_complete(pyppteer_fetchUrl(url))
def getPageUrl():
for page in range(1,5):
if page == 1:
yield 'http://www.nhc.gov.cn/xcs/yqtb/list_gzbd.shtml'
else:
url = 'http://www.nhc.gov.cn/xcs/yqtb/list_gzbd_'+ str(page) +'.shtml'
yield url
def getTitleUrl(html):
bsobj = BeautifulSoup(html,'html.parser')
titleList = bsobj.find('div', attrs={"class":"list"}).ul.find_all("li")
for item in titleList:
link = "http://www.nhc.gov.cn" + item.a["href"];
title = item.a["title"]
date = item.span.text
yield title, link, date
def getInfo(pat, s):
res = re.search(pat, s)
if res:
return res.group(1)
return '0'
def getContent(html):
bsobj = BeautifulSoup(html,'html.parser')
cnt = bsobj.find('div', attrs={"id":"xw_box"}).find_all("p")
res = []
if cnt:
# 从第一段解析
s = cnt[0].text
res.append(getInfo(r'新增确诊病例(\d+)例', s))
res.append(getInfo(r'本土病例(\d+)例', s))
res.append(getInfo(r'新增死亡病例(\d+)例', s))
# 从第二段解析
s = cnt[1].text
res.append(getInfo(r'新增治愈出院病例(\d+)例', s))
# 从第五段解析
s = cnt[4].text
res.append(getInfo(r'新增无症状感染者(\d+)例', s))
res.append(getInfo(r'本土(\d+)例', s))
return res
def saveFile(path, filename, content):
if not os.path.exists(path):
os.makedirs(path)
# 保存文件
with open(path + filename + ".txt", 'w', encoding='utf-8') as f:
f.write(content)
if "__main__" == __name__:
# print(getInfo(r'新增死亡病例(\d+)例', '无新增死亡病例。'))
# s = '4月28日0—24时,31个省(自治区、直辖市)和新疆生产建设兵团报告新增确诊病例5659例。其中境外输入病例13例(广东3例,北京2例,上海2例,福建2例,黑龙江1例,浙江1例,广西1例,四川1例),含2例由无症状感染者转为确诊病例(浙江1例,福建1例);本土病例5646例(上海5487例,北京47例,吉林42例,浙江31例,山东7例,广东7例,黑龙江4例,江西4例,内蒙古3例,江苏3例,四川3例,河南2例,辽宁1例,福建1例,湖南1例,广西1例,重庆1例,云南1例),含5125例由无症状感染者转为确诊病例(上海5062例,吉林31例,浙江28例,辽宁1例,山东1例,河南1例,云南1例)。新增死亡病例52例,均为本土病例,在上海;无新增疑似病例。'
# res = re.search( r'新增确诊病例(\d+)例', s)
# print(res.group(1))
#
# res = re.search( r'本土病例.*),', s)
# print(res.group())
#
# res = re.search( r'新增死亡病例\d+例', s)
# print(res.group())
#
# res = re.search( r'新增治愈出院病例\d+例', s)
# print(res.group())
#
with open('/Users/machi/Desktop/covid.csv', 'w') as f:
header = ','.join(['日期', '新增确诊病例', '本土新增确诊病例', '新增死亡病例', '新增治愈出院病例', '新增无症状感染者', '本土新增无症状感染者'])
f.write(header + '\n')
for url in getPageUrl():
print(url)
try:
s =fetchUrl(url)
except:
continue
for title,link,date in getTitleUrl(s):
print(title,link)
# time.sleep(5)
try:
html =fetchUrl(link)
content = getContent(html)
s = ','.join([date] + content)
f.write(s + '\n')
print('%s write finish' % date)
except Exception as e:
print('%s process failed' % date, e)
continue
# break