This script records audio from the computer's output and passes it to a C program that fingerprints it and queries the Gracenote database. If it identifies the audio then it offers options to look it up on Discogs. I made this because I wanted a way to identify tracks in DJ mixes that didn't involve using Shazam on my phone.
Running the script requires setting up audio devices using Soundflower and installing SwitchAudioSource to programmatically switch device (details here).
I am particularly interested in comments on the design. I think the most significant problem is that the main()
function is overly dense. I decided not to use a more object-oriented style because I wasn't sure of the benefit over using a few functions. If I were to do more with either Discogs or Gracenote I would probably use the Python API clients they both provide, so rather than recreate a partial implementation of them I just used several functions.
Would an OO-style have been preferable here?
The functions could easily be split into modules as they have quite distinct tasks. Does the length of this script justify breaking it into separate modules?
This is also the first time I have written something using .rc files and command-line flags (most of my experience is JS in the browser) so I am interested in views on whether I have used them sensibly.
#!/usr/bin/python
import wave
import time
import subprocess
import os.path
import json
import argparse
import webbrowser
import sys
import requests
import keyring
import pyaudio
from rauth import OAuth1Service
# ---------------- Config -----------------
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
RECORD_SECONDS = 6
SAVE_PATH = os.path.expanduser("~") + "/Music/recordings/"
WAVE_OUTPUT_FILENAME = "temp_{}.wav".format(int(time.time()))
COMPLETE_NAME = os.path.join(SAVE_PATH, WAVE_OUTPUT_FILENAME)
CONFIG_PATH = os.path.expanduser("~") + "/.identifyaudiorc"
def load_user_config(path):
user_config = {}
try:
with open(path, "r") as f:
for line in f:
split = line.split(" ")
user_config[str(split[0])] = str(split[1]).strip()
except:
raise IOError("Unable to read config file")
return user_config
config = load_user_config(CONFIG_PATH)
# ---------------- Setup ------------------
class GracenoteError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
p = pyaudio.PyAudio()
# ---------------- Discogs ----------------
discogs = OAuth1Service(
name="discogs",
consumer_key=config["DISCOGS_CONSUMER_KEY"],
consumer_secret=config["DISCOGS_CONSUMER_SECRET"],
request_token_url=config["DISCOGS_REQUEST_TOKEN_URL"],
access_token_url=config["DISCOGS_ACCESS_TOKEN_URL"],
authorize_url=config["DISCOGS_AUTHORIZE_URL"],
base_url=config["DISCOGS_BASE_URL"])
def discogs_get_master(artist_name, album_name):
# TODO: sometimes comes up with nothing when it should find something
payload = {
"key": config["DISCOGS_KEY"],
"secret": config["DISCOGS_SECRET"],
"artist": artist_name,
"release_title": album_name,
"type": "master"
}
r = requests.get("https://api.discogs.com/database/search", params=payload)
result = r.json()["results"]
if len(result):
return r.json()["results"][0]
else:
raise RuntimeError("No Discogs master found")
def discogs_get_release(master_id):
release = requests.get("https://api.discogs.com/masters/"+str(master_id)+"/versions",
params={"per_page": 1}
)
return release.json()["versions"][0]
def discogs_get_oauth_session():
access_token = keyring.get_password("system", "access_token")
access_token_secret = keyring.get_password("system", "access_token_secret")
if access_token and access_token_secret:
session = discogs.get_session((access_token, access_token_secret))
else:
request_token, request_token_secret = discogs.get_request_token()
authorize_url = discogs.get_authorize_url(request_token)
webbrowser.open(authorize_url, new=2, autoraise=True)
log("To enable Discogs access, visit this URL in your browser: "+authorize_url)
oauth_verifier = raw_input("Enter key: ")
session = discogs.get_auth_session(request_token, request_token_secret,
method="POST",
data={"oauth_verifier": oauth_verifier})
keyring.set_password("system", "access_token", session.access_token)
keyring.set_password("system", "access_token_secret", session.access_token_secret)
return session
def discogs_add_wantlist(session, username, release_id):
r = session.put("https://api.discogs.com/users/"+username+"/wants/"+str(release_id),
header_auth=True,
headers={
"content-type": "application/json",
"user-agent": "identify-audio-v0.2"})
return r.status_code
# ----------- Audio devices ---------------
def find_device(device_sought, device_list):
for device in device_list:
if device["name"] == device_sought:
return device["index"]
raise KeyError("Device {} not found.".format(device_sought))
def get_device_list():
num_devices = p.get_device_count()
device_list = [p.get_device_info_by_index(i) for i in range(0, num_devices)]
return device_list
def get_soundflower_index():
device_list = get_device_list()
soundflower_index = find_device("Soundflower (2ch)", device_list)
return soundflower_index
def get_current_output():
device_list = get_device_list()
try:
find_device("USB Audio Device", device_list)
return "USB Audio Device"
except:
return "Built-in Output"
def get_multi_device(output):
if output == "USB Audio Device":
return "Multi-Output Device (USB)"
else:
return "Multi-Output Device (Built-in)"
# ---------- Recording to file ------------
def record_audio(device_index, format, channels, rate, chunk, record_seconds):
stream = p.open(format=format,
channels=channels,
rate=rate,
input=True,
input_device_index=device_index,
frames_per_buffer=chunk)
log("Recording for {} seconds...".format(record_seconds))
frames = []
for i in range(0, int(rate / chunk * record_seconds)):
data = stream.read(chunk)
frames.append(data)
stream.stop_stream()
stream.close()
return frames
def write_file(frames, path, format, channels, rate):
wf = wave.open(path, "wb")
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(format))
wf.setframerate(rate)
wf.writeframes(b"".join(frames))
wf.close()
return path
# ----------- Gracenote -------------------
def query_gracenote(sound_path):
# TODO - handle double quotes in the output
out = subprocess.check_output([config["APP_PATH"], sound_path])
result = json.loads(out)
try:
error = result["error"]
except:
return result
raise GracenoteError(error)
def log(statement):
if not args["quiet"]:
print statement
# ----------- Main ------------------------
def main():
output = get_current_output()
multi_out = get_multi_device(output)
FNULL = open(os.devnull, "w")
if subprocess.call(["SwitchAudioSource", "-s", multi_out], stdout=FNULL, stderr=FNULL) == 0:
length = RECORD_SECONDS
match = False
attempts = 0
while not match and attempts <= 2:
input_audio = record_audio(get_soundflower_index(), FORMAT, CHANNELS, RATE, CHUNK, length)
try:
write_file(input_audio, COMPLETE_NAME, FORMAT, CHANNELS, RATE)
except IOError:
log("Error writing the sound file.")
resp = query_gracenote(COMPLETE_NAME)
if resp["result"] is None:
log("The track was not identified.")
length += 3
attempts += 1
if attempts <= 2:
log("Retrying...")
else:
match = True
if match:
print json.dumps(resp["result"], indent=4, separators=("", " - "), ensure_ascii=False).encode("utf8")
if args["discogs"] or args["want"]:
try:
master = discogs_get_master(resp["result"]["artist"], resp["result"]["album"])
except RuntimeError as e:
log(e)
else:
url = "https://discogs.com" + master["uri"]
log("Find online: " + url)
if args["open"]:
webbrowser.open(url, new=2, autoraise=True)
want_add = None
if not args["want"] and not args["open"]:
want_add = raw_input("Add this to your Discogs wantlist? y/n/o (to open in browser): ")
if want_add == "o" or args["open"]:
webbrowser.open(url, new=2, autoraise=True)
want_add = raw_input("Add this to your Discogs wantlist? y/n: ")
if want_add == "y" or args["want"]:
release = discogs_get_release(master["id"])
session = discogs_get_oauth_session()
status = discogs_add_wantlist(session, config["DISCOGS_USERNAME"], release["id"])
if status == 201:
log("Added '{}' to your Discogs wantlist".format(release["title"]))
else:
log("Error code {} adding the release to your Discogs wantlist".format(status))
else:
raise RuntimeError("Couldn't switch to multi-output device.")
p.terminate()
os.remove(COMPLETE_NAME)
if subprocess.call(["SwitchAudioSource", "-s", output], stdout=FNULL, stderr=FNULL) == 0:
return
else:
raise RuntimeError("Couldn't switch back to output.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Identify currently playing audio")
parser.add_argument("--discogs", "-d", action="store_true")
parser.add_argument("--want", "-w", action="store_true")
parser.add_argument("--open", "-o", action="store_true")
parser.add_argument("--quiet", "-q", action="store_true")
parser.add_argument("--verbose", "-v", action="store_true")
args = vars(parser.parse_args())
if args["verbose"]:
main()
else:
try:
main()
except GracenoteError as e:
print "Gracenote error: "+e
sys.exit(1)
except Exception as e:
print e
sys.exit(1)
1 Answer 1
To OO or to not to OO
Your program is fairly small and well organized and easy to read. I don't see any reason at this point to make it "more OO". If in the future you write more programs like this you'll want to organize the code into libraries to re-use common code. But at this point what you have is perfectly fine.
Now on to some stylistic comments...
Global variables
You have two kinds of global variables:
- constants like
RECORD_LENGTH
,CHUNK
- values which invoke code and could vary from
run to run, e.g.
WAVE_OUTPUT_FILENAME
,CONFIG_PATH
- values which are initialized to objects, e.g.
p
anddiscogs
First of all as globals the p
and discogs
variables
should have ALL CAPS names (and p
should be given a
more descriptive name -- it is even used?) This will
make it obvious to the reader that they are globals.
Secondly, I would move the initialization of
the path name globals to main
:
def main():
user_home = os.path.expanduser("~")
config_path = os.path.join(user_home, ".identifyaudiorc")
save_path = os.path.join(user_home, "/Music/recordings/")
complete_name = os.path.join(save_path, "temp_{}.wav".format(int(time.time())))
...
In fact, none of these values need to be global since
main
passes them to whatever function needs them - and that's
a good aspect of your code!
As an enhancement, consider adding command line options to set these paths - I think being able to set the location of the recordings directory would be convenient.
load_user_config()
What you have is fine, but be aware that there are tons of python libraries to load (and save) configuration files. Just google, e.g., "python load config file".
long if-blocks
main
has a very long spanning if
-block, and to make the
logic more apparent I would organize it like this:
def main():
...
if not subprocess.call(["SwitchAudioSource", ...):
raise RuntimeError("Couldn't switch to multi-output device.")
...
Now the reader can easily see what happens when the call to SwitchAudioSource fails
without having to search for the matching else
clause.
Another long if
-block is the if match:
statement. It would be more
readable if it was written:
if not match:
return
print json.dumps(resp["re...
In fact, now it is obvious that you aren't doing anything if there isn't a match. Perhaps you want something like:
if not match:
raise RuntimeError("Unable to read audio.")
...