I have an array of integers labels
, for instance array([0, 1, 1, 2, 2, 2, 3, 3, 1, 4, 4, 1, 1, 0, 0, -1, -1])
. The array can be large (> 100k elements) and rarely consist of more than 5/6 different integers + (-1
). In the example above, the unique integers are 0, 1, 2, 3, 4
. The integers are always consecutive and 0-indexed.
I have this code snippet that looks at the transition from one label to another, and it could use some love for an efficient version:
from itertools import groupby
import numpy as np
from numpy.typing import NDArray
def compute_transition_matrix(
labels: NDArray[int], # 1D, shape (n,)
n_clusters: int, # number of unique possible labels
) -> NDArray[float]:
"""Compute observed transition."""
labels = [s for s, _ in groupby(labels)]
T = np.zeros(shape=(n_clusters, n_clusters))
for i, j in zip(labels, labels[1:]):
if i == -1 or j == -1:
continue # ignore unlabeled
T[i, j] += 1
return T
It can be run with:
compute_transition_matrix(np.random.randint(-1, 5, size=100000), 5)
Any idea on how to get rid of the for loop on groupby
and on the for loop on the zip
?
1 Answer 1
Given an input length of n, I haven't been able to imagine a way to avoid looping over something of length n. You're counting things, so you have to look at them all. Certainly it's possible to parallelize this some, but I don't know how to get this kind of parallelization from numpy.
That said, I do see some room for performance improvement. You've already got an n-sized object (labels: NDArray[int], # 1D, shape (n,)
), so you should (and can) avoid building anything else other n-sized. Also, you're assuming that n_clusters
will be small (~6), so T won't be very big, so you might choose to use a more convenient structure to build it than an numpy array. Maybe something like this:
from collections import Counter
def bicounter_to_matrix(
counts: Mapping[Tuple[int, int], int]
) -> NDArray[float]:
...
def compute_transition_matrix(
labels: Iterable[int]
n_clusters: int,
) -> NDArray[float]:
"""Compute observed transition."""
transitions = Counter(zip(labels, labels[1:]))
return bicounter_to_matrix(transitions)
notes:
- I skipped
groupby
. All it's doing is suppressing the diagonal of your transition matrix. Performing all those equality tests is probably no cheaper than incrementing the counters, so just skip it and zero out the diagonal at the end if it matters. (Reifying the grouped labels as alist
in memory as you have is probably the biggest performance problem in your version.) - I skipped the skipping of the
-1
values. Again, I'm assuming it's cheaper to just handle everything homogeneously and drop extra information at then end than to fill the loop with if clauses. - Possibly the
Counter
class will ingest the iterable less efficiently than your array-and-loop combo; the only way to know is to check. - I haven't run the above code through MyPy. I suspect it would complain about using
_[1:]
on something I only specified asIterable
. I think the correct, maximally abstract, way would beCounter(zip(*itertools.tee(labels)))
, whether you gain or loose performance with that abstraction I don't know.