I have a python script that loads a database table and checks an API for the entries found.
I do it like this:
import threading
import constants
import pymysql
import requests
from datetime import datetime
from dateutil.parser import parse
import os
import urllib.request
import shutil
import json
class CheckStreams(threading.Thread):
def __init__(self, debug):
threading.Thread.__init__(self)
self.debug = debug
self.newGames = False
self.online = 0
def get_new_games(self):
return self.newGames
def set_new_games(self):
self.newGames = True
def get_online(self):
return self.online
def set_online(self):
self.online += 1
def run(self):
# Connect to the database
connect = pymysql.connect(host=constants.HOST,
user=constants.USERNAME,
password=constants.PASSWORD,
db=constants.DATABASE,
charset='utf8mb4',
port=constants.PORT,
cursorclass=pymysql.cursors.DictCursor)
# Get Streamer Table #
try:
with connect.cursor() as cursor:
# Read all Streamers #
sql = "SELECT * FROM `ls_streamer` s LEFT JOIN `ls_platforms` p ON s.platform = p.id"
# Execute Query #
cursor.execute(sql)
# Result to streamers Var #
streamers = cursor.fetchall()
# Check if we have Streamers at all #
if cursor.rowcount > 0:
# Show total Streamers #
if self.debug: print('Total Streamers: %s' % constants.color('green', cursor.rowcount))
# Loop through all streamers #
for streamer in streamers:
# Output Current Streamer #
if self.debug: print(
'\n%s - %s' % (
streamer["channel_user"], constants.color(streamer["name"], streamer["name"])))
# Fetch from Platform #
if streamer["platform"] == 1:
# Twitch.tv #
if self.debug: print('Gathering %s' % constants.color('Twitch.tv', 'Twitch.tv'))
# gatherTwitch()
self.gatherTwitch(streamer)
#self.gatherTwitchVods(streamer)
else:
# None #
if self.debug: print(
'Wrong Platform ID: #%s' % constants.color('red', streamer["platform"]))
else:
if self.debug: print('Total Streamers: %s' % constants.color('red', '0'))
except Exception:
constants.PrintException()
finally:
connect.close()
def gatherTwitch(self, streamer):
# Connect to the database
connect = pymysql.connect(host=constants.HOST,
user=constants.USERNAME,
password=constants.PASSWORD,
db=constants.DATABASE,
charset='utf8mb4',
port=constants.PORT,
cursorclass=pymysql.cursors.DictCursor)
# Internal Vars #
isOnline = False
description = None
language = None
thumbnail = None
setOffline = False
banner = "None"
logo = "None"
partner = False
# Create Stream Url from Vars #
streamUrl = '%s%s' % (streamer["url"], streamer["channel_id"])
# Send Request w/ Twitch Headers #
r = requests.get(
streamUrl,
headers=
{
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": constants.TWITCH_CLIENT_ID
}
)
# Check Response Code #
if r.status_code == 200:
# JSON #
data = r.json()
# Check if Stream is valid #
if "stream" in data.keys():
# Get Info #
streamData = data["stream"]
if type(streamData) is dict:
# Check if Streaming LoL #
if streamData["game"].lower() == "league of legends":
# Streaming LoL - YAY #
if self.debug: print('%s' % constants.color('green', 'IS STREAMING'))
# Gather Vars #
lastSeen = datetime.now()
if streamer["is_online"] == True:
lastSeen = streamer["modified"]
totalStreamed = streamer["total_online"]
totalStreamed += constants.convertTimeDelta((datetime.now() - lastSeen))
viewers = streamData["viewers"]
resolution = streamData["video_height"]
fps = streamData["average_fps"]
delay = streamData["delay"]
started = parse(streamData["created_at"], None, ignoretz=True)
isOnline = True
# Channel Data #
channelData = streamData["channel"]
if type(channelData) is dict:
description = channelData["status"]
language = channelData["language"]
logo = channelData["logo"]
banner = channelData["profile_banner"]
if channelData["partner"] == "true":
partner = True
if channelData["partner"] is True:
partner = True
else:
# Output Debug #
if self.debug: print('Channel Data %s' % constants.color('red', 'NOT FOUND'))
# Peview Data #
previewData = streamData["preview"]
if type(previewData) is dict:
thumbnail = previewData["medium"]
else:
# Output Debug #
if self.debug: print('Preview Data %s' % constants.color('red', 'NOT FOUND'))
try:
with connect.cursor() as cursorStreamer:
# Got all Vars put to DB #
sqlUpdate = """
UPDATE
`ls_streamer`
SET
total_online = %s,
viewers = %s,
resolution = %s,
fps = %s,
delay = %s,
description = %s,
language = %s,
thumbnail = %s,
logo = %s,
banner = %s,
started = %s,
is_online = %s,
modified = %s,
is_partner = %s
WHERE
id = %s
"""
# Execute Query #
connect.escape(sqlUpdate)
cursorStreamer.execute(sqlUpdate, (
totalStreamed, viewers, resolution, fps, delay, description, language, thumbnail,
logo, banner, started, isOnline, datetime.now(), partner, streamer["id"]))
connect.commit()
self.set_online()
# Now crawl the Riot API for Live Games for all connected summoners #
self.getLiveGames(streamer)
except Exception:
constants.PrintException()
finally:
if self.debug: print('Update ls_streamer: %s' % constants.color('green', 'SUCCESS'))
else:
# Set Offline #
setOffline = True
# Stream online but not streaming LoL #
if self.debug: print(
'Stream is %s but %s streaming LoL' % (
constants.color('green', 'online'), constants.color('red', 'NOT')))
else:
# Set Offline #
setOffline = True
# Output Debug #
if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))
else:
# Set Offline #
setOffline = True
# Output Debug #
if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))
else:
# Set Offline #
setOffline = True
# Output Debug #
if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))
# Output Url #
if self.debug: print('Streamer Url: %s' % streamUrl)
if setOffline == True:
try:
with connect.cursor() as cursorStreamerOff:
# Got all Vars put to DB #
sqlOffline = """
UPDATE
`ls_streamer`
SET
is_online = %s
WHERE
id = %s
"""
# Execute Query #
connect.escape(sqlOffline)
cursorStreamerOff.execute(sqlOffline, (False, streamer["id"]))
connect.commit()
except Exception:
constants.PrintException()
# Create Stream Url from Vars #
streamUrlVod = '%s%s/videos?broadcast_type=archive&limit=100' % (
streamer["channelurl"], streamer["channel_id"])
# Send Request w/ Twitch Headers #
rVod = requests.get(
streamUrlVod,
headers=
{
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": constants.TWITCH_CLIENT_ID
}
)
# Check Response Code #
if rVod.status_code == 200:
# JSON #
data = rVod.json()
# Check if Stream is valid #
if "videos" in data.keys():
# Get Info #
videos = data["videos"]
for video in videos:
thumb = video["preview"]["medium"]
video_id = video["_id"]
length = video["length"]
created = video["created_at"]
if video["game"] == "League of Legends" and video["viewable"] == "public":
# Check if video exists #
try:
with connect.cursor() as cursorVod:
sqlInsert = """
INSERT INTO
`ls_vods`
(streamer_id, thumbnail, video_id, created, `length`, last_check)
VALUES
(%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
last_check = %s,
thumbnail = %s,
`length` = %s
"""
cursorVod.execute(sqlInsert,
(streamer["id"], thumb, video_id, created, length, datetime.now(), datetime.now(), thumb, length))
connect.commit()
except Exception:
constants.PrintException()
else:
if self.debug: print('Videos not found in URL: %s' % streamUrl)
else:
# Output Debug #
if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))
# Output Url #
if self.debug: print('Streamer Url: %s' % streamUrl)
def getLiveGames(self, streamer):
# Connect to the database
connect = pymysql.connect(host=constants.HOST,
user=constants.USERNAME,
password=constants.PASSWORD,
db=constants.DATABASE,
charset='utf8mb4',
port=constants.PORT,
cursorclass=pymysql.cursors.DictCursor)
# Select all Summoners from Streamer #
try:
with connect.cursor() as cursorSummoner:
sqlSummoner = """
SELECT * FROM
`ls_summoner` s
LEFT JOIN
`ls_regions` r
ON
r.id = s.region_id
WHERE
streamer_id = %s
"""
# Execute Query #
cursorSummoner.execute(sqlSummoner, (streamer["id"],))
summoners = cursorSummoner.fetchall()
if cursorSummoner.rowcount > 0:
for summoner in summoners:
# Output which Summoner #
if self.debug: print('Checking Live Game for Summoner %s-%s' % (
summoner["short"].upper(), summoner["name"]))
urlLive = "https://%s.api.riotgames.com/lol/spectator/v3/active-games/by-summoner/%s?api_key=%s" % (
summoner["long"].lower(), summoner["summoner_id"],
constants.RIOT_API_KEY)
r = requests.get(urlLive)
if self.debug: print("Checking %s" % urlLive)
# Check Response Code #
if r.status_code == 200:
# Output #
if self.debug: print('->%s Found Live Game for %s-%s' % (
constants.color('green', 'FOUND'), summoner["short"].upper(), summoner["name"]))
# JSON #
game = r.json()
# Gather Vars #
map = game["mapId"]
gameId = game["gameId"]
gameMode = game["gameMode"]
gameType = game["gameType"]
queueId = game["gameQueueConfigId"]
played = game["gameLength"]
spell1Id = None
spell2Id = None
teamId = None
champion = None
runes = None
masteries = None
pp = game["participants"]
if type(pp) is list:
for part in pp:
if part["summonerId"] == summoner["summoner_id"]:
teamId = part["teamId"]
spell1Id = part["spell1Id"]
spell2Id = part["spell2Id"]
champion = part["championId"]
runes = json.dumps(part["runes"])
masteries = json.dumps(part["masteries"])
if self.debug: print('Game %s. Game ID #%s' % (constants.color('green', 'found'), gameId))
# MySQL #
try:
with connect.cursor() as cursorUpdateCurrent:
# Check Champion in DB #
sqlChampion = "SELECT * FROM `ls_champions` WHERE id = %s"
cursorUpdateCurrent.execute(sqlChampion,
(champion,))
if cursorUpdateCurrent.rowcount == 0:
# Champion not in DB TODO: SEND MAIL #
if self.debug: print("Champion %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check Map in DB #
sqlMap = "SELECT * FROM `ls_maps` WHERE id = %s"
cursorUpdateCurrent.execute(sqlMap,
(map,))
if cursorUpdateCurrent.rowcount == 0:
# Map not in DB TODO: SEND MAIL #
if self.debug: print("Map %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check Queue in DB #
sqlQueue = "SELECT * FROM `ls_queues` WHERE id = %s"
cursorUpdateCurrent.execute(sqlQueue,
(queueId,))
if cursorUpdateCurrent.rowcount == 0:
# Queue not in DB TODO: SEND MAIL #
if self.debug: print("Queue %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check P1Spell1 in DB #
sqlP1Spell1 = "SELECT * FROM `ls_spells` WHERE id = %s"
cursorUpdateCurrent.execute(sqlP1Spell1,
(spell1Id,))
if cursorUpdateCurrent.rowcount == 0:
# P1Spell1 not in DB TODO: SEND MAIL #
if self.debug: print("P1Spell1 %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check P1Spell2 in DB #
sqlP1Spell2 = "SELECT * FROM `ls_spells` WHERE id = %s"
cursorUpdateCurrent.execute(
sqlP1Spell2,
(spell2Id,))
if cursorUpdateCurrent.rowcount == 0:
# P1Spell2 not in DB TODO: SEND MAIL #
if self.debug: print("P1Spell2 %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Select Current Match #
sqlSelectCurrent = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))
if cursorUpdateCurrent.rowcount > 0:
# Found a Current Match #
sqlUpdateCurrent = """
UPDATE
`ls_current_match`
SET
champion_id = %s,
map_id = %s,
summoner_id = %s,
queue_id = %s,
match_id = %s,
team = %s,
length = %s,
type = %s,
mode = %s,
modified = %s,
p1_spell1_id = %s,
p1_spell2_id = %s,
is_playing = %s,
runes = %s,
masteries = %s
WHERE
summoner_id = %s
"""
cursorUpdateCurrent.execute(
sqlUpdateCurrent, (
champion,
map,
summoner["id"],
queueId,
gameId,
teamId,
played,
gameType,
gameMode,
datetime.now(),
spell1Id,
spell2Id,
True,
runes,
masteries, summoner["id"]
))
connect.commit()
if self.debug: print(
'%s Current Match in Database\n' % constants.color('yellow',
'UPDATED'))
else:
# No current Match for Summoner Id found #
sqlNewCurrent = """
INSERT INTO
`ls_current_match`
(champion_id, map_id, summoner_id, queue_id, match_id, team, length, type, mode, modified, p1_spell1_id, p1_spell2_id, is_playing, runes, masteries)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursorUpdateCurrent.execute(sqlNewCurrent, (
champion,
map,
summoner["id"],
queueId,
gameId,
teamId,
played,
gameType,
gameMode,
datetime.now(),
spell1Id,
spell2Id,
True,
runes,
masteries
))
connect.commit()
if self.debug: print(
'%s Current Match in Database\n' % constants.color(
'green', 'INSERTED'))
except Exception:
constants.PrintException()
# TODO: ENTER TO MATCHES DB #
# RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #
# Exit if One Playing found #
break
else:
# Response Code not 200 #
if self.debug: print(
'->%s %s' % ('Response Code', constants.color('red', r.status_code)))
# Reset Games in Current Matches #
try:
with connect.cursor() as cursorUpdateCurrent:
# Select Current Match #
sqlSelectCurrent = "SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))
if cursorUpdateCurrent.rowcount == 0:
# Output not found in Current Match -> has never played #
if self.debug: print(
'Not found, Summoner %s' % constants.color('yellow', 'NEVER PLAYED'))
else:
# Set isPlaying = false #
sqlSelectCurrent = "UPDATE `ls_current_match` SET is_playing = %s WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrent, (False, summoner["id"]))
connect.commit()
if self.debug: print(
'%s Current Match in Database, set isPlaying = false\n' % constants.color(
'green', 'UPDATED'))
# Create new Matchc in MatchHistory #
sqlSelectCurrentRe = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrentRe, (summoner["id"]))
cG = cursorUpdateCurrent.fetchone()
# See if Match in MatchHistory ->should not be the case #
sqlSelectMatch = "SELECT * FROM `ls_matches` WHERE match_id = %s AND streamer = %s"
cursorUpdateCurrent.execute(sqlSelectMatch, (cG["match_id"], streamer["id"]))
if cursorUpdateCurrent.rowcount == 0:
sqlNewMatch = """
INSERT INTO
`ls_matches`
(streamer, champion, map, match_id, team, lane, role, length, type, win, modified, crawled, summoner, region)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursorUpdateCurrent.execute(
sqlNewMatch, (
streamer["id"],
cG["champion_id"],
cG["map_id"],
cG["match_id"],
cG["team"],
'NONE',
'NONE',
cG["length"],
cG["type"],
1,
datetime.now(),
False,
cG["summoner_id"],
summoner["region_id"]
))
connect.commit()
else:
# Output that Match already exists !?? #
if self.debug: print('Match-ID %s %s' % (
cG["match_id"], constants.color('yellow', 'already crawled')))
self.set_new_games()
except Exception:
constants.PrintException()
else:
# No Summoners for Streamer #
if self.debug: print("%s found" % constants.color('red', 'NO SUMMONERS'))
except Exception:
constants.PrintException()
This works okay-ish, but as the table grows, it takes so much longer for the individual entries to be checked. Currently it takes about 1 second to check the API per entry, and I have around 300 entries. So a whole db check takes ~300 seconds. As the database grows, the checks take longer and longer. But, I don't want the first entry to be updated every 1000+ seconds if I have 1000+ entries.
Is there a way to multi-thread this without spawning a thread for each entry/db-id? Also the table updates frequently (added and deleted rows).
I am coming from a PHP background, so threading isn't anything I have worked with before, and I only have little understanding of it. Currently the CheckStreams
function is one of three threads my main class spawns.
1 Answer 1
Layout
There are a lot of unnecessary blank lines between lines of code:
# Check Response Code #
if rVod.status_code == 200:
# JSON #
data = rVod.json()
# Check if Stream is valid #
if "videos" in data.keys():
# Get Info #
videos = data["videos"]
for video in videos:
thumb = video["preview"]["medium"]
It almost looks double-spaced. There is value in being able to see more lines of code on your computer screen; it helps to understand the code better.
Comments
Some of the comments can be deleted to reduce clutter. For example:
# JSON #
data = rVod.json()
It is obvious from the code that you are doing something with JSON, and the single-word comment adds nothing to help understand the code better.
Comments related to future enhancements should also be deleted:
# TODO: ENTER TO MATCHES DB #
# RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #
Keep track of these in another file in your version control system.
Commented-out code should be deleted:
#self.gatherTwitchVods(streamer)
Tools
You could run code development tools to automatically find some style issues with your code.
ruff
identifies several unused import
lines:
import os
import urllib.request
import shutil
You can have ruff
remove them for you using the --fix
option.
It also identifies several instances of multiple statements on a single line:
if self.debug: print(
The black
program can be used to fix those.
ruff
also identifies a couple places where you compare against the boolean True
value:
if setOffline == True:
This is simpler:
if setOffline:
try/except
The except
statements are many lines away from the try
lines.
PEP 8 recommends that you limit the try
clause to the absolute minimum amount
of code necessary to avoid masking bugs. It is hard to keep track of what
line (or lines) are expected to result in the exception.
Explore related questions
See similar questions with these tags.