Below is a script to scrape data from the National Severe Storms Laboratory Probsevere dataset. Their data is available at a 2 minute interval and is only available for 24 hours before moving to the archives. The data is GeoJSON
which I convert to a MultiIndex
DataFrame
and save as a parquet
. The script runs once an hour to collect the previous hours data at a 10 min interval.
The script is running on a raspberry pi and writing the parquet
to an external HHD. I am collecting the data for a machine learning project. Eventually I will serve the data from the pi to my local network. I use shapely
to convert the geometry
into geometric shapes.
from datetime import datetime
from typing import Mapping
import pandas as pd
import numpy as np
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
NCEP_DATA = "https://mrms.ncep.noaa.gov/data"
scheduler = BlockingScheduler()
def name_to_datetime(names: pd.Series) -> pd.DatetimeIndex:
return pd.DatetimeIndex(names.str.replace("_", "T").str.extract(r"(\d*T\d*).json")[0]).rename("validTime")
def read_mrms(*args: str) -> pd.DataFrame:
url = "/".join([NCEP_DATA, *args]) + "/?C=M;O=D"
return pd.read_html(url)[0].dropna()
def read_probsevere() -> pd.DataFrame:
df = read_mrms("ProbSevere", "PROBSEVERE")
df.index = name_to_datetime(df.Name)
return (NCEP_DATA + "/ProbSevere/PROBSEVERE/" + df["Name"]).rename("url")
def get_last_hours_data():
s = read_probsevere()
last_hour = datetime.utcnow() - pd.to_timedelta(1, unit="h")
is_last_hour = (s.index.day == last_hour.day) & (s.index.hour == last_hour.hour)
is_10_min_interval = (s.index.minute % 10) == 0
return s[is_last_hour & is_10_min_interval]
def to_dataframe(mrms_files: Mapping[pd.Timestamp, str]) -> pd.DataFrame:
def generate():
for vt, url in mrms_files.items():
features = requests.get(url).json()["features"]
print(f"data collected for {vt}")
for feat in features:
props = feat["properties"]
props["validTime"] = vt
props["geometry"] = feat["geometry"]
yield props
ps = pd.DataFrame(generate()).set_index(["validTime", "ID"])
ps["AVG_BEAM_HGT"] = ps["AVG_BEAM_HGT"].str.replace(r"[A-Za-z]", "", regex=True).apply(pd.eval)
ps[["MAXRC_EMISS", "MAXRC_ICECF"]] = (
ps[["MAXRC_EMISS", "MAXRC_ICECF"]]
.stack()
.str.extract(r"(?:\()([a-z]*)(?:\))")
.replace({"weak": 1, "moderate": 2, "strong": 3})
.fillna(0)
.unstack(-1)
.droplevel(0, axis=1)
)
ps.loc[:, ps.columns != "geometry"] = ps.loc[:, ps.columns != "geometry"].astype(np.float32)
return ps
@scheduler.scheduled_job(IntervalTrigger(hours=1))
def on_hour():
template = "/media/external/data/{0}.parquet"
last = get_last_hours_data()
df = to_dataframe(last)
file_name = template.format(datetime.now().strftime("%Y-%m-%d.HR%H"))
df.to_parquet(file_name)
print(f"file saved as {file_name}")
if __name__ == "__main__":
on_hour()
scheduler.start()
-
\$\begingroup\$ Please describe how this process is daemonised on the Pi OS. \$\endgroup\$Reinderien– Reinderien2022年05月31日 13:28:18 +00:00Commented May 31, 2022 at 13:28
-
\$\begingroup\$ The OS on my Pi is Ubuntu Server 22.04 LTS, which shipped with python 3.8. From my main pc I can ssh into the Pi. ubuntu.com/download/raspberry-pi \$\endgroup\$Jason Leaver– Jason Leaver2022年05月31日 22:13:51 +00:00Commented May 31, 2022 at 22:13
-
\$\begingroup\$ I may have I misunderstood the question. I’m just calling the script directly to run, ‘python main.py’ . I suppose I things could be containerized. \$\endgroup\$Jason Leaver– Jason Leaver2022年06月01日 01:27:34 +00:00Commented Jun 1, 2022 at 1:27
1 Answer 1
In terms of scheduling and process persistence, it sounds like you "just run it" and hope that it stays running. This seems risky and is more effort than it's worth. Delete apscheduler
and run the process once an hour using cron
. The process will start, do its business and save its file, then quit.
name_to_datetime
is unnecessarily complex. You don't need to do string operations on this series: instead, issue one call to to_datetime
with exact=False
so that you don't need to worry about the surrounding text. This will return another series and not an index, but Pandas is smart enough to make an index upon assignment.
Your url
creation is also more complex than it needs to be. You already make a URL that you pass to read_html
- why not re-use that?
I don't think it's a great idea to directly compare the day and hour components of your index. Instead, you can simply subtract the index from now
, and compare to your timedelta.
Rather than modulating your minute
to choose a row once every ten minutes, this is what resample
has been written for. Use a frequency string of 10T
.
I think you have misinterpreted AVG_BEAM_HGT
. When it says
'15.65 kft / 4.77 km'
it does NOT mean "please divide 15.65 kilofeet by 4.77 kilometres", and you should NOT use eval
(especially for untrusted data that have come fresh off the internet!) Instead it means "the average beam height is 15.65 kilofeet, otherwise known as 4.77 kilometres for people who prefer units that are not an affront to all things holy". Drop everything except the 4.77 quantity.
Your MAXRC
processing needs some love. Why are you paying attention to the "weak"/"moderate"/"strong" descriptors when it gives you the numeric equivalent as well? You should ignore those descriptors and only use the numeric equivalent. After all, the site itself has generated those descriptors based on very simple threshold rules from the data. Don't stack
/unstack
, don't droplevel
, and just do a replace
on that sub-frame.
In your filename formatting, you don't need to call strftime
: you can pass that format definition directly in the format field of an f-string.
I must say that you're getting the hang of indices - you've chosen them well for these data (your validTime
and ID
).
Your URL suffix:
"/?C=M;O=D"
is deeply suspicious. If those are separate URL query parameters, I would expect &
rather than ;
, which can be done for you by passing those letters into the params
kwarg of get
.
In your call to read_html
, you should be telling it to skip rows 1 and 2 as those contain a separator and a parent directory, respectively.
Something weird is going on in the mrms_files
argument to to_dataframe
. That's not a mapping: that's already a DataFrame
, and you should not be calling .items()
, but rather .iterrows()
.
Suggested
I temporarily switched from parquet to CSV output so that I can inspect the results.
import re
from datetime import datetime
import pandas as pd
from requests import Session
NCEP_DATA = 'https://mrms.ncep.noaa.gov/data'
def name_to_datetime(names: pd.Series) -> pd.Series:
times = pd.to_datetime(names, format='%Y%m%d_%H%M%S', exact=False)
return times.rename('validTime')
def read_mrms(session: Session, *args: str) -> pd.DataFrame:
url = '/'.join((NCEP_DATA, *args))
with session.get(
url=url,
params={'C': 'M', 'O': 'D'},
headers={'Accept': 'text/html'},
) as response:
response.raise_for_status()
df, = pd.read_html(
io=response.text,
skiprows=[1, 2], # separator, parent dir
parse_dates=['Last modified'],
)
df = df.dropna()
df.index = name_to_datetime(df.Name)
df['url'] = (url + '/') + df.Name
return df
def read_probsevere(session: Session) -> pd.DataFrame:
return read_mrms(session, 'ProbSevere', 'PROBSEVERE')
def get_last_hours_data(session: Session) -> pd.DataFrame:
s = read_probsevere(session)
one_hour = pd.to_timedelta(1, unit='H')
is_last_hour = (datetime.utcnow() - s.index) <= one_hour
s = s[is_last_hour].resample('10T').first()
return s
def to_dataframe(session: Session, mrms_files: pd.DataFrame) -> pd.DataFrame:
def generate():
for valid_time, row in mrms_files.iterrows():
with session.get(
url=row.url,
headers={'Accept': 'application/json'},
) as response:
response.raise_for_status()
features = response.json()['features']
print(f'data collected for {valid_time}')
for feat in features:
props = feat['properties']
props['validTime'] = valid_time
props['geometry'] = feat['geometry']
yield props
ps = pd.DataFrame(generate()).set_index(['validTime', 'ID'])
# e.g. 3.97 kft / 1.21 km
ps['AVG_BEAM_HGT'] = ps.AVG_BEAM_HGT.str.extract(
re.compile(
r'''(?x) # verbose
( # capturing group
[\d.]+ # float characters, at least one
)
\s*km # consume whitespace, literal 'km'
'''
), expand=False,
).astype(float)
maxrc = ['MAXRC_EMISS', 'MAXRC_ICECF']
ps[maxrc] = (
ps[maxrc]
.replace(
to_replace=re.compile(
# e.g. 4.4%/min
r'''(?x) # verbose
^.*? # start, lazy consume everything
( # capturing group
[\d.]+ # float chars, at least one, greedy
)
%? # optional literal percent
/min # literal "per minute"
.*$ # consume everything, end
'''
), value=r'1円', regex=True,
)
.replace('N/A', 0)
.astype(float)
)
return ps
def on_hour() -> None:
with Session() as session:
last = get_last_hours_data(session)
df = to_dataframe(session, last)
# /media/external/data/...parquet actually
file_name = f'data_{datetime.now():%Y-%m-%d.HR%H}.csv'
df.to_csv(file_name)
print(f'file saved as {file_name}')
if __name__ == '__main__':
on_hour()
Output
data collected for 2022年06月11日 09:40:00
data collected for 2022年06月11日 09:50:00
data collected for 2022年06月11日 10:00:00
data collected for 2022年06月11日 10:10:00
data collected for 2022年06月11日 10:20:00
data collected for 2022年06月11日 10:30:00
data collected for 2022年06月11日 10:40:00
file saved as data_2022年06月11日.HR08.csv
-
1\$\begingroup\$ I've replaced the pi with a used Intel NUC for a little more processing power and a more familiar architecture. Since initially asking the question I had read about cron and will use it in my deployment to the NUC. This weekend I'll try and connect the dots with what I'm attempting to accomplish with this data and the data from the nomads site you previously reviewed. I cannot express my gratitude enough, you are an excellent teacher. \$\endgroup\$Jason Leaver– Jason Leaver2022年06月11日 11:31:09 +00:00Commented Jun 11, 2022 at 11:31
-
1\$\begingroup\$ In regards to the url suffix that’s just a method of sorting the html. "/?C=M;O=D" is roughly equivalent to ‘?column=modified&order=descending’ \$\endgroup\$Jason Leaver– Jason Leaver2022年06月12日 13:40:15 +00:00Commented Jun 12, 2022 at 13:40
-
\$\begingroup\$ Ok, but was that really supposed to be a semicolon? \$\endgroup\$Reinderien– Reinderien2022年06月12日 14:42:58 +00:00Commented Jun 12, 2022 at 14:42
-
\$\begingroup\$ I agree it is unusual and would be more appropriate to use
?C=N&O=A
. This is the html element<a href="?C=N;O=A">Name</a>
\$\endgroup\$Jason Leaver– Jason Leaver2022年06月12日 18:09:30 +00:00Commented Jun 12, 2022 at 18:09 -
\$\begingroup\$ on another note, so I've been looking into
cron
and have tooled around with some of its usage* * * * * echo "Hello world" >> /var/log/cron.log 2>&1
. I've since set up a Docker container and minikube deployment & service for the api. Where I intend to use something along these lines. CronJobs \$\endgroup\$Jason Leaver– Jason Leaver2022年06月12日 18:15:46 +00:00Commented Jun 12, 2022 at 18:15