i'm currently making an application using python to hide a message in an audio file (WAV only for now). The hiding itself is done pretty quickly and without any problems, but i want to calculate the best place for it by calculating entropy. The purpose is that the higher entropy of the fragment, the less chance of detecting it by bare eye. Problem is that it takes really long time (70mins approx for normal wav). It currently calculates entropy every 8 bytes to make it faster (this was just because of fitting it to the first version of reading script, want to make it for every byte at the end, but for now the execution time is limiting me). I use LSB replacement to do this with start and end markers being strings.
Here is the hiding code:
from wavreader import WAVFile as reader
from collections import Counter
from math import log
import time as t
import sys
fpath = "dubfx_nl.wav"
newname = "testresult.wav"
hmsg = "some sample test string to hide"
start_marker = "STARTINGTRANSMISSION"*4
end_marker = "ENDINGTRANSMISSION"*4
start_bit = 0
def hideBit(sampleNo, character, counter, higher_limit, flag=False):
global bitcount
global audiofile
audiofile.Data[sampleNo] = ((audiofile.Data[sampleNo] >> 1) << 1) | (character >> bitcount) & 0x01 # Podmiana LSB na bit znaku wiadomosci
if bitcount > 0 :
bitcount = bitcount - 1
elif counter < higher_limit:
bitcount = 7
counter = counter + 1
else:
counter = 0
bitcount = 7
flag = True
return counter, flag
def entropyfast(s):
return -sum(i/float(len(s)) * log(i/float(len(s)),2) for i in Counter(s).values())
W = '033円[0m' # white (normal)
R = '033円[31m' # red
G = '033円[32m' # green
O = '033円[33m' # orange
print "\n+==========================================================+"
print O+"Reading file..."+W,
sys.stdout.flush()
audiofile = reader(fpath)
audiofile.Data.flags.writeable = True
#data = np.reshape(audiofile.Data, len(audiofile.Data)*audiofile.NumChannels)
print G+"\t[DONE]"+W
print O+"Calculating data..."+W,
sys.stdout.flush()
avail_bits = audiofile.Subchunk2Size
required_bits = (len(hmsg) + len(start_marker) + len(end_marker))*8
print G+"\t[DONE]"+W
if( required_bits > avail_bits/8):
print R+"\nERROR! Message is too long!"+W
print "Available BYTES:\t", avail_bits/8
print "Required BYTES: \t", required_bits
else:
print "\nUse entropy detection coding? y/n:\n"+R+"[WARNING]"+W+" This will significantly extend the execution time!"
ent_answer = raw_input("Answer: ")
ent_time = None
if ent_answer == ("y" or "Y"):
ent_time = t.clock()
perc = 0
ent = 0
data_len = len(audiofile.Data)
for sampleStart in xrange(0,data_len-required_bits):
e = entropyfast(audiofile.Data[sampleStart:sampleStart+required_bits])
p = 100*sampleStart/float(data_len)
print "\r",
if (p - perc >= 0.1) or perc == 0 or perc == 100:
perc = p
print O+"Calculating best place for message: "+G+format(perc,".1f")+O+"%"+W,
if e > ent:
ent = e
start_bit = sampleStart
print "\r",
print O+"Calculating best place for message: "+G+100.0+O+"%"+W,
print "\nBest place found at: "+str(start_bit)+" sample!"
elif not ent_answer == ("n" or "N"):
print R+"\nUnknown answer!"+W
exit()
# Flags:
sm_done = False #Starting Markers Done
msg_done = False #Message Done
fn_done = False
bitcount = 7
charcount = 0
markercount = 0
print O+"\nHiding your message..."+W+"\t",
sys.stdout.flush()
before = t.clock()
for sampleNo in xrange(start_bit, start_bit+required_bits):
if not sm_done :
markercount, sm_done = hideBit(sampleNo, ord(start_marker[markercount]), markercount, len(start_marker)-1)
elif not msg_done:
charcount, msg_done = hideBit(sampleNo, ord(hmsg[charcount]), charcount, len(hmsg)-1)
elif not fn_done:
markercount, fn_done = hideBit(sampleNo, ord(end_marker[markercount]), markercount, len(end_marker)-1)
print G+"[DONE]"+W
after = t.clock()
print O+"Saving result..."+W+"\t",
audiofile.SaveTo(newname)
print G+"[DONE]"+W
print G+"\nMessage hidden successfuly!"+W
if not ent_time == None:
print G+"Entropy calculation took: "+str(before-ent_time)+" seconds!"+W
print G+"Hiding took "+ str(after-before) +" seconds!"+W
print "+==========================================================+\n"
And here is the reader code:
from wavreader import WAVFile as reader
import numpy as np
import sys
fpath = "testresult.wav"
class hiddenMessageReader:
def __init__(self, fname):
self.sm_done = False #Starting Markers Done
self.msg_done = False #Message Done
self.bitcount = 0
self.markercount = 0
self.msg = ""
self.char_obt = 0x00
self.start_marker = "STARTINGTRANSMISSION"*4
self.end_marker = "ENDINGTRANSMISSION"*4
self.marker_tmp = ""
self.audio = reader(fname)
def __ReadBit(self, sample, marker, flag, msg_bool=False):
self.char_obt = self.char_obt | (self.audio.Data[sample] & 0x01)
if self.bitcount < 7 :
self.bitcount = self.bitcount + 1
self.char_obt = self.char_obt << 1
elif chr(self.char_obt) == marker[self.markercount]:
self.marker_tmp = self.marker_tmp + chr(self.char_obt)
self.char_obt = 0x00
self.bitcount = 0
self.markercount = self.markercount + 1
if self.marker_tmp == marker:
self.markercount = 0
self.marker_tmp = ""
flag = True
else:
if msg_bool:
self.msg = self.msg + chr(self.char_obt)
self.marker_tmp = ""
self.markercount = 0
self.char_obt = 0x00
self.bitcount = 0
return flag
def ReadMsg(self):
if self.audio != None:
data_len = len(self.audio.Data)
perc = 0
for bit in xrange(0, data_len):
if not self.sm_done:
self.sm_done = self.__ReadBit(bit, self.start_marker, self.sm_done)
elif not self.msg_done:
self.msg_done = self.__ReadBit(bit, self.end_marker, self.msg_done, True)
else:
return None
p = 100*bit/float(data_len)
if p-perc >= 0.1 or perc == 100:
perc = p
print "\r",
print O+"Scanned through: "+G+format(perc,".1f")+O+"% of file..."+W,
return None
W = '033円[0m' # white (normal)
R = '033円[31m' # red
G = '033円[32m' # green
O = '033円[33m' # orange
print "\n+==========================================================+"
print O+'Reading file...'+W,
sys.stdout.flush()
msgReader = hiddenMessageReader(fpath)
print G+'\t[DONE]'+W
print O+"Reading messages...\t"+W,
sys.stdout.flush()
msgReader.ReadMsg()
print G+"[DONE]"+W
if msgReader.msg != "":
print G+"\nMessage successfuly read!\n"+W+"\""+msgReader.msg+"\""
else:
print R+"\nNo messages hidden in this file!"+W
print "+==========================================================+\n"
I also attach file reader code for wav files, although it works fine (for me):
import numpy as np
import struct
class WAVFile:
def __init__(self, filename):
with open(filename,"rb") as f:
try:
self.ChunkID = f.read(4)
self.ChunkSize = struct.unpack_from("<I",f.read(4))[0]
self.Format = f.read(4)
self.Subchunk1ID = f.read(4)
self.Subchunk1Size = struct.unpack_from("<I",f.read(4))[0]
self.AudioFormat = struct.unpack_from("<H",f.read(2))[0]
self.NumChannels = struct.unpack_from("<H",f.read(2))[0]
self.SampleRate = struct.unpack_from("<I",f.read(4))[0]
self.ByteRate = struct.unpack_from("<I",f.read(4))[0]
self.BlockAlign = struct.unpack_from("<H",f.read(2))[0]
self.BitsPerSample = struct.unpack_from("<H",f.read(2))[0]
if self.BitsPerSample != 16:
print "BitsPerSample value not supported!"
self = None
return None
self.Subchunk2ID = f.read(4)
if self.Subchunk2ID != "data" :
print "Format not supported!"
self = None
return None
self.Subchunk2Size = struct.unpack_from("<I",f.read(4))[0]
self.Data = np.fromfile(f, dtype=np.dtype(np.uint16), count=-1)
finally:
f.close()
def GetData(self):
return (self.Data, self.SampleRate)
def SaveTo(self, filename):
with open(filename,"wb") as f:
try:
f.write(self.ChunkID)
f.write(struct.pack("<I",self.ChunkSize))
f.write(self.Format)
f.write(self.Subchunk1ID)
f.write(struct.pack("<I",self.Subchunk1Size))
f.write(struct.pack("<H",self.AudioFormat))
f.write(struct.pack("<H",self.NumChannels))
f.write(struct.pack("<I",self.SampleRate))
f.write(struct.pack("<I",self.ByteRate))
f.write(struct.pack("<H",self.BlockAlign))
f.write(struct.pack("<H",self.BitsPerSample))
f.write(self.Subchunk2ID)
f.write(struct.pack("<I",self.Subchunk2Size))
self.Data.tofile(f)
finally:
f.close()
I know that it may not be too pretty, but this is my first project in python and i'm just trying some concepts and stuff. I would really appreciate for any tips to how to make it better.
-
2\$\begingroup\$ Have you tried to profile ( docs.python.org/2/library/profile.html ) the code? Where most of the time is being spent? \$\endgroup\$Roman Susi– Roman Susi2017年12月25日 18:21:18 +00:00Commented Dec 25, 2017 at 18:21
1 Answer 1
If it is your first project, why choose an outdated version of Python to start with. Python 3 release was about 10 years ago and yet you chose to use Python 2. Anyway, there is not much you need to make your code run in Python 3, and not much more to make it Python 2 compatible as well. The differences that impact your code are:
print
is no longer a keyword but a function;xrange
is nowrange
(and the oldrange
disapeared);raw_input
is nowinput
(and the oldinput
disapeared).
Anyway, kuddos for using the last two for your first project.
If you want to make your code compatible with both Python 2 and Python 3, you will need to add
from __future__ import print_function
try:
# Rename Python 2 function in their Python 3 counterparts
range = xrange
input = raw_input
except NameError:
# We are already using Python 3
pass
at the top of your files.
Now, to start tidying things, you could put the constants you wrote in both files (colors and markers) into a common file (say constants.py
) that you could use like a namespace (import constant as C
and use C.WHITE
or C.START_MARKER
).
In the end, it doesn't matter much as I ended putting everything in a single file (so back to the global constants) but if you want to keep it as 3 separate files, make them 4 instead.
An other thing is to avoid naming these constant with single letter names. Why would you need
W = '033円[0m' # white (normal)
R = '033円[31m' # red
G = '033円[32m' # green
O = '033円[33m' # orange
where
WHITE = '033円[0m'
RED = '033円[31m'
GREEN = '033円[32m'
ORANGE = '033円[33m'
would convey the same information both here and everywhere else.
And, in general, you’ve got a lot of PEP8 violations. Especially on variable names.
As regard to the code itself. The first thing that you should avoid, is this strange try: ... finally:
within your with open():
blocks. Stop. This is already the role of the with
statement to handle your resourses appropriatelly, you don't need to add this layer on your own.
Speaking of with
, you use the construct:
- print an orange message of a pending operation;
- perform the operation;
- print a green DONE message.
a few time, you could convert that to a context manager easily to avoid repeating yourself:
@contextmanager
def pending_message(message):
print(ORANGE, message, WHITE, sep='', end='', flush=True)
yield
print(GREEN, '\t[DONE]', WHITE, sep='')
Usage being
with pending_message('Reading file...'):
audiofile = WAVFile(filepath)
with pending_message('Reading messages...'):
message = audiofile.read_message(START_MARKER, STOP_MARKER)
An other thing to note is how you use and helper function and either a class with attributes or some global variables to perform similar operations the 8 bits of a byte and possibly some tidying at the end... This is way too convoluted and error prone. You could either use an inner for
loop to iterate over the 8 bits or use the modulo (%
) operator to check if you reached the 8th bit.
In general, your helper functions add complexity and doesn't help the reader (us) and thus the maintainer (you).
I would also advise to create methods in the WAVFile
class to manipulate self.data
rather than directly manipulating audiofile.data
from outside. This feels neater.
Lastly, since you are using numpy
you should really try to use its full potential of vectorized operations rather than looping in Python. As a rule of thumb, you should avoid for
loops as hard as you can when you import numpy
.
Reshaping your data, or using sliding window could be usefull in this case. I didn't get into the process of rewriting everything using numpy potential, but entropy_fast
was an easy target for this task.
Proposed improvements:
from __future__ import print_function
import time
import struct
from contextlib import contextmanager
import numpy as np
try:
# Rename Python 2 function in their Python 3 counterparts
range = xrange
input = raw_input
except NameError:
# We are already using Python 3
pass
WHITE = '033円[0m'
RED = '033円[31m'
GREEN = '033円[32m'
ORANGE = '033円[33m'
START_MARKER = 'STARTINGTRANSMISSION' * 4
STOP_MARKER = 'ENDINGTRANSMISSION' * 4
class WAVError(Exception):
pass
class WAVFile(object):
def __init__(self, filename):
with open(filename,"rb") as f:
self.chunk_id = f.read(4)
self.chunk_size, = struct.unpack("<I", f.read(4))
self.format = f.read(4)
self.subchunk_1_id = f.read(4)
(
self.subchunk_1_size,
self.audio_format,
self.num_channels,
self.sample_rate,
self.byte_rate,
self.block_align,
self.bits_per_sample,
) = struct.unpack("<IHHIIHH", f.read(20))
if self.bits_per_sample != 16:
raise WAVError("BitsPerSample value not supported!")
self.subchunk_2_id = f.read(4)
if self.subchunk_2_id != b"data" :
raise WAVError("Format not supported!")
self.subchunk_2_size, = struct.unpack("<I", f.read(4))
self.data = np.fromfile(f, dtype=np.dtype(np.uint16), count=-1)
def save_to(self, filename):
with open(filename,"wb") as f:
f.write(self.chunk_id)
f.write(struct.pack("<I", self.chunk_size))
f.write(self.format)
f.write(self.subchunk_1_id)
f.write(struct.pack("<IHHIIHH",
self.subchunk_1_size,
self.audio_format,
self.num_channels,
self.sample_rate,
self.byte_rate,
self.block_align,
self.bits_per_sample))
f.write(self.subchunk_2_id)
f.write(struct.pack("<I", self.subchunk_2_size))
self.data.tofile(f)
def select_entropy(self, required_bits):
_, start_bit = max(
(entropy_fast(self.data[sample:sample + required_bits]), sample)
for sample in range(len(self.data) - required_bits)
)
return start_bit
def hide_message(self, message, start):
self.data.flags.writeable = True
sample = start
for character in message:
character = ord(character)
for bitcount in range(7, -1, -1):
self.data[sample] = ((self.data[sample] >> 1) << 1) | (character >> bitcount) & 0x01
sample += 1
def read_message(self, start_marker, stop_marker):
decoded_message = []
marker = start_marker
decoding_message = False
marker_scanner = ""
char_obtained = 0x00
for bit, audio_sample in enumerate(self.data, 1):
char_obtained |= audio_sample & 0x01
if bit % 8:
char_obtained <<= 1
else:
marker_scanner += chr(char_obtained)
if marker.startswith(marker_scanner):
if len(marker_scanner) == len(marker):
if decoding_message:
return ''.join(decoded_message)
else:
marker = stop_marker
else:
if decoding_message:
decoded_message.append(marker_scanner)
marker_scanner = ""
char_obtained = 0x00
def entropy_fast(s):
_, counts = np.unique(s, return_counts=True)
normed_counts = counts / float(len(s))
return -np.sum(normed_counts * np.log2(normed_counts))
@contextmanager
def pending_message(message):
print(ORANGE, message, WHITE, sep='', end='', flush=True)
yield
print(GREEN, '\t[DONE]', WHITE, sep='')
def encode_message(message, filepath="dubfx_nl.wav", new_name="testresult.wav"):
message_to_hide = START_MARKER + message + STOP_MARKER
required_bits = len(message_to_hide) * 8
print("\n+==========================================================+")
with pending_message('Reading file...'):
audiofile = WAVFile(filepath)
if required_bits > audiofile.subchunk_2_size:
raise ValueError('Message is too long. {} bits required but only {} available.'.format(required_bits, audiofile.subchunk_2_size))
print("\nUse entropy detection coding? y/n:")
print(RED, "[WARNING]", WHITE, " This will significantly extend the execution time!", sep='')
if input("Answer: ").lower() not in ('n', 'no'):
entropy_time = time.clock()
start_bit = audiofile.select_entropy(required_bits)
entropy_time = time.clock() - entropy_time
print("Best place for message found at: {} sample!".format(start_bit))
print(GREEN, "Entropy calculation took: ", entropy_time, " seconds!", WHITE, sep='')
else:
start_bit = 0
with pending_message('\nHiding your message...'):
hide_time = time.clock()
audiofile.hide_message(message_to_hide, start_bit)
hide_time = time.clock() - hide_time
with pending_message('Saving result...'):
audiofile.save_to(new_name)
print(GREEN)
print("Message hidden successfuly!")
print("Hiding took", hide_time, "seconds!", WHITE)
print("+==========================================================+")
def decode_message(filepath="testresult.wav"):
print("\n+==========================================================+")
with pending_message('Reading file...'):
audiofile = WAVFile(filepath)
with pending_message('Reading messages...'):
message = audiofile.read_message(START_MARKER, STOP_MARKER)
if message:
print(GREEN)
print('Message successfuly read!', WHITE)
print(message)
else:
print(RED, '\nNo message hidden in this file!', WHITE)
print("+==========================================================+\n")
Explore related questions
See similar questions with these tags.