1
\$\begingroup\$

I made some changes in my code from the previous post.

The changes that I made:

  • I put all the functions to the class
  • All the global arrays I moved them to class too
  • Created PrivateException
  • I made property for search_name

I could do it with a different approach but I decided to do it step by step.

My idea was:

To create class inheritance with parent class instagramData and there to create classmethods the functions check_availability(cls, session, url), login(cls, username, password, session, url) and fetch_url(cls, session, url) which I can call to the child class InstagramPv and doing the rest (extraction links, download and save) but I stayed in the first plan.

First approach


import requests
import os
import time
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from multiprocessing.dummy import Pool
import urllib.parse
import re
from concurrent.futures import ThreadPoolExecutor
chromedriver_path = None
class PrivateException(Exception):
 pass
class InstagramPV:
 def __init__(self, username, password, folder, search_name):
 """
 :param username: username
 :param password: password
 :param folder: folder name
 :param search_name: the name what will search
 """
 self.username = username
 self.password = password
 self.folder = folder
 self.http_base = requests.Session()
 self._search_name = search_name
 self.links = []
 self.pictures = []
 self.videos = []
 self.url = "https://www.instagram.com/{name}/"
 if chromedriver_path is not None:
 self.driver = webdriver.Chrome(chromedriver_path)
 else:
 self.driver = webdriver.Chrome()
 @property
 def name(self):
 """To avoid any errors, with regex find the url and taking the name <search_name>"""
 find_name = "".join(re.findall(r"(?P<url>https?://[^\s]+)", self._search_name))
 if find_name.startswith("https"):
 self._search_name = urllib.parse.urlparse(find_name).path.split("/")[1]
 return self._search_name
 else:
 return self._search_name
 def __enter__(self):
 return self
 def check_availability(self):
 search = self.http_base.get(self.url.format(name=self.name), params={"__a": 1})
 search.raise_for_status()
 load_and_check = search.json()
 privacy = load_and_check.get("graphql").get("user").get("is_private")
 followed_by_viewer = load_and_check.get("graphql").get("user").get("followed_by_viewer")
 if privacy and not followed_by_viewer:
 raise PrivateException("[!] Account is private")
 def control(self):
 """
 Create the folder name and raises an error if already exists
 """
 if not os.path.exists(self.folder):
 os.mkdir(self.folder)
 else:
 raise FileExistsError("[*] Already Exists This Folder")
 def login(self):
 """Login To Instagram"""
 self.driver.get("https://www.instagram.com/accounts/login")
 time.sleep(3)
 self.driver.find_element_by_name('username').send_keys(self.username)
 self.driver.find_element_by_name('password').send_keys(self.password)
 submit = self.driver.find_element_by_tag_name('form')
 submit.submit()
 time.sleep(3)
 """Check For Invalid Credentials"""
 try:
 var_error = self.driver.find_element_by_class_name("eiCW-").text
 raise ValueError("[!] Invalid Credentials")
 except NoSuchElementException:
 pass
 try:
 """Close Notifications"""
 self.driver.find_element_by_xpath('//button[text()="Not Now"]').click()
 except NoSuchElementException:
 pass
 """Taking cookies"""
 cookies = self.driver.get_cookies()
 for cookie in cookies:
 c = {cookie["name"]: cookie["value"]}
 self.http_base.cookies.update(c)
 """Check for availability"""
 self.check_availability()
 self.driver.get(self.url.format(name=self.name))
 return self.scroll_down()
 def _get_href(self):
 elements = self.driver.find_elements_by_xpath("//a[@href]")
 for elem in elements:
 urls = elem.get_attribute("href")
 if "p" in urls.split("/"):
 self.links.append(urls)
 def scroll_down(self):
 """Taking hrefs while scrolling down"""
 end_scroll = []
 while True:
 self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
 time.sleep(2)
 self._get_href()
 time.sleep(2)
 new_height = self.driver.execute_script("return document.body.scrollHeight")
 end_scroll.append(new_height)
 if end_scroll.count(end_scroll[-1]) > 4:
 self.extraction_url()
 break
 def extraction_url(self):
 """Gathering Images and Videos Using ThreadPoolExecutor and pass to function <fetch_url> """
 links = list(set(self.links))
 print("[!] Ready for video - images".title())
 print(f"[*] extracting {len(links)} posts , please wait...".title())
 new_links = [urllib.parse.urljoin(link, "?__a=1") for link in links]
 with ThreadPoolExecutor(max_workers=8) as executor:
 [executor.submit(self.fetch_url, link) for link in new_links]
 def fetch_url(self, url):
 """
 This function extracts images and videos
 :param url: Taking the url
 """
 logging_page_id = self.http_base.get(url.split()[0]).json()
 try:
 """Taking Gallery Photos or Videos"""
 for log_pages in logging_page_id['graphql']['shortcode_media']['edge_sidecar_to_children']['edges']:
 video = log_pages["node"]["is_video"]
 if video:
 video_url = log_pages["node"]["video_url"]
 self.videos.append(video_url)
 else:
 image = log_pages["node"]["display_url"]
 self.pictures.append(image)
 except KeyError:
 """Unique photo or Video"""
 image = logging_page_id['graphql']['shortcode_media']['display_url']
 self.pictures.append(image)
 if logging_page_id['graphql']['shortcode_media']["is_video"]:
 videos = logging_page_id['graphql']['shortcode_media']["video_url"]
 self.videos.append(videos)
 def download_video(self, new_videos):
 """
 Saving the content of video in the file
 """
 number = new_videos[0]
 link = new_videos[1]
 with open(os.path.join(self.folder, f"Video{number}.mp4"), "wb") as f:
 content_of_video = InstagramPV.content_of_url(link, self.http_base)
 f.write(content_of_video)
 def images_download(self, new_pictures):
 """Saving the content of picture in the file"""
 number = new_pictures[0]
 link = new_pictures[1]
 with open(os.path.join(self.folder, f"Image{number}.jpg"), "wb") as f:
 content_of_picture = InstagramPV.content_of_url(link, self.http_base)
 f.write(content_of_picture)
 def downloading_video_images(self):
 """Using multiprocessing for Saving Images and Videos"""
 print("[*] ready for saving images and videos!".title())
 picture_data = enumerate(list(set(self.pictures)))
 video_data = enumerate(list(set(self.videos)))
 pool = Pool(8)
 pool.map(self.images_download, picture_data)
 pool.map(self.download_video, video_data)
 print("[+] Done")
 def __exit__(self, exc_type, exc_val, exc_tb):
 self.http_base.close()
 self.driver.close()
 @staticmethod
 def content_of_url(url, req):
 data = req.get(url)
 return data.content
def main():
 USERNAME = ""
 PASSWORD = ""
 NAME = ""
 FOLDER = ""
 with InstagramPV(USERNAME, PASSWORD, FOLDER, NAME) as pv:
 pv.control()
 pv.login()
 pv.downloading_video_images()
if __name__ == '__main__':
 main()

Second approach

chromedriver_path = None
class PrivateException(Exception):
 pass
class InstagramData:
 def __init__(self, search_name):
 """
 :param search_name: The Profile that will search
 """
 self._search_name = search_name
 self.links = []
 self.videos = []
 self.pictures = []
 @property
 def name(self):
 """To avoid any errors, with regex find the url and taking the name <search_name>"""
 find_name = "".join(re.findall(r"(?P<url>https?://[^\s]+)", self._search_name))
 if find_name.startswith("https"):
 self._search_name = urllib.parse.urlparse(find_name).path.split("/")[1]
 return self._search_name
 else:
 return self._search_name
 @classmethod
 def check_availability(cls, session, url):
 """
 Check availability of the profile If its private and status code
 :param session: session <self.http_base> requests.session
 :param url: the url
 :return:
 """
 search = session.get(url, params={"__a": 1})
 search.raise_for_status()
 load_and_check = search.json()
 privacy = load_and_check.get("graphql").get("user").get("is_private")
 followed_by_viewer = load_and_check.get("graphql").get("user").get("followed_by_viewer")
 if privacy and not followed_by_viewer:
 raise PrivateException("[!] Account is private")
 @classmethod
 def login_and_scrape(cls, username, password, session, url):
 """
 Login tO instagram, checking availability and taking links
 :param username: the username
 :param password: the password
 :param session: session <self.http_base> requests.session
 :param url: The URL
 :return: The links that we collect from scroll down
 """
 if chromedriver_path is not None:
 driver = webdriver.Chrome(chromedriver_path)
 else:
 driver = webdriver.Chrome()
 driver.get("https://www.instagram.com/accounts/login")
 time.sleep(3)
 driver.find_element_by_name('username').send_keys(username)
 driver.find_element_by_name('password').send_keys(password)
 submit = driver.find_element_by_tag_name('form')
 submit.submit()
 time.sleep(8)
 """Check For Invalid Credentials"""
 try:
 var_error = driver.find_element_by_class_name("eiCW-").text
 raise ValueError("[!] Invalid Credentials")
 except NoSuchElementException:
 pass
 try:
 """Close Notifications"""
 driver.find_element_by_xpath('//button[text()="Not Now"]').click()
 except NoSuchElementException:
 pass
 """Getting cookies and pass it to session parameter"""
 cookies = driver.get_cookies()
 for cookie in cookies:
 c = {cookie["name"]: cookie["value"]}
 session.cookies.update(c)
 """Checking the availability"""
 InstagramData.check_availability(session, url)
 driver.get(url)
 """Scrolling down and taking the href"""
 new_links = []
 end_scroll = []
 while True:
 driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
 time.sleep(2)
 for href in cls.get_href(driver):
 new_links.append(href)
 time.sleep(2)
 new_height = driver.execute_script("return document.body.scrollHeight")
 end_scroll.append(new_height)
 if end_scroll.count(end_scroll[-1]) > 4:
 driver.close()
 return new_links
 @staticmethod
 def get_href(driver):
 elements = driver.find_elements_by_xpath("//a[@href]")
 for elem in elements:
 urls = elem.get_attribute("href")
 if "p" in urls.split("/"):
 yield urls
 def fetch_url(self, session, url):
 """
 Collect the images, videos and appending on self.pictures, self.videos
 :param session: Session of <self.http_base>
 :param url: url
 :return:
 """
 logging_page_id = session.get(url.split()[0]).json()
 try:
 """Taking Gallery Photos or Videos"""
 for log_pages in logging_page_id['graphql']['shortcode_media']['edge_sidecar_to_children']['edges']:
 video = log_pages["node"]["is_video"]
 if video:
 video_url = log_pages["node"]["video_url"]
 self.videos.append(video_url)
 else:
 image = log_pages["node"]["display_url"]
 self.pictures.append(image)
 except KeyError:
 """Unique photo or Video"""
 image = logging_page_id['graphql']['shortcode_media']['display_url']
 self.pictures.append(image)
 if logging_page_id['graphql']['shortcode_media']["is_video"]:
 video = logging_page_id['graphql']['shortcode_media']["video_url"]
 self.videos.append(video)
class InstagramPV(InstagramData):
 def __init__(self, username, password, search_name, folder):
 super(InstagramPV, self).__init__(search_name)
 self.username = username
 self.password = password
 self.folder = folder
 self.http_base = requests.Session()
 self.url = "https://www.instagram.com/{name}/"
 def __enter__(self):
 return self
 def control(self):
 """
 Create the folder name and raises an error if already exists
 """
 if not os.path.exists(self.folder):
 os.mkdir(self.folder)
 else:
 raise FileExistsError("[*] Already Exists This Folder")
 def extraction_url(self):
 """Gathering Images and Videos Using ThreadPoolExecutor """
 links = list(
 set(InstagramData.login_and_scrape(self.username, self.password, self.http_base,
 self.url.format(name=self.name))))
 print("[!] Ready for video - images".title())
 print(f"[*] extracting {len(links)} posts , please wait...".title())
 new_links = [urllib.parse.urljoin(link, "?__a=1") for link in links]
 with ThreadPoolExecutor(max_workers=8) as executor:
 [executor.submit(self.fetch_url(self.http_base, link)) for link in new_links]
 def download_video(self, new_videos):
 """
 Saving the content of video in the file
 """
 number = new_videos[0]
 link = new_videos[1]
 with open(os.path.join(self.folder, f"Video{number}.mp4"), "wb") as f:
 content_of_video = InstagramPV.content_of_url(link, self.http_base)
 f.write(content_of_video)
 def images_download(self, new_pictures):
 """Saving the content of picture in the file"""
 number = new_pictures[0]
 link = new_pictures[1]
 with open(os.path.join(self.folder, f"Image{number}.jpg"), "wb") as f:
 content_of_picture = InstagramPV.content_of_url(link, self.http_base)
 f.write(content_of_picture)
 def downloading_video_images(self):
 self.control()
 self.extraction_url()
 """Using multiprocessing for Saving Images and Videos"""
 print("[*] ready for saving images and videos!".title())
 picture_data = enumerate(list(set(self.pictures)))
 video_data = enumerate(list(set(self.videos)))
 pool = Pool(8)
 pool.map(self.images_download, picture_data)
 pool.map(self.download_video, video_data)
 print("[+] Done")
 @staticmethod
 def content_of_url(url, req):
 data = req.get(url)
 return data.content
 def __exit__(self, exc_type, exc_val, exc_tb):
 self.http_base.close()
def main():
 USERNAME = ""
 PASSWORD = ""
 NAME = ""
 FOLDER = ""
 with InstagramPV(USERNAME, PASSWORD, NAME, FOLDER) as pv:
 pv.downloading_video_images()
if __name__ == '__main__':
 main()

My previous posts:

  1. Instagram scraper Posts (Videos and Photos)

  2. Scraping Instagram with selenium, extract URLs, download posts

  3. Web scraping using selenium, multiprocessing, InstagramBot

Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Mar 17, 2020 at 20:01
\$\endgroup\$
1
  • \$\begingroup\$ I realize that your earlier posts probably contain this, but a short paragraph on what the code does might be helpful. You might also want to delete one of the tags and add the comparative review tag. \$\endgroup\$ Commented Mar 18, 2020 at 18:39

1 Answer 1

1
\$\begingroup\$

Type hints

def __init__(self, username, password, folder, search_name):

can (probably) be

def __init__(self, username: str, password: str, folder: Path, search_name: str):

Also, since these lists are initialized without a direct reference to the args, they should be type-declared:

 self.links: List[str] = []
 self.pictures: List[str] = []
 self.videos: List[str] = []

Paths

Note that I suggest the use of Path. Read about it here:

https://docs.python.org/3/library/pathlib.html

Then later on, you can use it like so:

 self.folder.mkdir(exist_ok=True)

One line, no existence checks necessary. Also, this:

os.path.join(self.folder, f"Image{number}.jpg"

can be easier:

self.folder / f'Image{number}.jpg'

Combined cookie update

I think that

 """Taking cookies"""
 cookies = self.driver.get_cookies()
 for cookie in cookies:
 c = {cookie["name"]: cookie["value"]}
 self.http_base.cookies.update(c)

can be

# Taking cookies
cookies = {
 cookie['name']: cookie['value']
 for cookie in self.driver.get_cookies()
}
self.http_base.cookies.update(cookies)

Quote style

... is inconsistent in places like this:

 if logging_page_id['graphql']['shortcode_media']["is_video"]:

So pick one or the other and stick with it.

Use a generator

scroll_down can become a generator and gain some efficiency:

  • Use a Counter class instance rather than calling end_scroll.count(), which is quite inefficient.
  • Do not maintain an end_scroll list. Rather than appending, yield new_height, which makes the function a generator.

Nomenclature

extraction_url sounds like a noun (i.e. it gets some data for you). That's not actually what it does. Instead, it seems like it submits some links. Call it submit or submit_links (this is a verb, and makes it clear that it's an "action", not a "getter").

Magic numbers

In this:

if end_scroll.count(end_scroll[-1]) > 4:

What is 4? This should be saved to a named constant.

answered Mar 21, 2020 at 16:00
\$\endgroup\$
5
  • \$\begingroup\$ I have two questions. 1) Can i use type hints for every function in the class? 2) In quote style , you mean to put in variable? \$\endgroup\$ Commented Mar 21, 2020 at 21:33
  • \$\begingroup\$ Can i use type hints for every function in the class? - Yes, you should :) \$\endgroup\$ Commented Mar 21, 2020 at 22:31
  • 1
    \$\begingroup\$ In quote style , you mean to put in variable? No, as in choose single or double quotes only. \$\endgroup\$ Commented Mar 21, 2020 at 22:31
  • \$\begingroup\$ For the function scroll_down i didn't make it a generator, instead i tested the expression (condition) the number of posts and the self.links. Should i make it a generator anyway? \$\endgroup\$ Commented Mar 23, 2020 at 15:27
  • \$\begingroup\$ my new question :) \$\endgroup\$ Commented Mar 28, 2020 at 1:41

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.