8
\$\begingroup\$

i'm currently making an application using python to hide a message in an audio file (WAV only for now). The hiding itself is done pretty quickly and without any problems, but i want to calculate the best place for it by calculating entropy. The purpose is that the higher entropy of the fragment, the less chance of detecting it by bare eye. Problem is that it takes really long time (70mins approx for normal wav). It currently calculates entropy every 8 bytes to make it faster (this was just because of fitting it to the first version of reading script, want to make it for every byte at the end, but for now the execution time is limiting me). I use LSB replacement to do this with start and end markers being strings.

Here is the hiding code:

from wavreader import WAVFile as reader
from collections import Counter
from math import log
import time as t
import sys
fpath = "dubfx_nl.wav"
newname = "testresult.wav"
hmsg = "some sample test string to hide"
start_marker = "STARTINGTRANSMISSION"*4
end_marker = "ENDINGTRANSMISSION"*4
start_bit = 0
def hideBit(sampleNo, character, counter, higher_limit, flag=False):
 global bitcount
 global audiofile
 audiofile.Data[sampleNo] = ((audiofile.Data[sampleNo] >> 1) << 1) | (character >> bitcount) & 0x01 # Podmiana LSB na bit znaku wiadomosci
 if bitcount > 0 :
 bitcount = bitcount - 1
 elif counter < higher_limit:
 bitcount = 7
 counter = counter + 1
 else:
 counter = 0
 bitcount = 7
 flag = True
 return counter, flag
def entropyfast(s):
 return -sum(i/float(len(s)) * log(i/float(len(s)),2) for i in Counter(s).values())
W = '033円[0m' # white (normal)
R = '033円[31m' # red
G = '033円[32m' # green
O = '033円[33m' # orange
print "\n+==========================================================+"
print O+"Reading file..."+W,
sys.stdout.flush()
audiofile = reader(fpath)
audiofile.Data.flags.writeable = True
#data = np.reshape(audiofile.Data, len(audiofile.Data)*audiofile.NumChannels)
print G+"\t[DONE]"+W
print O+"Calculating data..."+W,
sys.stdout.flush()
avail_bits = audiofile.Subchunk2Size
required_bits = (len(hmsg) + len(start_marker) + len(end_marker))*8
print G+"\t[DONE]"+W
if( required_bits > avail_bits/8):
 print R+"\nERROR! Message is too long!"+W
 print "Available BYTES:\t", avail_bits/8
 print "Required BYTES: \t", required_bits
else:
 print "\nUse entropy detection coding? y/n:\n"+R+"[WARNING]"+W+" This will significantly extend the execution time!"
 ent_answer = raw_input("Answer: ")
 ent_time = None
 if ent_answer == ("y" or "Y"):
 ent_time = t.clock()
 perc = 0
 ent = 0
 data_len = len(audiofile.Data)
 for sampleStart in xrange(0,data_len-required_bits):
 e = entropyfast(audiofile.Data[sampleStart:sampleStart+required_bits])
 p = 100*sampleStart/float(data_len)
 print "\r",
 if (p - perc >= 0.1) or perc == 0 or perc == 100:
 perc = p
 print O+"Calculating best place for message: "+G+format(perc,".1f")+O+"%"+W,
 if e > ent:
 ent = e
 start_bit = sampleStart
 print "\r",
 print O+"Calculating best place for message: "+G+100.0+O+"%"+W,
 print "\nBest place found at: "+str(start_bit)+" sample!"
 elif not ent_answer == ("n" or "N"):
 print R+"\nUnknown answer!"+W
 exit()
 # Flags:
 sm_done = False #Starting Markers Done
 msg_done = False #Message Done
 fn_done = False
 bitcount = 7
 charcount = 0
 markercount = 0
 print O+"\nHiding your message..."+W+"\t",
 sys.stdout.flush()
 before = t.clock()
 for sampleNo in xrange(start_bit, start_bit+required_bits):
 if not sm_done :
 markercount, sm_done = hideBit(sampleNo, ord(start_marker[markercount]), markercount, len(start_marker)-1)
 elif not msg_done:
 charcount, msg_done = hideBit(sampleNo, ord(hmsg[charcount]), charcount, len(hmsg)-1)
 elif not fn_done:
 markercount, fn_done = hideBit(sampleNo, ord(end_marker[markercount]), markercount, len(end_marker)-1)
 print G+"[DONE]"+W
 after = t.clock()
 print O+"Saving result..."+W+"\t",
 audiofile.SaveTo(newname)
 print G+"[DONE]"+W
 print G+"\nMessage hidden successfuly!"+W
 if not ent_time == None:
 print G+"Entropy calculation took: "+str(before-ent_time)+" seconds!"+W
 print G+"Hiding took "+ str(after-before) +" seconds!"+W
print "+==========================================================+\n"

And here is the reader code:

from wavreader import WAVFile as reader
import numpy as np
import sys
fpath = "testresult.wav"
class hiddenMessageReader:
 def __init__(self, fname):
 self.sm_done = False #Starting Markers Done
 self.msg_done = False #Message Done
 self.bitcount = 0
 self.markercount = 0
 self.msg = ""
 self.char_obt = 0x00
 self.start_marker = "STARTINGTRANSMISSION"*4
 self.end_marker = "ENDINGTRANSMISSION"*4
 self.marker_tmp = ""
 self.audio = reader(fname)
 def __ReadBit(self, sample, marker, flag, msg_bool=False):
 self.char_obt = self.char_obt | (self.audio.Data[sample] & 0x01)
 if self.bitcount < 7 :
 self.bitcount = self.bitcount + 1
 self.char_obt = self.char_obt << 1
 elif chr(self.char_obt) == marker[self.markercount]:
 self.marker_tmp = self.marker_tmp + chr(self.char_obt)
 self.char_obt = 0x00
 self.bitcount = 0
 self.markercount = self.markercount + 1
 if self.marker_tmp == marker:
 self.markercount = 0
 self.marker_tmp = ""
 flag = True
 else:
 if msg_bool:
 self.msg = self.msg + chr(self.char_obt)
 self.marker_tmp = ""
 self.markercount = 0
 self.char_obt = 0x00
 self.bitcount = 0
 return flag
 def ReadMsg(self):
 if self.audio != None:
 data_len = len(self.audio.Data)
 perc = 0
 for bit in xrange(0, data_len):
 if not self.sm_done:
 self.sm_done = self.__ReadBit(bit, self.start_marker, self.sm_done)
 elif not self.msg_done:
 self.msg_done = self.__ReadBit(bit, self.end_marker, self.msg_done, True)
 else:
 return None
 p = 100*bit/float(data_len)
 if p-perc >= 0.1 or perc == 100:
 perc = p
 print "\r",
 print O+"Scanned through: "+G+format(perc,".1f")+O+"% of file..."+W,
 return None
W = '033円[0m' # white (normal)
R = '033円[31m' # red
G = '033円[32m' # green
O = '033円[33m' # orange
print "\n+==========================================================+"
print O+'Reading file...'+W,
sys.stdout.flush()
msgReader = hiddenMessageReader(fpath)
print G+'\t[DONE]'+W
print O+"Reading messages...\t"+W,
sys.stdout.flush()
msgReader.ReadMsg()
print G+"[DONE]"+W
if msgReader.msg != "":
 print G+"\nMessage successfuly read!\n"+W+"\""+msgReader.msg+"\""
else:
 print R+"\nNo messages hidden in this file!"+W
print "+==========================================================+\n"

I also attach file reader code for wav files, although it works fine (for me):

import numpy as np
import struct
class WAVFile:
 def __init__(self, filename):
 with open(filename,"rb") as f:
 try:
 self.ChunkID = f.read(4)
 self.ChunkSize = struct.unpack_from("<I",f.read(4))[0]
 self.Format = f.read(4)
 self.Subchunk1ID = f.read(4)
 self.Subchunk1Size = struct.unpack_from("<I",f.read(4))[0]
 self.AudioFormat = struct.unpack_from("<H",f.read(2))[0]
 self.NumChannels = struct.unpack_from("<H",f.read(2))[0]
 self.SampleRate = struct.unpack_from("<I",f.read(4))[0]
 self.ByteRate = struct.unpack_from("<I",f.read(4))[0]
 self.BlockAlign = struct.unpack_from("<H",f.read(2))[0]
 self.BitsPerSample = struct.unpack_from("<H",f.read(2))[0]
 if self.BitsPerSample != 16:
 print "BitsPerSample value not supported!"
 self = None
 return None
 self.Subchunk2ID = f.read(4)
 if self.Subchunk2ID != "data" :
 print "Format not supported!"
 self = None
 return None
 self.Subchunk2Size = struct.unpack_from("<I",f.read(4))[0]
 self.Data = np.fromfile(f, dtype=np.dtype(np.uint16), count=-1)
 finally:
 f.close()
 def GetData(self):
 return (self.Data, self.SampleRate)
 def SaveTo(self, filename):
 with open(filename,"wb") as f:
 try:
 f.write(self.ChunkID)
 f.write(struct.pack("<I",self.ChunkSize))
 f.write(self.Format)
 f.write(self.Subchunk1ID)
 f.write(struct.pack("<I",self.Subchunk1Size))
 f.write(struct.pack("<H",self.AudioFormat))
 f.write(struct.pack("<H",self.NumChannels))
 f.write(struct.pack("<I",self.SampleRate))
 f.write(struct.pack("<I",self.ByteRate))
 f.write(struct.pack("<H",self.BlockAlign))
 f.write(struct.pack("<H",self.BitsPerSample))
 f.write(self.Subchunk2ID)
 f.write(struct.pack("<I",self.Subchunk2Size))
 self.Data.tofile(f)
 finally:
 f.close()

I know that it may not be too pretty, but this is my first project in python and i'm just trying some concepts and stuff. I would really appreciate for any tips to how to make it better.

200_success
145k22 gold badges190 silver badges478 bronze badges
asked Dec 25, 2017 at 15:16
\$\endgroup\$
1
  • 2
    \$\begingroup\$ Have you tried to profile ( docs.python.org/2/library/profile.html ) the code? Where most of the time is being spent? \$\endgroup\$ Commented Dec 25, 2017 at 18:21

1 Answer 1

4
\$\begingroup\$

If it is your first project, why choose an outdated version of Python to start with. Python 3 release was about 10 years ago and yet you chose to use Python 2. Anyway, there is not much you need to make your code run in Python 3, and not much more to make it Python 2 compatible as well. The differences that impact your code are:

  • print is no longer a keyword but a function;
  • xrange is now range (and the old range disapeared);
  • raw_input is now input (and the old input disapeared).

Anyway, kuddos for using the last two for your first project.

If you want to make your code compatible with both Python 2 and Python 3, you will need to add

from __future__ import print_function
try:
 # Rename Python 2 function in their Python 3 counterparts
 range = xrange
 input = raw_input
except NameError:
 # We are already using Python 3
 pass

at the top of your files.


Now, to start tidying things, you could put the constants you wrote in both files (colors and markers) into a common file (say constants.py) that you could use like a namespace (import constant as C and use C.WHITE or C.START_MARKER).

In the end, it doesn't matter much as I ended putting everything in a single file (so back to the global constants) but if you want to keep it as 3 separate files, make them 4 instead.

An other thing is to avoid naming these constant with single letter names. Why would you need

W = '033円[0m' # white (normal)
R = '033円[31m' # red
G = '033円[32m' # green
O = '033円[33m' # orange

where

WHITE = '033円[0m'
RED = '033円[31m'
GREEN = '033円[32m'
ORANGE = '033円[33m'

would convey the same information both here and everywhere else.

And, in general, you’ve got a lot of PEP8 violations. Especially on variable names.


As regard to the code itself. The first thing that you should avoid, is this strange try: ... finally: within your with open(): blocks. Stop. This is already the role of the with statement to handle your resourses appropriatelly, you don't need to add this layer on your own.

Speaking of with, you use the construct:

  • print an orange message of a pending operation;
  • perform the operation;
  • print a green DONE message.

a few time, you could convert that to a context manager easily to avoid repeating yourself:

@contextmanager
def pending_message(message):
 print(ORANGE, message, WHITE, sep='', end='', flush=True)
 yield
 print(GREEN, '\t[DONE]', WHITE, sep='')

Usage being

with pending_message('Reading file...'):
 audiofile = WAVFile(filepath)
with pending_message('Reading messages...'):
 message = audiofile.read_message(START_MARKER, STOP_MARKER)

An other thing to note is how you use and helper function and either a class with attributes or some global variables to perform similar operations the 8 bits of a byte and possibly some tidying at the end... This is way too convoluted and error prone. You could either use an inner for loop to iterate over the 8 bits or use the modulo (%) operator to check if you reached the 8th bit.

In general, your helper functions add complexity and doesn't help the reader (us) and thus the maintainer (you).

I would also advise to create methods in the WAVFile class to manipulate self.data rather than directly manipulating audiofile.data from outside. This feels neater.


Lastly, since you are using numpy you should really try to use its full potential of vectorized operations rather than looping in Python. As a rule of thumb, you should avoid for loops as hard as you can when you import numpy.

Reshaping your data, or using sliding window could be usefull in this case. I didn't get into the process of rewriting everything using numpy potential, but entropy_fast was an easy target for this task.


Proposed improvements:

from __future__ import print_function
import time
import struct
from contextlib import contextmanager
import numpy as np
try:
 # Rename Python 2 function in their Python 3 counterparts
 range = xrange
 input = raw_input
except NameError:
 # We are already using Python 3
 pass
WHITE = '033円[0m'
RED = '033円[31m'
GREEN = '033円[32m'
ORANGE = '033円[33m'
START_MARKER = 'STARTINGTRANSMISSION' * 4
STOP_MARKER = 'ENDINGTRANSMISSION' * 4
class WAVError(Exception):
 pass
class WAVFile(object):
 def __init__(self, filename):
 with open(filename,"rb") as f:
 self.chunk_id = f.read(4)
 self.chunk_size, = struct.unpack("<I", f.read(4))
 self.format = f.read(4)
 self.subchunk_1_id = f.read(4)
 (
 self.subchunk_1_size,
 self.audio_format,
 self.num_channels,
 self.sample_rate,
 self.byte_rate,
 self.block_align,
 self.bits_per_sample,
 ) = struct.unpack("<IHHIIHH", f.read(20))
 if self.bits_per_sample != 16:
 raise WAVError("BitsPerSample value not supported!")
 self.subchunk_2_id = f.read(4)
 if self.subchunk_2_id != b"data" :
 raise WAVError("Format not supported!")
 self.subchunk_2_size, = struct.unpack("<I", f.read(4))
 self.data = np.fromfile(f, dtype=np.dtype(np.uint16), count=-1)
 def save_to(self, filename):
 with open(filename,"wb") as f:
 f.write(self.chunk_id)
 f.write(struct.pack("<I", self.chunk_size))
 f.write(self.format)
 f.write(self.subchunk_1_id)
 f.write(struct.pack("<IHHIIHH",
 self.subchunk_1_size,
 self.audio_format,
 self.num_channels,
 self.sample_rate,
 self.byte_rate,
 self.block_align,
 self.bits_per_sample))
 f.write(self.subchunk_2_id)
 f.write(struct.pack("<I", self.subchunk_2_size))
 self.data.tofile(f)
 def select_entropy(self, required_bits):
 _, start_bit = max(
 (entropy_fast(self.data[sample:sample + required_bits]), sample)
 for sample in range(len(self.data) - required_bits)
 )
 return start_bit
 def hide_message(self, message, start):
 self.data.flags.writeable = True
 sample = start
 for character in message:
 character = ord(character)
 for bitcount in range(7, -1, -1):
 self.data[sample] = ((self.data[sample] >> 1) << 1) | (character >> bitcount) & 0x01
 sample += 1
 def read_message(self, start_marker, stop_marker):
 decoded_message = []
 marker = start_marker
 decoding_message = False
 marker_scanner = ""
 char_obtained = 0x00
 for bit, audio_sample in enumerate(self.data, 1):
 char_obtained |= audio_sample & 0x01
 if bit % 8:
 char_obtained <<= 1
 else:
 marker_scanner += chr(char_obtained)
 if marker.startswith(marker_scanner):
 if len(marker_scanner) == len(marker):
 if decoding_message:
 return ''.join(decoded_message)
 else:
 marker = stop_marker
 else:
 if decoding_message:
 decoded_message.append(marker_scanner)
 marker_scanner = ""
 char_obtained = 0x00
def entropy_fast(s):
 _, counts = np.unique(s, return_counts=True)
 normed_counts = counts / float(len(s))
 return -np.sum(normed_counts * np.log2(normed_counts))
@contextmanager
def pending_message(message):
 print(ORANGE, message, WHITE, sep='', end='', flush=True)
 yield
 print(GREEN, '\t[DONE]', WHITE, sep='')
def encode_message(message, filepath="dubfx_nl.wav", new_name="testresult.wav"):
 message_to_hide = START_MARKER + message + STOP_MARKER
 required_bits = len(message_to_hide) * 8
 print("\n+==========================================================+")
 with pending_message('Reading file...'):
 audiofile = WAVFile(filepath)
 if required_bits > audiofile.subchunk_2_size:
 raise ValueError('Message is too long. {} bits required but only {} available.'.format(required_bits, audiofile.subchunk_2_size))
 print("\nUse entropy detection coding? y/n:")
 print(RED, "[WARNING]", WHITE, " This will significantly extend the execution time!", sep='')
 if input("Answer: ").lower() not in ('n', 'no'):
 entropy_time = time.clock()
 start_bit = audiofile.select_entropy(required_bits)
 entropy_time = time.clock() - entropy_time
 print("Best place for message found at: {} sample!".format(start_bit))
 print(GREEN, "Entropy calculation took: ", entropy_time, " seconds!", WHITE, sep='')
 else:
 start_bit = 0
 with pending_message('\nHiding your message...'):
 hide_time = time.clock()
 audiofile.hide_message(message_to_hide, start_bit)
 hide_time = time.clock() - hide_time
 with pending_message('Saving result...'):
 audiofile.save_to(new_name)
 print(GREEN)
 print("Message hidden successfuly!")
 print("Hiding took", hide_time, "seconds!", WHITE)
 print("+==========================================================+")
def decode_message(filepath="testresult.wav"):
 print("\n+==========================================================+")
 with pending_message('Reading file...'):
 audiofile = WAVFile(filepath)
 with pending_message('Reading messages...'):
 message = audiofile.read_message(START_MARKER, STOP_MARKER)
 if message:
 print(GREEN)
 print('Message successfuly read!', WHITE)
 print(message)
 else:
 print(RED, '\nNo message hidden in this file!', WHITE)
 print("+==========================================================+\n")
answered Dec 26, 2017 at 10:23
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.