2
\$\begingroup\$

Python script that can download images and videos of the user, like Gallery with photos or videos. It saves the data in the folder.

How it works:

  • Log in in instragram using selenium and navigate to the profile

  • Check the availability of Instagram profile if it's private or existing

  • Gathering urls from images or videos

  • Using threads and multiprocessing improve execution speed

Usage:

myfile.py -u [email protected] -p mypassword -f myfile -n stackoverjoke

My code:

import requests
import time
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from multiprocessing.dummy import Pool
import urllib.parse
import re
from concurrent.futures import ThreadPoolExecutor
from typing import *
import argparse
chromedriver_path = None
class PrivateException(Exception):
 pass
class InstagramPV:
 def __init__(self, username: str, password: str, folder: Path, search_name: str):
 """
 :param username: username
 :param password: password
 :param folder: folder name
 :param search_name: the name what will search
 """
 self.username = username
 self.password = password
 self.folder = folder
 self.http_base = requests.Session()
 self._search_name = search_name
 self.links: List[str] = []
 self.pictures: List[str] = []
 self.videos: List[str] = []
 self.url: str = 'https://www.instagram.com/{name}/'
 if chromedriver_path is not None:
 self.driver = webdriver.Chrome(chromedriver_path)
 else:
 self.driver = webdriver.Chrome()
 @property
 def name(self) -> str:
 """
 To avoid any errors, with regex find the url and taking the name <search_name>
 :return: The name of the Profile
 """
 find_name = ''.join(re.findall(r'(?P<url>https?://[^\s]+)', self._search_name))
 if find_name.startswith('https'):
 self._search_name = urllib.parse.urlparse(find_name).path.split('/')[1]
 return self._search_name
 def __enter__(self):
 return self
 def __exit__(self, exc_type, exc_val, exc_tb):
 self.http_base.close()
 self.driver.close()
 def check_availability(self) -> None:
 """
 Checking Status code, Taking number of posts, Privacy and followed by viewer
 Raise Error if the Profile is private and not following by viewer
 :return: None
 """
 search = self.http_base.get(self.url.format(name=self.name), params={'__a': 1})
 search.raise_for_status()
 load_and_check = search.json()
 privacy = load_and_check.get('graphql').get('user').get('is_private')
 followed_by_viewer = load_and_check.get('graphql').get('user').get('followed_by_viewer')
 if privacy and not followed_by_viewer:
 raise PrivateException('[!] Account is private')
 def control(self) -> None:
 """
 Create the folder name
 """
 self.folder.mkdir(exist_ok=True)
 def login(self) -> None:
 """Login To Instagram"""
 self.driver.get('https://www.instagram.com/accounts/login')
 WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'form')))
 self.driver.find_element_by_name('username').send_keys(self.username)
 self.driver.find_element_by_name('password').send_keys(self.password)
 submit = self.driver.find_element_by_tag_name('form')
 submit.submit()
 """Check For Invalid Credentials"""
 try:
 var_error = self.driver.find_element_by_class_name('eiCW-').text
 raise ValueError('[!] Invalid Credentials')
 except NoSuchElementException:
 pass
 try:
 """Close Notifications"""
 notifications = WebDriverWait(self.driver, 20).until(
 EC.presence_of_element_located((By.XPATH, '//button[text()="Not Now"]')))
 notifications.click()
 except NoSuchElementException:
 pass
 """Taking cookies"""
 cookies = {
 cookie['name']: cookie['value']
 for cookie in self.driver.get_cookies()
 }
 self.http_base.cookies.update(cookies)
 """Check for availability"""
 self.check_availability()
 self.driver.get(self.url.format(name=self.name))
 self.submit_links()
 def get_href(self) -> None:
 elements = self.driver.find_elements_by_xpath('//a[@href]')
 for elem in elements:
 urls = elem.get_attribute('href')
 if 'p' in urls.split('/'):
 self.links.append(urls)
 def located(self) -> bool:
 """
 Become a flag. While this element is displayed keep scrolling down until it isn't
 :return: True if the element is displayed, False if it isn't
 """
 try:
 self.driver.find_element_by_xpath('//*[@class="_4emnV"]').is_displayed()
 return True
 except NoSuchElementException:
 return False
 def scroll_down(self) -> Iterable[bool]:
 '''Taking hrefs while scrolling down'''
 while True:
 flag = self.located()
 self.get_href()
 time.sleep(1)
 self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
 time.sleep(1)
 yield flag
 def submit_links(self) -> None:
 """Gathering Images and Videos and pass to function <fetch_url> Using ThreadPoolExecutor"""
 for displayed_more in self.scroll_down():
 if not displayed_more:
 break
 self.control()
 seen = set()
 links = [link for link in self.links if not (link in seen or seen.add(link))]
 print('[!] Ready for video - images'.title())
 print(f'[*] extracting {len(links)} posts , please wait...'.title())
 new_links = [urllib.parse.urljoin(link, '?__a=1') for link in links]
 with ThreadPoolExecutor(max_workers=8) as executor:
 for link in new_links:
 executor.submit(self.fetch_url, link)
 def fetch_url(self, url: str) -> None:
 """
 This function extracts images and videos
 :param url: Taking the url
 :return None
 """
 logging_page_id = self.http_base.get(url.split()[0]).json()
 try:
 """Taking Gallery Photos or Videos"""
 for log_pages in logging_page_id['graphql']['shortcode_media']['edge_sidecar_to_children']['edges']:
 video = log_pages['node']['is_video']
 if video:
 video_url = log_pages['node']['video_url']
 self.videos.append(video_url)
 else:
 image = log_pages['node']['display_url']
 self.pictures.append(image)
 except KeyError:
 """Unique photo or Video"""
 image = logging_page_id['graphql']['shortcode_media']['display_url']
 self.pictures.append(image)
 if logging_page_id['graphql']['shortcode_media']['is_video']:
 videos = logging_page_id['graphql']['shortcode_media']['video_url']
 self.videos.append(videos)
 def download_video(self, new_videos: Tuple[int, str]) -> None:
 """
 Saving the video content
 :param new_videos: Tuple[int,str]
 :return: None
 """
 number = new_videos[0]
 link = new_videos[1]
 with open(self.folder / f'Video{number}.mp4', 'wb') as f:
 content_of_video = self.http_base.get(link).content
 f.write(content_of_video)
 def images_download(self, new_pictures: Tuple[int, str]) -> None:
 """
 Saving the picture content
 :param new_pictures: Tuple[int, str]
 :return: None
 """
 number = new_pictures[0]
 link = new_pictures[1]
 with open(self.folder / f'Image{number}.jpg', 'wb') as f:
 content_of_picture = self.http_base.get(link).content
 f.write(content_of_picture)
 def downloading_video_images(self) -> None:
 """Using multiprocessing for Saving Images and Videos"""
 print('[*] ready for saving images and videos!'.title())
 picture_data = enumerate(list(set(self.pictures)))
 video_data = enumerate(list(set(self.videos)))
 pool = Pool(8)
 pool.map(self.images_download, picture_data)
 pool.map(self.download_video, video_data)
 print('[+] Done')
def main():
 parser = argparse.ArgumentParser()
 parser.add_argument('-u', '--username', help='Username or your email of your account', action='store',
 required=True)
 parser.add_argument('-p', '--password', help='Password of your account', action='store', required=True)
 parser.add_argument('-f', '--filename', help='Filename for storing data', action='store', required=True)
 parser.add_argument('-n', '--name', help='Name to search or link', action='store', required=True)
 args = parser.parse_args()
 with InstagramPV(args.username, args.password, Path(args.filename), args.name) as pv:
 pv.login()
 pv.downloading_video_images()
if __name__ == '__main__':
 main()

Changes:

1) I changed the behaviour of the function scroll_down - avoiding "bugs" of instagram

2) Added function located

My previous comparative review tag: Instagram Scraping Using Selenium

asked Apr 4, 2020 at 20:15
\$\endgroup\$

1 Answer 1

3
\$\begingroup\$

Global constants

chromedriver_path should be capitalized. Otherwise: I assume that you manually change it from None to some meaningful value for your local system. Try not to do this - instead, accept that path as an environmental variable, in a config file, or as a command-line parameter.

Captain Obvious

This:

 """
 :param username: username
 :param password: password
 :param folder: folder name
 """

is worse than having no comments at all. Fill these out to be meaningful to someone who doesn't know what your script does.

Side-effects

One would expect, looking from the outside, that name simply returns a string - especially since it's marked as a property. It does that, but it also has the side-effect of setting self._search_name (sometimes). There are at least two problems with this:

  • State modification in a getter - this is occasionally useful, i.e. in caching, but that isn't what you're doing here
  • Conditional state modification whose reason isn't obvious - why is it that a member is only set if the URL is HTTPS?

Names

control doesn't seem to control anything; it creates a directory.

get_href is not a getter; it doesn't return anything. It actually would make more sense as a static getter that yields instead of appending to a list; then the caller could simply self.links.extend(self.get_hrefs()).

located

In its current implementation, this makes no sense:

 try:
 self.driver.find_element_by_xpath('//*[@class="_4emnV"]').is_displayed()
 return True
 except NoSuchElementException:
 return False

You call is_displayed and throw its return value away, relying on a no-such-element to determine the return value of your function. Why call is_displayed at all?

scroll_down

You have a while True that doesn't exit on its own. Instead, the outer caller waits for a boolean:

 for displayed_more in self.scroll_down():
 if not displayed_more:
 break

This entire iterable structure all the way up to get_href needs to be re-thought. What you should have is a generator function that, instead of yielding a bool to terminate, yields a URL string, and breaks out of the loop (with a break, not a boolean flag) when the no-such-element condition is met.

Side-effects in comprehensions

This is particularly gruesome:

 seen = set()
 links = [link for link in self.links if not (link in seen or seen.add(link))]

As soon as you have a term of a statement that's being relied upon to modify the iteration, you should expand this out into a normal loop. However, if I understand this correctly, you're simply removing dupes, in which case

links = set(self.links)

If you care deeply about order, then there are other ways to do this that still don't require a custom generator.

Generator materialization

This:

 new_links = [urllib.parse.urljoin(link, '?__a=1') for link in links]

should use parentheses instead of brackets, because you don't need the list in memory - you only need the generator once through.

Variable reuse

Save

logging_page_id['graphql']['shortcode_media']

to a temporary variable for reuse.

Tuples in a function

This:

def download_video(self, new_videos: Tuple[int, str]) -> None:

can simplify its tuple unpacking from

 number = new_videos[0]
 link = new_videos[1]

to

number, link = new_videos

Magic numbers

Pull the 8 from this

Pool(8)

into a constant, for instance

N_PROCESSES = 8
# ...
Pool(N_PROCESSES)

This is more maintainable and self-documenting.

answered Apr 5, 2020 at 1:34
\$\endgroup\$
11
  • \$\begingroup\$ I have tons of questions but i will ask the most "important" for me. 1) In name you mean to check everything even the URL isn't HTTPS? Should i get rid of property and try something else, because my intention is to extract the name and do the rest. 2) In scroll_down i fell into an infinity loop and i don't know if this is an instagram bug, but i saw "wrong" number of posts while the real posts were less. So, i wanted to find something to stop this, like the end of page. I think that the problem comes from scrolling down. \$\endgroup\$ Commented Apr 5, 2020 at 2:57
  • \$\begingroup\$ 3) In Side-effects in comprehensions i tried to remove the duplicate links without changing the order. Otherwise i could use collections.OrderedDict 3) In Tuples in a function in the previous post you mention You're better off accepting number and link as separate arguments. . 4) Magic numbers? I didn't undertand. Sorry for asking so many things but i am learning from your answers. \$\endgroup\$ Commented Apr 5, 2020 at 2:57
  • 1
    \$\begingroup\$ i want to understand the point of view of "gruesome" - Apologies; that's colloquial and not technical. Basically, it's not a good idea to mutate a set on the inside of a list comprehension. \$\endgroup\$ Commented Apr 6, 2020 at 16:02
  • 1
    \$\begingroup\$ May i ask why using booleans to stop is wrong way? - Instead of looping, yielding a flag, outer looping and listening to the flag, yielding the data you actually care about, which is simpler. \$\endgroup\$ Commented Apr 6, 2020 at 16:08
  • 1
    \$\begingroup\$ I think i improved my code. By the way i learned so much these weeks that i didn't learned for months! Thanks to you! my new question \$\endgroup\$ Commented Apr 9, 2020 at 3:12

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.