2
\$\begingroup\$

I have a sniffer tool which populates my DB with data found after parsing the packets and filling them in. Since each row in my table deals with one packet, I had to make a procedure that provides the correlation between packets. I fill that using the stream_index column.

+--------------+------------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| src_ip | int(10) unsigned | YES | | NULL | |
| dst_ip | int(10) unsigned | YES | | NULL | |
| src_port | int(10) | YES | | NULL | |
| dst_port | int(10) | YES | | NULL | |
| data | int(11) | YES | | NULL | |
| Created | datetime | NO | MUL | CURRENT_TIMESTAMP | |
| smalicious | int(10) | YES | | NULL | |
| sgeoLoc | int(11) | YES | | NULL | |
| seq | int(10) unsigned | YES | | NULL | |
| app_id | int(11) | YES | | NULL | |
| tcp_flag | varchar(25) | YES | | NULL | |
| dgeoLoc | int(11) | YES | | NULL | |
| dmalicious | int(10) | YES | | NULL | |
| conn_state | int(11) | YES | | NULL | |
| src_mac | varchar(20) | YES | | NULL | |
| dst_mac | varchar(20) | YES | | NULL | |
| seq_num | varchar(25) | YES | | NULL | |
| ack_num | varchar(25) | YES | | NULL | |
| protocol | int(3) | YES | | NULL | |
| app | varchar(300) | YES | | NULL | |
| app_sub | varchar(300) | YES | | NULL | |
| source_user | text | YES | | NULL | |
| stream_index | int(11) | YES | | NULL | |
+--------------+------------------+------+-----+-------------------+----------------+

And here is my code that fills up the stream_index data inside the Database:

import mysql.connector
import re
import AuthDB
import random
def fetch_stream():
 cnx = mysql.connector.connect(**AuthDB.config)
 cursor = cnx.cursor(buffered=True)
 query_get = "select * from user_activity_load where protocol = 6 and tcp_flag = 'SYN' and stream_index is NULL or protocol = 17 and stream_index is NULL limit 100"
 cursor.execute(query_get)
 row_data = cursor.fetchone()
 while row_data is not None:
 print(row_data)
 ip1 = row_data[1]
 ip2 = row_data[2]
 port1 = row_data[3]
 port2 = row_data[4]
 seq = row_data[17]
 seq_num = ' '.join(seq.split(' ')[:2])
 protocol = row_data[19]
 if protocol == 6:
 get_tcp_stream(ip1, ip2, port1, port2, seq_num)
 elif protocol == 17:
 get_udp_stream(ip1, ip2, port1, port2)
 row_data = cursor.fetchone()
 cursor.close()
 cnx.close()
def get_tcp_stream(ip1,ip2,port1,port2,seq):
 id_list = []
 cnx = mysql.connector.connect(**AuthDB.config)
 cursor = cnx.cursor(buffered=True)
 query_get = "select * from user_activity_load where src_ip ='"+str(ip1)+"' and dst_ip = '"+str(ip2)+"' and src_port = "+str(port1)+" and dst_port = "+str(port2)+" and seq_num like '"+seq+"%' or dst_ip ='"+str(ip1)+"' and src_ip = '"+str(ip2)+"' and dst_port = "+str(port1)+" and src_port = "+str(port2)+" and ack_num like '"+seq+"%'"
 cursor.execute(query_get)
 row_data = cursor.fetchone()
 while row_data is not None:
 print(row_data)
 id_list.append(row_data[0])
 row_data = cursor.fetchone()
 update_index(id_list)
 cursor.close()
 cnx.close()
def get_udp_stream(ip1,ip2,port1,port2):
 id_list = []
 cnx = mysql.connector.connect(**AuthDB.config)
 cursor = cnx.cursor(buffered=True)
 query_get = "select * from user_activity_load where src_ip ='"+str(ip1)+"' and dst_ip = '"+str(ip2)+"' and src_port = "+str(port1)+" and dst_port = "+str(port2)+" or dst_ip ='"+str(ip1)+"' and src_ip = '"+str(ip2)+"' and dst_port = "+str(port1)+" and src_port = "+str(port2)
 print(query_get)
 cursor.execute(query_get)
 row_data = cursor.fetchone()
 while row_data is not None:
 print(row_data)
 id_list.append(row_data[0])
 row_data = cursor.fetchone()
 update_index(id_list)
 cursor.close()
 cnx.close()
def update_index(id_list):
 index = random.randint(1, 100000000)
 cnx = mysql.connector.connect(**AuthDB.config)
 cursor = cnx.cursor(buffered=True)
 for row_id in id_list:
 query_insert = "update user_activity_load set stream_index = "+str(index)+" where id = "+str(row_id)
 cursor.execute(query_insert)
 cnx.commit()
 cursor.close()
 cnx.close()
fetch_stream()

There's a separate code that fills out the rest of the values in the table but that is not of my concern right now. This works fine for both TCP and UDP packets but I think the quality of the code can be improved.

I'd kindly ask you to review so as to bring light upon how the quality of this can be improved and make it better, faster and more optimised. Thanks in advance

asked Apr 3, 2019 at 8:30
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

First things first, always have an entry point with your Python code -

if __name__ == "__main__":
 fetch_stream()

As this will allow code documenting tools to auto-generate documents without executing your code (and it's obvious where your code runs from for other developers).
Next, I see lots of code duplication (DRY/Don't repeat yourself), it's important to extract all these into their own separate functions, for instance, you open and close the database connection a few times, which is expensive. Let's extract that out, and make the db an injectable parameter:

def open_database():
 return mysql.connector.connect(**AuthDB.config)
 def fetch_stream(db):
 cursor = db.cursor(buffered=True)
 # ....
if __name__ == "__main__":
 db = open_database()
 fetch_stream(db)
 db.close()

However, now looking across all the functions, we can see that the cursor creation is the same, and we also see the same pattern - create cursor, run query, get results - yet you have code which manipulates the data per-database row - this too is an expensive process because you can tie up the database memory holding your query results, and you perform fetch_one() operations instead of a fetch_all() operation.

Unfortunately my suggestions will now break your existing code structure - but let's continue. We extract the database query process into a faster method (Python2 map returns a list automatically, Python3 we need to wrap the map with a list):

def execute_query(db, query):
 cursor = db.cursor(buffered=True)
 cursor.execute(query)
 results = cursor.fetch_all()
 col_names = [column[0] for column in cursor.description]
 rows_dict = list(map(lambda row: dict(zip(col_names, row)), results))
 cursor.close()
 return rows_dict

We now have a function which will grab the data quickly and return an object which is easily iterated. This would result in a change to your fetch_stream() function, into something like this:

if __name__ == "__main__":
 db = open_database()
 query = "SELECT * FROM user_activity_load WHERE protocol = 6 AND tcp_flag = 'SYN' AND stream_index IS NULL OR protocol = 17 AND stream_index IS NULL LIMIT 100"
 all_user_activity_load = execute_query(db, query)

This is important to bring the query out of the function, because having "magic numbers" and SQL queries inside functions violates the Open/Close Principal - being that your code should be Open for extension but Closed for modification. Database structures will always change, and if you want to modify the query to get different columns instead - you'd have to open up the code and edit the code, then run it again to see if it resulted in your expected outcome, correct?
That means you're always modifying the code and you might make a mistake and push to production, etc. We have S.O.L.I.D as it's a way of writing code to avoid common pitfalls. So, you can see we've moved the query out of the function, and make it an injectable parameter into execute_query(). Personally, I'd have the queries in an .ini file, and load them based on section heading. That way the code can be deployed as read-only, but other IT people can edit the .ini file to modify the query if the database changes.

Of course, the fetch_stream() function is now broken completely. Looking at that function, it iterates through the initial results, and separates TCP and UDP based on protocol. Given we now have names for each field in the dictionary from the execute_query() function, we can easily filter the original results into a new list:

def filter_by_protocol(results, protocol_number):
 return [x for x in results if x['protocol'] == protocol_number]

So you'd filter those based on protocol == 6 for the TCP etc. with a set of lines like:

all_user_activity_load = execute_query(db, query)
tcp_results = filter_by_protocol(all_user_activity_load, 6)
udp_results = filter_by_protocol(all_user_activity_load, 17)

Now we can throw those lists at both get_tcp_stream(tcp_results) and get_udp_stream(udp_results) - but digging into the code for those functions, if I'm reading your intention correctly - it appears we're going back to the database to pull the exact same information which we've already got? Is that right? If so, we don't need those functions, just to clean up the current tcp_results and udp_results with the sequence number, for the update_index() function. You should end up with a main something like this:

if __name__ == "__main__":
 db = open_database()
 query = "SELECT * FROM user_activity_load WHERE protocol = 6 AND tcp_flag = 'SYN' AND stream_index IS NULL OR protocol = 17 AND stream_index IS NULL LIMIT 100"
 all_user_activity_load = execute_query(db, query)
 tcp_results = filter_by_protocol(all_user_activity_load, 6)
 udp_results = filter_by_protocol(all_user_activity_load, 17)
 update_index(filter_id_from_data(tcp_results))
 update_index(filter_id_from_data(udp_results))
 db.close()

I'll leave that as an exercise to you, but it should be quite straight forward. I hope this small review helps somewhat, with an introduction into one of the SOLID concepts (which you can learn to improve your coding), and suggestions to remove code duplication (DRY).
Good luck!

answered Apr 4, 2019 at 6:12
\$\endgroup\$
3
  • \$\begingroup\$ The DB query comes from stackoverflow.com/questions/6923930/… - it has a modified column name depending on the database type should you have any issues in using that function. \$\endgroup\$ Commented Apr 4, 2019 at 6:15
  • \$\begingroup\$ This is honestly one of the best reviews anybody has ever done for my code, and I thank you for that. I will surely implement whatever you've told, and these being the best practices employed by professional developers, will also help in sharpening my skills. Thanking you again _/\_ \$\endgroup\$ Commented Apr 4, 2019 at 6:49
  • \$\begingroup\$ Also, the get_tcp_stream and get_udp_stream take my row-data as parameters and fetch all the rows that have the same data, almost, in order to update the stream_index throughout the fetched rows after calling the get_tcp/udp_stream. This is done in order to label the tcp/udp communication happening between two endpoints. Because fetch_stream returns a single row that points to a single packet being transmitted between the endpoints and get_tcp_stream updates all these packets/rows returned and labels them as one communication \$\endgroup\$ Commented Apr 4, 2019 at 8:02

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.