2
\$\begingroup\$

I have an array of integers labels, for instance array([0, 1, 1, 2, 2, 2, 3, 3, 1, 4, 4, 1, 1, 0, 0, -1, -1]). The array can be large (> 100k elements) and rarely consist of more than 5/6 different integers + (-1). In the example above, the unique integers are 0, 1, 2, 3, 4. The integers are always consecutive and 0-indexed.

I have this code snippet that looks at the transition from one label to another, and it could use some love for an efficient version:

from itertools import groupby
import numpy as np
from numpy.typing import NDArray
def compute_transition_matrix(
 labels: NDArray[int], # 1D, shape (n,)
 n_clusters: int, # number of unique possible labels
) -> NDArray[float]:
 """Compute observed transition."""
 labels = [s for s, _ in groupby(labels)]
 T = np.zeros(shape=(n_clusters, n_clusters))
 for i, j in zip(labels, labels[1:]):
 if i == -1 or j == -1:
 continue # ignore unlabeled
 T[i, j] += 1
 return T

It can be run with:

compute_transition_matrix(np.random.randint(-1, 5, size=100000), 5)

Any idea on how to get rid of the for loop on groupby and on the for loop on the zip?

asked Nov 23, 2022 at 22:26
\$\endgroup\$

1 Answer 1

2
+50
\$\begingroup\$

Given an input length of n, I haven't been able to imagine a way to avoid looping over something of length n. You're counting things, so you have to look at them all. Certainly it's possible to parallelize this some, but I don't know how to get this kind of parallelization from numpy.

That said, I do see some room for performance improvement. You've already got an n-sized object (labels: NDArray[int], # 1D, shape (n,)), so you should (and can) avoid building anything else other n-sized. Also, you're assuming that n_clusters will be small (~6), so T won't be very big, so you might choose to use a more convenient structure to build it than an numpy array. Maybe something like this:

from collections import Counter
def bicounter_to_matrix(
 counts: Mapping[Tuple[int, int], int]
) -> NDArray[float]:
 ...
def compute_transition_matrix(
 labels: Iterable[int]
 n_clusters: int,
) -> NDArray[float]:
 """Compute observed transition."""
 transitions = Counter(zip(labels, labels[1:]))
 return bicounter_to_matrix(transitions)

notes:

  • I skipped groupby. All it's doing is suppressing the diagonal of your transition matrix. Performing all those equality tests is probably no cheaper than incrementing the counters, so just skip it and zero out the diagonal at the end if it matters. (Reifying the grouped labels as a list in memory as you have is probably the biggest performance problem in your version.)
  • I skipped the skipping of the -1 values. Again, I'm assuming it's cheaper to just handle everything homogeneously and drop extra information at then end than to fill the loop with if clauses.
  • Possibly the Counter class will ingest the iterable less efficiently than your array-and-loop combo; the only way to know is to check.
  • I haven't run the above code through MyPy. I suspect it would complain about using _[1:] on something I only specified as Iterable. I think the correct, maximally abstract, way would be Counter(zip(*itertools.tee(labels))), whether you gain or loose performance with that abstraction I don't know.
answered Dec 1, 2022 at 20:45
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.