3
\$\begingroup\$

I wrote a simple Zoopla real estate scraper for just practicing what I learned so far in Python, Requests, BeautifulSoup and overall web scraping fundamentals. By looking at my code I feel like there would be a better and more elegant way to write it but unfortunately as a beginner I don't know yet. So, I believe I should let experienced guys here at Stack Exchange to review my code for enhancement.

import requests
import json
import csv
import time
from bs4 import BeautifulSoup as bs
class ZooplaScraper:
 results = []
def fetch(self, url):
 print(f'HTTP GET request to URL: {url}', end='')
 res = requests.get(url)
 print(f' | Status code: {res.status_code}')
 return res
def parse(self, html):
 content = bs(html, 'html.parser')
 content_array = content.select('script[id="__NEXT_DATA__"]')
 content_dict = json.loads(content_array[0].string)
 content_details = content_dict['props']['initialProps']['pageProps']['regularListingsFormatted']
 for listing in content_details:
 self.results.append ({
 'listing_id': listing['listingId'],
 'name_title': listing['title'],
 'names': listing['branch']['name'],
 'addresses': listing['address'],
 'agent': 'https://zoopla.co.uk' + listing['branch']['branchDetailsUri'],
 'phone_no': listing['branch']['phone'],
 'picture': listing['image']['src'],
 'prices': listing['price'],
 'Listed_on': listing['publishedOn'],
 'listing_detail_link': 'https://zoopla.co.uk' + listing['listingUris']['detail']
 })
def to_csv(self):
 
 with open('zoopla.csv', 'w') as csv_file:
 writer = csv.DictWriter(csv_file, fieldnames=self.results[0].keys())
 writer.writeheader()
 for row in self.results:
 writer.writerow(row)
 print('Stored results to "zoopla.csv"')
def run(self):
 for page in range(1, 5):
 url = 'https://www.zoopla.co.uk/for-sale/property/london/?page_size=25&q=London&radius=0&results_sort=newest_listings&pn='
 url += str(page)
 res = self.fetch(url)
 self.parse(res.text)
 time.sleep(2)
 self.to_csv()
 if __name__ == '__main__':
 scraper = ZooplaScraper()
 scraper.run()

Basically in this scraper I mostly did is JSON parsing. Problem was all the data on the website was coming from JavaScript under the script tag so I have to select that tag and then pass it to json.loads() and parse the JSON dict to find the right key-value pair.

Reinderien
70.9k5 gold badges76 silver badges256 bronze badges
asked May 19, 2021 at 20:55
\$\endgroup\$
1
  • 1
    \$\begingroup\$ Your code is incorrectly indented, particularly the class methods. \$\endgroup\$ Commented May 20, 2021 at 15:32

1 Answer 1

1
\$\begingroup\$
  • Make a Session instead of issuing individual Requests get; this promotes explicit connection pooling, cookie sharing etc.
  • There's no need for your current prints. If you find them to be of very high value, convert them into real logging calls
  • Pre-define your script tag loading via a SoupStrainer
  • Use urljoin and centralize your root URL definition
  • Do not keep results as a member; it's the result of a method call
  • Do not represent results as a list; it can be an iterator so that results can be depaginated and streamed to disk while keeping memory occupation relatively low
  • Parametrize your fetch function to represent the actual parameters on the web call
  • Consider using PEP484 type hints
  • Your open is missing newline=''

Suggested

from functools import partial
from typing import Any, Dict, Iterable, List
import json
import csv
from urllib.parse import urljoin
from bs4 import BeautifulSoup, SoupStrainer
from requests import Session
JSON = Dict[str, Any]
class ZooplaScraper:
 ROOT = 'https://zoopla.co.uk'
 from_root = partial(urljoin, ROOT)
 def __init__(self):
 self.session = Session()
 strainer = SoupStrainer('script', id='__NEXT_DATA__')
 self.load_script = partial(
 BeautifulSoup, features='html.parser', parse_only=strainer,
 )
 def fetch(
 self, query: str = 'London', radius: int = 0,
 sort: str = 'newest_listings', page: int = 1,
 ) -> str:
 with self.session.get(
 self.from_root(f'for-sale/property/{query.lower()}/'),
 params={
 'page_size': 25,
 'q': query,
 'radius': radius,
 'results_sort': sort,
 'pn': page,
 }
 ) as resp:
 resp.raise_for_status()
 return resp.text
 def load(self, html: str) -> List[JSON]:
 script = self.load_script(html)
 data = json.loads(script.string)
 return data['props']['initialProps']['pageProps']['regularListingsFormatted']
 @classmethod
 def serialise(cls, listings: Iterable[JSON]) -> Iterable[JSON]:
 for listing in listings:
 yield {
 'listing_id': listing['listingId'],
 'name_title': listing['title'],
 'names': listing['branch']['name'],
 'addresses': listing['address'],
 'agent': cls.from_root(listing['branch']['branchDetailsUri']),
 'phone_no': listing['branch']['phone'],
 'picture': listing['image']['src'],
 'prices': listing['price'],
 'listed_on': listing['publishedOn'],
 'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
 }
 def run(
 self,
 query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
 ) -> Iterable[JSON]:
 for page in range(1, 5):
 yield from self.serialise(
 self.load(
 self.fetch(query, radius, sort, page)
 )
 )
 @staticmethod
 def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
 with open(filename, 'w', newline='') as csv_file:
 first = next(results)
 writer = csv.DictWriter(csv_file, fieldnames=first.keys())
 writer.writeheader()
 writer.writerow(first)
 writer.writerows(results)
if __name__ == '__main__':
 scraper = ZooplaScraper()
 scraper.to_csv(scraper.run())

Experimental

This is an experimental, alternate implementation that:

  • Streams the HTTP response and does not need the entire response content to complete
  • Streams the parsed HTML elements and does not need the entire document tree to complete
  • Streams the JSON body and does not need the entire dictionary tree to complete

It is somewhat iterator-heavy, and built more as a proof of concept to demonstrate that this is possible. Advantages include that worst-case memory usage should be reduced, and that BeautifulSoup is no longer needed. Disadvantages include that a new dependency, JsonSlicer, is needed; and this might introduce subtle HTTP inefficiencies from connections that are reset before complete response transmission.

import csv
import logging
from functools import partial
from html.parser import HTMLParser
from typing import Any, Dict, Iterable, Tuple, Optional
from urllib.parse import urljoin
from jsonslicer import JsonSlicer
from requests import Session, Response
JSON = Dict[str, Any]
class StreamParser(HTMLParser):
 def __init__(self, resp: Response):
 resp.raise_for_status() # If the response failed, it can't be parsed
 self.resp = resp # Keep the response so we can stream from it
 self.in_tag = False # Parser state: if we're in the script tag
 self.done = False # Whether we're done the script tag
 self.queue = [] # Queue of text element chunks in the script
 super().__init__() # Initialize the base parser
 def __enter__(self):
 # Start the data chunk iterator
 self.chunks = self.data_chunks()
 return self
 def __exit__(self, exc_type, exc_val, exc_tb) -> None:
 # When we're done, tell the HTTP response stream to close
 self.resp.close()
 def data_chunks(self) -> Iterable[str]:
 # Stream in arbitrary-sized chunks from the response
 for chunk in self.resp.iter_content(
 chunk_size=None, # Get whatever chunks are sent our way
 decode_unicode=True, # Needed for HTMLParser compatibility
 ):
 logging.debug(
 f'{len(chunk)}-character chunk: '
 f'{chunk[:10]}...{chunk[-10:]}'
 )
 # Feed this chunk to the parser, which will in turn call our handle
 # methods and populate the queue
 self.feed(chunk)
 yield from self.queue
 self.queue.clear()
 # We only care about one tag. Once that's parsed, we're done
 # iterating
 if self.done:
 break
 def read(self, n: Optional[int] = -1) -> str:
 # Will be called by JsonSlicer. We only support partial reads for
 # efficiency's sake; we do not build up our own buffer string.
 if n is None or n < 0:
 raise NotImplementedError('Read-to-end not supported')
 try:
 return next(self.chunks)
 except StopIteration:
 return '' # end of stream
 def handle_starttag(self, tag: str, attrs: Iterable[Tuple[str, str]]):
 self.in_tag = tag == 'script' and any(
 k == 'id' and v == '__NEXT_DATA__' for k, v in attrs
 )
 def handle_data(self, data: str) -> None:
 if self.in_tag:
 self.queue.append(data)
 def handle_endtag(self, tag: str) -> None:
 if self.in_tag:
 self.in_tag = False
 self.done = True
 def __iter__(self) -> Iterable[JSON]:
 # Iterating over this object will magically produce individual listing
 # dictionaries. We're an iterator; we delegate to the JsonSlicer
 # iterator; and it in turn invokes read() which uses our data_chunks
 # iterator.
 return JsonSlicer(file=self, path_prefix=(
 'props', 'initialProps', 'pageProps', 'regularListingsFormatted', None,
 ))
class ZooplaScraper:
 ROOT = 'https://zoopla.co.uk'
 from_root = partial(urljoin, ROOT)
 def __init__(self):
 self.session = Session()
 def fetch(
 self, query: str = 'London', radius: int = 0,
 sort: str = 'newest_listings', page: int = 1,
 ) -> StreamParser:
 resp = self.session.get(
 self.from_root(f'for-sale/property/{query.lower()}/'),
 params={
 'page_size': 25,
 'q': query,
 'radius': radius,
 'results_sort': sort,
 'pn': page,
 },
 stream=True,
 )
 return StreamParser(resp)
 @classmethod
 def serialise(cls, listing: JSON) -> JSON:
 # Convert from the site's representation of a listing dict to our own
 return {
 'listing_id': listing['listingId'],
 'name_title': listing['title'],
 'names': listing['branch']['name'],
 'addresses': listing['address'],
 'agent': cls.from_root(listing['branch']['branchDetailsUri']),
 'phone_no': listing['branch']['phone'],
 'picture': listing['image']['src'],
 'prices': listing['price'],
 'listed_on': listing['publishedOn'],
 'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
 }
 def run(
 self,
 query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
 max_pages: int = 4,
 ) -> Iterable[JSON]:
 for page in range(1, max_pages + 1):
 with self.fetch(query, radius, sort, page) as stream:
 for n_listings, data in enumerate(stream):
 yield self.serialise(data)
 logging.info(f'Page {page}: {n_listings} listings')
 @staticmethod
 def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
 with open(filename, 'w', newline='') as csv_file:
 first = next(results)
 writer = csv.DictWriter(csv_file, fieldnames=first.keys())
 writer.writeheader()
 writer.writerow(first)
 writer.writerows(results)
 logging.info(f'Write to {filename} complete')
if __name__ == '__main__':
 # Will include debugging statements from urllib3
 logging.basicConfig(level=logging.INFO) # Switch to DEBUG for more verbosity
 scraper = ZooplaScraper()
 scraper.to_csv(scraper.run())
answered May 20, 2021 at 16:29
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.