之前爬取的911网站已没有数据了,于是最近又换了个网站,可惜这个网站数据只有天气、温度、风级、风向,内容没有原来911的多。
import requests from bs4 import BeautifulSoup from dateutil.relativedelta import relativedelta from datetime import datetime import time class weather_data: def __init__(self,city,start_year,end_year,start_month=1,end_month = 12): """ :param city: 需爬取的城市全拼 :param start_year: 爬取开始年份 :param end_year: 爬取结束年份 :param start_month: 爬取开始月份 :param end_month: 爬取结束月份 """ self.city = city self.start_time = datetime.strptime(f"{start_year}-{start_month}",'%Y-%m') self.end_time = datetime.strptime(f"{end_year}-{end_month}",'%Y-%m') def _get_original_html(self): """ 网页爬取 """ # url = f"https://tianqi.911cha.com/{self.city}/{self.start_time.year}-{self.start_time.month}.html" if self.start_time.month<10: url = f"http://www.tianqihoubao.com/lishi/tianjin/month/{self.start_time.year}0{self.start_time.month}.html" else: url = f"http://www.tianqihoubao.com/lishi/tianjin/month/{self.start_time.year}{self.start_time.month}.html" print(url) header = { "User-Agent": ""} #根据自己浏览器情况填写 response = requests.get(url, headers=header) return response.content def _parse_data(self): #一次解析一个月 soup = BeautifulSoup(self.html,"html.parser") data = {} for n, tr in enumerate(soup.find_all("tr")): if n == 0: continue Time = tr.find('a').get_text().strip() td_list = tr.find_all('td') try: data[Time] = {'Day': {'weather': td_list[1].get_text().split('/')[0].strip(), 'temperature': td_list[2].get_text().split('/')[0].strip(), 'wind_scale': td_list[3].get_text().split('/')[0].strip().split(' ')[1], 'wind_direction': td_list[3].get_text().split('/')[0].strip().split(' ')[0]}, 'Night': {'weather': td_list[1].get_text().split('/')[1].strip(), 'temperature': td_list[2].get_text().split('/')[1].strip(), 'wind_scale': td_list[3].get_text().split('/')[1].strip().split(' ')[1], 'wind_direction': td_list[3].get_text().split('/')[0].strip().split(' ')[0]}} except IndexError: # print(td_list[3].get_text().split('/')) data[Time] = {'Day': {'weather': td_list[1].get_text().split('/')[0].strip(), 'temperature': td_list[2].get_text().split('/')[0].strip(), 'wind_scale': td_list[3].get_text().split('/')[0].strip().split(' ')[1], 'wind_direction': td_list[3].get_text().split('/')[0].strip().split(' ')[0]}, 'Night': {'weather': ('' if td_list[1].get_text().split('/')[1].strip() =='' else td_list[1].get_text().split('/')[1].strip()), 'temperature': td_list[2].get_text().split('/')[1].strip(), 'wind_scale': (''if td_list[3].get_text().split('/')[1].strip() =='' else td_list[3].get_text().split('/')[1].strip().split(' ')[1]), 'wind_direction': (''if td_list[3].get_text().split('/')[0].strip() is None else td_list[3].get_text().split('/')[0].strip().split(' ')[0])}} return data def main(self): while self.start_time<=self.end_time: self.html = self._get_original_html() data = self._parse_data() self.start_time+=relativedelta(months=1) with open('weather_dict_tianjin.txt','a',encoding='UTF-8') as f: f.writelines(str(data)+'\n') print(f"{self.start_time},开始睡眠{time.strftime('%Y-%m-%d %H:%M:%S')}") time.sleep(30) if __name__ == "__main__": T = weather_data(city="tianjin", start_year=2018, end_year=2020, start_month=5, end_month=8) T.main()