-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Update check_links.py #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
267 changes: 154 additions & 113 deletions
tools/check_links.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,189 +1,230 @@ | ||
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """ | ||
| FlyPython 链接检查工具 | ||
| 用于定期检查README文件中所有外部链接的有效性 | ||
| FlyPython Link Checker Tool | ||
| Periodically checks the validity of all external links in README files | ||
| """ | ||
|
|
||
| import re | ||
| import requests | ||
| import time | ||
| import json | ||
| import os | ||
| from pathlib import Path | ||
| from urllib.parse import urlparse | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from typing import List, Dict | ||
|
|
||
| import requests | ||
| from requests.adapters import HTTPAdapter | ||
| from urllib3.util.retry import Retry | ||
|
|
||
|
|
||
| class LinkChecker: | ||
| def __init__(self): | ||
| self.session = requests.Session() | ||
| self.session.headers.update({ | ||
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | ||
| }) | ||
| self.timeout = 10 | ||
| def __init__(self, timeout: int = 10, max_workers: int = 10): | ||
| self.session = self._create_session() | ||
| self.timeout = timeout | ||
| self.max_workers = max_workers | ||
| self.results = { | ||
| 'working': [], | ||
| 'broken': [], | ||
| 'redirect': [], | ||
| 'timeout': [], | ||
| 'unknown': [] | ||
| } | ||
|
|
||
| def extract_links_from_file(self, filename): | ||
| """从markdown文件中提取所有外部链接""" | ||
| self.processed_urls = set() | ||
|
|
||
| def _create_session(self) -> requests.Session: | ||
| """Create a requests session with retry strategy and headers""" | ||
| session = requests.Session() | ||
|
|
||
| # Configure retry strategy | ||
| retry_strategy = Retry( | ||
| total=2, | ||
| backoff_factor=0.5, | ||
| status_forcelist=[429, 500, 502, 503, 504] | ||
| ) | ||
| adapter = HTTPAdapter(max_retries=retry_strategy) | ||
| session.mount('http://', adapter) | ||
| session.mount('https://', adapter) | ||
|
|
||
| # Set user agent | ||
| session.headers.update({ | ||
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | ||
| }) | ||
|
|
||
| return session | ||
|
|
||
| def extract_links_from_file(self, filename: str) -> List[Dict]: | ||
| """Extract all external links from a markdown file""" | ||
| filepath = Path(filename) | ||
|
|
||
| if not filepath.exists(): | ||
| print(f"File not found: {filename}") | ||
| return [] | ||
|
|
||
| try: | ||
| with open(filename, 'r', encoding='utf-8') as f: | ||
| content = f.read() | ||
| content = filepath.read_text(encoding='utf-8') | ||
| except Exception as e: | ||
| print(f"无法读取文件 {filename}: {e}") | ||
| print(f"Failed to read {filename}: {e}") | ||
| return [] | ||
|
|
||
| # 匹配markdown链接格式 [text](url) | ||
| markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content) | ||
|
|
||
| # 匹配纯链接格式 | ||
| url_pattern = r'https?://[^\s\])\}]+' | ||
| plain_links = re.findall(url_pattern, content) | ||
|
|
||
| links = [] | ||
|
|
||
| # 处理markdown链接 | ||
| # Extract markdown links [text](url) | ||
| markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content) | ||
| for text, url in markdown_links: | ||
| if url.startswith('http'): | ||
| links.append({ | ||
| 'text': text, | ||
| 'url': url, | ||
| 'file': filename, | ||
| 'file': str(filepath), | ||
| 'type': 'markdown' | ||
| }) | ||
|
|
||
| # 处理纯链接 | ||
| for url in plain_links: | ||
| # 避免重复 | ||
| if not any(link['url'] == url for link in links): | ||
| # Extract plain URLs | ||
| plain_urls = re.findall(r'https?://[^\s\])\}]+', content) | ||
| seen = {link['url'] for link in links} | ||
|
|
||
| for url in plain_urls: | ||
| if url not in seen: | ||
| links.append({ | ||
| 'text': url, | ||
| 'url': url, | ||
| 'file': filename, | ||
| 'file': str(filepath), | ||
| 'type': 'plain' | ||
| }) | ||
| seen.add(url) | ||
|
|
||
| return links | ||
| def check_link(self, link): | ||
| """检查单个链接的状态""" | ||
|
|
||
| def check_link(self, link: Dict) -> Dict: | ||
| """Check the status of a single link""" | ||
| url = link['url'] | ||
|
|
||
| if url in self.processed_urls: | ||
| return link | ||
|
|
||
| self.processed_urls.add(url) | ||
|
|
||
| try: | ||
| # Try HEAD request first (faster) | ||
| response = self.session.head(url, timeout=self.timeout, allow_redirects=True) | ||
| status_code = response.status_code | ||
|
|
||
| if status_code == 200: | ||
| link['status'] = 'working' | ||
| link['status_code'] = status_code | ||
| self.results['working'].append(link) | ||
| elif 300 <= status_code < 400: | ||
| link['status'] = 'redirect' | ||
| link['status_code'] = status_code | ||
| link['final_url'] = response.url | ||
| self.results['redirect'].append(link) | ||
| else: | ||
| # 尝试GET请求,有些网站不支持HEAD | ||
| try: | ||
| response = self.session.get(url, timeout=self.timeout) | ||
| if response.status_code == 200: | ||
| link['status'] = 'working' | ||
| link['status_code'] = response.status_code | ||
| self.results['working'].append(link) | ||
| else: | ||
| link['status'] = 'broken' | ||
| link['status_code'] = response.status_code | ||
| self.results['broken'].append(link) | ||
| except: | ||
| link['status'] = 'broken' | ||
| link['status_code'] = status_code | ||
| self.results['broken'].append(link) | ||
| return self._process_response(link, response) | ||
|
|
||
| except requests.exceptions.Timeout: | ||
| link['status'] = 'timeout' | ||
| link['error'] = 'Request timeout' | ||
| self.results['timeout'].append(link) | ||
| return link | ||
|
|
||
| except requests.exceptions.RequestException as e: | ||
| link['status'] = 'unknown' | ||
| link['error'] = str(e) | ||
| self.results['unknown'].append(link) | ||
| # Fall back to GET request for servers that don't support HEAD | ||
| try: | ||
| response = self.session.get(url, timeout=self.timeout) | ||
| return self._process_response(link, response) | ||
| except requests.exceptions.RequestException: | ||
| link['status'] = 'unknown' | ||
| link['error'] = str(e) | ||
| self.results['unknown'].append(link) | ||
| return link | ||
|
|
||
| def _process_response(self, link: Dict, response: requests.Response) -> Dict: | ||
| """Process HTTP response and categorize link""" | ||
| status_code = response.status_code | ||
|
|
||
| if status_code == 200: | ||
| link['status'] = 'working' | ||
| self.results['working'].append(link) | ||
| elif 300 <= status_code < 400: | ||
| link['status'] = 'redirect' | ||
| link['final_url'] = response.url | ||
| self.results['redirect'].append(link) | ||
| else: | ||
| link['status'] = 'broken' | ||
| self.results['broken'].append(link) | ||
|
|
||
| link['status_code'] = status_code | ||
| return link | ||
| def check_all_links(self, links, max_workers=10): | ||
| """并发检查所有链接""" | ||
| print(f"开始检查 {len(links)} 个链接...") | ||
|
|
||
| def check_all_links(self, links: List[Dict]) -> None: | ||
| """Concurrently check all links""" | ||
| print(f"Checking {len(links)} links...\n") | ||
|
|
||
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | ||
| future_to_link = {executor.submit(self.check_link, link): link for link in links} | ||
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | ||
| futures = {executor.submit(self.check_link, link): link for link in links} | ||
|
|
||
| for i, future in enumerate(as_completed(future_to_link), 1): | ||
| link = future_to_link[future] | ||
| for i, future in enumerate(as_completed(futures), 1): | ||
| link = futures[future] | ||
| try: | ||
| result = future.result() | ||
| status = result.get('status', 'unknown') | ||
| print(f"[{i}/{len(links)}] {status.upper()}: {result['url']}") | ||
| time.sleep(0.1) | ||
| status = result.get('status', 'unknown').upper() | ||
| print(f"[{i}/{len(links)}] {status}: {result['url']}") | ||
| except Exception as e: | ||
| print(f"检查链接时出错 {link['url']}: {e}") | ||
| def generate_report(self): | ||
| """生成检查报告""" | ||
| print(f"Error checking {link['url']}: {e}") | ||
|
|
||
| def generate_report(self, output_dir: str = 'reports') -> None: | ||
| """Generate and save detailed report""" | ||
| total = sum(len(links) for links in self.results.values()) | ||
|
|
||
| print("\n" + "="*60) | ||
| print("链接检查报告") | ||
| print("="*60) | ||
| print(f"总链接数: {total}") | ||
| print(f"正常链接: {len(self.results['working'])}") | ||
| print(f"重定向链接: {len(self.results['redirect'])}") | ||
| print(f"失效链接: {len(self.results['broken'])}") | ||
| print(f"超时链接: {len(self.results['timeout'])}") | ||
| print(f"未知状态: {len(self.results['unknown'])}") | ||
|
|
||
| # 保存详细结果 | ||
| os.makedirs('../reports', exist_ok=True) | ||
| with open('../reports/link_check_results.json', 'w', encoding='utf-8') as f: | ||
| report = f""" | ||
| {'='*60} | ||
| Link Check Report | ||
| {'='*60} | ||
| Total Links: {total} | ||
| ✓ Working: {len(self.results['working'])} | ||
| → Redirects: {len(self.results['redirect'])} | ||
| ✗ Broken: {len(self.results['broken'])} | ||
| ⏱ Timeouts: {len(self.results['timeout'])} | ||
| ? Unknown: {len(self.results['unknown'])} | ||
| {'='*60} | ||
| """ | ||
| print(report) | ||
|
|
||
| # Save detailed results | ||
| output_path = Path(output_dir) | ||
| output_path.mkdir(exist_ok=True) | ||
|
|
||
| results_file = output_path / 'link_check_results.json' | ||
| with open(results_file, 'w', encoding='utf-8') as f: | ||
| json.dump(self.results, f, ensure_ascii=False, indent=2) | ||
|
|
||
| print(f"\n详细结果已保存到: reports/link_check_results.json") | ||
| print(f"Detailed results saved to: {results_file}") | ||
|
|
||
| def deduplicate_links(self, links: List[Dict]) -> List[Dict]: | ||
| """Remove duplicate links by URL""" | ||
| seen = set() | ||
| unique = [] | ||
| for link in links: | ||
| if link['url'] not in seen: | ||
| unique.append(link) | ||
| seen.add(link['url']) | ||
| return unique | ||
|
|
||
|
|
||
| def main(): | ||
| checker = LinkChecker() | ||
| files = ['../README.md', '../README_cn.md'] | ||
|
|
||
| # 从README文件提取链接 (相对于项目根目录) | ||
| files_to_check = ['../README.md', '../README_cn.md'] | ||
| # Extract links | ||
| checker = LinkChecker(timeout=10, max_workers=10) | ||
| all_links = [] | ||
|
|
||
| for filename in files_to_check: | ||
| print(f"从 {filename} 提取链接...") | ||
| for filename in files: | ||
| print(f"Extracting links from {filename}...") | ||
| links = checker.extract_links_from_file(filename) | ||
| all_links.extend(links) | ||
| print(f"找到 {len(links)} 个链接") | ||
| if links: | ||
| all_links.extend(links) | ||
| print(f"Found {len(links)} links\n") | ||
|
|
||
| if not all_links: | ||
| print("没有找到任何链接!") | ||
| print("No links found!") | ||
| return | ||
|
|
||
| # 去重 | ||
| unique_links = [] | ||
| seen_urls = set() | ||
| for link in all_links: | ||
| if link['url'] not in seen_urls: | ||
| unique_links.append(link) | ||
| seen_urls.add(link['url']) | ||
|
|
||
| print(f"去重后共 {len(unique_links)} 个唯一链接") | ||
| # Deduplicate and check | ||
| unique_links = checker.deduplicate_links(all_links) | ||
| print(f"Checking {len(unique_links)} unique links\n") | ||
|
|
||
| # 检查链接 | ||
| checker.check_all_links(unique_links) | ||
|
|
||
| # 生成报告 | ||
| checker.generate_report() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
| main() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.