3
\$\begingroup\$

I am working on a P2P streaming sim (which I think is correct), however I've discovered a huge bottleneck in my code. During the run of my sim, the function I've included below gets called about 25000 times and takes about 560 seconds from a total of about 580 seconds (I used pycallgraph).

Can anyone identify any way I could speed the execution up, by optimizing the code (I think I won't be able to reduce the times it gets executed)? The basics needed to understand what the function does, is that the incoming messages consist of [sender_sequence nr, sender_id, nr_of_known_nodes_to_sender (or -1 if the sender departs the network), time_to_live], while the mCache entries these messages update, consist of the same data (except for the third field, which cannot be -1, instead if such a message is received the whole entry will be deleted), with the addition of a fifth field containing the update time.

def msg_io(self): # Node messages (not datastream) control
 # filter and sort by main key
 valid_sorted = sorted((el for el in self.incoming if el[3] != 0), key=ig(1))
 self.incoming = []
 # ensure identical keys have highest first element first
 valid_sorted.sort(key=ig(0), reverse=True)
 # group by second element
 grouped = groupby(valid_sorted, ig(1))
 # take first element for each key
 internal = [next(item) for group, item in grouped]
 for j in internal: 
 found = False
 if j[3] > 0 : # We are only interested in non-expired messages
 for i in self.mCache[:]:
 if j[1] == i[1]: # If the current mCache entry corresponds to the current incoming message
 if j[2] == -1: # If the message refers to a node departure
 self.mCache.remove(i) # We remove the node from the mCache
 self.partnerCache = [partner for partner in self.partnerCache if not partner.node_id == j[1]] 
 else: 
 if j[0] > i[0]: # If the message isn't a leftover (i.e. lower sequence number)
 i[:4] = j[:] # We update the relevant mCache entry
 i[4] = turns # We add the current time for the update
 found = True
 else:
 k = j[:]
 #k[3] -= 1
 id_to_node[i[1]].incoming.append(k) # and we forward the message 
 if not j[2] == -1 and not found and (len(self.mCache) < maxNodeCache): # If we didn't have the node in the mCache 
 temp = j[:]
 temp.append(turns) # We insert a timestamp...
 self.mCache.append(temp) # ...and add it to the mCache
 self.internal.remove(j)
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Dec 25, 2012 at 11:07
\$\endgroup\$

3 Answers 3

3
\$\begingroup\$
def msg_io(self): # Node messages (not datastream) control
 # filter and sort by main key
 valid_sorted = sorted((el for el in self.incoming if el[3] != 0), key=ig(1))

Don't use abbreviation like ig. It just makes your code harder to read. Also, don't refer to the objects as magic numbers, like 1,2,3. It makes it really hard to figure out what the pieces refer to.

 self.incoming = []

I'd del self.incoming[:], to clear the list instead. You'll get some marginal speed benefits.

 # ensure identical keys have highest first element first
 valid_sorted.sort(key=ig(0), reverse=True)

Don't do multiple sorts. Instead, use a key like lambda el: (el[1],el[0]) to sort on both criteria at the same time.

 # group by second element
 grouped = groupby(valid_sorted, ig(1))

Yeah, this isn't actually doing what you think it does. For this to work it assumed the data is sorted by the [1] item, but you've sorted it by the [0] item.

If you don't actually need to process items in order sorted by key, I'd avoid the sorting altogether. Instead, just loop over incoming and split it into a dict then loop over the dict.

 # take first element for each key
 internal = [next(item) for group, item in grouped]
 for j in internal: 

I'd iterate over the generator and call next in side the for loop. Also avoid using single letter variable names. It makes it hard to figure out what everything is.

 found = False

Whenever I seed code that assign logic flags, I know there is a better way to do it

 if j[3] > 0 : # We are only interested in non-expired messages
 for i in self.mCache[:]:

You copy the cache. You do need to do this in order to remove elements. But that's expensive.

 if j[1] == i[1]: # If the current mCache entry corresponds to the current incoming message

It's expensive to have to loop over the whole cache to try and find a single element. Probably this should be a dictionary. That's avoid the copy and make deleting more efficient.

 if j[2] == -1: # If the message refers to a node departure
 self.mCache.remove(i) # We remove the node from the mCache
 self.partnerCache = [partner for partner in self.partnerCache if not partner.node_id == j[1]] 
 else: 
 if j[0] > i[0]: # If the message isn't a leftover (i.e. lower sequence number)
 i[:4] = j[:] # We update the relevant mCache entry
 i[4] = turns # We add the current time for the update
 found = True
 else:
 k = j[:]
 #k[3] -= 1

Don't leave dead code in your actual code. That's what source control is for.

 id_to_node[i[1]].incoming.append(k) # and we forward the message 

You are forwarding the message to all the nodes in the cache. That strikes me as rather strange. But perhaps it makes sense for a P2P sim.

 if not j[2] == -1 and not found and (len(self.mCache) < maxNodeCache): # If we didn't have the node in the mCache 
 temp = j[:]
 temp.append(turns) # We insert a timestamp...
 self.mCache.append(temp) # ...and add it to the mCache
 self.internal.remove(j)

Is this supposed to be the internal local variable from earlier? Because its not.

answered Dec 26, 2012 at 4:36
\$\endgroup\$
1
  • \$\begingroup\$ Yup, you got me there! I was in the process of removing the class variable (self.internal) in favor of a local variable, but it seems I copied and pasted midway! \$\endgroup\$ Commented Dec 26, 2012 at 10:02
3
\$\begingroup\$

valid_sorted is created using sorted function, and then it is sorted again. I don't see how the first sort is helping in any way. Maybe I'm wrong..


You create the iterator grouped with the function groupby, but then you turn it into list and then iterate over it again. instead - you can do this:

grouped = groupby(valid_sorted, ig(1))
for group, item in grouped:
 j = next(item)
 ...

That way - you only iterate over it once. :)


The inner loop iterating over self.mCache[:] - I don't see why you can't just iterate over self.mCache and that's it. If its members are lists - it will change weither you use [:] or not.

When you use [:] - you create a copy of the list, and that's why It can be bad for memory and speed.


The remove function can be quite slow, if you can find a better way to implement your algorithm without using this function - it can be faster.

Jamal
35.2k13 gold badges134 silver badges238 bronze badges
answered Dec 25, 2012 at 13:43
\$\endgroup\$
3
  • \$\begingroup\$ First of all, many thank! I can see the point in removing the sorted (I'm already applying some of the changes), however the reasoning behind the grouped() modifications eludes me...aren't we doing the iteration I'm doing via list comprehension, using an explicit for loop? Isn't that more inefficient? \$\endgroup\$ Commented Dec 25, 2012 at 16:22
  • \$\begingroup\$ Another thing that bothers me, is that since I use remove() on the mCache list, I wonder if it is safe to not slice it when I iterate over it, as you suggest. I think I saw a suggestion somewhere, about a reverse iteration, which should be removal safe \$\endgroup\$ Commented Dec 25, 2012 at 16:32
  • \$\begingroup\$ As @winston's commet says - it seems like groupby doesn't do what we thought it does, so I guess it doesn't matter anymore :) About the remove use - I think it is ok to use it like you do, and if you'll reverse it - it will have the same effect in my opinion. \$\endgroup\$ Commented Dec 26, 2012 at 13:33
3
\$\begingroup\$

The solution that I actually implemented, with quite a gain in speed, follows below. In addition, I removed the checks for the third item being zero (placed them in another part of the code), though I don't think the checks had much effect on the delay. The interesting part was the implementation of mCache and the incoming messages using dictionaries, thus getting rid of slow loops. Unfortunately I can think of no proper replacement for the message forwarding loop. As for the gain, on a small test run it clocked 53 seconds instead of 560.

def msg_io(self): # Node messages (not datastream) control
 messages = {}
 for incoming_msg in self.incoming:
 k = incoming_msg[1]
 if k not in messages:
 # First appearance of this second-position element
 messages[k] = incoming_msg
 elif messages[k][0] < incoming_msg[0]:
 # We've seen this second-position element before, but this list has a higher first element than the last one we saw
 messages[k] = incoming_msg
 else:
 # We've already seen this second-position element in a list with an equal or higher first element
 pass
 self.incoming = []
 for j in messages: 
 if j in self.mCache:
 if messages[j][2] == -1:
 del(self.mCache[j])
 for partner in self.partnerCache[:]:
 if partner.node_id == j:
 self.partnerCache.remove(partner)
 else: 
 if messages[j][0] > self.mCache[j][0]: # If the message isn't a leftover (i.e. lower sequence number)
 self.mCache[j][:4] = messages[j][:] # We update the relevant mCache entry
 self.mCache[j][4] = turns # We add the current time for the update
 elif (not messages[j][2] == -1) and (len(self.mCache) < maxNodeCache): # If we didn't have the node in the mCache 
 self.mCache[j] = messages[j][:4]
 self.mCache[j].append(turns)
 for destinations in self.mCache:
 if not destinations == j:
 id_to_node[destinations].incoming.append(messages[j])
 messages = {}
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
answered Dec 28, 2012 at 17:28
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.