命名实体识别NER探索(2)

tech2024-07-05  75

系列文章目录

命名实体识别NER探索(1) https://duanzhihua.blog.csdn.net/article/details/108338970

文章目录

系列文章目录前言编写编码转换、特征拼接函数程序运行结果简化版的自动标注构建一个命名实体识别词典原始数据集自动标注 总结


前言

在NER系列第一篇命名实体识别NER探索(1)中讲解了数据的采集及清洗,自动标注将文本转化为深度学习的格式。


本文将文本转换为编码,并且将数据进行各种形式的拼接,进行数据增强。

编写编码转换、特征拼接函数

data_utils.py代码如下(示例):

# encoding = utf8 import re import math import codecs import random import os import numpy as np import pandas as pd import jieba import pickle from tqdm import tqdm jieba.initialize() def get_data(name = 'train'): ''' 该函数的主要功能是:把所有的数据都放在一个文件里面一起获取,并且将数据进行不同形式的拼接,进行数据增强 :param name:所有数据所在的位置 :return: ''' with open(f'data/Prepare/dict.pkl','rb') as f: map_dict = pickle.load(f) def item2id(data,w2i): ''' 该函数的主要功能是:把字符转变成id :param data: 等待转化的数据 :param w2i: 转化的方法 :return: 如果是认识的值就返回对应的ID,如果不认识,就返回UNK的id ''' return [w2i[x] if x in w2i else w2i['UNK'] for x in data] results = [] root = os.path.join('data/prepare/',name) files = list(os.listdir(root)) fileindex=-1 file_index = [] for file in tqdm(files): #for file in files: result=[] path = os.path.join(root,file) try: #samples = pd.read_csv(path, sep=',', encoding='gbk') samples = pd.read_csv(path, sep=',' ) except UnicodeEncodeError: #samples = pd.read_csv(path, sep=',', encoding='UTF-8',errors='ignore') samples = pd.read_csv(path, sep=',' , errors='ignore') except Exception as e: print(e) num_samples = len(samples) fileindex += num_samples file_index.append(fileindex) # 存储好每个句子开始的下标 sep_index = [-1]+samples[samples['word']=='sep'].index.tolist()+[num_samples]#-1,20,40,50 # -----------------------------获取句子并且将句子全部转换成id---------------------------- for i in range(len(sep_index)-1): start = sep_index[i]+1 end = sep_index[i+1] data = [] for feature in samples.columns: #print(list(samples[feature])[start:end],map_dict[feature][1]) try: data.append(item2id(list(samples[feature])[start:end],map_dict[feature][1])) except: print(item2id(list(samples[feature])[start:end],map_dict[feature][1])) #print(data) result.append(data) #按照数据进行不同的拼接,不拼接、拼接1个、拼接2个...从而增强数据学习的能力 # ----------------------------------------数据增强------------------------------------- if name == 'task': results.extend(result) else: two=[] for i in range(len(result)-1): first = result[i] second = result[i+1] two.append([first[k]+second[k] for k in range(len(first))]) three = [] for i in range(len(result) - 2): first = result[i] second = result[i + 1] third = result[i + 2] three.append([first[k] + second[k]+third[k] for k in range(len(first))]) #应该用extend而不是append results.extend(result+two+three) with open(f'data/prepare/'+name+'.pkl','wb') as f: pickle.dump(results,f) def create_dico(item_list): """ Create a dictionary of items from a list of list of items. """ assert type(item_list) is list dico = {} for items in item_list: for item in items: if item not in dico: dico[item] = 1 else: dico[item] += 1 return dico def create_mapping(dico): """ Create a mapping (item to ID / ID to item) from a dictionary. Items are ordered by decreasing frequency. """ sorted_items = sorted(dico.items(), key=lambda x: (-x[1], x[0])) id_to_item = {i: v[0] for i, v in enumerate(sorted_items)} item_to_id = {v: k for k, v in id_to_item.items()} return item_to_id, id_to_item def zero_digits(s): """ Replace every digit in a string by a zero. """ return re.sub('\d', '0', s) def iob2(tags): """ Check that tags have a valid IOB format. Tags in IOB1 format are converted to IOB2. """ for i, tag in enumerate(tags): if tag == 'O': continue split = tag.split('-') if len(split) != 2 or split[0] not in ['I', 'B']: return False if split[0] == 'B': continue elif i == 0 or tags[i - 1] == 'O': # conversion IOB1 to IOB2 tags[i] = 'B' + tag[1:] elif tags[i - 1][1:] == tag[1:]: continue else: # conversion IOB1 to IOB2 tags[i] = 'B' + tag[1:] return True def iob_iobes(tags): """ IOB -> IOBES """ new_tags = [] for i, tag in enumerate(tags): if tag == 'O': new_tags.append(tag) elif tag.split('-')[0] == 'B': if i + 1 != len(tags) and \ tags[i + 1].split('-')[0] == 'I': new_tags.append(tag) else: new_tags.append(tag.replace('B-', 'S-')) elif tag.split('-')[0] == 'I': if i + 1 < len(tags) and \ tags[i + 1].split('-')[0] == 'I': new_tags.append(tag) else: new_tags.append(tag.replace('I-', 'E-')) else: raise Exception('Invalid IOB format!') return new_tags def iobes_iob(tags): """ IOBES -> IOB """ new_tags = [] for i, tag in enumerate(tags): if tag.split('-')[0] == 'B': new_tags.append(tag) elif tag.split('-')[0] == 'I': new_tags.append(tag) elif tag.split('-')[0] == 'S': new_tags.append(tag.replace('S-', 'B-')) elif tag.split('-')[0] == 'E': new_tags.append(tag.replace('E-', 'I-')) elif tag.split('-')[0] == 'O': new_tags.append(tag) else: raise Exception('Invalid format!') return new_tags def insert_singletons(words, singletons, p=0.5): """ Replace singletons by the unknown word with a probability p. """ new_words = [] for word in words: if word in singletons and np.random.uniform() < p: new_words.append(0) else: new_words.append(word) return new_words def get_seg_features(string): """ Segment text with jieba features are represented in bies format s donates single word """ #def features(self,string): #def _w2f(word): #lenth=len(word) #if lenth==1: #r=[0] #if lenth>1: #r=[2]*lenth #r[0]=1 #r[-1]=3 #return r #return list(chain.from_iterable([_w2f(word) for word in jieba.cut(string) if len(word.strip())>0])) seg_feature = [] for word in jieba.cut(string): if len(word) == 1: seg_feature.append(0) else: tmp = [2] * len(word) tmp[0] = 1 tmp[-1] = 3 seg_feature.extend(tmp) return seg_feature #return [i for word in jieba.cut(string) for i in range(1,len(word)+1) ] def create_input(data): """ Take sentence data and return an input for the training or the evaluation function. """ inputs = list() inputs.append(data['chars']) inputs.append(data["segs"]) inputs.append(data['tags']) return inputs def load_word2vec(emb_path, id_to_word, word_dim, old_weights): """ Load word embedding from pre-trained file embedding size must match """ new_weights = old_weights print('Loading pretrained embeddings from {}...'.format(emb_path)) pre_trained = {} emb_invalid = 0 for i, line in enumerate(codecs.open(emb_path, 'r', 'utf-8')): line = line.rstrip().split() if len(line) == word_dim + 1: pre_trained[line[0]] = np.array( [float(x) for x in line[1:]] ).astype(np.float32) else: emb_invalid += 1 if emb_invalid > 0: print('WARNING: %i invalid lines' % emb_invalid) c_found = 0 c_lower = 0 c_zeros = 0 n_words = len(id_to_word) # Lookup table initialization for i in range(n_words): word = id_to_word[i] if word in pre_trained: new_weights[i] = pre_trained[word] c_found += 1 elif word.lower() in pre_trained: new_weights[i] = pre_trained[word.lower()] c_lower += 1 elif re.sub('\d', '0', word.lower()) in pre_trained: new_weights[i] = pre_trained[ re.sub('\d', '0', word.lower()) ] c_zeros += 1 print('Loaded %i pretrained embeddings.' % len(pre_trained)) print('%i / %i (%.4f%%) words have been initialized with ' 'pretrained embeddings.' % ( c_found + c_lower + c_zeros, n_words, 100. * (c_found + c_lower + c_zeros) / n_words) ) print('%i found directly, %i after lowercasing, ' '%i after lowercasing + zero.' % ( c_found, c_lower, c_zeros )) return new_weights def full_to_half(s): """ Convert full-width character to half-width one """ n = [] for char in s: num = ord(char) if num == 0x3000: num = 32 elif 0xFF01 <= num <= 0xFF5E: num -= 0xfee0 char = chr(num) n.append(char) return ''.join(n) def cut_to_sentence(text): """ Cut text to sentences """ sentence = [] sentences = [] len_p = len(text) pre_cut = False for idx, word in enumerate(text): sentence.append(word) cut = False if pre_cut: cut=True pre_cut=False if word in u"!?\n": cut = True if len_p > idx+1: if text[idx+1] in ".\"\'?!": cut = False pre_cut=True if cut: sentences.append(sentence) sentence = [] if sentence: sentences.append("".join(list(sentence))) return sentences def replace_html(s): s = s.replace('&quot;','"') s = s.replace('&amp;','&') s = s.replace('&lt;','<') s = s.replace('&gt;','>') s = s.replace('&nbsp;',' ') s = s.replace("&ldquo;", "") s = s.replace("&rdquo;", "") s = s.replace("&mdash;","") s = s.replace("\xa0", " ") return(s) def get_dict(path): with open(path,'rb') as f: dict = pickle.load(f) return dict def input_from_line(line, char_to_id): """ Take sentence data and return an input for the training or the evaluation function. """ line = full_to_half(line) line = replace_html(line) inputs = list() inputs.append([line]) line.replace(" ", "$") inputs.append([[char_to_id[char] if char in char_to_id else char_to_id["<UNK>"] for char in line]]) inputs.append([get_seg_features(line)]) inputs.append([[]]) return inputs class BatchManager(object): ''' def __init__(self, data, batch_size): self.batch_data = self.sort_and_pad(data, batch_size) self.len_data = len(self.batch_data) ''' def __init__(self,batch_size,name='train'): with open(f'data/prepare/' + name + '.pkl', 'rb') as f: data = pickle.load(f) self.batch_data = self.sort_and_pad(data,batch_size,name) self.len_data = len(self.batch_data) def sort_and_pad(self, data, batch_size, name): # 总共有多少批次 num_batch = int(math.ceil(len(data) / batch_size)) # print(len(data[0][0])) # 按照句子长度进行排序 sorted_data = sorted(data, key=lambda x: len(x[0])) batch_data = list() for i in range(num_batch): batch_data.append(self.pad_data(sorted_data[i * int(batch_size):(i + 1) * int(batch_size)], name)) return batch_data @staticmethod def pad_data(data, name): if name != 'task': chars = [] targets = [] bounds = [] flags = [] radicals = [] pinyins = [] max_length = max([len(sentence[0]) for sentence in data]) # len(data[-1][0]) for line in data: char, target, bound, flag, radical, pinyin = line padding = [0] * (max_length - len(char)) chars.append(char + padding) targets.append(target + padding) bounds.append(bound + padding) flags.append(flag + padding) radicals.append(radical + padding) pinyins.append(pinyin + padding) return [chars, targets, bounds, flags, radicals, pinyins] else: chars = [] bounds = [] flags = [] radicals = [] pinyins = [] max_length = max([len(sentence[0]) for sentence in data]) # len(data[-1][0]) for line in data: char, bound, flag, radical, pinyin = line padding = [0] * (max_length - len(char)) chars.append(char + padding) bounds.append(bound + padding) flags.append(flag + padding) radicals.append(radical + padding) pinyins.append(pinyin + padding) return [chars, bounds, flags, radicals, pinyins] def iter_batch(self, shuffle=False): if shuffle: random.shuffle(self.batch_data) for idx in range(self.len_data): yield self.batch_data[idx] ''' def sort_and_pad(self, data, batch_size): num_batch = int(math.ceil(len(data) /batch_size)) sorted_data = sorted(data, key=lambda x: len(x[0])) batch_data = list() for i in range(num_batch): batch_data.append(self.pad_data(sorted_data[i*int(batch_size) : (i+1)*int(batch_size)])) return batch_data @staticmethod def pad_data(data): strings = [] chars = [] segs = [] targets = [] max_length = max([len(sentence[0]) for sentence in data]) #len(data[-1][0]) for line in data: string, char, seg, target = line padding = [0] * (max_length - len(string)) strings.append(string + padding) chars.append(char + padding) segs.append(seg + padding) targets.append(target + padding) return [strings, chars, segs, targets] def iter_batch(self, shuffle=False): if shuffle: random.shuffle(self.batch_data) for idx in range(self.len_data): yield self.batch_data[idx] ''' if __name__ == '__main__': get_data('train') get_data('test')

程序运行结果

运行结果如下,生成train.pkl及test.pkl :

Building prefix dict from the default dictionary ... Loading model from cache C:\Users\lenovo\AppData\Local\Temp\jieba.cache Loading model cost 1.349 seconds. Prefix dict has been built successfully. 100%|██████████| 290/290 [11:42<00:00, 2.42s/it] 100%|██████████| 73/73 [02:37<00:00, 2.16s/it] Process finished with exit code 0

简化版的自动标注

构建一个命名实体识别词典

.......,AT 无,AT 端口,REG UP,SYM .....

原始数据集

1.txtoriginal.txt

...... 描述:两端已恢复,请回单。 ......

自动标注

# encoding=utf8 import os, jieba, csv import jieba.posseg as pseg c_root = os.path.split(os.getcwd())[0] + os.sep + "ner_input_source_data" + os.sep dev = open("../ner_output_result_data/noc_ner.val", 'w', encoding='utf8') train = open("../ner_output_result_data/noc_ner.train", 'w', encoding='utf8') test = open("../ner_output_result_data/noc_ner.test", 'w', encoding='utf8') biaoji = set(['ORG', 'AM', 'FR', 'TE', 'AT', 'DE', 'SYM', 'CH', 'REG', 'Noun','name' ]) fuhao = set(['。', '?', '?', '!', '!']) dics = csv.reader(open("../dictionary/NOC_CSA_ORDER_DICT.csv", 'r', encoding='utf8')) for row in dics: if len(row) == 2: jieba.add_word(row[0].strip(), tag=row[1].strip()) jieba.suggest_freq(row[0].strip()) split_num = 0 for file in os.listdir(c_root): if "txtoriginal.txt" in file: fp = open(c_root + file, 'r', encoding='utf8') for line in fp: split_num += 1 words = pseg.cut(line) for key, value in words: # print(key) # print(value) if value.strip() and key.strip(): import time start_time = time.time() index = str(1) if split_num % 15 < 2 else str( 2) if split_num % 15 > 1 and split_num % 15 < 4 else str(3) end_time = time.time() #print("method one used time is {}".format(end_time - start_time)) if value not in biaoji: value = 'O' for achar in key.strip(): if achar and achar.strip() in fuhao: string = achar + " " + value.strip() + "\n" + "\n" dev.write(string) if index == '1' else test.write( string) if index == '2' else train.write(string) elif achar.strip() and achar.strip() not in fuhao: string = achar + " " + value.strip() + "\n" dev.write(string) if index == '1' else test.write( string) if index == '2' else train.write(string) elif value.strip() in biaoji: begin = 0 for char in key.strip(): if begin == 0: begin += 1 string1 = char + ' ' + 'B-' + value.strip() + '\n' if index == '1': dev.write(string1) elif index == '2': test.write(string1) elif index == '3': train.write(string1) else: pass else: string1 = char + ' ' + 'I-' + value.strip() + '\n' if index == '1': dev.write(string1) elif index == '2': test.write(string1) elif index == '3': train.write(string1) else: pass else: continue dev.close() train.close() test.close() print("字符级别数据自动打标签执行OK! 结果保存在ner_output_result_data目录!")

运行结果如下

...... : O N B-ORG ........ 中 I-ORG 心 I-ORG - O 融 O .......

总结

以上就是今天要讲的内容,本文简单介绍了NER输入文本特征编码转换及特征拼接, 生成训练集、测试集pkl文件。 同时,也讲解了简化版的自动标注实现方法。 段智华 认证博客专家 Spark AI 企业级AI技术 本人从事大数据人工智能开发和运维工作十余年,码龄5年,深入研究Spark源码,参与王家林大咖主编出版Spark+AI系列图书5本,清华大学出版社最新出版2本新书《Spark大数据商业实战三部曲:内核解密|商业案例|性能调优》第二版、《企业级AI技术内幕:深度学习框架开发+机器学习案例实战+Alluxio解密》,《企业级AI技术内幕》新书分为盘古人工智能框架开发专题篇、机器学习案例实战篇、分布式内存管理系统Alluxio解密篇。Spark新书第二版以数据智能为灵魂,包括内核解密篇,商业案例篇,性能调优篇和Spark+AI解密篇。从2015年开始撰写博文,累计原创1059篇,博客阅读量达155万次
最新回复(0)