"""The CheckExternalLinksBuilder class."""
from __future__ import annotations
import json
import re
import socket
import time
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from html.parser import HTMLParser
from os import path
from queue import PriorityQueue, Queue
from threading import Thread
from typing import TYPE_CHECKING, NamedTuple, cast
from urllib.parse import unquote, urlparse, urlsplit, urlunparse
from docutils import nodes
from requests.exceptions import ConnectionError, HTTPError, SSLError, TooManyRedirects
from sphinx.builders.dummy import DummyBuilder
from sphinx.locale import __
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import encode_uri, logging, requests
from sphinx.util.console import darkgray, darkgreen, purple, red, turquoise # type: ignore
from sphinx.util.nodes import get_node_line
if TYPE_CHECKING:
from typing import Any, Callable, Generator, Iterator
from requests import Response
from sphinx.application import Sphinx
from sphinx.config import Config
logger = logging.getLogger(__name__)
uri_re = re.compile('([a-z]+:)?//') # matches to foo:// and // (a protocol relative URL)
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml;q=0.9,*/*;q=0.8',
}
CHECK_IMMEDIATELY = 0
QUEUE_POLL_SECS = 1
DEFAULT_DELAY = 60.0
[docs]class CheckExternalLinksBuilder(DummyBuilder):
"""
Checks for broken external links.
"""
name = 'linkcheck'
epilog = __('Look for any errors in the above output or in '
'%(outdir)s/output.txt')
def init(self) -> None:
self.broken_hyperlinks = 0
self.hyperlinks: dict[str, Hyperlink] = {}
# set a timeout for non-responding servers
socket.setdefaulttimeout(5.0)
def finish(self) -> None:
checker = HyperlinkAvailabilityChecker(self.config)
logger.info('')
output_text = path.join(self.outdir, 'output.txt')
output_json = path.join(self.outdir, 'output.json')
with open(output_text, 'w', encoding='utf-8') as self.txt_outfile, \
open(output_json, 'w', encoding='utf-8') as self.json_outfile:
for result in checker.check(self.hyperlinks):
self.process_result(result)
if self.broken_hyperlinks:
self.app.statuscode = 1
def process_result(self, result: CheckResult) -> None:
filename = self.env.doc2path(result.docname, False)
linkstat = {'filename': filename, 'lineno': result.lineno,
'status': result.status, 'code': result.code, 'uri': result.uri,
'info': result.message}
self.write_linkstat(linkstat)
if result.status == 'unchecked':
return
if result.status == 'working' and result.message == 'old':
return
if result.lineno:
logger.info('(%16s: line %4d) ', result.docname, result.lineno, nonl=True)
if result.status == 'ignored':
if result.message:
logger.info(darkgray('-ignored- ') + result.uri + ': ' + result.message)
else:
logger.info(darkgray('-ignored- ') + result.uri)
elif result.status == 'local':
logger.info(darkgray('-local- ') + result.uri)
self.write_entry('local', result.docname, filename, result.lineno, result.uri)
elif result.status == 'working':
logger.info(darkgreen('ok ') + result.uri + result.message)
elif result.status == 'broken':
if self.app.quiet or self.app.warningiserror:
logger.warning(__('broken link: %s (%s)'), result.uri, result.message,
location=(result.docname, result.lineno))
else:
logger.info(red('broken ') + result.uri + red(' - ' + result.message))
self.write_entry('broken', result.docname, filename, result.lineno,
result.uri + ': ' + result.message)
self.broken_hyperlinks += 1
elif result.status == 'redirected':
try:
text, color = {
301: ('permanently', purple),
302: ('with Found', purple),
303: ('with See Other', purple),
307: ('temporarily', turquoise),
308: ('permanently', purple),
}[result.code]
except KeyError:
text, color = ('with unknown code', purple)
linkstat['text'] = text
if self.config.linkcheck_allowed_redirects:
logger.warning('redirect ' + result.uri + ' - ' + text + ' to ' +
result.message, location=(result.docname, result.lineno))
else:
logger.info(color('redirect ') + result.uri +
color(' - ' + text + ' to ' + result.message))
self.write_entry('redirected ' + text, result.docname, filename,
result.lineno, result.uri + ' to ' + result.message)
else:
raise ValueError('Unknown status %s.' % result.status)
def write_linkstat(self, data: dict) -> None:
self.json_outfile.write(json.dumps(data))
self.json_outfile.write('\n')
def write_entry(self, what: str, docname: str, filename: str, line: int,
uri: str) -> None:
self.txt_outfile.write(f'{filename}:{line}: [{what}] {uri}\n')
class HyperlinkCollector(SphinxPostTransform):
builders = ('linkcheck',)
default_priority = 800
def run(self, **kwargs: Any) -> None:
builder = cast(CheckExternalLinksBuilder, self.app.builder)
hyperlinks = builder.hyperlinks
docname = self.env.docname
# reference nodes
for refnode in self.document.findall(nodes.reference):
if 'refuri' in refnode:
uri = refnode['refuri']
_add_uri(self.app, uri, refnode, hyperlinks, docname)
# image nodes
for imgnode in self.document.findall(nodes.image):
uri = imgnode['candidates'].get('?')
if uri and '://' in uri:
_add_uri(self.app, uri, imgnode, hyperlinks, docname)
# raw nodes
for rawnode in self.document.findall(nodes.raw):
uri = rawnode.get('source')
if uri and '://' in uri:
_add_uri(self.app, uri, rawnode, hyperlinks, docname)
def _add_uri(app: Sphinx, uri: str, node: nodes.Element,
hyperlinks: dict[str, Hyperlink], docname: str) -> None:
if newuri := app.emit_firstresult('linkcheck-process-uri', uri):
uri = newuri
try:
lineno = get_node_line(node)
except ValueError:
lineno = -1
if uri not in hyperlinks:
hyperlinks[uri] = Hyperlink(uri, docname, app.env.doc2path(docname), lineno)
class Hyperlink(NamedTuple):
uri: str
docname: str
docpath: str
lineno: int
class HyperlinkAvailabilityChecker:
def __init__(self, config: Config) -> None:
self.config = config
self.rate_limits: dict[str, RateLimit] = {}
self.rqueue: Queue[CheckResult] = Queue()
self.workers: list[Thread] = []
self.wqueue: PriorityQueue[CheckRequest] = PriorityQueue()
self.num_workers: int = config.linkcheck_workers
self.to_ignore: list[re.Pattern[str]] = list(map(re.compile,
self.config.linkcheck_ignore))
def check(self, hyperlinks: dict[str, Hyperlink]) -> Generator[CheckResult, None, None]:
self.invoke_threads()
total_links = 0
for hyperlink in hyperlinks.values():
if self.is_ignored_uri(hyperlink.uri):
yield CheckResult(hyperlink.uri, hyperlink.docname, hyperlink.lineno,
'ignored', '', 0)
else:
self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, hyperlink), False)
total_links += 1
done = 0
while done < total_links:
yield self.rqueue.get()
done += 1
self.shutdown_threads()
def invoke_threads(self) -> None:
for _i in range(self.num_workers):
thread = HyperlinkAvailabilityCheckWorker(self.config,
self.rqueue, self.wqueue,
self.rate_limits)
thread.start()
self.workers.append(thread)
def shutdown_threads(self) -> None:
self.wqueue.join()
for _worker in self.workers:
self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, None), False)
def is_ignored_uri(self, uri: str) -> bool:
return any(pat.match(uri) for pat in self.to_ignore)
class CheckRequest(NamedTuple):
next_check: float
hyperlink: Hyperlink | None
class CheckResult(NamedTuple):
uri: str
docname: str
lineno: int
status: str
message: str
code: int
class HyperlinkAvailabilityCheckWorker(Thread):
"""A worker class for checking the availability of hyperlinks."""
def __init__(self, config: Config,
rqueue: Queue[CheckResult],
wqueue: Queue[CheckRequest],
rate_limits: dict[str, RateLimit]) -> None:
self.rate_limits = rate_limits
self.rqueue = rqueue
self.wqueue = wqueue
self.anchors_ignore: list[re.Pattern[str]] = list(
map(re.compile, config.linkcheck_anchors_ignore))
self.anchors_ignore_for_url: list[re.Pattern[str]] = list(
map(re.compile, config.linkcheck_anchors_ignore_for_url))
self.documents_exclude: list[re.Pattern[str]] = list(
map(re.compile, config.linkcheck_exclude_documents))
self.auth = [(re.compile(pattern), auth_info) for pattern, auth_info
in config.linkcheck_auth]
self.timeout: int | float | None = config.linkcheck_timeout
self.request_headers: dict[str, dict[str, str]] = config.linkcheck_request_headers
self.check_anchors: bool = config.linkcheck_anchors
self.allowed_redirects: dict[re.Pattern[str], re.Pattern[str]]
self.allowed_redirects = config.linkcheck_allowed_redirects
self.retries: int = config.linkcheck_retries
self.rate_limit_timeout = config.linkcheck_rate_limit_timeout
self.user_agent = config.user_agent
self.tls_verify = config.tls_verify
self.tls_cacerts = config.tls_cacerts
self._session = requests._Session()
super().__init__(daemon=True)
def run(self) -> None:
while True:
next_check, hyperlink = self.wqueue.get()
if hyperlink is None:
# An empty hyperlink is a signal to shutdown the worker; cleanup resources here
self._session.close()
break
uri, docname, _docpath, lineno = hyperlink
if uri is None:
break
netloc = urlsplit(uri).netloc
try:
# Refresh rate limit.
# When there are many links in the queue, workers are all stuck waiting
# for responses, but the builder keeps queuing. Links in the queue may
# have been queued before rate limits were discovered.
next_check = self.rate_limits[netloc].next_check
except KeyError:
pass
if next_check > time.time():
# Sleep before putting message back in the queue to avoid
# waking up other threads.
time.sleep(QUEUE_POLL_SECS)
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
self.wqueue.task_done()
continue
status, info, code = self._check(docname, uri, hyperlink)
if status == 'rate-limited':
logger.info(darkgray('-rate limited- ') + uri + darkgray(' | sleeping...'))
else:
self.rqueue.put(CheckResult(uri, docname, lineno, status, info, code))
self.wqueue.task_done()
def _check(self, docname: str, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
# check for various conditions without bothering the network
for doc_matcher in self.documents_exclude:
if doc_matcher.match(docname):
info = (
f'{docname} matched {doc_matcher.pattern} from '
'linkcheck_exclude_documents'
)
return 'ignored', info, 0
if len(uri) == 0 or uri.startswith(('#', 'mailto:', 'tel:')):
return 'unchecked', '', 0
if not uri.startswith(('http:', 'https:')):
if uri_re.match(uri):
# Non-supported URI schemes (ex. ftp)
return 'unchecked', '', 0
src_dir = path.dirname(hyperlink.docpath)
if path.exists(path.join(src_dir, uri)):
return 'working', '', 0
return 'broken', '', 0
# need to actually check the URI
status, info, code = '', '', 0
for _ in range(self.retries):
status, info, code = self._check_uri(uri, hyperlink)
if status != 'broken':
break
return status, info, code
def _retrieval_methods(self,
check_anchors: bool,
anchor: str) -> Iterator[tuple[Callable, dict]]:
if not check_anchors or not anchor:
yield self._session.head, {'allow_redirects': True}
yield self._session.get, {'stream': True}
def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
req_url, delimiter, anchor = uri.partition('#')
if delimiter and anchor:
for rex in self.anchors_ignore:
if rex.match(anchor):
anchor = ''
break
else:
for rex in self.anchors_ignore_for_url:
if rex.match(req_url):
anchor = ''
break
# handle non-ASCII URIs
try:
req_url.encode('ascii')
except UnicodeError:
req_url = encode_uri(req_url)
# Get auth info, if any
for pattern, auth_info in self.auth: # noqa: B007 (false positive)
if pattern.match(uri):
break
else:
auth_info = None
# update request headers for the URL
headers = _get_request_headers(uri, self.request_headers)
# Linkcheck HTTP request logic:
#
# - Attempt HTTP HEAD before HTTP GET unless page content is required.
# - Follow server-issued HTTP redirects.
# - Respect server-issued HTTP 429 back-offs.
error_message = ''
status_code = -1
response_url = retry_after = ''
for retrieval_method, kwargs in self._retrieval_methods(self.check_anchors, anchor):
try:
with retrieval_method(
url=req_url, auth=auth_info,
headers=headers,
timeout=self.timeout,
**kwargs,
_user_agent=self.user_agent,
_tls_info=(self.tls_verify, self.tls_cacerts),
) as response:
if (self.check_anchors and response.ok and anchor
and not contains_anchor(response, anchor)):
raise Exception(__(f'Anchor {anchor!r} not found'))
# Copy data we need from the (closed) response
status_code = response.status_code
redirect_status_code = response.history[-1].status_code if response.history else None # NoQA: E501
retry_after = response.headers.get('Retry-After')
response_url = f'{response.url}'
response.raise_for_status()
del response
break
except SSLError as err:
# SSL failure; report that the link is broken.
return 'broken', str(err), 0
except (ConnectionError, TooManyRedirects) as err:
# Servers drop the connection on HEAD requests, causing
# ConnectionError.
error_message = str(err)
continue
except HTTPError as err:
error_message = str(err)
# Unauthorised: the reference probably exists
if status_code == 401:
return 'working', 'unauthorized', 0
# Rate limiting; back-off if allowed, or report failure otherwise
if status_code == 429:
if next_check := self.limit_rate(response_url, retry_after):
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
return 'rate-limited', '', 0
return 'broken', error_message, 0
# Don't claim success/failure during server-side outages
if status_code == 503:
return 'ignored', 'service unavailable', 0
# For most HTTP failures, continue attempting alternate retrieval methods
continue
except Exception as err:
# Unhandled exception (intermittent or permanent); report that
# the link is broken.
return 'broken', str(err), 0
else:
# All available retrieval methods have been exhausted; report
# that the link is broken.
return 'broken', error_message, 0
# Success; clear rate limits for the origin
netloc = urlsplit(req_url).netloc
self.rate_limits.pop(netloc, None)
if ((response_url.rstrip('/') == req_url.rstrip('/'))
or _allowed_redirect(req_url, response_url,
self.allowed_redirects)):
return 'working', '', 0
elif redirect_status_code is not None:
return 'redirected', response_url, redirect_status_code
else:
return 'redirected', response_url, 0
def limit_rate(self, response_url: str, retry_after: str) -> float | None:
delay = DEFAULT_DELAY
next_check = None
if retry_after:
try:
# Integer: time to wait before next attempt.
delay = float(retry_after)
except ValueError:
try:
# An HTTP-date: time of next attempt.
until = parsedate_to_datetime(retry_after)
except (TypeError, ValueError):
# TypeError: Invalid date format.
# ValueError: Invalid date, e.g. Oct 52th.
pass
else:
next_check = datetime.timestamp(until)
delay = (until - datetime.now(timezone.utc)).total_seconds()
else:
next_check = time.time() + delay
netloc = urlsplit(response_url).netloc
if next_check is None:
max_delay = self.rate_limit_timeout
try:
rate_limit = self.rate_limits[netloc]
except KeyError:
delay = DEFAULT_DELAY
else:
last_wait_time = rate_limit.delay
delay = 2.0 * last_wait_time
if delay > max_delay > last_wait_time:
delay = max_delay
if delay > max_delay:
return None
next_check = time.time() + delay
self.rate_limits[netloc] = RateLimit(delay, next_check)
return next_check
def _get_request_headers(
uri: str,
request_headers: dict[str, dict[str, str]],
) -> dict[str, str]:
url = urlsplit(uri)
candidates = (f'{url.scheme}://{url.netloc}',
f'{url.scheme}://{url.netloc}/',
uri,
'*')
for u in candidates:
if u in request_headers:
return {**DEFAULT_REQUEST_HEADERS, **request_headers[u]}
return {}
def contains_anchor(response: Response, anchor: str) -> bool:
"""Determine if an anchor is contained within an HTTP response."""
parser = AnchorCheckParser(unquote(anchor))
# Read file in chunks. If we find a matching anchor, we break
# the loop early in hopes not to have to download the whole thing.
for chunk in response.iter_content(chunk_size=4096, decode_unicode=True):
if isinstance(chunk, bytes): # requests failed to decode
chunk = chunk.decode() # manually try to decode it
parser.feed(chunk)
if parser.found:
break
parser.close()
return parser.found
class AnchorCheckParser(HTMLParser):
"""Specialised HTML parser that looks for a specific anchor."""
def __init__(self, search_anchor: str) -> None:
super().__init__()
self.search_anchor = search_anchor
self.found = False
def handle_starttag(self, tag: Any, attrs: Any) -> None:
for key, value in attrs:
if key in ('id', 'name') and value == self.search_anchor:
self.found = True
break
def _allowed_redirect(url: str, new_url: str,
allowed_redirects: dict[re.Pattern[str], re.Pattern[str]]) -> bool:
return any(
from_url.match(url) and to_url.match(new_url)
for from_url, to_url
in allowed_redirects.items()
)
class RateLimit(NamedTuple):
delay: float
next_check: float
def rewrite_github_anchor(app: Sphinx, uri: str) -> str | None:
"""Rewrite anchor name of the hyperlink to github.com
The hyperlink anchors in github.com are dynamically generated. This rewrites
them before checking and makes them comparable.
"""
parsed = urlparse(uri)
if parsed.hostname == 'github.com' and parsed.fragment:
prefixed = parsed.fragment.startswith('user-content-')
if not prefixed:
fragment = f'user-content-{parsed.fragment}'
return urlunparse(parsed._replace(fragment=fragment))
return None
def compile_linkcheck_allowed_redirects(app: Sphinx, config: Config) -> None:
"""Compile patterns in linkcheck_allowed_redirects to the regexp objects."""
for url, pattern in list(app.config.linkcheck_allowed_redirects.items()):
try:
app.config.linkcheck_allowed_redirects[re.compile(url)] = re.compile(pattern)
except re.error as exc:
logger.warning(__('Failed to compile regex in linkcheck_allowed_redirects: %r %s'),
exc.pattern, exc.msg)
finally:
# Remove the original regexp-string
app.config.linkcheck_allowed_redirects.pop(url)
def setup(app: Sphinx) -> dict[str, Any]:
app.add_builder(CheckExternalLinksBuilder)
app.add_post_transform(HyperlinkCollector)
app.add_config_value('linkcheck_ignore', [], False)
app.add_config_value('linkcheck_exclude_documents', [], False)
app.add_config_value('linkcheck_allowed_redirects', {}, False)
app.add_config_value('linkcheck_auth', [], False)
app.add_config_value('linkcheck_request_headers', {}, False)
app.add_config_value('linkcheck_retries', 1, False)
app.add_config_value('linkcheck_timeout', None, False, [int, float])
app.add_config_value('linkcheck_workers', 5, False)
app.add_config_value('linkcheck_anchors', True, False)
# Anchors starting with ! are ignored since they are
# commonly used for dynamic pages
app.add_config_value('linkcheck_anchors_ignore', ['^!'], False)
app.add_config_value('linkcheck_anchors_ignore_for_url', (), False, (tuple, list))
app.add_config_value('linkcheck_rate_limit_timeout', 300.0, False)
app.add_event('linkcheck-process-uri')
app.connect('config-inited', compile_linkcheck_allowed_redirects, priority=800)
# FIXME: Disable URL rewrite handler for github.com temporarily.
# ref: https://github.com/sphinx-doc/sphinx/issues/9435
# app.connect('linkcheck-process-uri', rewrite_github_anchor)
return {
'version': 'builtin',
'parallel_read_safe': True,
'parallel_write_safe': True,
}