Private GIT

Skip to content
Snippets Groups Projects
Commit a6773a4b authored by echel0n's avatar echel0n
Browse files

Merge branch 'release/v4.0.3'

parents 514e9d84 5e1298f8
No related branches found
No related tags found
No related merge requests found
Showing
with 8719 additions and 0 deletions
......@@ -23,6 +23,8 @@ from __future__ import with_statement
import os.path
import sys
sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), 'lib')))
try:
import requests
except ImportError:
......
# -*- coding: utf-8 -*-
# __
# /__) _ _ _ _ _/ _
# / ( (- (/ (/ (- _) / _)
# /
"""
requests HTTP library
~~~~~~~~~~~~~~~~~~~~~
Requests is an HTTP library, written in Python, for human beings. Basic GET
usage:
>>> import requests
>>> r = requests.get('http://python.org')
>>> r.status_code
200
>>> 'Python is a programming language' in r.content
True
... or POST:
>>> payload = dict(key1='value1', key2='value2')
>>> r = requests.post("http://httpbin.org/post", data=payload)
>>> print(r.text)
{
...
"form": {
"key2": "value2",
"key1": "value1"
},
...
}
The other HTTP methods are supported - see `requests.api`. Full documentation
is at <http://python-requests.org>.
:copyright: (c) 2014 by Kenneth Reitz.
:license: Apache 2.0, see LICENSE for more details.
"""
__title__ = 'requests'
__version__ = '2.3.0'
__build__ = 0x020300
__author__ = 'Kenneth Reitz'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2014 Kenneth Reitz'
# Attempt to enable urllib3's SNI support, if possible
try:
from .packages.urllib3.contrib import pyopenssl
pyopenssl.inject_into_urllib3()
except ImportError:
pass
from . import utils
from .models import Request, Response, PreparedRequest
from .api import request, get, head, post, patch, put, delete, options
from .sessions import session, Session
from .status_codes import codes
from .exceptions import (
RequestException, Timeout, URLRequired,
TooManyRedirects, HTTPError, ConnectionError
)
# Set default logging handler to avoid "No handler found" warnings.
import logging
try: # Python 2.7+
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass
logging.getLogger(__name__).addHandler(NullHandler())
# -*- coding: utf-8 -*-
"""
requests.adapters
~~~~~~~~~~~~~~~~~
This module contains the transport adapters that Requests uses to define
and maintain connections.
"""
import socket
import copy
from .models import Response
from .packages.urllib3.poolmanager import PoolManager, proxy_from_url
from .packages.urllib3.response import HTTPResponse
from .packages.urllib3.util import Timeout as TimeoutSauce
from .compat import urlparse, basestring, urldefrag, unquote
from .utils import (DEFAULT_CA_BUNDLE_PATH, get_encoding_from_headers,
except_on_missing_scheme, get_auth_from_url)
from .structures import CaseInsensitiveDict
from .packages.urllib3.exceptions import MaxRetryError
from .packages.urllib3.exceptions import TimeoutError
from .packages.urllib3.exceptions import SSLError as _SSLError
from .packages.urllib3.exceptions import HTTPError as _HTTPError
from .packages.urllib3.exceptions import ProxyError as _ProxyError
from .cookies import extract_cookies_to_jar
from .exceptions import ConnectionError, Timeout, SSLError, ProxyError
from .auth import _basic_auth_str
DEFAULT_POOLBLOCK = False
DEFAULT_POOLSIZE = 10
DEFAULT_RETRIES = 0
class BaseAdapter(object):
"""The Base Transport Adapter"""
def __init__(self):
super(BaseAdapter, self).__init__()
def send(self):
raise NotImplementedError
def close(self):
raise NotImplementedError
class HTTPAdapter(BaseAdapter):
"""The built-in HTTP Adapter for urllib3.
Provides a general-case interface for Requests sessions to contact HTTP and
HTTPS urls by implementing the Transport Adapter interface. This class will
usually be created by the :class:`Session <Session>` class under the
covers.
:param pool_connections: The number of urllib3 connection pools to cache.
:param pool_maxsize: The maximum number of connections to save in the pool.
:param int max_retries: The maximum number of retries each connection
should attempt. Note, this applies only to failed connections and
timeouts, never to requests where the server returns a response.
:param pool_block: Whether the connection pool should block for connections.
Usage::
>>> import lib.requests
>>> s = requests.Session()
>>> a = requests.adapters.HTTPAdapter(max_retries=3)
>>> s.mount('http://', a)
"""
__attrs__ = ['max_retries', 'config', '_pool_connections', '_pool_maxsize',
'_pool_block']
def __init__(self, pool_connections=DEFAULT_POOLSIZE,
pool_maxsize=DEFAULT_POOLSIZE, max_retries=DEFAULT_RETRIES,
pool_block=DEFAULT_POOLBLOCK):
self.max_retries = max_retries
self.config = {}
self.proxy_manager = {}
super(HTTPAdapter, self).__init__()
self._pool_connections = pool_connections
self._pool_maxsize = pool_maxsize
self._pool_block = pool_block
self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block)
def __getstate__(self):
return dict((attr, getattr(self, attr, None)) for attr in
self.__attrs__)
def __setstate__(self, state):
# Can't handle by adding 'proxy_manager' to self.__attrs__ because
# because self.poolmanager uses a lambda function, which isn't pickleable.
self.proxy_manager = {}
self.config = {}
for attr, value in state.items():
setattr(self, attr, value)
self.init_poolmanager(self._pool_connections, self._pool_maxsize,
block=self._pool_block)
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
"""Initializes a urllib3 PoolManager. This method should not be called
from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param connections: The number of urllib3 connection pools to cache.
:param maxsize: The maximum number of connections to save in the pool.
:param block: Block when no free connections are available.
"""
# save these values for pickling
self._pool_connections = connections
self._pool_maxsize = maxsize
self._pool_block = block
self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize,
block=block)
def cert_verify(self, conn, url, verify, cert):
"""Verify a SSL certificate. This method should not be called from user
code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param conn: The urllib3 connection object associated with the cert.
:param url: The requested URL.
:param verify: Whether we should actually verify the certificate.
:param cert: The SSL certificate to verify.
"""
if url.lower().startswith('https') and verify:
cert_loc = None
# Allow self-specified cert location.
if verify is not True:
cert_loc = verify
if not cert_loc:
cert_loc = DEFAULT_CA_BUNDLE_PATH
if not cert_loc:
raise Exception("Could not find a suitable SSL CA certificate bundle.")
conn.cert_reqs = 'CERT_REQUIRED'
conn.ca_certs = cert_loc
else:
conn.cert_reqs = 'CERT_NONE'
conn.ca_certs = None
if cert:
if not isinstance(cert, basestring):
conn.cert_file = cert[0]
conn.key_file = cert[1]
else:
conn.cert_file = cert
def build_response(self, req, resp):
"""Builds a :class:`Response <requests.Response>` object from a urllib3
response. This should not be called from user code, and is only exposed
for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`
:param req: The :class:`PreparedRequest <PreparedRequest>` used to generate the response.
:param resp: The urllib3 response object.
"""
response = Response()
# Fallback to None if there's no status_code, for whatever reason.
response.status_code = getattr(resp, 'status', None)
# Make headers case-insensitive.
response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))
# Set encoding.
response.encoding = get_encoding_from_headers(response.headers)
response.raw = resp
response.reason = response.raw.reason
if isinstance(req.url, bytes):
response.url = req.url.decode('utf-8')
else:
response.url = req.url
# Add new cookies from the server.
extract_cookies_to_jar(response.cookies, req, resp)
# Give the Response some context.
response.request = req
response.connection = self
return response
def get_connection(self, url, proxies=None):
"""Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param url: The URL to connect to.
:param proxies: (optional) A Requests-style dictionary of proxies used on this request.
"""
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
except_on_missing_scheme(proxy)
proxy_headers = self.proxy_headers(proxy)
if not proxy in self.proxy_manager:
self.proxy_manager[proxy] = proxy_from_url(
proxy,
proxy_headers=proxy_headers,
num_pools=self._pool_connections,
maxsize=self._pool_maxsize,
block=self._pool_block)
conn = self.proxy_manager[proxy].connection_from_url(url)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(url)
return conn
def close(self):
"""Disposes of any internal state.
Currently, this just closes the PoolManager, which closes pooled
connections.
"""
self.poolmanager.clear()
def request_url(self, request, proxies):
"""Obtain the url to use when making the final request.
If the message is being sent through a HTTP proxy, the full URL has to
be used. Otherwise, we should only use the path portion of the URL.
This should not be called from user code, and is only exposed for use
when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
:param proxies: A dictionary of schemes to proxy URLs.
"""
proxies = proxies or {}
scheme = urlparse(request.url).scheme
proxy = proxies.get(scheme)
if proxy and scheme != 'https':
url, _ = urldefrag(request.url)
else:
url = request.path_url
return url
def add_headers(self, request, **kwargs):
"""Add any headers needed by the connection. As of v2.0 this does
nothing by default, but is left for overriding by users that subclass
the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
This should not be called from user code, and is only exposed for use
when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param request: The :class:`PreparedRequest <PreparedRequest>` to add headers to.
:param kwargs: The keyword arguments from the call to send().
"""
pass
def proxy_headers(self, proxy):
"""Returns a dictionary of the headers to add to any request sent
through a proxy. This works with urllib3 magic to ensure that they are
correctly sent to the proxy, rather than in a tunnelled request if
CONNECT is being used.
This should not be called from user code, and is only exposed for use
when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param proxies: The url of the proxy being used for this request.
:param kwargs: Optional additional keyword arguments.
"""
headers = {}
username, password = get_auth_from_url(proxy)
if username and password:
headers['Proxy-Authorization'] = _basic_auth_str(username,
password)
return headers
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
"""Sends PreparedRequest object. Returns Response object.
:param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
:param stream: (optional) Whether to stream the request content.
:param timeout: (optional) The timeout on the request.
:param verify: (optional) Whether to verify SSL certificates.
:param cert: (optional) Any user-provided SSL certificate to be trusted.
:param proxies: (optional) The proxies dictionary to apply to the request.
"""
conn = self.get_connection(request.url, proxies)
self.cert_verify(conn, request.url, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(request)
chunked = not (request.body is None or 'Content-Length' in request.headers)
timeout = TimeoutSauce(connect=timeout, read=timeout)
try:
if not chunked:
resp = conn.urlopen(
method=request.method,
url=url,
body=request.body,
headers=request.headers,
redirect=False,
assert_same_host=False,
preload_content=False,
decode_content=False,
retries=self.max_retries,
timeout=timeout
)
# Send the request.
else:
if hasattr(conn, 'proxy_pool'):
conn = conn.proxy_pool
low_conn = conn._get_conn(timeout=timeout)
try:
low_conn.putrequest(request.method,
url,
skip_accept_encoding=True)
for header, value in request.headers.items():
low_conn.putheader(header, value)
low_conn.endheaders()
for i in request.body:
low_conn.send(hex(len(i))[2:].encode('utf-8'))
low_conn.send(b'\r\n')
low_conn.send(i)
low_conn.send(b'\r\n')
low_conn.send(b'0\r\n\r\n')
r = low_conn.getresponse()
resp = HTTPResponse.from_httplib(
r,
pool=conn,
connection=low_conn,
preload_content=False,
decode_content=False
)
except:
# If we hit any problems here, clean up the connection.
# Then, reraise so that we can handle the actual exception.
low_conn.close()
raise
else:
# All is well, return the connection to the pool.
conn._put_conn(low_conn)
except socket.error as sockerr:
raise ConnectionError(sockerr, request=request)
except MaxRetryError as e:
raise ConnectionError(e, request=request)
except _ProxyError as e:
raise ProxyError(e)
except (_SSLError, _HTTPError) as e:
if isinstance(e, _SSLError):
raise SSLError(e, request=request)
elif isinstance(e, TimeoutError):
raise Timeout(e, request=request)
else:
raise
r = self.build_response(request, resp)
if not stream:
r.content
return r
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
requests.api
~~~~~~~~~~~~
This module implements the Requests API.
:copyright: (c) 2012 by Kenneth Reitz.
:license: Apache2, see LICENSE for more details.
"""
from . import sessions
def request(method, url, **kwargs):
"""Constructs and sends a :class:`Request <Request>`.
Returns :class:`Response <Response>` object.
:param method: method for the new :class:`Request` object.
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`.
:param cookies: (optional) Dict or CookieJar object to send with the :class:`Request`.
:param files: (optional) Dictionary of 'name': file-like-objects (or {'name': ('filename', fileobj)}) for multipart encoding upload.
:param auth: (optional) Auth tuple to enable Basic/Digest/Custom HTTP Auth.
:param timeout: (optional) Float describing the timeout of the request in seconds.
:param allow_redirects: (optional) Boolean. Set to True if POST/PUT/DELETE redirect following is allowed.
:param proxies: (optional) Dictionary mapping protocol to the URL of the proxy.
:param verify: (optional) if ``True``, the SSL cert will be verified. A CA_BUNDLE path can also be provided.
:param stream: (optional) if ``False``, the response content will be immediately downloaded.
:param cert: (optional) if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair.
Usage::
>>> import requests
>>> req = requests.request('GET', 'http://httpbin.org/get')
<Response [200]>
"""
session = sessions.Session()
return session.request(method=method, url=url, **kwargs)
def get(url, **kwargs):
"""Sends a GET request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs):
"""Sends a OPTIONS request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs):
"""Sends a HEAD request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs):
"""Sends a POST request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs):
"""Sends a PUT request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs):
"""Sends a PATCH request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs):
"""Sends a DELETE request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
"""
return request('delete', url, **kwargs)
# -*- coding: utf-8 -*-
"""
requests.auth
~~~~~~~~~~~~~
This module contains the authentication handlers for Requests.
"""
import os
import re
import time
import hashlib
from base64 import b64encode
from .compat import urlparse, str
from .cookies import extract_cookies_to_jar
from .utils import parse_dict_header
CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded'
CONTENT_TYPE_MULTI_PART = 'multipart/form-data'
def _basic_auth_str(username, password):
"""Returns a Basic Auth string."""
return 'Basic ' + b64encode(('%s:%s' % (username, password)).encode('latin1')).strip().decode('latin1')
class AuthBase(object):
"""Base class that all auth implementations derive from"""
def __call__(self, r):
raise NotImplementedError('Auth hooks must be callable.')
class HTTPBasicAuth(AuthBase):
"""Attaches HTTP Basic Authentication to the given Request object."""
def __init__(self, username, password):
self.username = username
self.password = password
def __call__(self, r):
r.headers['Authorization'] = _basic_auth_str(self.username, self.password)
return r
class HTTPProxyAuth(HTTPBasicAuth):
"""Attaches HTTP Proxy Authentication to a given Request object."""
def __call__(self, r):
r.headers['Proxy-Authorization'] = _basic_auth_str(self.username, self.password)
return r
class HTTPDigestAuth(AuthBase):
"""Attaches HTTP Digest Authentication to the given Request object."""
def __init__(self, username, password):
self.username = username
self.password = password
self.last_nonce = ''
self.nonce_count = 0
self.chal = {}
self.pos = None
def build_digest_header(self, method, url):
realm = self.chal['realm']
nonce = self.chal['nonce']
qop = self.chal.get('qop')
algorithm = self.chal.get('algorithm')
opaque = self.chal.get('opaque')
if algorithm is None:
_algorithm = 'MD5'
else:
_algorithm = algorithm.upper()
# lambdas assume digest modules are imported at the top level
if _algorithm == 'MD5' or _algorithm == 'MD5-SESS':
def md5_utf8(x):
if isinstance(x, str):
x = x.encode('utf-8')
return hashlib.md5(x).hexdigest()
hash_utf8 = md5_utf8
elif _algorithm == 'SHA':
def sha_utf8(x):
if isinstance(x, str):
x = x.encode('utf-8')
return hashlib.sha1(x).hexdigest()
hash_utf8 = sha_utf8
KD = lambda s, d: hash_utf8("%s:%s" % (s, d))
if hash_utf8 is None:
return None
# XXX not implemented yet
entdig = None
p_parsed = urlparse(url)
path = p_parsed.path
if p_parsed.query:
path += '?' + p_parsed.query
A1 = '%s:%s:%s' % (self.username, realm, self.password)
A2 = '%s:%s' % (method, path)
HA1 = hash_utf8(A1)
HA2 = hash_utf8(A2)
if nonce == self.last_nonce:
self.nonce_count += 1
else:
self.nonce_count = 1
ncvalue = '%08x' % self.nonce_count
s = str(self.nonce_count).encode('utf-8')
s += nonce.encode('utf-8')
s += time.ctime().encode('utf-8')
s += os.urandom(8)
cnonce = (hashlib.sha1(s).hexdigest()[:16])
noncebit = "%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, HA2)
if _algorithm == 'MD5-SESS':
HA1 = hash_utf8('%s:%s:%s' % (HA1, nonce, cnonce))
if qop is None:
respdig = KD(HA1, "%s:%s" % (nonce, HA2))
elif qop == 'auth' or 'auth' in qop.split(','):
respdig = KD(HA1, noncebit)
else:
# XXX handle auth-int.
return None
self.last_nonce = nonce
# XXX should the partial digests be encoded too?
base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \
'response="%s"' % (self.username, realm, nonce, path, respdig)
if opaque:
base += ', opaque="%s"' % opaque
if algorithm:
base += ', algorithm="%s"' % algorithm
if entdig:
base += ', digest="%s"' % entdig
if qop:
base += ', qop="auth", nc=%s, cnonce="%s"' % (ncvalue, cnonce)
return 'Digest %s' % (base)
def handle_401(self, r, **kwargs):
"""Takes the given response and tries digest-auth, if needed."""
if self.pos is not None:
# Rewind the file position indicator of the body to where
# it was to resend the request.
r.request.body.seek(self.pos)
num_401_calls = getattr(self, 'num_401_calls', 1)
s_auth = r.headers.get('www-authenticate', '')
if 'digest' in s_auth.lower() and num_401_calls < 2:
setattr(self, 'num_401_calls', num_401_calls + 1)
pat = re.compile(r'digest ', flags=re.IGNORECASE)
self.chal = parse_dict_header(pat.sub('', s_auth, count=1))
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.raw.release_conn()
prep = r.request.copy()
extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
prep.headers['Authorization'] = self.build_digest_header(
prep.method, prep.url)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
setattr(self, 'num_401_calls', 1)
return r
def __call__(self, r):
# If we have a saved nonce, skip the 401
if self.last_nonce:
r.headers['Authorization'] = self.build_digest_header(r.method, r.url)
try:
self.pos = r.body.tell()
except AttributeError:
pass
r.register_hook('response', self.handle_401)
return r
This diff is collapsed.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
certs.py
~~~~~~~~
This module returns the preferred default CA certificate bundle.
If you are packaging Requests, e.g., for a Linux distribution or a managed
environment, you can change the definition of where() to return a separately
packaged CA bundle.
"""
import os.path
def where():
"""Return the preferred certificate bundle."""
# vendored bundle inside Requests
return os.path.join(os.path.dirname(__file__), 'cacert.pem')
if __name__ == '__main__':
print(where())
# -*- coding: utf-8 -*-
"""
pythoncompat
"""
from .packages import chardet
import sys
# -------
# Pythons
# -------
# Syntax sugar.
_ver = sys.version_info
#: Python 2.x?
is_py2 = (_ver[0] == 2)
#: Python 3.x?
is_py3 = (_ver[0] == 3)
#: Python 3.0.x
is_py30 = (is_py3 and _ver[1] == 0)
#: Python 3.1.x
is_py31 = (is_py3 and _ver[1] == 1)
#: Python 3.2.x
is_py32 = (is_py3 and _ver[1] == 2)
#: Python 3.3.x
is_py33 = (is_py3 and _ver[1] == 3)
#: Python 3.4.x
is_py34 = (is_py3 and _ver[1] == 4)
#: Python 2.7.x
is_py27 = (is_py2 and _ver[1] == 7)
#: Python 2.6.x
is_py26 = (is_py2 and _ver[1] == 6)
#: Python 2.5.x
is_py25 = (is_py2 and _ver[1] == 5)
#: Python 2.4.x
is_py24 = (is_py2 and _ver[1] == 4) # I'm assuming this is not by choice.
# ---------
# Platforms
# ---------
# Syntax sugar.
_ver = sys.version.lower()
is_pypy = ('pypy' in _ver)
is_jython = ('jython' in _ver)
is_ironpython = ('iron' in _ver)
# Assume CPython, if nothing else.
is_cpython = not any((is_pypy, is_jython, is_ironpython))
# Windows-based system.
is_windows = 'win32' in str(sys.platform).lower()
# Standard Linux 2+ system.
is_linux = ('linux' in str(sys.platform).lower())
is_osx = ('darwin' in str(sys.platform).lower())
is_hpux = ('hpux' in str(sys.platform).lower()) # Complete guess.
is_solaris = ('solar==' in str(sys.platform).lower()) # Complete guess.
try:
import simplejson as json
except ImportError:
import json
# ---------
# Specifics
# ---------
if is_py2:
from urllib import quote, unquote, quote_plus, unquote_plus, urlencode, getproxies, proxy_bypass
from urlparse import urlparse, urlunparse, urljoin, urlsplit, urldefrag
from urllib2 import parse_http_list
import cookielib
from Cookie import Morsel
from StringIO import StringIO
from .packages.urllib3.packages.ordered_dict import OrderedDict
from httplib import IncompleteRead
builtin_str = str
bytes = str
str = unicode
basestring = basestring
numeric_types = (int, long, float)
elif is_py3:
from urllib.parse import urlparse, urlunparse, urljoin, urlsplit, urlencode, quote, unquote, quote_plus, unquote_plus, urldefrag
from urllib.request import parse_http_list, getproxies, proxy_bypass
from http import cookiejar as cookielib
from http.cookies import Morsel
from io import StringIO
from collections import OrderedDict
from http.client import IncompleteRead
builtin_str = str
str = str
bytes = bytes
basestring = (str, bytes)
numeric_types = (int, float)
# -*- coding: utf-8 -*-
"""
Compatibility code to be able to use `cookielib.CookieJar` with requests.
requests.utils imports from here, so be careful with imports.
"""
import time
import collections
from .compat import cookielib, urlparse, urlunparse, Morsel
try:
import threading
# grr, pyflakes: this fixes "redefinition of unused 'threading'"
threading
except ImportError:
import dummy_threading as threading
class MockRequest(object):
"""Wraps a `requests.Request` to mimic a `urllib2.Request`.
The code in `cookielib.CookieJar` expects this interface in order to correctly
manage cookie policies, i.e., determine whether a cookie can be set, given the
domains of the request and the cookie.
The original request object is read-only. The client is responsible for collecting
the new headers via `get_new_headers()` and interpreting them appropriately. You
probably want `get_cookie_header`, defined below.
"""
def __init__(self, request):
self._r = request
self._new_headers = {}
self.type = urlparse(self._r.url).scheme
def get_type(self):
return self.type
def get_host(self):
return urlparse(self._r.url).netloc
def get_origin_req_host(self):
return self.get_host()
def get_full_url(self):
# Only return the response's URL if the user hadn't set the Host
# header
if not self._r.headers.get('Host'):
return self._r.url
# If they did set it, retrieve it and reconstruct the expected domain
host = self._r.headers['Host']
parsed = urlparse(self._r.url)
# Reconstruct the URL as we expect it
return urlunparse([
parsed.scheme, host, parsed.path, parsed.params, parsed.query,
parsed.fragment
])
def is_unverifiable(self):
return True
def has_header(self, name):
return name in self._r.headers or name in self._new_headers
def get_header(self, name, default=None):
return self._r.headers.get(name, self._new_headers.get(name, default))
def add_header(self, key, val):
"""cookielib has no legitimate use for this method; add it back if you find one."""
raise NotImplementedError("Cookie headers should be added with add_unredirected_header()")
def add_unredirected_header(self, name, value):
self._new_headers[name] = value
def get_new_headers(self):
return self._new_headers
@property
def unverifiable(self):
return self.is_unverifiable()
@property
def origin_req_host(self):
return self.get_origin_req_host()
@property
def host(self):
return self.get_host()
class MockResponse(object):
"""Wraps a `httplib.HTTPMessage` to mimic a `urllib.addinfourl`.
...what? Basically, expose the parsed HTTP headers from the server response
the way `cookielib` expects to see them.
"""
def __init__(self, headers):
"""Make a MockResponse for `cookielib` to read.
:param headers: a httplib.HTTPMessage or analogous carrying the headers
"""
self._headers = headers
def info(self):
return self._headers
def getheaders(self, name):
self._headers.getheaders(name)
def extract_cookies_to_jar(jar, request, response):
"""Extract the cookies from the response into a CookieJar.
:param jar: cookielib.CookieJar (not necessarily a RequestsCookieJar)
:param request: our own requests.Request object
:param response: urllib3.HTTPResponse object
"""
if not (hasattr(response, '_original_response') and
response._original_response):
return
# the _original_response field is the wrapped httplib.HTTPResponse object,
req = MockRequest(request)
# pull out the HTTPMessage with the headers and put it in the mock:
res = MockResponse(response._original_response.msg)
jar.extract_cookies(res, req)
def get_cookie_header(jar, request):
"""Produce an appropriate Cookie header string to be sent with `request`, or None."""
r = MockRequest(request)
jar.add_cookie_header(r)
return r.get_new_headers().get('Cookie')
def remove_cookie_by_name(cookiejar, name, domain=None, path=None):
"""Unsets a cookie by name, by default over all domains and paths.
Wraps CookieJar.clear(), is O(n).
"""
clearables = []
for cookie in cookiejar:
if cookie.name == name:
if domain is None or domain == cookie.domain:
if path is None or path == cookie.path:
clearables.append((cookie.domain, cookie.path, cookie.name))
for domain, path, name in clearables:
cookiejar.clear(domain, path, name)
class CookieConflictError(RuntimeError):
"""There are two cookies that meet the criteria specified in the cookie jar.
Use .get and .set and include domain and path args in order to be more specific."""
class RequestsCookieJar(cookielib.CookieJar, collections.MutableMapping):
"""Compatibility class; is a cookielib.CookieJar, but exposes a dict interface.
This is the CookieJar we create by default for requests and sessions that
don't specify one, since some clients may expect response.cookies and
session.cookies to support dict operations.
Don't use the dict interface internally; it's just for compatibility with
with external client code. All `requests` code should work out of the box
with externally provided instances of CookieJar, e.g., LWPCookieJar and
FileCookieJar.
Caution: dictionary operations that are normally O(1) may be O(n).
Unlike a regular CookieJar, this class is pickleable.
"""
def get(self, name, default=None, domain=None, path=None):
"""Dict-like get() that also supports optional domain and path args in
order to resolve naming collisions from using one cookie jar over
multiple domains. Caution: operation is O(n), not O(1)."""
try:
return self._find_no_duplicates(name, domain, path)
except KeyError:
return default
def set(self, name, value, **kwargs):
"""Dict-like set() that also supports optional domain and path args in
order to resolve naming collisions from using one cookie jar over
multiple domains."""
# support client code that unsets cookies by assignment of a None value:
if value is None:
remove_cookie_by_name(self, name, domain=kwargs.get('domain'), path=kwargs.get('path'))
return
if isinstance(value, Morsel):
c = morsel_to_cookie(value)
else:
c = create_cookie(name, value, **kwargs)
self.set_cookie(c)
return c
def iterkeys(self):
"""Dict-like iterkeys() that returns an iterator of names of cookies from the jar.
See itervalues() and iteritems()."""
for cookie in iter(self):
yield cookie.name
def keys(self):
"""Dict-like keys() that returns a list of names of cookies from the jar.
See values() and items()."""
return list(self.iterkeys())
def itervalues(self):
"""Dict-like itervalues() that returns an iterator of values of cookies from the jar.
See iterkeys() and iteritems()."""
for cookie in iter(self):
yield cookie.value
def values(self):
"""Dict-like values() that returns a list of values of cookies from the jar.
See keys() and items()."""
return list(self.itervalues())
def iteritems(self):
"""Dict-like iteritems() that returns an iterator of name-value tuples from the jar.
See iterkeys() and itervalues()."""
for cookie in iter(self):
yield cookie.name, cookie.value
def items(self):
"""Dict-like items() that returns a list of name-value tuples from the jar.
See keys() and values(). Allows client-code to call "dict(RequestsCookieJar)
and get a vanilla python dict of key value pairs."""
return list(self.iteritems())
def list_domains(self):
"""Utility method to list all the domains in the jar."""
domains = []
for cookie in iter(self):
if cookie.domain not in domains:
domains.append(cookie.domain)
return domains
def list_paths(self):
"""Utility method to list all the paths in the jar."""
paths = []
for cookie in iter(self):
if cookie.path not in paths:
paths.append(cookie.path)
return paths
def multiple_domains(self):
"""Returns True if there are multiple domains in the jar.
Returns False otherwise."""
domains = []
for cookie in iter(self):
if cookie.domain is not None and cookie.domain in domains:
return True
domains.append(cookie.domain)
return False # there is only one domain in jar
def get_dict(self, domain=None, path=None):
"""Takes as an argument an optional domain and path and returns a plain old
Python dict of name-value pairs of cookies that meet the requirements."""
dictionary = {}
for cookie in iter(self):
if (domain is None or cookie.domain == domain) and (path is None
or cookie.path == path):
dictionary[cookie.name] = cookie.value
return dictionary
def __getitem__(self, name):
"""Dict-like __getitem__() for compatibility with client code. Throws exception
if there are more than one cookie with name. In that case, use the more
explicit get() method instead. Caution: operation is O(n), not O(1)."""
return self._find_no_duplicates(name)
def __setitem__(self, name, value):
"""Dict-like __setitem__ for compatibility with client code. Throws exception
if there is already a cookie of that name in the jar. In that case, use the more
explicit set() method instead."""
self.set(name, value)
def __delitem__(self, name):
"""Deletes a cookie given a name. Wraps cookielib.CookieJar's remove_cookie_by_name()."""
remove_cookie_by_name(self, name)
def set_cookie(self, cookie, *args, **kwargs):
if hasattr(cookie.value, 'startswith') and cookie.value.startswith('"') and cookie.value.endswith('"'):
cookie.value = cookie.value.replace('\\"', '')
return super(RequestsCookieJar, self).set_cookie(cookie, *args, **kwargs)
def update(self, other):
"""Updates this jar with cookies from another CookieJar or dict-like"""
if isinstance(other, cookielib.CookieJar):
for cookie in other:
self.set_cookie(cookie)
else:
super(RequestsCookieJar, self).update(other)
def _find(self, name, domain=None, path=None):
"""Requests uses this method internally to get cookie values. Takes as args name
and optional domain and path. Returns a cookie.value. If there are conflicting cookies,
_find arbitrarily chooses one. See _find_no_duplicates if you want an exception thrown
if there are conflicting cookies."""
for cookie in iter(self):
if cookie.name == name:
if domain is None or cookie.domain == domain:
if path is None or cookie.path == path:
return cookie.value
raise KeyError('name=%r, domain=%r, path=%r' % (name, domain, path))
def _find_no_duplicates(self, name, domain=None, path=None):
"""__get_item__ and get call _find_no_duplicates -- never used in Requests internally.
Takes as args name and optional domain and path. Returns a cookie.value.
Throws KeyError if cookie is not found and CookieConflictError if there are
multiple cookies that match name and optionally domain and path."""
toReturn = None
for cookie in iter(self):
if cookie.name == name:
if domain is None or cookie.domain == domain:
if path is None or cookie.path == path:
if toReturn is not None: # if there are multiple cookies that meet passed in criteria
raise CookieConflictError('There are multiple cookies with name, %r' % (name))
toReturn = cookie.value # we will eventually return this as long as no cookie conflict
if toReturn:
return toReturn
raise KeyError('name=%r, domain=%r, path=%r' % (name, domain, path))
def __getstate__(self):
"""Unlike a normal CookieJar, this class is pickleable."""
state = self.__dict__.copy()
# remove the unpickleable RLock object
state.pop('_cookies_lock')
return state
def __setstate__(self, state):
"""Unlike a normal CookieJar, this class is pickleable."""
self.__dict__.update(state)
if '_cookies_lock' not in self.__dict__:
self._cookies_lock = threading.RLock()
def copy(self):
"""Return a copy of this RequestsCookieJar."""
new_cj = RequestsCookieJar()
new_cj.update(self)
return new_cj
def create_cookie(name, value, **kwargs):
"""Make a cookie from underspecified parameters.
By default, the pair of `name` and `value` will be set for the domain ''
and sent on every request (this is sometimes called a "supercookie").
"""
result = dict(
version=0,
name=name,
value=value,
port=None,
domain='',
path='/',
secure=False,
expires=None,
discard=True,
comment=None,
comment_url=None,
rest={'HttpOnly': None},
rfc2109=False,)
badargs = set(kwargs) - set(result)
if badargs:
err = 'create_cookie() got unexpected keyword arguments: %s'
raise TypeError(err % list(badargs))
result.update(kwargs)
result['port_specified'] = bool(result['port'])
result['domain_specified'] = bool(result['domain'])
result['domain_initial_dot'] = result['domain'].startswith('.')
result['path_specified'] = bool(result['path'])
return cookielib.Cookie(**result)
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
expires = None
if morsel['max-age']:
expires = time.time() + morsel['max-age']
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = time.mktime(
time.strptime(morsel['expires'], time_template)) - time.timezone
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
discard=False,
domain=morsel['domain'],
expires=expires,
name=morsel.key,
path=morsel['path'],
port=None,
rest={'HttpOnly': morsel['httponly']},
rfc2109=False,
secure=bool(morsel['secure']),
value=morsel.value,
version=morsel['version'] or 0,
)
def cookiejar_from_dict(cookie_dict, cookiejar=None, overwrite=True):
"""Returns a CookieJar from a key/value dictionary.
:param cookie_dict: Dict of key/values to insert into CookieJar.
:param cookiejar: (optional) A cookiejar to add the cookies to.
:param overwrite: (optional) If False, will not replace cookies
already in the jar with new ones.
"""
if cookiejar is None:
cookiejar = RequestsCookieJar()
if cookie_dict is not None:
names_from_jar = [cookie.name for cookie in cookiejar]
for name in cookie_dict:
if overwrite or (name not in names_from_jar):
cookiejar.set_cookie(create_cookie(name, cookie_dict[name]))
return cookiejar
def merge_cookies(cookiejar, cookies):
"""Add cookies to cookiejar and returns a merged CookieJar.
:param cookiejar: CookieJar object to add the cookies to.
:param cookies: Dictionary or CookieJar object to be added.
"""
if not isinstance(cookiejar, cookielib.CookieJar):
raise ValueError('You can only merge into CookieJar')
if isinstance(cookies, dict):
cookiejar = cookiejar_from_dict(
cookies, cookiejar=cookiejar, overwrite=False)
elif isinstance(cookies, cookielib.CookieJar):
try:
cookiejar.update(cookies)
except AttributeError:
for cookie_in_jar in cookies:
cookiejar.set_cookie(cookie_in_jar)
return cookiejar
# -*- coding: utf-8 -*-
"""
requests.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of Requests' exceptions.
"""
from .packages.urllib3.exceptions import HTTPError as BaseHTTPError
class RequestException(IOError):
"""There was an ambiguous exception that occurred while handling your
request."""
def __init__(self, *args, **kwargs):
"""
Initialize RequestException with `request` and `response` objects.
"""
response = kwargs.pop('response', None)
self.response = response
self.request = kwargs.pop('request', None)
if (response is not None and not self.request and
hasattr(response, 'request')):
self.request = self.response.request
super(RequestException, self).__init__(*args, **kwargs)
class HTTPError(RequestException):
"""An HTTP error occurred."""
class ConnectionError(RequestException):
"""A Connection error occurred."""
class ProxyError(ConnectionError):
"""A proxy error occurred."""
class SSLError(ConnectionError):
"""An SSL error occurred."""
class Timeout(RequestException):
"""The request timed out."""
class URLRequired(RequestException):
"""A valid URL is required to make a request."""
class TooManyRedirects(RequestException):
"""Too many redirects."""
class MissingSchema(RequestException, ValueError):
"""The URL schema (e.g. http or https) is missing."""
class InvalidSchema(RequestException, ValueError):
"""See defaults.py for valid schemas."""
class InvalidURL(RequestException, ValueError):
""" The URL provided was somehow invalid. """
class ChunkedEncodingError(RequestException):
"""The server declared chunked encoding but sent an invalid chunk."""
class ContentDecodingError(RequestException, BaseHTTPError):
"""Failed to decode response content"""
# -*- coding: utf-8 -*-
"""
requests.hooks
~~~~~~~~~~~~~~
This module provides the capabilities for the Requests hooks system.
Available hooks:
``response``:
The response generated from a Request.
"""
HOOKS = ['response']
def default_hooks():
hooks = {}
for event in HOOKS:
hooks[event] = []
return hooks
# TODO: response is the only one
def dispatch_hook(key, hooks, hook_data, **kwargs):
"""Dispatches a hook dictionary on a given piece of data."""
hooks = hooks or dict()
if key in hooks:
hooks = hooks.get(key)
if hasattr(hooks, '__call__'):
hooks = [hooks]
for hook in hooks:
_hook_data = hook(hook_data, **kwargs)
if _hook_data is not None:
hook_data = _hook_data
return hook_data
This diff is collapsed.
If you are planning to submit a pull request to requests with any changes in
this library do not go any further. These are independent libraries which we
vendor into requests. Any changes necessary to these libraries must be made in
them and submitted as separate pull requests to those libraries.
urllib3 pull requests go here: https://github.com/shazow/urllib3
chardet pull requests go here: https://github.com/chardet/chardet
from __future__ import absolute_import
from . import urllib3
######################## BEGIN LICENSE BLOCK ########################
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
__version__ = "2.2.1"
from sys import version_info
def detect(aBuf):
if ((version_info < (3, 0) and isinstance(aBuf, unicode)) or
(version_info >= (3, 0) and not isinstance(aBuf, bytes))):
raise ValueError('Expected a bytes object, not a unicode object')
from . import universaldetector
u = universaldetector.UniversalDetector()
u.reset()
u.feed(aBuf)
u.close()
return u.result
This diff is collapsed.
######################## BEGIN LICENSE BLOCK ########################
# The Original Code is Mozilla Communicator client code.
#
# The Initial Developer of the Original Code is
# Netscape Communications Corporation.
# Portions created by the Initial Developer are Copyright (C) 1998
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Pilgrim - port to Python
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
from .mbcharsetprober import MultiByteCharSetProber
from .codingstatemachine import CodingStateMachine
from .chardistribution import Big5DistributionAnalysis
from .mbcssm import Big5SMModel
class Big5Prober(MultiByteCharSetProber):
def __init__(self):
MultiByteCharSetProber.__init__(self)
self._mCodingSM = CodingStateMachine(Big5SMModel)
self._mDistributionAnalyzer = Big5DistributionAnalysis()
self.reset()
def get_charset_name(self):
return "Big5"
#!/usr/bin/env python
"""
Script which takes one or more file paths and reports on their detected
encodings
Example::
% chardetect somefile someotherfile
somefile: windows-1252 with confidence 0.5
someotherfile: ascii with confidence 1.0
If no paths are provided, it takes its input from stdin.
"""
from io import open
from sys import argv, stdin
from chardet.universaldetector import UniversalDetector
def description_of(file, name='stdin'):
"""Return a string describing the probable encoding of a file."""
u = UniversalDetector()
for line in file:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '%s: %s with confidence %s' % (name,
result['encoding'],
result['confidence'])
else:
return '%s: no result' % name
def main():
if len(argv) <= 1:
print(description_of(stdin))
else:
for path in argv[1:]:
with open(path, 'rb') as f:
print(description_of(f, path))
if __name__ == '__main__':
main()
######################## BEGIN LICENSE BLOCK ########################
# The Original Code is Mozilla Communicator client code.
#
# The Initial Developer of the Original Code is
# Netscape Communications Corporation.
# Portions created by the Initial Developer are Copyright (C) 1998
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Pilgrim - port to Python
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
from .euctwfreq import (EUCTWCharToFreqOrder, EUCTW_TABLE_SIZE,
EUCTW_TYPICAL_DISTRIBUTION_RATIO)
from .euckrfreq import (EUCKRCharToFreqOrder, EUCKR_TABLE_SIZE,
EUCKR_TYPICAL_DISTRIBUTION_RATIO)
from .gb2312freq import (GB2312CharToFreqOrder, GB2312_TABLE_SIZE,
GB2312_TYPICAL_DISTRIBUTION_RATIO)
from .big5freq import (Big5CharToFreqOrder, BIG5_TABLE_SIZE,
BIG5_TYPICAL_DISTRIBUTION_RATIO)
from .jisfreq import (JISCharToFreqOrder, JIS_TABLE_SIZE,
JIS_TYPICAL_DISTRIBUTION_RATIO)
from .compat import wrap_ord
ENOUGH_DATA_THRESHOLD = 1024
SURE_YES = 0.99
SURE_NO = 0.01
MINIMUM_DATA_THRESHOLD = 3
class CharDistributionAnalysis:
def __init__(self):
# Mapping table to get frequency order from char order (get from
# GetOrder())
self._mCharToFreqOrder = None
self._mTableSize = None # Size of above table
# This is a constant value which varies from language to language,
# used in calculating confidence. See
# http://www.mozilla.org/projects/intl/UniversalCharsetDetection.html
# for further detail.
self._mTypicalDistributionRatio = None
self.reset()
def reset(self):
"""reset analyser, clear any state"""
# If this flag is set to True, detection is done and conclusion has
# been made
self._mDone = False
self._mTotalChars = 0 # Total characters encountered
# The number of characters whose frequency order is less than 512
self._mFreqChars = 0
def feed(self, aBuf, aCharLen):
"""feed a character with known length"""
if aCharLen == 2:
# we only care about 2-bytes character in our distribution analysis
order = self.get_order(aBuf)
else:
order = -1
if order >= 0:
self._mTotalChars += 1
# order is valid
if order < self._mTableSize:
if 512 > self._mCharToFreqOrder[order]:
self._mFreqChars += 1
def get_confidence(self):
"""return confidence based on existing data"""
# if we didn't receive any character in our consideration range,
# return negative answer
if self._mTotalChars <= 0 or self._mFreqChars <= MINIMUM_DATA_THRESHOLD:
return SURE_NO
if self._mTotalChars != self._mFreqChars:
r = (self._mFreqChars / ((self._mTotalChars - self._mFreqChars)
* self._mTypicalDistributionRatio))
if r < SURE_YES:
return r
# normalize confidence (we don't want to be 100% sure)
return SURE_YES
def got_enough_data(self):
# It is not necessary to receive all data to draw conclusion.
# For charset detection, certain amount of data is enough
return self._mTotalChars > ENOUGH_DATA_THRESHOLD
def get_order(self, aBuf):
# We do not handle characters based on the original encoding string,
# but convert this encoding string to a number, here called order.
# This allows multiple encodings of a language to share one frequency
# table.
return -1
class EUCTWDistributionAnalysis(CharDistributionAnalysis):
def __init__(self):
CharDistributionAnalysis.__init__(self)
self._mCharToFreqOrder = EUCTWCharToFreqOrder
self._mTableSize = EUCTW_TABLE_SIZE
self._mTypicalDistributionRatio = EUCTW_TYPICAL_DISTRIBUTION_RATIO
def get_order(self, aBuf):
# for euc-TW encoding, we are interested
# first byte range: 0xc4 -- 0xfe
# second byte range: 0xa1 -- 0xfe
# no validation needed here. State machine has done that
first_char = wrap_ord(aBuf[0])
if first_char >= 0xC4:
return 94 * (first_char - 0xC4) + wrap_ord(aBuf[1]) - 0xA1
else:
return -1
class EUCKRDistributionAnalysis(CharDistributionAnalysis):
def __init__(self):
CharDistributionAnalysis.__init__(self)
self._mCharToFreqOrder = EUCKRCharToFreqOrder
self._mTableSize = EUCKR_TABLE_SIZE
self._mTypicalDistributionRatio = EUCKR_TYPICAL_DISTRIBUTION_RATIO
def get_order(self, aBuf):
# for euc-KR encoding, we are interested
# first byte range: 0xb0 -- 0xfe
# second byte range: 0xa1 -- 0xfe
# no validation needed here. State machine has done that
first_char = wrap_ord(aBuf[0])
if first_char >= 0xB0:
return 94 * (first_char - 0xB0) + wrap_ord(aBuf[1]) - 0xA1
else:
return -1
class GB2312DistributionAnalysis(CharDistributionAnalysis):
def __init__(self):
CharDistributionAnalysis.__init__(self)
self._mCharToFreqOrder = GB2312CharToFreqOrder
self._mTableSize = GB2312_TABLE_SIZE
self._mTypicalDistributionRatio = GB2312_TYPICAL_DISTRIBUTION_RATIO
def get_order(self, aBuf):
# for GB2312 encoding, we are interested
# first byte range: 0xb0 -- 0xfe
# second byte range: 0xa1 -- 0xfe
# no validation needed here. State machine has done that
first_char, second_char = wrap_ord(aBuf[0]), wrap_ord(aBuf[1])
if (first_char >= 0xB0) and (second_char >= 0xA1):
return 94 * (first_char - 0xB0) + second_char - 0xA1
else:
return -1
class Big5DistributionAnalysis(CharDistributionAnalysis):
def __init__(self):
CharDistributionAnalysis.__init__(self)
self._mCharToFreqOrder = Big5CharToFreqOrder
self._mTableSize = BIG5_TABLE_SIZE
self._mTypicalDistributionRatio = BIG5_TYPICAL_DISTRIBUTION_RATIO
def get_order(self, aBuf):
# for big5 encoding, we are interested
# first byte range: 0xa4 -- 0xfe
# second byte range: 0x40 -- 0x7e , 0xa1 -- 0xfe
# no validation needed here. State machine has done that
first_char, second_char = wrap_ord(aBuf[0]), wrap_ord(aBuf[1])
if first_char >= 0xA4:
if second_char >= 0xA1:
return 157 * (first_char - 0xA4) + second_char - 0xA1 + 63
else:
return 157 * (first_char - 0xA4) + second_char - 0x40
else:
return -1
class SJISDistributionAnalysis(CharDistributionAnalysis):
def __init__(self):
CharDistributionAnalysis.__init__(self)
self._mCharToFreqOrder = JISCharToFreqOrder
self._mTableSize = JIS_TABLE_SIZE
self._mTypicalDistributionRatio = JIS_TYPICAL_DISTRIBUTION_RATIO
def get_order(self, aBuf):
# for sjis encoding, we are interested
# first byte range: 0x81 -- 0x9f , 0xe0 -- 0xfe
# second byte range: 0x40 -- 0x7e, 0x81 -- oxfe
# no validation needed here. State machine has done that
first_char, second_char = wrap_ord(aBuf[0]), wrap_ord(aBuf[1])
if (first_char >= 0x81) and (first_char <= 0x9F):
order = 188 * (first_char - 0x81)
elif (first_char >= 0xE0) and (first_char <= 0xEF):
order = 188 * (first_char - 0xE0 + 31)
else:
return -1
order = order + second_char - 0x40
if second_char > 0x7F:
order = -1
return order
class EUCJPDistributionAnalysis(CharDistributionAnalysis):
def __init__(self):
CharDistributionAnalysis.__init__(self)
self._mCharToFreqOrder = JISCharToFreqOrder
self._mTableSize = JIS_TABLE_SIZE
self._mTypicalDistributionRatio = JIS_TYPICAL_DISTRIBUTION_RATIO
def get_order(self, aBuf):
# for euc-JP encoding, we are interested
# first byte range: 0xa0 -- 0xfe
# second byte range: 0xa1 -- 0xfe
# no validation needed here. State machine has done that
char = wrap_ord(aBuf[0])
if char >= 0xA0:
return 94 * (char - 0xA1) + wrap_ord(aBuf[1]) - 0xa1
else:
return -1
######################## BEGIN LICENSE BLOCK ########################
# The Original Code is Mozilla Communicator client code.
#
# The Initial Developer of the Original Code is
# Netscape Communications Corporation.
# Portions created by the Initial Developer are Copyright (C) 1998
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Pilgrim - port to Python
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
from . import constants
import sys
from .charsetprober import CharSetProber
class CharSetGroupProber(CharSetProber):
def __init__(self):
CharSetProber.__init__(self)
self._mActiveNum = 0
self._mProbers = []
self._mBestGuessProber = None
def reset(self):
CharSetProber.reset(self)
self._mActiveNum = 0
for prober in self._mProbers:
if prober:
prober.reset()
prober.active = True
self._mActiveNum += 1
self._mBestGuessProber = None
def get_charset_name(self):
if not self._mBestGuessProber:
self.get_confidence()
if not self._mBestGuessProber:
return None
# self._mBestGuessProber = self._mProbers[0]
return self._mBestGuessProber.get_charset_name()
def feed(self, aBuf):
for prober in self._mProbers:
if not prober:
continue
if not prober.active:
continue
st = prober.feed(aBuf)
if not st:
continue
if st == constants.eFoundIt:
self._mBestGuessProber = prober
return self.get_state()
elif st == constants.eNotMe:
prober.active = False
self._mActiveNum -= 1
if self._mActiveNum <= 0:
self._mState = constants.eNotMe
return self.get_state()
return self.get_state()
def get_confidence(self):
st = self.get_state()
if st == constants.eFoundIt:
return 0.99
elif st == constants.eNotMe:
return 0.01
bestConf = 0.0
self._mBestGuessProber = None
for prober in self._mProbers:
if not prober:
continue
if not prober.active:
if constants._debug:
sys.stderr.write(prober.get_charset_name()
+ ' not active\n')
continue
cf = prober.get_confidence()
if constants._debug:
sys.stderr.write('%s confidence = %s\n' %
(prober.get_charset_name(), cf))
if bestConf < cf:
bestConf = cf
self._mBestGuessProber = prober
if not self._mBestGuessProber:
return 0.0
return bestConf
# else:
# self._mBestGuessProber = self._mProbers[0]
# return self._mBestGuessProber.get_confidence()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment