在NER系列第一篇命名实体识别NER探索(1)中讲解了数据的采集及清洗,自动标注将文本转化为深度学习的格式。
本文将文本转换为编码,并且将数据进行各种形式的拼接,进行数据增强。
data_utils.py代码如下(示例):
# encoding = utf8 import re import math import codecs import random import os import numpy as np import pandas as pd import jieba import pickle from tqdm import tqdm jieba.initialize() def get_data(name = 'train'): ''' 该函数的主要功能是:把所有的数据都放在一个文件里面一起获取,并且将数据进行不同形式的拼接,进行数据增强 :param name:所有数据所在的位置 :return: ''' with open(f'data/Prepare/dict.pkl','rb') as f: map_dict = pickle.load(f) def item2id(data,w2i): ''' 该函数的主要功能是:把字符转变成id :param data: 等待转化的数据 :param w2i: 转化的方法 :return: 如果是认识的值就返回对应的ID,如果不认识,就返回UNK的id ''' return [w2i[x] if x in w2i else w2i['UNK'] for x in data] results = [] root = os.path.join('data/prepare/',name) files = list(os.listdir(root)) fileindex=-1 file_index = [] for file in tqdm(files): #for file in files: result=[] path = os.path.join(root,file) try: #samples = pd.read_csv(path, sep=',', encoding='gbk') samples = pd.read_csv(path, sep=',' ) except UnicodeEncodeError: #samples = pd.read_csv(path, sep=',', encoding='UTF-8',errors='ignore') samples = pd.read_csv(path, sep=',' , errors='ignore') except Exception as e: print(e) num_samples = len(samples) fileindex += num_samples file_index.append(fileindex) # 存储好每个句子开始的下标 sep_index = [-1]+samples[samples['word']=='sep'].index.tolist()+[num_samples]#-1,20,40,50 # -----------------------------获取句子并且将句子全部转换成id---------------------------- for i in range(len(sep_index)-1): start = sep_index[i]+1 end = sep_index[i+1] data = [] for feature in samples.columns: #print(list(samples[feature])[start:end],map_dict[feature][1]) try: data.append(item2id(list(samples[feature])[start:end],map_dict[feature][1])) except: print(item2id(list(samples[feature])[start:end],map_dict[feature][1])) #print(data) result.append(data) #按照数据进行不同的拼接,不拼接、拼接1个、拼接2个...从而增强数据学习的能力 # ----------------------------------------数据增强------------------------------------- if name == 'task': results.extend(result) else: two=[] for i in range(len(result)-1): first = result[i] second = result[i+1] two.append([first[k]+second[k] for k in range(len(first))]) three = [] for i in range(len(result) - 2): first = result[i] second = result[i + 1] third = result[i + 2] three.append([first[k] + second[k]+third[k] for k in range(len(first))]) #应该用extend而不是append results.extend(result+two+three) with open(f'data/prepare/'+name+'.pkl','wb') as f: pickle.dump(results,f) def create_dico(item_list): """ Create a dictionary of items from a list of list of items. """ assert type(item_list) is list dico = {} for items in item_list: for item in items: if item not in dico: dico[item] = 1 else: dico[item] += 1 return dico def create_mapping(dico): """ Create a mapping (item to ID / ID to item) from a dictionary. Items are ordered by decreasing frequency. """ sorted_items = sorted(dico.items(), key=lambda x: (-x[1], x[0])) id_to_item = {i: v[0] for i, v in enumerate(sorted_items)} item_to_id = {v: k for k, v in id_to_item.items()} return item_to_id, id_to_item def zero_digits(s): """ Replace every digit in a string by a zero. """ return re.sub('\d', '0', s) def iob2(tags): """ Check that tags have a valid IOB format. Tags in IOB1 format are converted to IOB2. """ for i, tag in enumerate(tags): if tag == 'O': continue split = tag.split('-') if len(split) != 2 or split[0] not in ['I', 'B']: return False if split[0] == 'B': continue elif i == 0 or tags[i - 1] == 'O': # conversion IOB1 to IOB2 tags[i] = 'B' + tag[1:] elif tags[i - 1][1:] == tag[1:]: continue else: # conversion IOB1 to IOB2 tags[i] = 'B' + tag[1:] return True def iob_iobes(tags): """ IOB -> IOBES """ new_tags = [] for i, tag in enumerate(tags): if tag == 'O': new_tags.append(tag) elif tag.split('-')[0] == 'B': if i + 1 != len(tags) and \ tags[i + 1].split('-')[0] == 'I': new_tags.append(tag) else: new_tags.append(tag.replace('B-', 'S-')) elif tag.split('-')[0] == 'I': if i + 1 < len(tags) and \ tags[i + 1].split('-')[0] == 'I': new_tags.append(tag) else: new_tags.append(tag.replace('I-', 'E-')) else: raise Exception('Invalid IOB format!') return new_tags def iobes_iob(tags): """ IOBES -> IOB """ new_tags = [] for i, tag in enumerate(tags): if tag.split('-')[0] == 'B': new_tags.append(tag) elif tag.split('-')[0] == 'I': new_tags.append(tag) elif tag.split('-')[0] == 'S': new_tags.append(tag.replace('S-', 'B-')) elif tag.split('-')[0] == 'E': new_tags.append(tag.replace('E-', 'I-')) elif tag.split('-')[0] == 'O': new_tags.append(tag) else: raise Exception('Invalid format!') return new_tags def insert_singletons(words, singletons, p=0.5): """ Replace singletons by the unknown word with a probability p. """ new_words = [] for word in words: if word in singletons and np.random.uniform() < p: new_words.append(0) else: new_words.append(word) return new_words def get_seg_features(string): """ Segment text with jieba features are represented in bies format s donates single word """ #def features(self,string): #def _w2f(word): #lenth=len(word) #if lenth==1: #r=[0] #if lenth>1: #r=[2]*lenth #r[0]=1 #r[-1]=3 #return r #return list(chain.from_iterable([_w2f(word) for word in jieba.cut(string) if len(word.strip())>0])) seg_feature = [] for word in jieba.cut(string): if len(word) == 1: seg_feature.append(0) else: tmp = [2] * len(word) tmp[0] = 1 tmp[-1] = 3 seg_feature.extend(tmp) return seg_feature #return [i for word in jieba.cut(string) for i in range(1,len(word)+1) ] def create_input(data): """ Take sentence data and return an input for the training or the evaluation function. """ inputs = list() inputs.append(data['chars']) inputs.append(data["segs"]) inputs.append(data['tags']) return inputs def load_word2vec(emb_path, id_to_word, word_dim, old_weights): """ Load word embedding from pre-trained file embedding size must match """ new_weights = old_weights print('Loading pretrained embeddings from {}...'.format(emb_path)) pre_trained = {} emb_invalid = 0 for i, line in enumerate(codecs.open(emb_path, 'r', 'utf-8')): line = line.rstrip().split() if len(line) == word_dim + 1: pre_trained[line[0]] = np.array( [float(x) for x in line[1:]] ).astype(np.float32) else: emb_invalid += 1 if emb_invalid > 0: print('WARNING: %i invalid lines' % emb_invalid) c_found = 0 c_lower = 0 c_zeros = 0 n_words = len(id_to_word) # Lookup table initialization for i in range(n_words): word = id_to_word[i] if word in pre_trained: new_weights[i] = pre_trained[word] c_found += 1 elif word.lower() in pre_trained: new_weights[i] = pre_trained[word.lower()] c_lower += 1 elif re.sub('\d', '0', word.lower()) in pre_trained: new_weights[i] = pre_trained[ re.sub('\d', '0', word.lower()) ] c_zeros += 1 print('Loaded %i pretrained embeddings.' % len(pre_trained)) print('%i / %i (%.4f%%) words have been initialized with ' 'pretrained embeddings.' % ( c_found + c_lower + c_zeros, n_words, 100. * (c_found + c_lower + c_zeros) / n_words) ) print('%i found directly, %i after lowercasing, ' '%i after lowercasing + zero.' % ( c_found, c_lower, c_zeros )) return new_weights def full_to_half(s): """ Convert full-width character to half-width one """ n = [] for char in s: num = ord(char) if num == 0x3000: num = 32 elif 0xFF01 <= num <= 0xFF5E: num -= 0xfee0 char = chr(num) n.append(char) return ''.join(n) def cut_to_sentence(text): """ Cut text to sentences """ sentence = [] sentences = [] len_p = len(text) pre_cut = False for idx, word in enumerate(text): sentence.append(word) cut = False if pre_cut: cut=True pre_cut=False if word in u"!?\n": cut = True if len_p > idx+1: if text[idx+1] in ".\"\'?!": cut = False pre_cut=True if cut: sentences.append(sentence) sentence = [] if sentence: sentences.append("".join(list(sentence))) return sentences def replace_html(s): s = s.replace('"','"') s = s.replace('&','&') s = s.replace('<','<') s = s.replace('>','>') s = s.replace(' ',' ') s = s.replace("“", "") s = s.replace("”", "") s = s.replace("—","") s = s.replace("\xa0", " ") return(s) def get_dict(path): with open(path,'rb') as f: dict = pickle.load(f) return dict def input_from_line(line, char_to_id): """ Take sentence data and return an input for the training or the evaluation function. """ line = full_to_half(line) line = replace_html(line) inputs = list() inputs.append([line]) line.replace(" ", "$") inputs.append([[char_to_id[char] if char in char_to_id else char_to_id["<UNK>"] for char in line]]) inputs.append([get_seg_features(line)]) inputs.append([[]]) return inputs class BatchManager(object): ''' def __init__(self, data, batch_size): self.batch_data = self.sort_and_pad(data, batch_size) self.len_data = len(self.batch_data) ''' def __init__(self,batch_size,name='train'): with open(f'data/prepare/' + name + '.pkl', 'rb') as f: data = pickle.load(f) self.batch_data = self.sort_and_pad(data,batch_size,name) self.len_data = len(self.batch_data) def sort_and_pad(self, data, batch_size, name): # 总共有多少批次 num_batch = int(math.ceil(len(data) / batch_size)) # print(len(data[0][0])) # 按照句子长度进行排序 sorted_data = sorted(data, key=lambda x: len(x[0])) batch_data = list() for i in range(num_batch): batch_data.append(self.pad_data(sorted_data[i * int(batch_size):(i + 1) * int(batch_size)], name)) return batch_data @staticmethod def pad_data(data, name): if name != 'task': chars = [] targets = [] bounds = [] flags = [] radicals = [] pinyins = [] max_length = max([len(sentence[0]) for sentence in data]) # len(data[-1][0]) for line in data: char, target, bound, flag, radical, pinyin = line padding = [0] * (max_length - len(char)) chars.append(char + padding) targets.append(target + padding) bounds.append(bound + padding) flags.append(flag + padding) radicals.append(radical + padding) pinyins.append(pinyin + padding) return [chars, targets, bounds, flags, radicals, pinyins] else: chars = [] bounds = [] flags = [] radicals = [] pinyins = [] max_length = max([len(sentence[0]) for sentence in data]) # len(data[-1][0]) for line in data: char, bound, flag, radical, pinyin = line padding = [0] * (max_length - len(char)) chars.append(char + padding) bounds.append(bound + padding) flags.append(flag + padding) radicals.append(radical + padding) pinyins.append(pinyin + padding) return [chars, bounds, flags, radicals, pinyins] def iter_batch(self, shuffle=False): if shuffle: random.shuffle(self.batch_data) for idx in range(self.len_data): yield self.batch_data[idx] ''' def sort_and_pad(self, data, batch_size): num_batch = int(math.ceil(len(data) /batch_size)) sorted_data = sorted(data, key=lambda x: len(x[0])) batch_data = list() for i in range(num_batch): batch_data.append(self.pad_data(sorted_data[i*int(batch_size) : (i+1)*int(batch_size)])) return batch_data @staticmethod def pad_data(data): strings = [] chars = [] segs = [] targets = [] max_length = max([len(sentence[0]) for sentence in data]) #len(data[-1][0]) for line in data: string, char, seg, target = line padding = [0] * (max_length - len(string)) strings.append(string + padding) chars.append(char + padding) segs.append(seg + padding) targets.append(target + padding) return [strings, chars, segs, targets] def iter_batch(self, shuffle=False): if shuffle: random.shuffle(self.batch_data) for idx in range(self.len_data): yield self.batch_data[idx] ''' if __name__ == '__main__': get_data('train') get_data('test')运行结果如下,生成train.pkl及test.pkl :
Building prefix dict from the default dictionary ... Loading model from cache C:\Users\lenovo\AppData\Local\Temp\jieba.cache Loading model cost 1.349 seconds. Prefix dict has been built successfully. 100%|██████████| 290/290 [11:42<00:00, 2.42s/it] 100%|██████████| 73/73 [02:37<00:00, 2.16s/it] Process finished with exit code 01.txtoriginal.txt
...... 描述:两端已恢复,请回单。 ......运行结果如下
...... : O N B-ORG ........ 中 I-ORG 心 I-ORG - O 融 O .......