Я хочу загрузить подготовленный emebdding для каждого исполнителя, а не для каждого раздела.

import gensim

from gensim.models.fasttext import FastText as FT_gensim
import numpy as np

class Loader(object):
    cache = {}
    emb_dic = {}
    count = 0
    def __init__(self, filename):
        print("|-------------------------------------|")
        print ("Welcome to Loader class in python")
        print("|-------------------------------------|")
        self.fn = filename

    @property
    def fasttext(self):
        if Loader.count == 1:
                print("already loaded")
        if self.fn not in Loader.cache:
            Loader.cache[self.fn] =  FT_gensim.load_fasttext_format(self.fn)
            Loader.count = Loader.count + 1
        return Loader.cache[self.fn]


    def map(self, word):
        if word not in self.fasttext:
            Loader.emb_dic[word] = np.random.uniform(low = 0.0, high = 1.0, size = 300)
            return Loader.emb_dic[word]
        return self.fasttext[word]

я называю этот класс как:

inputRaw =(inputFile, 3).map(lambda line: (line.split("\t")[0], line.split("\t")[1])).map(Loader(modelpath).map)
  • Я путаюсь, сколько раз будет загружен файл modelpath? Я хочу, чтобы один раз был загружен на каждого исполнителя и использовался всеми его ядрами. Мой ответ на этот вопрос - путь к модели будет загружен 3 раза (= количество разделов). Если мой ответ правильный, недостаток такого моделирования связан с размером пути к файлу модели. Предположим, этот файл 10 ГБ, и предположим, у меня есть 200 разделов. Таким образом, в этом случае нам потребуется 10 * 200 ГБ = 2000 с огромным (Это решение может работать только с небольшим количеством разделов.)

Предположим, у меня есть rdd =(id, sentence) =[(id1, u'patina californian'), (id2, u'virgil american'), (id3', u'frensh'), (id4, u'american')]

и я хочу суммировать векторы встраиваемых слов для каждого предложения:

def test(document):
    print("document is = {}".format(document))
    documentWords = document.split(" ")
    features = np.zeros(300)
    for word in documentWords:
        features = np.add(features, Loader(modelpath).fasttext[word])
    return features

def calltest(inputRawSource):

    my_rdd = inputRawSource.map(lambda line: (line[0], test(line[1]))).cache()
    return my_rdd

В этом случае, сколько раз будет загружен файл modelpath? Обратите внимание, что я установил для свойства spark.executor.instances значение 3

0