py批量存入mysql,pandas一些操作,循环内加线程

tech2022-08-12  136

写在前面的话: 总结一下近期实习学到的内容,还有在做项目中遇到的问题及其解决方式。都是为了学习巩固,有什么不对的地方还希望各位大佬指正出来,不胜感激。

1. 批量存入数据库

与 excute() 方法对应的是 excutemany()

""" :param sql: sql语句,values 部分正常写,insert 最好加上ignore,这样会跳过错误的数据 :param data: tuple; data 必须是元组,而且其内存的数据顺序与values部分对应,这样存的时候才能对应到相应字段 """ # 前面代码省略 cursor.executemany(sql, data) db.commit()

放上一部分存储代码(不全),其中 init 就是控制每次多少量的, tuple(datas_tuple[num:init]) 是每次我放入的数据范围(一定是元组),我每次放入1w条,解决了我本来几十个小时可能还解决不了的问题

还有一个问题是如果有时间戳必须处理掉,还有为 Nan 的值都得填充(直接 fillna('') 即可),不然入不了库

def savesql(datas): """ :param username: :param text: :param text_type: :param pubdate: :param keywords: :return: None """ insertSql = "insert ignore into news(username, text, pubdate, text_type, keywords) " \ "values(%s,%s,%s,%s,%s)" # DB是我单独写的一个读取数据库的文件 DB.update_many(insertSql, datas) # 自己找个excel文件就行 datas = pd.read_excel(file, header=None) length = datas.shape[0] datas_tuple = tuple(tuple(x) for x in datas.values) init = 10000 for num in range(0, length, 10000): savesql(tuple(datas_tuple[num:init])) init += 10000 if init >= length: init = length

2. pandas 的一些操作

2.1 删除某列中长度小于某个值的行

做去重处理以及去掉噪声数据,但是 df[(len(df['column name']) < 2)] 报错

参考链接 根据涉及len(string)的条件表达式从pandas DataFrame中删除行

# dataframe = dataframe[(len(dataframe['text'])>5)] dataframe.dropna(subset=['text'], inplace=True) dataframe = dataframe[dataframe['text'].map(len)>5]

2.2 选取某列等于某个字段的所有行

代码中的 keywords 是我的 header 字段,改成自己的列名就好

df_keywords = df.query('keywords=="top" | keywords=="bottom" | keywords==""')

2.3 统计某个列包含哪些值且统计个数

keyword_count = dict(df['keywords'].value_counts())

3. 在pandas处理数据时手动加多线程

没看懂先看 pymysql单条插入数据和批量插入数据:

先做数据预处理,不要有 NAN from concurrent.futures import ThreadPoolExecutor, as_completed # DB是我自己写的一个文件,就是执行一下sql def run(datas): # datas 是一个元组,可参考上面 DB.update_many('update temp set text=%s where id=%s', datas) df1 = pd.read_excel(file, header=None) length = df1.shape[0] datas_tuple = tuple(tuple(x) for x in df1_insert_sort.values) init = 10000 pool = ThreadPoolExecutor(max_workers=40) task_list = [] # 每次更新1w条数据 for num in range(0, length, 10000): try: one_tuple = tuple(datas_tuple[num:init]) task = pool.submit(run, one_tuple) task_list.append(task) init += 10000 if init >= length: init = length except Exception as e: print('----------------{}----------------'.format(str(e))) for future in as_completed(task_list): data = future.result() print('*************Done************')

具体加快了多少也没测,目前先这么用着……

最新回复(0)