An Instagram Bot which downloads the posts from profile
I have to mention my previous posts:
- Instagram scraper Posts (Videos and Photos)
- Scraping Instagram with selenium, extract URLs, download posts
My code:
import requests
import os
import time
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from multiprocessing.dummy import Pool
import urllib.parse
import argparse
import re
from concurrent.futures import ThreadPoolExecutor
LINKS = []
PICTURES = []
VIDEO = []
chromedriver_path = None
def check_availability(link, session_base):
"""
This function checks the availability of profile and the status code
:param session_base: The requests session
:param link: link that searching for and includes the profile name
:return: raise Exception if <privacy> is True and <followed_by_viewer> is False
"""
search = session_base.get(urllib.parse.urljoin(link, "?__a=1"))
search.raise_for_status()
load_and_check = search.json()
privacy = load_and_check.get("graphql").get("user").get("is_private")
followed_by_viewer = load_and_check.get("graphql").get("user").get("followed_by_viewer")
if privacy and not followed_by_viewer:
raise Exception("[!] Account is private")
def fetch_url(url, session_base):
"""
This function extracts images and videos
:param session_base: The requests session
:param url: Taking the url of array LINKS
"""
logging_page_id = session_base.get(url.split()[0]).json()
try:
"""Taking Gallery Photos or Videos"""
for log_pages in logging_page_id['graphql']['shortcode_media']['edge_sidecar_to_children']['edges']:
video = log_pages.get("node").get("is_video")
if video:
video_url = log_pages.get("node").get("video_url")
VIDEO.append(video_url)
else:
image = log_pages.get("node").get("display_url")
PICTURES.append(image)
except KeyError:
"""Unique photo or Video"""
image = logging_page_id.get('graphql').get('shortcode_media').get('display_url')
PICTURES.append(image)
if logging_page_id.get('graphql').get('shortcode_media').get("is_video"):
videos = logging_page_id.get('graphql').get('shortcode_media').get("video_url")
VIDEO.append(videos)
class InstagramPV:
def __init__(self, username, password, folder, search_name):
"""
:param username: username
:param password: password
:param folder: folder name
:param search_name: the name what will search
"""
self.username = username
self.password = password
self.folder = folder
self.HttpBase = requests.Session()
"""To avoid any errors, with regex find the url and taking the name <search_name>"""
find_name = "".join(re.findall(r"(?P<url>https?://[^\s]+)", search_name))
if find_name.startswith("https"):
self.search_name = urllib.parse.urlparse(find_name).path.split("/")[1]
else:
self.search_name = search_name
if chromedriver_path is not None:
self.driver = webdriver.Chrome(chromedriver_path)
else:
self.driver = webdriver.Chrome()
def __enter__(self):
return self
def control(self):
"""
Create the folder name and raises an error if already exists
"""
if not os.path.exists(self.folder):
os.mkdir(self.folder)
else:
raise FileExistsError("[*] Already Exists This Folder")
def login(self):
"""Login To Instagram"""
self.driver.get("https://www.instagram.com/accounts/login")
time.sleep(3)
self.driver.find_element_by_name('username').send_keys(self.username)
self.driver.find_element_by_name('password').send_keys(self.password)
submit = self.driver.find_element_by_tag_name('form')
submit.submit()
time.sleep(3)
"""Check For Invalid Credentials"""
try:
var_error = self.driver.find_element_by_class_name("eiCW-").text
raise ValueError("[!] Invalid Credentials")
except NoSuchElementException:
pass
try:
"""Close Notifications"""
self.driver.find_element_by_xpath('//button[text()="Not Now"]').click()
except NoSuchElementException:
pass
time.sleep(2)
"""Taking Cookies and update the self.HttpBase"""
cookies = self.driver.get_cookies()
for cookie in cookies:
c = {cookie["name"]: cookie["value"]}
self.HttpBase.cookies.update(c)
self.driver.get("https://www.instagram.com/{name}/".format(name=self.search_name))
"""Checking for availability"""
check_availability("https://www.instagram.com/{name}/".format(name=self.search_name), self.HttpBase)
return self.scroll_down()
def _get_href(self):
elements = self.driver.find_elements_by_xpath("//a[@href]")
for elem in elements:
urls = elem.get_attribute("href")
if "p" in urls.split("/"):
LINKS.append(urls)
def scroll_down(self):
"""Taking hrefs while scrolling down"""
end_scroll = []
while True:
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
self._get_href()
time.sleep(2)
new_height = self.driver.execute_script("return document.body.scrollHeight")
end_scroll.append(new_height)
if end_scroll.count(end_scroll[-1]) > 4:
self.extraction_url()
break
def extraction_url(self):
"""Gathering Images and Videos Using ThreadPoolExecutor and pass to function <fetch_url> """
links = list(set(LINKS))
print("[!] Ready for video - images".title())
print(f"[*] extracting {len(links)} posts , please wait...".title())
new_links = [urllib.parse.urljoin(link, "?__a=1") for link in links]
with ThreadPoolExecutor(max_workers=8) as executor:
[executor.submit(fetch_url, link, self.HttpBase) for link in new_links]
def _download_video(self, new_videos):
"""
Saving the content of video in the file
"""
number, link = new_videos
with open(os.path.join(self.folder, f"Video{number}.mp4"), "wb") as f:
content_of_video = InstagramPV.content_of_url(link)
f.write(content_of_video)
def _images_download(self, new_pictures):
"""Saving the content of picture in the file"""
number, link = new_pictures
with open(os.path.join(self.folder, f"Image{number}.jpg"), "wb") as f:
content_of_picture = InstagramPV.content_of_url(link)
f.write(content_of_picture)
def downloading_video_images(self):
"""Using multiprocessing for Saving Images and Videos"""
print("[*] ready for saving images and videos!".title())
new_pictures = list(set(PICTURES))
new_videos = list(set(VIDEO))
picture_data = [i for i in enumerate(new_pictures)]
video_data = [i for i in enumerate(new_videos)]
pool = Pool(8)
pool.map(self._images_download, picture_data)
pool.map(self._download_video, video_data)
print("[+] Done")
def __exit__(self, exc_type, exc_val, exc_tb):
self.HttpBase.close()
self.driver.close()
@staticmethod
def content_of_url(url):
req = requests.get(url)
return req.content
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--username", help='Username or your email of your account', action="store",
required=True)
parser.add_argument("-p", "--password", help='Password of your account', action="store", required=True)
parser.add_argument("-f", "--filename", help='Filename for storing data', action="store", required=True)
parser.add_argument("-n", "--name", help='Name to search', action="store", required=True)
args = parser.parse_args()
with InstagramPV(args.username, args.password, args.filename, args.name) as pv:
pv.control()
pv.login()
pv.downloading_video_images()
if __name__ == '__main__':
main()
```
1 Answer 1
Requests makes things easy
session_base.get(urllib.parse.urljoin(link, "?__a=1"))
should be
session_base.get(link, params={__a: 1})
Exception types
raise Exception("[!] Account is private")
The use of the base Exception
should be replaced by a custom exception of yours. They're easy to make, and using them makes it so that upstream code can more meaningfully handle exceptions.
Some things need to exist
In these two cases:
video_url = log_pages.get("node").get("video_url")
VIDEO.append(video_url)
image = log_pages.get("node").get("display_url")
PICTURES.append(image)
the dictionary value is mandatory, so you shouldn't get
it; you should use regular bracket indexing. This will allow failures to be caught earlier, instead of leaking None
into your data.
Nomenclature
self.HttpBase = requests.Session()
Member variables should be lower_snake_case, i.e. http_base
.
Don't repeat yourself
"https://www.instagram.com/{name}/".format(name=self.search_name)
should be put in a temporary variable.
Packed-tuple argument?
This:
def _images_download(self, new_pictures):
number, link = new_pictures
is (削除) a little odd (削除ここまで) probably necessary due to your use of map
, so never mind.
List creation
[i for i in enumerate(new_pictures)]
should just be
list(enumerate(new_pictures))
but since you are only iterating through it once, don't even materialize it to a list; simply leave it as
picture_data = enumerate(new_pictures)
Globals
These:
LINKS = []
PICTURES = []
VIDEO = []
are a problem. They're assigned in global scope, and then both written to and read from a class instance. The easy, and vaguely correct, thing to do is to move all of them to members of InstagramPV
. fetch_url
would then need to either:
- return new video and picture lists; or
- move to being a method on
InstagramPV
and populate the members there.
I think I'd vote for the second, although this is bordering on making an uber-class without meaningful separation. One way to split this up is to make a class for Instagram data (links, pictures, videos) and a class for Instagram scraping (session, authentication, etc.); but I could be convinced that there are other sensible approaches.
Session use
Why isn't this:
@staticmethod
def content_of_url(url):
req = requests.get(url)
return req.content
using your session? It's surprising that it does the right thing without a cookie jar.
-
\$\begingroup\$ I owe you many thanks and i will update my post to tell you how much i appreciate you and the community. but I could be convinced that there are other sensible approaches . Do you mean to change all my code and try to find a different way? Can you give me an example? \$\endgroup\$AlexDotis– AlexDotis2020年03月16日 14:07:20 +00:00Commented Mar 16, 2020 at 14:07
-
\$\begingroup\$ Put another way: once this round of review is over and you have a large(r)
InstagramPV
class, that means the obvious problems have largely been addressed, and more abstract problems would need to be approached, in this case object-oriented structure. I'd not worry about that yet. \$\endgroup\$Reinderien– Reinderien2020年03月16日 14:16:24 +00:00Commented Mar 16, 2020 at 14:16 -
1\$\begingroup\$ First of all @Reinderien i thank you. Second , i promise that i will not stop until i will "hear" "well done" from you :). I know that it will be very difficult, but give me a chance to impress you with my progress. It's a challenge for me. My new question \$\endgroup\$AlexDotis– AlexDotis2020年03月17日 20:09:24 +00:00Commented Mar 17, 2020 at 20:09
-
1\$\begingroup\$ Great attitude :) \$\endgroup\$Reinderien– Reinderien2020年03月17日 20:52:30 +00:00Commented Mar 17, 2020 at 20:52
-
1\$\begingroup\$ Packed-tuple argument edited - I don't think you can easily escape the need for a single tuple. \$\endgroup\$Reinderien– Reinderien2020年04月05日 03:18:04 +00:00Commented Apr 5, 2020 at 3:18
Explore related questions
See similar questions with these tags.