4
\$\begingroup\$

I have a python script that loads a database table and checks an API for the entries found.

I do it like this:

import threading
import constants
import pymysql
import requests
from datetime import datetime
from dateutil.parser import parse
import os
import urllib.request
import shutil
import json
class CheckStreams(threading.Thread):
 def __init__(self, debug):
 threading.Thread.__init__(self)
 self.debug = debug
 self.newGames = False
 self.online = 0
 def get_new_games(self):
 return self.newGames
 def set_new_games(self):
 self.newGames = True
 def get_online(self):
 return self.online
 def set_online(self):
 self.online += 1
 def run(self):
 # Connect to the database
 connect = pymysql.connect(host=constants.HOST,
 user=constants.USERNAME,
 password=constants.PASSWORD,
 db=constants.DATABASE,
 charset='utf8mb4',
 port=constants.PORT,
 cursorclass=pymysql.cursors.DictCursor)
 # Get Streamer Table #
 try:
 with connect.cursor() as cursor:
 # Read all Streamers #
 sql = "SELECT * FROM `ls_streamer` s LEFT JOIN `ls_platforms` p ON s.platform = p.id"
 # Execute Query #
 cursor.execute(sql)
 # Result to streamers Var #
 streamers = cursor.fetchall()
 # Check if we have Streamers at all #
 if cursor.rowcount > 0:
 # Show total Streamers #
 if self.debug: print('Total Streamers: %s' % constants.color('green', cursor.rowcount))
 # Loop through all streamers #
 for streamer in streamers:
 # Output Current Streamer #
 if self.debug: print(
 '\n%s - %s' % (
 streamer["channel_user"], constants.color(streamer["name"], streamer["name"])))
 # Fetch from Platform #
 if streamer["platform"] == 1:
 # Twitch.tv #
 if self.debug: print('Gathering %s' % constants.color('Twitch.tv', 'Twitch.tv'))
 # gatherTwitch()
 self.gatherTwitch(streamer)
 #self.gatherTwitchVods(streamer)
 else:
 # None #
 if self.debug: print(
 'Wrong Platform ID: #%s' % constants.color('red', streamer["platform"]))
 else:
 if self.debug: print('Total Streamers: %s' % constants.color('red', '0'))
 except Exception:
 constants.PrintException()
 finally:
 connect.close()
def gatherTwitch(self, streamer):
 # Connect to the database
 connect = pymysql.connect(host=constants.HOST,
 user=constants.USERNAME,
 password=constants.PASSWORD,
 db=constants.DATABASE,
 charset='utf8mb4',
 port=constants.PORT,
 cursorclass=pymysql.cursors.DictCursor)
 # Internal Vars #
 isOnline = False
 description = None
 language = None
 thumbnail = None
 setOffline = False
 banner = "None"
 logo = "None"
 partner = False
 # Create Stream Url from Vars #
 streamUrl = '%s%s' % (streamer["url"], streamer["channel_id"])
 # Send Request w/ Twitch Headers #
 r = requests.get(
 streamUrl,
 headers=
 {
 "Accept": "application/vnd.twitchtv.v5+json",
 "Client-ID": constants.TWITCH_CLIENT_ID
 }
 )
 # Check Response Code #
 if r.status_code == 200:
 # JSON #
 data = r.json()
 # Check if Stream is valid #
 if "stream" in data.keys():
 # Get Info #
 streamData = data["stream"]
 if type(streamData) is dict:
 # Check if Streaming LoL #
 if streamData["game"].lower() == "league of legends":
 # Streaming LoL - YAY #
 if self.debug: print('%s' % constants.color('green', 'IS STREAMING'))
 # Gather Vars #
 lastSeen = datetime.now()
 if streamer["is_online"] == True:
 lastSeen = streamer["modified"]
 totalStreamed = streamer["total_online"]
 totalStreamed += constants.convertTimeDelta((datetime.now() - lastSeen))
 viewers = streamData["viewers"]
 resolution = streamData["video_height"]
 fps = streamData["average_fps"]
 delay = streamData["delay"]
 started = parse(streamData["created_at"], None, ignoretz=True)
 isOnline = True
 # Channel Data #
 channelData = streamData["channel"]
 if type(channelData) is dict:
 description = channelData["status"]
 language = channelData["language"]
 logo = channelData["logo"]
 banner = channelData["profile_banner"]
 if channelData["partner"] == "true":
 partner = True
 if channelData["partner"] is True:
 partner = True
 else:
 # Output Debug #
 if self.debug: print('Channel Data %s' % constants.color('red', 'NOT FOUND'))
 # Peview Data #
 previewData = streamData["preview"]
 if type(previewData) is dict:
 thumbnail = previewData["medium"]
 else:
 # Output Debug #
 if self.debug: print('Preview Data %s' % constants.color('red', 'NOT FOUND'))
 try:
 with connect.cursor() as cursorStreamer:
 # Got all Vars put to DB #
 sqlUpdate = """
 UPDATE
 `ls_streamer`
 SET
 total_online = %s,
 viewers = %s,
 resolution = %s,
 fps = %s,
 delay = %s,
 description = %s,
 language = %s,
 thumbnail = %s,
 logo = %s,
 banner = %s,
 started = %s,
 is_online = %s,
 modified = %s,
 is_partner = %s
 WHERE
 id = %s
 """
 # Execute Query #
 connect.escape(sqlUpdate)
 cursorStreamer.execute(sqlUpdate, (
 totalStreamed, viewers, resolution, fps, delay, description, language, thumbnail,
 logo, banner, started, isOnline, datetime.now(), partner, streamer["id"]))
 connect.commit()
 self.set_online()
 # Now crawl the Riot API for Live Games for all connected summoners #
 self.getLiveGames(streamer)
 except Exception:
 constants.PrintException()
 finally:
 if self.debug: print('Update ls_streamer: %s' % constants.color('green', 'SUCCESS'))
 else:
 # Set Offline #
 setOffline = True
 # Stream online but not streaming LoL #
 if self.debug: print(
 'Stream is %s but %s streaming LoL' % (
 constants.color('green', 'online'), constants.color('red', 'NOT')))
 else:
 # Set Offline #
 setOffline = True
 # Output Debug #
 if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))
 else:
 # Set Offline #
 setOffline = True
 # Output Debug #
 if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))
 else:
 # Set Offline #
 setOffline = True
 # Output Debug #
 if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))
 # Output Url #
 if self.debug: print('Streamer Url: %s' % streamUrl)
 if setOffline == True:
 try:
 with connect.cursor() as cursorStreamerOff:
 # Got all Vars put to DB #
 sqlOffline = """
 UPDATE
 `ls_streamer`
 SET
 is_online = %s
 WHERE
 id = %s
 """
 # Execute Query #
 connect.escape(sqlOffline)
 cursorStreamerOff.execute(sqlOffline, (False, streamer["id"]))
 connect.commit()
 except Exception:
 constants.PrintException()
 # Create Stream Url from Vars #
 streamUrlVod = '%s%s/videos?broadcast_type=archive&limit=100' % (
 streamer["channelurl"], streamer["channel_id"])
 # Send Request w/ Twitch Headers #
 rVod = requests.get(
 streamUrlVod,
 headers=
 {
 "Accept": "application/vnd.twitchtv.v5+json",
 "Client-ID": constants.TWITCH_CLIENT_ID
 }
 )
 # Check Response Code #
 if rVod.status_code == 200:
 # JSON #
 data = rVod.json()
 # Check if Stream is valid #
 if "videos" in data.keys():
 # Get Info #
 videos = data["videos"]
 for video in videos:
 thumb = video["preview"]["medium"]
 video_id = video["_id"]
 length = video["length"]
 created = video["created_at"]
 if video["game"] == "League of Legends" and video["viewable"] == "public":
 # Check if video exists #
 try:
 with connect.cursor() as cursorVod:
 sqlInsert = """
 INSERT INTO
 `ls_vods`
 (streamer_id, thumbnail, video_id, created, `length`, last_check)
 VALUES
 (%s, %s, %s, %s, %s, %s)
 ON DUPLICATE KEY UPDATE
 last_check = %s,
 thumbnail = %s,
 `length` = %s
 """
 cursorVod.execute(sqlInsert,
 (streamer["id"], thumb, video_id, created, length, datetime.now(), datetime.now(), thumb, length))
 connect.commit()
 except Exception:
 constants.PrintException()
 else:
 if self.debug: print('Videos not found in URL: %s' % streamUrl)
 else:
 # Output Debug #
 if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))
 # Output Url #
 if self.debug: print('Streamer Url: %s' % streamUrl)
def getLiveGames(self, streamer):
 # Connect to the database
 connect = pymysql.connect(host=constants.HOST,
 user=constants.USERNAME,
 password=constants.PASSWORD,
 db=constants.DATABASE,
 charset='utf8mb4',
 port=constants.PORT,
 cursorclass=pymysql.cursors.DictCursor)
 # Select all Summoners from Streamer #
 try:
 with connect.cursor() as cursorSummoner:
 sqlSummoner = """
 SELECT * FROM
 `ls_summoner` s
 LEFT JOIN
 `ls_regions` r
 ON
 r.id = s.region_id
 WHERE
 streamer_id = %s
 """
 # Execute Query #
 cursorSummoner.execute(sqlSummoner, (streamer["id"],))
 summoners = cursorSummoner.fetchall()
 if cursorSummoner.rowcount > 0:
 for summoner in summoners:
 # Output which Summoner #
 if self.debug: print('Checking Live Game for Summoner %s-%s' % (
 summoner["short"].upper(), summoner["name"]))
 urlLive = "https://%s.api.riotgames.com/lol/spectator/v3/active-games/by-summoner/%s?api_key=%s" % (
 summoner["long"].lower(), summoner["summoner_id"],
 constants.RIOT_API_KEY)
 r = requests.get(urlLive)
 if self.debug: print("Checking %s" % urlLive)
 # Check Response Code #
 if r.status_code == 200:
 # Output #
 if self.debug: print('->%s Found Live Game for %s-%s' % (
 constants.color('green', 'FOUND'), summoner["short"].upper(), summoner["name"]))
 # JSON #
 game = r.json()
 # Gather Vars #
 map = game["mapId"]
 gameId = game["gameId"]
 gameMode = game["gameMode"]
 gameType = game["gameType"]
 queueId = game["gameQueueConfigId"]
 played = game["gameLength"]
 spell1Id = None
 spell2Id = None
 teamId = None
 champion = None
 runes = None
 masteries = None
 pp = game["participants"]
 if type(pp) is list:
 for part in pp:
 if part["summonerId"] == summoner["summoner_id"]:
 teamId = part["teamId"]
 spell1Id = part["spell1Id"]
 spell2Id = part["spell2Id"]
 champion = part["championId"]
 runes = json.dumps(part["runes"])
 masteries = json.dumps(part["masteries"])
 if self.debug: print('Game %s. Game ID #%s' % (constants.color('green', 'found'), gameId))
 # MySQL #
 try:
 with connect.cursor() as cursorUpdateCurrent:
 # Check Champion in DB #
 sqlChampion = "SELECT * FROM `ls_champions` WHERE id = %s"
 cursorUpdateCurrent.execute(sqlChampion,
 (champion,))
 if cursorUpdateCurrent.rowcount == 0:
 # Champion not in DB TODO: SEND MAIL #
 if self.debug: print("Champion %",
 constants.color('red',
 'NOT IN DB'))
 # Exit current Game Update #
 break
 # Check Map in DB #
 sqlMap = "SELECT * FROM `ls_maps` WHERE id = %s"
 cursorUpdateCurrent.execute(sqlMap,
 (map,))
 if cursorUpdateCurrent.rowcount == 0:
 # Map not in DB TODO: SEND MAIL #
 if self.debug: print("Map %",
 constants.color('red',
 'NOT IN DB'))
 # Exit current Game Update #
 break
 # Check Queue in DB #
 sqlQueue = "SELECT * FROM `ls_queues` WHERE id = %s"
 cursorUpdateCurrent.execute(sqlQueue,
 (queueId,))
 if cursorUpdateCurrent.rowcount == 0:
 # Queue not in DB TODO: SEND MAIL #
 if self.debug: print("Queue %",
 constants.color('red',
 'NOT IN DB'))
 # Exit current Game Update #
 break
 # Check P1Spell1 in DB #
 sqlP1Spell1 = "SELECT * FROM `ls_spells` WHERE id = %s"
 cursorUpdateCurrent.execute(sqlP1Spell1,
 (spell1Id,))
 if cursorUpdateCurrent.rowcount == 0:
 # P1Spell1 not in DB TODO: SEND MAIL #
 if self.debug: print("P1Spell1 %",
 constants.color('red',
 'NOT IN DB'))
 # Exit current Game Update #
 break
 # Check P1Spell2 in DB #
 sqlP1Spell2 = "SELECT * FROM `ls_spells` WHERE id = %s"
 cursorUpdateCurrent.execute(
 sqlP1Spell2,
 (spell2Id,))
 if cursorUpdateCurrent.rowcount == 0:
 # P1Spell2 not in DB TODO: SEND MAIL #
 if self.debug: print("P1Spell2 %",
 constants.color('red',
 'NOT IN DB'))
 # Exit current Game Update #
 break
 # Select Current Match #
 sqlSelectCurrent = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
 cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))
 if cursorUpdateCurrent.rowcount > 0:
 # Found a Current Match #
 sqlUpdateCurrent = """
 UPDATE
 `ls_current_match`
 SET
 champion_id = %s,
 map_id = %s,
 summoner_id = %s,
 queue_id = %s,
 match_id = %s,
 team = %s,
 length = %s,
 type = %s,
 mode = %s,
 modified = %s,
 p1_spell1_id = %s,
 p1_spell2_id = %s,
 is_playing = %s,
 runes = %s,
 masteries = %s
 WHERE
 summoner_id = %s
 """
 cursorUpdateCurrent.execute(
 sqlUpdateCurrent, (
 champion,
 map,
 summoner["id"],
 queueId,
 gameId,
 teamId,
 played,
 gameType,
 gameMode,
 datetime.now(),
 spell1Id,
 spell2Id,
 True,
 runes,
 masteries, summoner["id"]
 ))
 connect.commit()
 if self.debug: print(
 '%s Current Match in Database\n' % constants.color('yellow',
 'UPDATED'))
 else:
 # No current Match for Summoner Id found #
 sqlNewCurrent = """
 INSERT INTO
 `ls_current_match`
 (champion_id, map_id, summoner_id, queue_id, match_id, team, length, type, mode, modified, p1_spell1_id, p1_spell2_id, is_playing, runes, masteries)
 VALUES
 (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
 """
 cursorUpdateCurrent.execute(sqlNewCurrent, (
 champion,
 map,
 summoner["id"],
 queueId,
 gameId,
 teamId,
 played,
 gameType,
 gameMode,
 datetime.now(),
 spell1Id,
 spell2Id,
 True,
 runes,
 masteries
 ))
 connect.commit()
 if self.debug: print(
 '%s Current Match in Database\n' % constants.color(
 'green', 'INSERTED'))
 except Exception:
 constants.PrintException()
 # TODO: ENTER TO MATCHES DB #
 # RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #
 # Exit if One Playing found #
 break
 else:
 # Response Code not 200 #
 if self.debug: print(
 '->%s %s' % ('Response Code', constants.color('red', r.status_code)))
 # Reset Games in Current Matches #
 try:
 with connect.cursor() as cursorUpdateCurrent:
 # Select Current Match #
 sqlSelectCurrent = "SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
 cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))
 if cursorUpdateCurrent.rowcount == 0:
 # Output not found in Current Match -> has never played #
 if self.debug: print(
 'Not found, Summoner %s' % constants.color('yellow', 'NEVER PLAYED'))
 else:
 # Set isPlaying = false #
 sqlSelectCurrent = "UPDATE `ls_current_match` SET is_playing = %s WHERE summoner_id = %s"
 cursorUpdateCurrent.execute(sqlSelectCurrent, (False, summoner["id"]))
 connect.commit()
 if self.debug: print(
 '%s Current Match in Database, set isPlaying = false\n' % constants.color(
 'green', 'UPDATED'))
 # Create new Matchc in MatchHistory #
 sqlSelectCurrentRe = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
 cursorUpdateCurrent.execute(sqlSelectCurrentRe, (summoner["id"]))
 cG = cursorUpdateCurrent.fetchone()
 # See if Match in MatchHistory ->should not be the case #
 sqlSelectMatch = "SELECT * FROM `ls_matches` WHERE match_id = %s AND streamer = %s"
 cursorUpdateCurrent.execute(sqlSelectMatch, (cG["match_id"], streamer["id"]))
 if cursorUpdateCurrent.rowcount == 0:
 sqlNewMatch = """
 INSERT INTO
 `ls_matches`
 (streamer, champion, map, match_id, team, lane, role, length, type, win, modified, crawled, summoner, region)
 VALUES
 (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
 """
 cursorUpdateCurrent.execute(
 sqlNewMatch, (
 streamer["id"],
 cG["champion_id"],
 cG["map_id"],
 cG["match_id"],
 cG["team"],
 'NONE',
 'NONE',
 cG["length"],
 cG["type"],
 1,
 datetime.now(),
 False,
 cG["summoner_id"],
 summoner["region_id"]
 ))
 connect.commit()
 else:
 # Output that Match already exists !?? #
 if self.debug: print('Match-ID %s %s' % (
 cG["match_id"], constants.color('yellow', 'already crawled')))
 self.set_new_games()
 except Exception:
 constants.PrintException()
 else:
 # No Summoners for Streamer #
 if self.debug: print("%s found" % constants.color('red', 'NO SUMMONERS'))
 except Exception:
 constants.PrintException()

This works okay-ish, but as the table grows, it takes so much longer for the individual entries to be checked. Currently it takes about 1 second to check the API per entry, and I have around 300 entries. So a whole db check takes ~300 seconds. As the database grows, the checks take longer and longer. But, I don't want the first entry to be updated every 1000+ seconds if I have 1000+ entries.

Is there a way to multi-thread this without spawning a thread for each entry/db-id? Also the table updates frequently (added and deleted rows).

I am coming from a PHP background, so threading isn't anything I have worked with before, and I only have little understanding of it. Currently the CheckStreams function is one of three threads my main class spawns.

toolic
14.7k5 gold badges29 silver badges204 bronze badges
asked Nov 2, 2017 at 15:58
\$\endgroup\$
0

1 Answer 1

1
\$\begingroup\$

Layout

There are a lot of unnecessary blank lines between lines of code:

# Check Response Code #
if rVod.status_code == 200:
 # JSON #
 data = rVod.json()
 # Check if Stream is valid #
 if "videos" in data.keys():
 # Get Info #
 videos = data["videos"]
 for video in videos:
 thumb = video["preview"]["medium"]

It almost looks double-spaced. There is value in being able to see more lines of code on your computer screen; it helps to understand the code better.

Comments

Some of the comments can be deleted to reduce clutter. For example:

# JSON #
data = rVod.json()

It is obvious from the code that you are doing something with JSON, and the single-word comment adds nothing to help understand the code better.

Comments related to future enhancements should also be deleted:

# TODO: ENTER TO MATCHES DB #
# RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #

Keep track of these in another file in your version control system.

Commented-out code should be deleted:

#self.gatherTwitchVods(streamer)

Tools

You could run code development tools to automatically find some style issues with your code.

ruff identifies several unused import lines:

import os
import urllib.request
import shutil

You can have ruff remove them for you using the --fix option.

It also identifies several instances of multiple statements on a single line:

if self.debug: print(

The black program can be used to fix those.

ruff also identifies a couple places where you compare against the boolean True value:

if setOffline == True:

This is simpler:

if setOffline:

try/except

The except statements are many lines away from the try lines.
PEP 8 recommends that you limit the try clause to the absolute minimum amount of code necessary to avoid masking bugs. It is hard to keep track of what line (or lines) are expected to result in the exception.

answered Sep 4 at 11:53
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.