Предыстория: Word2vec необходимо рутинизировать в pyspark, но загрузка предварительно обученных векторов слов является большой проблемой, поэтому его необходимо сначала загрузить в HDFS, а затем получить с помощью кода. После исследования я обнаружил, что, хотя у pyspark есть собственный метод word2vec, он, похоже, не может загрузить предварительно обученный вектор слов txt.
Поэтому общие этапы следует разделить на два этапа:
1. Получите векторный файл слов из hdfs.
2. Выполните сегментацию слов + обработку векторизации данных в кадре данных pyspark.
Существует множество векторных файлов слов с открытым исходным кодом, которые по сути представляют собой текстовые документы в форме ключ-значение. Возьмем в качестве примера вектор слов Tencent AI Lab.
(https://ai.tencent.com/ailab/nlp/en/embedding.html)
Сначала вам нужно загрузить текстовый файл вектора слов в HDFS, а затем использовать искровой файл в коде для распространения файла каждому работнику:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
# Отправьте вектор слов HDFS каждому работнику.
sparkContext = spark.sparkContext
sparkContext.addPyFile("hdfs://******/tencent-ailab-embedding-zh-d100-v0.2.0-s.txt")
# Метод использования файлов: Точно так же, как «/***/***» при локальном использовании файлов.
SparkFiles.get("tencent-ailab-embedding-zh-d100-v0.2.0-s.txt")
Этот этап требует много времени, главным образом, из-за этапа отправки векторов слов каждому работнику. Если файл вектора слов большой, это может занять больше времени.
После того, как предварительно обученные векторы слов будут отправлены каждому работнику, следующим шагом будет сегментация данных и получение векторов слов. Функция udf используется для выполнения вышеуказанных операций:
import pyspark.sql.functions as f
# Определите сегментацию слов и векторизованную UDF
@f.udf(StringType())
def generate_embedding(title, subtitle=None):
cut_title = jieba.lcut(title.lower())
if subtitle is None:
cut_sentence = cut_title
else:
cut_subtitle = jieba.lcut(title.lower())
cut_sentence = cut_title + cut_subtitle
res_embed = []
for word in cut_sentence:
# Выберите здесь не обрабатывать незарегистрированные слова. Вместо этого вы также можете использовать unk
try:
res_embed.append(model.get_vector(word))
except:
pass
# Выполните avg_pooling для векторов слов
if len(res_embed)==0:
avg_vectors = np.zeros(100)
else:
res_embed_arr = np.array(res_embed)
avg_vectors = res_embed_arr.mean(axis=(0))
avg_vectors = np.round(avg_vectors,decimals=6)
# Конвертировать в необходимый формат
tmp = []
for j in avg_vectors:
tmp.append(str(j))
output = ','.join(tmp)
return output
Здесь возникнет проблема, если вам понадобится использовать пользовательский словарь jieba.,почему я здесьpysparkреализовано наjieba.load_userdict()
Следовательно, необходим способ загрузить его только один раз на каждого рабочего.
Сначала отправьте пользовательский словарь каждому работнику в основном методе:
# Доставьте словарь HDFS каждому работнику.
sparkContext.addPyFile("hdfs://xxxxxxx/word_dict.txt")
Затем вudfДобавьте в первую строкуjieba.dt.initialized
Определите, нужно ли загружать словарь:
if not jieba.dt.initialized:
jieba.load_userdict(SparkFiles.get("word_dict.txt"))
Это прекрасно решает эту проблему~
ссылка: