D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
bin
/
X11
/
Filename :
deb-janitor
back
Copy
#!/usr/bin/python3 # Copyright (c) 2020 Jelmer Vernooij <jelmer@debian.org> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See file /usr/share/common-licenses/GPL-3 for more details. # """ Command-line interface for the Debian Janitor. See https://janitor.debian.net/ """ import argparse import json import sys from urllib.request import urlopen, Request from urllib.error import HTTPError import devscripts DEFAULT_API_URL = 'https://janitor.debian.net/api/' USER_AGENT = 'devscripts janitor cli (%s)' % (devscripts.version) DEFAULT_URLLIB_TIMEOUT = 30 def _get_json_url(http_url: str, timeout: int = DEFAULT_URLLIB_TIMEOUT): headers = {'User-Agent': USER_AGENT, 'Accept': 'application/json'} with urlopen(Request(http_url, headers=headers), timeout=timeout) as resp: http_contents = resp.read() return json.loads(http_contents) def schedule(source, suite, api_url=DEFAULT_API_URL): """Schedule a new run for a package. Args: source: the source package name suite: the suite to schedule for """ url = '%s%s/pkg/%s/schedule' % (api_url, suite, source) headers = {'User-Agent': USER_AGENT} req = Request(url, headers=headers, method='POST') try: with urlopen(req) as resp: resp = json.load(resp) except HTTPError as err: if err.code == 404: raise NoSuchSource(json.loads(err.read())['reason']) from err raise estimated_duration = resp['estimated_duration_seconds'] queue_position = resp['queue_position'] queue_wait_time = resp['queue_wait_time'] return (estimated_duration, queue_position, queue_wait_time) class MissingDiffError(Exception): """There is no diff for the specified package/suite combination.""" class NoSuchSource(Exception): """There is no source package known with the specified name.""" def diff(source, suite, api_url=DEFAULT_API_URL): """Retrieve the source diff for a package/suite. Args: source: the source package name suite: the suite to retrieve Returns: the diff as a bytestring Raises: MissingDiffError: If the diff was missing (source not valid, suite not valid, no runs yet, etc) """ url = '%s%s/pkg/%s/diff' % (api_url, suite, source) headers = {'User-Agent': USER_AGENT, 'Accept': 'text/plain'} req = Request(url, headers=headers) try: with urlopen(req) as resp: data = resp.read() except HTTPError as err: if err.code == 404: raise MissingDiffError(err.read().decode()) from err raise err else: return data def main(argv): """Handle command-line arguments.""" parser = argparse.ArgumentParser('janitor') parser.add_argument( '--api-url', type=str, help='API endpoint to talk to', default=DEFAULT_API_URL) subparsers = parser.add_subparsers( help='sub-command help', dest='subcommand') schedule_parser = subparsers.add_parser('schedule') schedule_parser.add_argument('source') schedule_parser.add_argument('suite') diff_parser = subparsers.add_parser('diff') diff_parser.add_argument('source', help='Source package name') diff_parser.add_argument('suite') args = parser.parse_args(argv) if args.subcommand == 'schedule': try: (est_duration, pos, wait_time) = schedule( args.source, args.suite, api_url=args.api_url) except NoSuchSource as err: sys.stderr.write('%s\n' % err.args[0]) return 1 print( 'Scheduled. Estimated duration: %.2fs, ' 'queue position: %d (wait time: %.2f)' % ( est_duration, pos, wait_time)) return 0 if args.subcommand == 'diff': try: sys.stdout.buffer.write( diff(args.source, args.suite, api_url=args.api_url)) sys.stdout.flush() except MissingDiffError as err: sys.stderr.write('%s\n' % err.args[0]) return 1 else: return 0 parser.print_usage() return 1 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))