Python script that can download images and videos of the user, like Gallery with photos or videos. It saves the data in the folder.
How it works:
Log in in instragram using selenium and navigate to the profile
Check the availability of Instagram profile if it's private or existing
Gathering urls from images or videos
Using threads and multiprocessing improve execution speed
My code:
from pathlib import Path
import requests
import time
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from multiprocessing.dummy import Pool
import urllib.parse
import re
from concurrent.futures import ThreadPoolExecutor
from typing import *
chromedriver_path = None
class PrivateException(Exception):
pass
class InstagramPV:
def __init__(self, username: str, password: str, folder: Path, search_name: str):
"""
:param username: username
:param password: password
:param folder: folder name
:param search_name: the name what will search
"""
self.username = username
self.password = password
self.folder = folder
self.http_base = requests.Session()
self._search_name = search_name
self.links: List[str] = []
self.pictures: List[str] = []
self.videos: List[str] = []
self.url: str = 'https://www.instagram.com/{name}/'
self.posts: int = 0
if chromedriver_path is not None:
self.driver = webdriver.Chrome(chromedriver_path)
else:
self.driver = webdriver.Chrome()
@property
def name(self) -> str:
"""
To avoid any errors, with regex find the url and taking the name <search_name>
:return: The name of the Profile
"""
find_name = ''.join(re.findall(r'(?P<url>https?://[^\s]+)', self._search_name))
if find_name.startswith('https'):
self._search_name = urllib.parse.urlparse(find_name).path.split('/')[1]
return self._search_name
else:
return self._search_name
def __enter__(self):
return self
def check_availability(self) -> None:
"""
Checking Status code, Taking number of posts, Privacy and followed by viewer
Raise Error if the Profile is private and not following by viewer
:return: None
"""
search = self.http_base.get(self.url.format(name=self.name), params={'__a': 1})
search.raise_for_status()
load_and_check = search.json()
self.posts = load_and_check.get('graphql').get('user').get('edge_owner_to_timeline_media').get('count')
privacy = load_and_check.get('graphql').get('user').get('is_private')
followed_by_viewer = load_and_check.get('graphql').get('user').get('followed_by_viewer')
if privacy and not followed_by_viewer:
raise PrivateException('[!] Account is private')
def control(self) -> None:
"""
Create the folder name
"""
self.folder.mkdir(exist_ok=True)
def login(self) -> None:
"""Login To Instagram"""
self.driver.get('https://www.instagram.com/accounts/login')
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'form')))
self.driver.find_element_by_name('username').send_keys(self.username)
self.driver.find_element_by_name('password').send_keys(self.password)
submit = self.driver.find_element_by_tag_name('form')
submit.submit()
"""Check For Invalid Credentials"""
try:
var_error = self.driver.find_element_by_class_name('eiCW-').text
raise ValueError('[!] Invalid Credentials')
except NoSuchElementException:
pass
try:
"""Close Notifications"""
notifications = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH, '//button[text()="Not Now"]')))
notifications.click()
except NoSuchElementException:
pass
"""Taking cookies"""
cookies = {
cookie['name']: cookie['value']
for cookie in self.driver.get_cookies()
}
self.http_base.cookies.update(cookies)
"""Check for availability"""
self.check_availability()
self.driver.get(self.url.format(name=self.name))
return self.scroll_down()
def get_href(self) -> None:
elements = self.driver.find_elements_by_xpath('//a[@href]')
for elem in elements:
urls = elem.get_attribute('href')
if 'p' in urls.split('/'):
self.links.append(urls)
def scroll_down(self) -> None:
"""Taking hrefs while scrolling down"""
while len(list(set(self.links))) < self.posts:
self.get_href()
time.sleep(1)
self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
time.sleep(1)
return self.submit_links()
def submit_links(self) -> None:
"""Gathering Images and Videos and pass to function <fetch_url> Using ThreadPoolExecutor"""
self.control()
links = list(set(self.links))
print('[!] Ready for video - images'.title())
print(f'[*] extracting {len(links)} posts , please wait...'.title())
new_links = [urllib.parse.urljoin(link, '?__a=1') for link in links]
with ThreadPoolExecutor(max_workers=8) as executor:
[executor.submit(self.fetch_url, link) for link in new_links]
def fetch_url(self, url: str) -> None:
"""
This function extracts images and videos
:param url: Taking the url
:return None
"""
logging_page_id = self.http_base.get(url.split()[0]).json()
try:
"""Taking Gallery Photos or Videos"""
for log_pages in logging_page_id['graphql']['shortcode_media']['edge_sidecar_to_children']['edges']:
video = log_pages['node']['is_video']
if video:
video_url = log_pages['node']['video_url']
self.videos.append(video_url)
else:
image = log_pages['node']['display_url']
self.pictures.append(image)
except KeyError:
"""Unique photo or Video"""
image = logging_page_id['graphql']['shortcode_media']['display_url']
self.pictures.append(image)
if logging_page_id['graphql']['shortcode_media']['is_video']:
videos = logging_page_id['graphql']['shortcode_media']['video_url']
self.videos.append(videos)
def download_video(self, new_videos: Tuple[int, str]) -> None:
"""
Saving the video content
:param new_videos: Tuple[int,str]
:return: None
"""
number = new_videos[0]
link = new_videos[1]
with open(self.folder / f'Video{number}.mp4', 'wb') as f:
content_of_video = InstagramPV.content_of_url(self.http_base.get(link))
f.write(content_of_video)
def images_download(self, new_pictures: Tuple[int, str]) -> None:
"""
Saving the picture content
:param new_pictures: Tuple[int, str]
:return: None
"""
number = new_pictures[0]
link = new_pictures[1]
with open(self.folder / f'Image{number}.jpg', 'wb') as f:
content_of_picture = InstagramPV.content_of_url(self.http_base.get(link))
f.write(content_of_picture)
def downloading_video_images(self) -> None:
"""Using multiprocessing for Saving Images and Videos"""
print('[*] ready for saving images and videos!'.title())
picture_data = enumerate(list(set(self.pictures)))
video_data = enumerate(list(set(self.videos)))
pool = Pool(8)
pool.map(self.images_download, picture_data)
pool.map(self.download_video, video_data)
print('[+] Done')
def __exit__(self, exc_type, exc_val, exc_tb):
self.http_base.close()
self.driver.close()
@staticmethod
def content_of_url(req: [requests.sessions.Session, requests.models.Response]) -> bytes:
"""
:param req: requests.sessions.Session, requests.models.Response
:return: Content of Url
"""
return req.content
def main():
USERNAME = ''
PASSWORD = ''
NAME = ''
FOLDER = Path('')
with InstagramPV(USERNAME, PASSWORD, FOLDER, NAME) as pv:
pv.login()
pv.downloading_video_images()
if __name__ == '__main__':
main()
My previous comparative review tag: Instagram Bot, selenium, web scraping
1 Answer 1
Duplicated statements in an if-block
if find_name.startswith('https'):
self._search_name = urllib.parse.urlparse(find_name).path.split('/')[1]
return self._search_name
else:
return self._search_name
should just be
if find_name.startswith('https'):
self._search_name = urllib.parse.urlparse(find_name).path.split('/')[1]
return self._search_name
Type hint difference
You say this has no return:
def login(self) -> None:
But then you do one anyway?
return self.scroll_down()
This is repeated in scroll_down()
itself.
List comprehensions as loops
I find this:
with ThreadPoolExecutor(max_workers=8) as executor:
[executor.submit(self.fetch_url, link) for link in new_links]
to be unnecessary. It's more legible to have a simple for
-loop than to construct a list and throw it away.
Method order
For sane legibility, it's better to put __exit__
directly after __enter__
in the class.
content_of_url
This method:
@staticmethod
def content_of_url(req: [requests.sessions.Session, requests.models.Response]) -> bytes:
"""
:param req: requests.sessions.Session, requests.models.Response
:return: Content of Url
"""
return req.content
doesn't do anything useful enough to deserve being a dedicated method. Even if it did, the type hint for req
seems wrong; it should just be a Response
. I'm not sure why the Session
is mentioned.
Local variables
USERNAME = ''
PASSWORD = ''
NAME = ''
FOLDER = Path('')
should be lowercase, now that they're in function scope.
-
\$\begingroup\$ Thanks!! About the type hints, should just get rid of return or it's returning something that i didnt notice? Also, in The content_of_url, if i dont mention the Session is raising me a warning. Should i put it in the class too or to do something else? \$\endgroup\$AlexDotis– AlexDotis2020年03月28日 15:36:00 +00:00Commented Mar 28, 2020 at 15:36
-
\$\begingroup\$ _ should just get rid of return_ - yes, since the bottom of that stack does not return anything. if i dont mention the Session is raising me a warning - what warning? \$\endgroup\$Reinderien– Reinderien2020年03月28日 15:46:07 +00:00Commented Mar 28, 2020 at 15:46
-
\$\begingroup\$ Strange. It was raising me a warning at self.http_base.get(link) but now nothing. I dont understand. Its ok now \$\endgroup\$AlexDotis– AlexDotis2020年03月28日 16:10:56 +00:00Commented Mar 28, 2020 at 16:10
-
\$\begingroup\$ Should i post my next question ? \$\endgroup\$AlexDotis– AlexDotis2020年03月31日 02:00:17 +00:00Commented Mar 31, 2020 at 2:00
-
\$\begingroup\$ Since you've accepted an answer on this one, I would say yes. \$\endgroup\$Reinderien– Reinderien2020年03月31日 02:33:33 +00:00Commented Mar 31, 2020 at 2:33
Explore related questions
See similar questions with these tags.