《动手学深度学习》(PyTorch版)代码注释 - 52 【World2Vec

tech2024-07-10  62

目录

说明配置环境此节说明代码

说明

本博客代码来自开源项目:《动手学深度学习》(PyTorch版) 并且在博主学习的理解上对代码进行了大量注释,方便理解各个函数的原理和用途

配置环境

使用环境:python3.8 平台:Windows10 IDE:PyCharm

此节说明

此节对应书本上10.3节 此节功能为:word2vec的实现 由于此节相对复杂,代码注释量较多

代码

# 本书链接https://tangshusen.me/Dive-into-DL-PyTorch/#/ # 10.3 word2vec的实现 # 注释:黄文俊 # E-mail:hurri_cane@qq.com import collections import math import random import sys import time import os import numpy as np import torch from torch import nn import torch.utils.data as Data sys.path.append("..") import d2lzh_pytorch as d2l print(torch.__version__) # 读取数据集 assert 'ptb.train.txt' in os.listdir("F:/PyCharm/Learning_pytorch/data/ptb") with open('F:/PyCharm/Learning_pytorch/data/ptb/ptb.train.txt', 'r') as f: lines = f.readlines() # st是sentence(句子)的缩写 raw_dataset = [st.split() for st in lines] print('# sentences: %d' % len(raw_dataset)) # 对于数据集的前3个句子,打印每个句子的词数和前5个词。 # 这个数据集中句尾符为"<eos>",生僻词全用"<unk>"表示,数字则被替换成了"N"。 for st in raw_dataset[:3]: print('# tokens:', len(st), st[:5]) # 建立词语索引 # 为了计算简单,我们只保留在数据集中至少出现5次的词。 # tk是token的缩写 counter = collections.Counter([tk for st in raw_dataset for tk in st]) counter = dict(filter(lambda x: x[1] >= 5, counter.items())) # 然后将词映射到整数索引。 idx_to_token = [tk for tk, _ in counter.items()] token_to_idx = {tk: idx for idx, tk in enumerate(idx_to_token)} dataset = [[token_to_idx[tk] for tk in st if tk in token_to_idx] for st in raw_dataset] num_tokens = sum([len(st) for st in dataset]) print('# tokens: %d' % num_tokens) # 二次采样,大概率随机丢弃高频词 def discard(idx): return random.uniform(0, 1) < 1 - math.sqrt( 1e-4 / counter[idx_to_token[idx]] * num_tokens) subsampled_dataset = [[tk for tk in st if not discard(tk)] for st in dataset] print('# tokens: %d' % sum([len(st) for st in subsampled_dataset])) # 比较一个词(如:高频词‘the’和低频词‘join’)在二次采样前后出现在数据集中的次数 def compare_counts(token): return '# %s: before=%d, after=%d' % (token, sum( [st.count(token_to_idx[token]) for st in dataset]), sum( [st.count(token_to_idx[token]) for st in subsampled_dataset])) print(compare_counts('the')) print(compare_counts('join')) # 提取中心词和背景词 def get_centers_and_contexts(dataset, max_window_size): centers, contexts = [], [] for st in dataset: if len(st) < 2: # 每个句子至少要有2个词才可能组成一对“中心词-背景词” continue centers += st for center_i in range(len(st)): window_size = random.randint(1, max_window_size) indices = list(range(max(0, center_i - window_size), min(len(st), center_i + 1 + window_size))) # indices为此时中心词加背景词的索引 indices.remove(center_i) # 将中心词排除在背景词之外 contexts.append([st[idx] for idx in indices]) return centers, contexts # 创建一个人工数据集,其中含有词数分别为7和3的两个句子。 # 设最大背景窗口为2,打印所有中心词和它们的背景词。 tiny_dataset = [list(range(7)), list(range(7, 10))] print('dataset', tiny_dataset) for center, context in zip(*get_centers_and_contexts(tiny_dataset, 2)): print('center', center, 'has contexts', context) # 实验中,我们设最大背景窗口大小为5。下面提取数据集中所有的中心词及其背景词。 all_centers, all_contexts = get_centers_and_contexts(subsampled_dataset, 5) # 负采样,对于一对中心词和背景词,我们随机采样K个噪声词(实验中设K=5) def get_negatives(all_contexts, sampling_weights, K): all_negatives, neg_candidates, i = [], [], 0 population = list(range(len(sampling_weights))) for contexts in all_contexts: negatives = [] while len(negatives) < len(contexts) * K: if i == len(neg_candidates): # 根据每个词的权重(sampling_weights)随机生成k个词的索引作为噪声词。 # 为了高效计算,可以将k设得稍大一点 i, neg_candidates = 0, random.choices( population, sampling_weights, k=int(1e5)) ''' random.choices(population,weights=None,k=10) population:集群。 weights:相对权重。 k:选取次数。 根据设置的权重,从集群中随机选取k次数据,返回一个列表。 权重计算方式如下: 比如:weights=[1,2,3,4,5],那么第一个成员的概率就是P=1/(1+2+3+4+5)=1/15。 ''' neg, i = neg_candidates[i], i + 1 # 噪声词不能是背景词 if neg not in set(contexts): negatives.append(neg) all_negatives.append(negatives) return all_negatives sampling_weights = [counter[w]**0.75 for w in idx_to_token] # sampling_weights储存样本每个词词频的0.75次方值 all_negatives = get_negatives(all_contexts, sampling_weights, 5) # 读取数据 # 从数据集中提取所有中心词all_centers,以及每个中心词对应的背景词all_contexts和噪声词all_negatives class MyDataset(torch.utils.data.Dataset): def __init__(self, centers, contexts, negatives): assert len(centers) == len(contexts) == len(negatives) self.centers = centers self.contexts = contexts self.negatives = negatives def __getitem__(self, index): return (self.centers[index], self.contexts[index], self.negatives[index]) def __len__(self): return len(self.centers) # 小批量读取函数 def batchify(data): """用作DataLoader的参数collate_fn: 输入是个长为batchsize的list, list中的每个元素都是Dataset类调用__getitem__得到的结果 """ max_len = max(len(c) + len(n) for _, c, n in data) centers, contexts_negatives, masks, labels = [], [], [], [] for center, context, negative in data: cur_len = len(context) + len(negative) centers += [center] contexts_negatives += [context + negative + [0] * (max_len - cur_len)] masks += [[1] * cur_len + [0] * (max_len - cur_len)] labels += [[1] * len(context) + [0] * (max_len - len(context))] return (torch.tensor(centers).view(-1, 1), torch.tensor(contexts_negatives), torch.tensor(masks), torch.tensor(labels)) batch_size = 512 num_workers = 0 if sys.platform.startswith('win32') else 12 dataset = MyDataset(all_centers, all_contexts, all_negatives) data_iter = Data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify, num_workers=num_workers) for batch in data_iter: for name, data in zip(['centers', 'contexts_negatives', 'masks', 'labels'], batch): print(name, 'shape:', data.shape) break # 跳字模型。使用嵌入层和小批量乘法来实现跳字模型 def skip_gram(center, contexts_and_negatives, embed_v, embed_u): v = embed_v(center) u = embed_u(contexts_and_negatives) pred = torch.bmm(v, u.permute(0, 2, 1)) # permute交换tensor维度 return pred # 二元交叉熵损失函数 class SigmoidBinaryCrossEntropyLoss(nn.Module): def __init__(self): # none mean sum super(SigmoidBinaryCrossEntropyLoss, self).__init__() def forward(self, inputs, targets, mask=None): """ input – Tensor shape: (batch_size, len) target – Tensor of the same shape as input """ inputs, targets, mask = inputs.float(), targets.float(), mask.float() res = nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction="none", weight=mask) return res.mean(dim=1) #按照行求平均 loss = SigmoidBinaryCrossEntropyLoss() # 初始化模型参数 embed_size = 100 net = nn.Sequential( nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size), nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size) ) # 定义训练函数 def train(net, lr, num_epochs): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print("train on", device) net = net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) for epoch in range(num_epochs): start, l_sum, n = time.time(), 0.0, 0 for batch in data_iter: center, context_negative, mask, label = [d.to(device) for d in batch] pred = skip_gram(center, context_negative, net[0], net[1]) # 使用掩码变量mask来避免填充项对损失函数计算的影响 l = (loss(pred.view(label.shape), label, mask) * mask.shape[1] / mask.float().sum(dim=1)).mean() # 一个batch的平均loss optimizer.zero_grad() l.backward() optimizer.step() l_sum += l.cpu().item() n += 1 print('epoch %d, loss %.2f, time %.2fs' % (epoch + 1, l_sum / n, time.time() - start)) train(net, 0.01, 10) # 应用词嵌入模型 def get_similar_tokens(query_token, k, embed): W = embed.weight.data x = W[token_to_idx[query_token]] # 添加的1e-9是为了数值稳定性 cos = torch.matmul(W, x) / (torch.sum(W * W, dim=1) * torch.sum(x * x) + 1e-9).sqrt() _, topk = torch.topk(cos, k=k+1) # 返回cos中最大的K+1个数的索引给topk topk = topk.cpu().numpy() for i in topk[1:]: # 除去输入词 print('cosine sim=%.3f: %s' % (cos[i], (idx_to_token[i]))) # 返回与词“chip”语义最接近的词 get_similar_tokens('chip', 3, net[0]) print("*"*50)
最新回复(0)