Source code for httpx_html.parse

import asyncio
from urllib.parse import urlparse, urlunparse, urljoin
from concurrent.futures._base import TimeoutError
from typing import Set, Union, List, MutableMapping, Optional, NewType, TYPE_CHECKING

import http.cookiejar
from pyquery import PyQuery

from lxml.html.clean import Cleaner
import lxml
from lxml import etree
from lxml.html import HtmlElement
from lxml.html import tostring as lxml_html_tostring
from lxml.html.soupparser import fromstring as soup_parse
from parse import search as parse_search
from parse import findall, Result
from w3lib.encoding import html_to_unicode

if TYPE_CHECKING:
    from .session import BaseSession

    _Attrs = MutableMapping
    _Containing = Union[str, List[str]]
    _CookieRender = MutableMapping[str, http.cookiejar.CookieJar]
    _Encoding = NewType('_Encoding', str)
    _Find = Union[List['Element'], 'Element']
    _Html = Union[str, bytes]
    _Links = Set[str]
    _Next = Union['HTML', List[str]]
    _NextSymbol = List[str]
    _Result = Union[List['Result'], 'Result']
    _Text = NewType('_Text', str)
    _Url = NewType('_Url', str)
    _XPath = Union[List[str], List['Element'], str, 'Element']

DEFAULT_ENCODING = 'utf-8'
DEFAULT_URL = 'https://example.org/'
DEFAULT_NEXT_SYMBOL = ['next', 'more', 'older']

cleaner = Cleaner()
cleaner.javascript = True
cleaner.style = True


class MaxRetries(Exception):

    def __init__(self, message):
        self.message = message


class BaseParser:
    '''A basic HTML/Element Parser, for Humans.

    :param element: The element from which to base the parsing upon.
    :param default_encoding: Which encoding to default to.
    :param html: HTML from which to base the parsing upon (optional).
    :param url: The URL from which the HTML originated, used for ``absolute_links``.
    '''

    __slots__ = ('element', 'url', 'skip_anchors', 'default_encoding',
                 '_encoding', '_html', '_lxml', '_pq')

    def __init__(
        self,
        *, element,
        default_encoding: Optional[str] = None,
        html:             '_Html' = None,
        url:              '_Url',
    ) -> None:
        self.element = element
        self.url = url
        self.skip_anchors = True
        self.default_encoding = default_encoding
        self._encoding = None
        self._html = html.encode(DEFAULT_ENCODING) if isinstance(html, str) else html
        self._lxml = None
        self._pq = None

    @property
    def raw_html(self) -> bytes:
        '''Bytes representation of the HTML content.
        '''
        if self._html:
            return self._html
        else:
            return etree.tostring(self.element, encoding='unicode').strip().encode(self.encoding)

    @raw_html.setter
    def raw_html(self, html: bytes) -> None:
        self._html = html

    @property
    def html(self) -> str:
        '''Unicode representation of the HTML content
        '''
        if self._html:
            return self.raw_html.decode(self.encoding, errors='replace')
        else:
            return etree.tostring(self.element, encoding='unicode').strip()

    @html.setter
    def html(self, html: str) -> None:
        self._html = html.encode(self.encoding)

    @property
    def encoding(self) -> '_Encoding':
        '''The encoding string to be used, extracted from the HTML and
        :class:`HTMLResponse <HTMLResponse>` headers.
        '''
        # scan meta tags for charset
        if not self._encoding and self._html:
            self._encoding = html_to_unicode(self.default_encoding, self._html)[0]
            # fall back to httpx's detected encoding if decode fails
            try:
                self.raw_html.decode(self.encoding, errors='replace')
            except UnicodeDecodeError:
                self._encoding = self.default_encoding

        return self._encoding if self._encoding else self.default_encoding

    @encoding.setter
    def encoding(self, enc: str) -> None:
        self._encoding = enc

    @property
    def pq(self) -> PyQuery:
        '''`PyQuery <https://github.com/gawel/pyquery/>`_ representation
        of the :class:`Element <Element>` or :class:`HTML <HTML>`.
        '''
        if self._pq is None:
            self._pq = PyQuery(self.lxml)

        return self._pq

    @property
    def lxml(self) -> HtmlElement:
        '''`lxml <https://lxml.de>`_ representation of the
        :class:`Element <Element>` or :class:`HTML <HTML>`.
        '''
        if self._lxml is None:
            try:
                self._lxml = soup_parse(self.html, features='html.parser')
            except ValueError:
                self._lxml = lxml.html.fromstring(self.raw_html)

        return self._lxml

    @property
    def text(self) -> '_Text':
        '''The text content of the :class:`Element <Element>` or :class:`HTML <HTML>`.
        '''
        return self.pq.text()

    @property
    def full_text(self) -> '_Text':
        '''The full text content (including links) of the :class:`Element <Element>`
        or :class:`HTML <HTML>`.
        '''
        return self.lxml.text_content()

    def find(
        self,
        selector:      str = "*",
        *, containing: '_Containing' = None,
        clean:         bool = False,
        first:         bool = False,
        _encoding:     str = None
    ) -> '_Find':
        '''Given a CSS Selector, returns a list of :class:`Element <Element>`
        objects or a single one.

        :param selector: CSS Selector to use.
        :param clean: Whether or not to sanitize the found HTML of ``<script>`` and
            ``<style>`` tags.
        :param containing: If specified, only return elements that contain the provided text.
        :param first: Whether or not to return just the first result.
        :param _encoding: The encoding format.

        Example CSS Selectors:

        - ``a``
        - ``a.someClass``
        - ``a#someID``
        - ``a[target=_blank]``

        See W3School's `CSS Selectors Reference
        <https://www.w3schools.com/cssref/css_selectors.asp>`_
        for more details.

        If ``first`` is ``True``, only returns the first :class:`Element <Element>` found.
        '''

        # Convert a single containing into a list.
        if isinstance(containing, str):
            containing = [containing]

        encoding = _encoding or self.encoding
        elements = [
            Element(element=found, url=self.url, default_encoding=encoding)
            for found in self.pq(selector)
        ]

        if containing:
            # elements_copy = elements.copy()
            # elements = []

            # for element in elements_copy:
            #     if any([c.lower() in element.full_text.lower() for c in containing]):
            #         elements.append(element)
            elements = [
                e for e in elements if any(c.lower() in e.full_text.lower() for c in containing)
            ]
            elements.reverse()

        # Sanitize the found HTML.
        if clean:
            elements_copy = elements.copy()
            elements = []

            for element in elements_copy:
                element.raw_html = lxml_html_tostring(cleaner.clean_html(element.lxml))
                elements.append(element)

        return _get_first_or_list(elements, first)

    def xpath(
        self,
        selector:  str,
        *, clean:  bool = False,
        first:     bool = False,
        _encoding: Optional[str] = None,
    ) -> '_XPath':
        '''Given an XPath selector, returns a list of :class:`Element <Element>` objects
        or a single one.

        :param selector: XPath Selector to use.
        :param clean: Whether or not to sanitize the found HTML of ``<script>`` and
            ``<style>`` tags.
        :param first: Whether or not to return just the first result.
        :param _encoding: The encoding format.

        If a sub-selector is specified (e.g. ``//a/@href``), a simple list of results is
        returned.

        See W3School's `XPath Examples
        <https://www.w3schools.com/xml/xpath_examples.asp>`_
        for more details.

        If ``first`` is ``True``, only returns the first :class:`Element <Element>` found.
        '''
        selected = self.lxml.xpath(selector)

        elements = [
            Element(element=selection, url=self.url, default_encoding=_encoding or self.encoding)
            if not isinstance(selection, etree._ElementUnicodeResult) else str(selection)
            for selection in selected
        ]  # type: List[Element]

        # Sanitize the found HTML
        if clean:
            elements_copy = elements.copy()
            elements = []

            for element in elements_copy:
                element.raw_html = lxml_html_tostring(cleaner.clean_html(element.lxml))
                elements.append(element)

        return _get_first_or_list(elements, first)

    def search(self, template: str) -> Result:
        '''Search the :class:`Element <Element>` for the given Parse template.

        :param template: The Parse template to use.
        '''

        return parse_search(template, self.html)

    def search_all(self, template: str) -> '_Result':
        '''Search the :class:`Element <Element>` (multiple times) for the given parse
        template.

        :param template: The Parse template to use.
        '''
        return [r for r in findall(template, self.html)]

    @property
    def links(self) -> '_Links':
        '''All found links on page, in as–is form.
        '''
        def gen():
            for link in self.find('a'):

                try:
                    href = link.attrs['href'].strip()
                    if (href and not (href.startswith('#') and self.skip_anchors)
                            and not href.startswith(('javascript:', 'mailto:'))):
                        yield href
                except KeyError:
                    pass

        return set(gen())

    def _make_absolute(self, link):
        '''Makes a given link absolute.
        '''
        # Parse the link with stdlib.
        parsed = urlparse(link)._asdict()

        # If link is relative, then join it with base_url.
        if not parsed['netloc']:
            return urljoin(self.base_url, link)

        # Link is absolute; if it lacks a scheme, add one from base_url.
        if not parsed['scheme']:
            parsed['scheme'] = urlparse(self.base_url).scheme

            # Reconstruct the URL to incorporate the new scheme.
            parsed = (v for v in parsed.values())
            return urlunparse(parsed)

        # Link is absolute and complete with scheme; nothing to be done here.
        return link

    @property
    def absolute_links(self) -> '_Links':
        '''All found links on page, in absolute form
        (`learn more <https://www.navegabem.com/absolute-or-relative-links.html>`_).
        '''
        def gen():
            for link in self.links:
                yield self._make_absolute(link)

        return set(gen())

    @property
    def base_url(self) -> '_Url':
        '''The base URL for the page. Supports the ``<base>`` tag
        (`learn more <https://www.w3schools.com/tags/tag_base.asp>`_).'''

        # support for <base> tag
        base = self.find('base', first=True)
        if base:
            result = base.attrs.get('href', '').strip()
            if result:
                return result

        # parse the url to separate out the path
        parsed = urlparse(self.url)._asdict()

        # remove any part of the path after the last '/'
        parsed['path'] = '/'.join(parsed['path'].split('/')[:-1]) + '/'

        # reconstruct the url with the modified path
        parsed = (v for v in parsed.values())
        url = urlunparse(parsed)

        return url


[docs]class Element(BaseParser): '''An element of HTML. :param element: The element from which to base the parsing upon. :param url: The URL from which the HTML originated, used for ``absolute_links``. :param default_encoding: Which encoding to default to. ''' __slots__ = 'tag', 'lineno', '_attrs' # , 'session' def __init__( self, *, element, url: '_Url', default_encoding: Optional[str] = None, ) -> None: super().__init__(element=element, url=url, default_encoding=default_encoding) self.element = element self.tag = element.tag self.lineno = element.sourceline self._attrs = None def __repr__(self) -> str: attrs = [f'{a}={self.attrs[a]!r}' for a in self.attrs] return f'<Element {self.element.tag!r} {" ".join(attrs)}>' @property def attrs(self) -> '_Attrs': '''Returns a dictionary of the attributes of the :class:`Element <Element>` (`learn more <https://www.w3schools.com/tags/ref_attributes.asp>`_). ''' if self._attrs is None: self._attrs = {k: v for k, v in self.element.items()} # split class and rel up, as there are usually many of them for attr in ['class', 'rel']: if attr in self._attrs: self._attrs[attr] = tuple(self._attrs[attr].split()) return self._attrs
[docs]class HTML(BaseParser): '''An HTML document, ready for parsing. :param url: The URL from which the HTML originated, used for ``absolute_links``. :param html: HTML from which to base the parsing upon (optional). :param default_encoding: Which encoding to default to. ''' __slots__ = 'session', 'page', 'next_symbol' def __init__( self, *, session: Optional['BaseSession'] = None, url: str = DEFAULT_URL, html: '_Html', default_encoding: Optional[str] = DEFAULT_ENCODING, async_: bool = False, ) -> None: # convert incoming unicode HTML into bytes if isinstance(html, str): html = html.encode(DEFAULT_ENCODING) pq = PyQuery(html) super().__init__( element=pq('html') or pq.wrapAll('<html></html>')('html'), html=html, url=url, default_encoding=default_encoding ) if session: self.session = session elif async_: from .session import AsyncHTMLSession self.session = AsyncHTMLSession() else: from .session import HTMLSession self.session = HTMLSession() self.page = None self.next_symbol = DEFAULT_NEXT_SYMBOL def __repr__(self) -> str: return f'<HTML url={self.url!r}>'
[docs] def next( self, fetch: bool = False, next_symbol: '_NextSymbol' = DEFAULT_NEXT_SYMBOL ) -> Optional['_Next']: '''Attempts to find the next page, if there is one. If ``fetch`` is ``True`` (default), returns :class:`HTML <HTML>` object of next page. If ``fetch`` is ``False``, simply returns the next URL. ''' def get_next(): candidates = self.find('a', containing=next_symbol) for candidate in candidates: if candidate.attrs.get('href'): # Support 'next' rel (e.g. reddit). if 'next' in candidate.attrs.get('rel', []): return candidate.attrs['href'] # Support 'next' in classnames. for _class in candidate.attrs.get('class', []): if 'next' in _class: return candidate.attrs['href'] if 'page' in candidate.attrs['href']: return candidate.attrs['href'] try: # resort to the last candidate return candidates[-1].attrs['href'] except IndexError: return None __next = get_next() if not __next: return None url = self._make_absolute(__next) if fetch: return self.session.get(url) else: return url
def __iter__(self): next = self while True: yield next try: next = next.next(fetch=True, next_symbol=self.next_symbol).html except AttributeError: break def __next__(self): return self.next(fetch=True, next_symbol=self.next_symbol).html def __aiter__(self): return self async def __anext__(self): while True: url = self.next(fetch=False, next_symbol=self.next_symbol) if not url: break response = await self.session.get(url) return response.html def add_next_symbol(self, next_symbol): self.next_symbol.append(next_symbol) async def _async_render( self, *, url: str, script: str = None, scrolldown, sleep: int, wait: float, reload, content: Optional[str], timeout: Union[float, int], wait_until: Optional[Union[str, List[str]]], keep_page: bool, cookies: list = [{}], ): '''Handle page creation and js rendering. Internal use for render/arender methods. ''' try: page = await self.browser.newPage() # wait before rendering the page, to prevent timeouts await asyncio.sleep(wait) if 'User-Agent' in self.session.headers: await page.setUserAgent(self.session.headers['User-Agent']) if cookies: for cookie in cookies: if cookie: await page.setCookie(cookie) options = {'timeout': int(timeout * 1000)} if wait_until is not None: options['waitUntil'] = wait_until # load the given page (GET request, obviously) if reload: await page.goto(url, options=options) else: await page.goto(f'data:text/html,{self.html}', options=options) result = None if script: result = await page.evaluate(script) if scrolldown: for _ in range(scrolldown): await page._keyboard.down('PageDown') await asyncio.sleep(sleep) else: await asyncio.sleep(sleep) if scrolldown: await page._keyboard.up('PageDown') # Return the content of the page, JavaScript evaluated. content = await page.content() if not keep_page: await page.close() page = None return content, result, page except TimeoutError: await page.close() page = None return None def _convert_cookiejar_to_render( self, session_cookiejar, ) -> '_CookieRender': ''' Convert HTMLSession.cookies:cookiejar[] for browser.newPage().setCookie ''' # | setCookie(self, *cookies:dict) -> None # | Set cookies. # | # | ``cookies`` should be dictionaries which contain these fields: # | # | * ``name`` (str): **required** # | * ``value`` (str): **required** # | * ``url`` (str) # | * ``domain`` (str) # | * ``path`` (str) # | * ``expires`` (number): Unix time in seconds # | * ``httpOnly`` (bool) # | * ``secure`` (bool) # | * ``sameSite`` (str): ``'Strict'`` or ``'Lax'`` cookie_render = {} def __convert(cookiejar, key): try: v = eval(f'cookiejar.{key}') kv = '' if not v else {key: v} except Exception: kv = '' return kv keys = [ 'name', 'value', 'url', 'domain', 'path', 'sameSite', 'expires', 'httpOnly', 'secure', ] for key in keys: cookie_render.update(__convert(session_cookiejar, key)) return cookie_render def _convert_cookiesjar_to_render(self) -> List['_CookieRender']: '''Convert ``HTMLSession.cookies`` for ``browser.newPage().setCookie``. ''' if isinstance(self.session.cookies, http.cookiejar.CookieJar): return [self._convert_cookiejar_to_render(c) for c in self.session.cookies] return []
[docs] def render( self, retries: int = 8, script: str = None, wait: float = 0.2, scrolldown: bool = False, sleep: int = 0, reload: bool = True, timeout: Union[float, int] = 8.0, wait_until: Union[str, List[str]] = None, keep_page: bool = False, cookies: list = [{}], send_cookies_session: bool = False, ): '''Reloads the response in Chromium, and replaces HTML content with an updated version, with JavaScript executed. :param retries: The number of times to retry loading the page in Chromium. :param script: JavaScript to execute upon page load (optional). :param wait: The number of seconds to wait before loading the page, preventing timeouts (optional). :param scrolldown: Integer, if provided, of how many times to page down. :param sleep: Integer, if provided, of how many seconds to sleep after initial render. :param reload: If ``False``, content will not be loaded from the browser, but will be provided from memory. :param timeout: Timeout for rendering, in seconds. :param wait_until: When to consider the page loaded. Acceptable values are: ``load`` (default) ``domcontentloaded``, ``networkidle0``, ``networkidle1``. :param keep_page: If ``True`` will allow you to interact with the browser page through ``r.html.page``. :param send_cookies_session: If ``True`` send ``HTMLSession.cookies`` convert. :param cookies: If not ``empty`` send ``cookies``. If ``scrolldown`` is specified, the page will scrolldown the specified number of times, after sleeping the specified amount of time (e.g. ``scrolldown=10, sleep=1``). If just ``sleep`` is provided, the rendering will wait *n* seconds, before returning. If ``script`` is specified, it will execute the provided JavaScript at runtime. Example: .. code-block:: python script = \"\"\" () => { return { width: document.documentElement.clientWidth, height: document.documentElement.clientHeight, deviceScaleFactor: window.devicePixelRatio, } } \"\"\" Returns the return value of the executed ``script``, if any is provided: .. code-block:: python >>> r.html.render(script=script) {'width': 800, 'height': 600, 'deviceScaleFactor': 1} Warning: the first time you run this method, it will download Chromium into your home directory (``~/.pyppeteer``). ''' self.browser = self.session.browser # Automatically create a event loop and browser content = None # automatically set reload to False, if example URL is being used if self.url == DEFAULT_URL: reload = False if send_cookies_session: cookies = self._convert_cookiesjar_to_render() for i in range(retries): if not content: try: content, result, page = self.session.loop.run_until_complete( self._async_render( url=self.url, script=script, sleep=sleep, wait=wait, content=self.html, reload=reload, scrolldown=scrolldown, timeout=timeout, wait_until=wait_until, keep_page=keep_page, cookies=cookies) ) except TypeError: pass else: break if not content: raise MaxRetries('Unable to render the page. Try increasing timeout.') html = HTML(url=self.url, html=content.encode(DEFAULT_ENCODING), default_encoding=DEFAULT_ENCODING) self.__dict__.update(html.__dict__) self.page = page return result
[docs] async def arender( self, retries: int = 8, script: str = None, wait: float = 0.2, scrolldown: bool = False, sleep: int = 0, reload: bool = True, timeout: Union[float, int] = 8.0, wait_until: Union[str, List[str]] = None, keep_page: bool = False, cookies: list = [{}], send_cookies_session: bool = False, ): '''Async version of render. Takes same parameters. ''' self.browser = await self.session.browser content = None # automatically set Reload to False, if example URL is being used if self.url == DEFAULT_URL: reload = False if send_cookies_session: cookies = self._convert_cookiesjar_to_render() for _ in range(retries): if not content: try: content, result, page = await self._async_render( url=self.url, script=script, sleep=sleep, wait=wait, content=self.html, reload=reload, scrolldown=scrolldown, timeout=timeout, wait_until=wait_until, keep_page=keep_page, cookies=cookies ) except TypeError: pass else: break if not content: raise MaxRetries('Unable to render the page. Try increasing timeout.') html = HTML(url=self.url, html=content.encode(DEFAULT_ENCODING), default_encoding=DEFAULT_ENCODING) self.__dict__.update(html.__dict__) self.page = page return result
def _get_first_or_list(lst, first=False): if first: try: return lst[0] except IndexError: return None else: return lst