import json
import os
import warnings
from io import BytesIO
from urllib.parse import urlsplit
from zipfile import ZipFile
from requests.adapters import HTTPAdapter
from requests_cache import NEVER_EXPIRE, CachedSession
from .exceptions import UnknownLatestVersion
if os.name == 'nt':
FILE_URI_OFFSET = 8 # "file:///C|/tmp"
else:
FILE_URI_OFFSET = 7 # "file:///tmp"
DEFAULT_MINOR_VERSION = '1.1'
# https://requests-cache.readthedocs.io/en/stable/user_guide/troubleshooting.html#common-error-messages
# https://docs.python.org/3/library/socket.html#constants
warnings.filterwarnings(
'ignore',
category=ResourceWarning,
message=r"^unclosed <ssl\.SSLSocket fd=\d+, family=AddressFamily\.AF_INET6?, type=SocketKind\.SOCK_STREAM, ",
)
# https://2.python-requests.org/projects/3/api/#requests.adapters.HTTPAdapter
# https://urllib3.readthedocs.io/en/latest/advanced-usage.html#customizing-pool-behavior
adapter = HTTPAdapter(max_retries=3, pool_maxsize=int(os.getenv('REQUESTS_POOL_MAXSIZE', 10)))
session = CachedSession(backend='memory', expire_after=os.getenv('REQUESTS_CACHE_EXPIRE_AFTER', NEVER_EXPIRE))
session.mount('https://', adapter)
session.mount('http://', adapter)
[docs]
def json_dump(data, io):
"""
Dumps JSON to a file-like object.
"""
json.dump(data, io, ensure_ascii=False, indent=2)
[docs]
def get_latest_version(versions):
"""
Returns the identifier of the latest version from a list of versions of the same extension.
:raises UnknownLatestVersion: if the latest version of the extension can't be determined
"""
if len(versions) == 1:
return versions[0]
version_numbers = {version.version: version for version in versions}
if 'master' in version_numbers:
return version_numbers['master']
if DEFAULT_MINOR_VERSION in version_numbers:
return version_numbers[DEFAULT_MINOR_VERSION]
dated = [version for version in versions if version.date]
if dated:
return max(dated, key=lambda version: version.date)
raise UnknownLatestVersion
def _resolve(data_or_url):
parsed = urlsplit(data_or_url)
if parsed.scheme:
if parsed.scheme == 'file':
with open(data_or_url[FILE_URI_OFFSET:]) as f:
return f.read()
response = session.get(data_or_url)
response.raise_for_status()
return response.text
return data_or_url
def _resolve_zip(url, root=''):
parsed = urlsplit(url)
if parsed.scheme == 'file':
if url.endswith('.zip'):
with open(url[FILE_URI_OFFSET:], 'rb') as f:
io = BytesIO(f.read())
else:
io = BytesIO()
with ZipFile(io, 'w') as zipfile:
zipfile.write(url[FILE_URI_OFFSET:], arcname='zip/')
for root, dirs, files in os.walk(os.path.join(url[FILE_URI_OFFSET:], root)):
for directory in dirs:
if directory == '__pycache__':
dirs.remove(directory)
for file in sorted(files):
zipfile.write(os.path.join(root, file), arcname=f'zip/{file}')
else:
response = session.get(url, allow_redirects=True)
response.raise_for_status()
io = BytesIO(response.content)
return ZipFile(io)