I have a big (50k*150k) matrix with co-occurrences of nouns (rows) and adjectives (columns). As most nouns don't co-occurr with most adjectives, this matrix is very sparse,>99.9%, so I'm using the CSR format. For every pair of nouns I need to calculate the Jason-Shannon Divergence to asses how similar the co-occurrence distributions with all adjectives are between each pair. I have sixteen cores I can use, so I run the calculations in parallel, such that each core processes every sixteenth noun.
It's not going very fast and I'm wondering whether there's a faster way to do this.
from __future__ import division
from multiprocessing import Pool
from scipy.stats import entropy
import codecs
import os
import scipy
import scipy.sparse
# --- data & parameters--- #
frequenciesAdjectives = codecs.open('frequencies.txt', 'r', 'utf-8')
results = codecs.open('results.txt', 'w', 'utf-8')
temporaryFilesPath = "/temp/"
cores = 16
# --- functions --- #
# calculate jensen shannon divergence
def JSD(p, q):
p = p
q = q
m = 0.5 * (p+q)
jsd = 0.5 * (entropy(p, m) + entropy(q, m))
return jsd
# calculate JSD for every i-th noun with every other noun and write to temp file, where i is the number of cores used, and return filepath
def getJSDs(n):
# open temporary file
filename = "temp"+str(n).zfill(2)
fullpath = temporaryFilesPath+filename
temp = codecs.open(fullpath, "w", "utf-8")
# shortcut write function
tempwrite = temp.write
# calculate JSD for each noun pair and write to temporary file
for index, noun1 in enumerate(nouns[n::cores]):
index = index*cores+n
first = sparseMatrix.getrow(index).toarray()[0]
tempwrite("here")
for index2, noun2 in enumerate(nouns[index:]):
index2 += index
second = sparseMatrix.getrow(index2).toarray()[0]
divergence = JSD(first, second)
tempwrite(u"{noun1}\t{noun2}\t{divergence}\n".format(**locals()))
temp.close()
return fullpath
# --- processing --- #
# set up parameter variables for sparse matrix
nouns = []
adjectiveIDs = {} # column id of each adjective in matrix
frequencies = [] # non-zero entries in matrix
positions = [] # column id of frequency in corresponding position in "frequencies"
indices = [0] # frequencies[indices[i]:indices[i+]] = non-zero entries of line i of matrix
# ignore file header
frequenciesAdjectives.readline()
# incrementally get sparse matrix parameters (turn frequencies into probabilites)
for line in frequenciesAdjectives:
line = line.strip().lower().split("\t")
noun = line[0]
nouns.append(noun)
adjectiveList = [pair.split(" ") for pair in line[2:]]
total = sum([int(frequency) for _,frequency in adjectiveList])
for pair in adjectiveList:
adjective, frequency = pair
probability = int(frequency)/total
position = adjectiveIDs.setdefault(adjective, len(adjectiveIDs))
frequencies.append(probability)
positions.append(position)
indices.append(len(frequencies))
# turn lists into arrays
frequencies = scipy.array(frequencies)
positions = scipy.array(positions)
indices = scipy.array(indices)
# create sparse matrix from parameter arrays and delete arrays
sparseMatrix = scipy.sparse.csr_matrix((frequencies, positions, indices), shape=(len(nouns), len(adjectiveIDs)))
del frequencies, positions, indices, adjectiveIDs
# calculate JSDs in parallel and get list of temporary files
pool = Pool()
tempFiles = pool.map(getJSDs, range(cores))
pool.close()
pool.join()
# shortcut results.write and write header
resultswrite = results.write
resultswrite(u"noun1\tnoun2\tjensenShannonDivergence\n")
# combine temporary files into results file and delete
for path in tempFiles:
tempfile = codecs.open(path, "r", "utf-8")
for line in tempfile:
resultswrite(line)
tempfile.close()
os.remove(path)
The format of the file from which I read in the frequencies is this:
"noun\tnounFrequency\tadjective1\tcooccurrenceFrequency1\tadjective2\tcooccurrenceFrequency2\n"
I can't use the dense matrix format, because that uses up all of the RAM. I doubt I can get the entropy function to run faster than SciPy - I tried using a for
loop and numba, but it didn't come out faster and I don't know any C. I don't really know parallel computing, so I bet my way of using pool.map isn't optimal. Would it maybe be faster if I copied the global matrix variable into a local variable for each function?
1 Answer 1
I rewrote the entire thing, now it's two orders of magnitude (500x or so!) faster.
Key changes include:
- Using a list of tuples (noun, adjectiveDictionary) instead of the sparse matrix and dropping the 0 elements completely
- Using
math.log
instead ofscipy.log
(huge difference!) - Switching from Python 2 to Python 3
- Not dividing the tasks up manually, instead using a generator and the queue that comes with
imap_unordered
, for which it was crucial to find the right chunksize.
#!/usr/bin/env/python3
from multiprocessing import Pool
from collections import Counter
from math import log2
# from numba import jit
# --- data & parameters--- #
frequenciesAdjectives = open('/home/christian/results/gender/frequenciesAdjectivesGivenNouns_UK.txt', 'r')
results = open('/home/christian/results/gender/JensenShannonDivergences_ukWaC.txt', 'w')
cores = 16
# --- functions --- #
# calculates Jason-Shannon Divergence from tuple of two nouns and their associated adjective probabilities in dictionaries p and q
def JSD(nounTuple):
noun1, noun2, p, q = nounTuple
jsd = 0.0
m = p + q
for key in m:
m_key = 0.5 * m[key]
if key in p:
p_key = p[key]
jsd += 0.5 * p_key * log2(p_key/m_key)
if key in q:
q_key = q[key]
jsd += 0.5 * q_key * log2(q_key/m_key)
return noun1, noun2, jsd
def jobGenerator(tuples):
for index, (noun, adjectives) in enumerate(tuples):
for noun2, adjectives2 in tuples[index:]:
yield noun, noun2, adjectives, adjectives2
# --- processing --- #
# ignore header
frequenciesAdjectives.readline()
# make list of tuples of nouns and dictionaries containing their preceding adjective frequencies
nounAdjectives = []
for line in frequenciesAdjectives:
adjectives = Counter()
line = line.strip().lower().split("\t")
noun = line[0]
adjectiveList = [pair.split(" ") for pair in line[2:]]
frequencySum = sum(int(frequency) for _, frequency in adjectiveList)
for adjective, frequency in adjectiveList:
probability = int(frequency)/frequencySum
adjectives[adjective] = probability
nounAdjectives.append((noun, adjectives))
# make generator of (noun, noun2, adjectives, adjectives2)-tuples
jobs = jobGenerator(nounAdjectives)
# shortcut results.write and write header
resultswrite = results.write
resultswrite(u"noun1\tnoun2\tjensenShannonDivergence")
# calculate JSDs in parallel and write to file
pool = Pool(cores)
for noun1, noun2, jsd in pool.imap_unordered(JSD, jobs, chunksize=500000):
resultswrite(u"\n{noun1}\t{noun2}\t{jsd}".format_map(locals()))
pool.close()
Explore related questions
See similar questions with these tags.