123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868 |
- # Copyright 2008 Canonical Ltd.
- # This file is part of launchpadlib.
- #
- # launchpadlib is free software: you can redistribute it and/or modify it
- # under the terms of the GNU Lesser General Public License as published by the
- # Free Software Foundation, version 3 of the License.
- #
- # launchpadlib is distributed in the hope that it will be useful, but WITHOUT
- # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
- # for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with launchpadlib. If not, see <http://www.gnu.org/licenses/>.
- from __future__ import print_function
- """launchpadlib credentials and authentication support."""
- __metaclass__ = type
- __all__ = [
- "AccessToken",
- "AnonymousAccessToken",
- "AuthorizeRequestTokenWithBrowser",
- "CredentialStore",
- "RequestTokenAuthorizationEngine",
- "Consumer",
- "Credentials",
- ]
- try:
- from cStringIO import StringIO
- except ImportError:
- from io import StringIO
- import httplib2
- import json
- import os
- from select import select
- import stat
- from sys import stdin
- import time
- try:
- from urllib.parse import urlencode
- except ImportError:
- from urllib import urlencode
- try:
- from urllib.parse import urljoin
- except ImportError:
- from urlparse import urljoin
- import webbrowser
- from base64 import (
- b64decode,
- b64encode,
- )
- from six.moves.urllib.parse import parse_qs
- if bytes is str:
- # Python 2
- unicode_type = unicode # noqa: F821
- else:
- unicode_type = str
- from lazr.restfulclient.errors import HTTPError
- from lazr.restfulclient.authorize.oauth import (
- AccessToken as _AccessToken,
- Consumer,
- OAuthAuthorizer,
- SystemWideConsumer, # Not used directly, just re-imported into here.
- )
- from launchpadlib import uris
- request_token_page = "+request-token"
- access_token_page = "+access-token"
- authorize_token_page = "+authorize-token"
- access_token_poll_time = 1
- access_token_poll_timeout = 15 * 60
- EXPLOSIVE_ERRORS = (MemoryError, KeyboardInterrupt, SystemExit)
- def _ssl_certificate_validation_disabled():
- """Whether the user has disabled SSL certificate connection.
- Some testing servers have broken certificates. Rather than raising an
- error, we allow an environment variable,
- ``LP_DISABLE_SSL_CERTIFICATE_VALIDATION`` to disable the check.
- """
- # XXX: Copied from lazr/restfulclient/_browser.py. Once it appears in a
- # released version of lazr.restfulclient, depend on that new version and
- # delete this copy.
- return bool(os.environ.get("LP_DISABLE_SSL_CERTIFICATE_VALIDATION", False))
- def _http_post(url, headers, params):
- """POST to ``url`` with ``headers`` and a body of urlencoded ``params``.
- Wraps it up to make sure we avoid the SSL certificate validation if our
- environment tells us to. Also, raises an error on non-200 statuses.
- """
- cert_disabled = _ssl_certificate_validation_disabled()
- response, content = httplib2.Http(
- disable_ssl_certificate_validation=cert_disabled
- ).request(url, method="POST", headers=headers, body=urlencode(params))
- if response.status != 200:
- raise HTTPError(response, content)
- return response, content
- class Credentials(OAuthAuthorizer):
- """Standard credentials storage and usage class.
- :ivar consumer: The consumer (application)
- :type consumer: `Consumer`
- :ivar access_token: Access information on behalf of the user
- :type access_token: `AccessToken`
- """
- _request_token = None
- URI_TOKEN_FORMAT = "uri"
- DICT_TOKEN_FORMAT = "dict"
- ITEM_SEPARATOR = "<BR>"
- NEWLINE = "\n"
- def serialize(self):
- """Turn this object into a string.
- This should probably be moved into OAuthAuthorizer.
- """
- sio = StringIO()
- self.save(sio)
- serialized = sio.getvalue()
- if isinstance(serialized, unicode_type):
- serialized = serialized.encode("utf-8")
- return serialized
- @classmethod
- def from_string(cls, value):
- """Create a `Credentials` object from a serialized string.
- This should probably be moved into OAuthAuthorizer.
- """
- credentials = cls()
- if not isinstance(value, unicode_type):
- value = value.decode("utf-8")
- credentials.load(StringIO(value))
- return credentials
- def get_request_token(
- self,
- context=None,
- web_root=uris.STAGING_WEB_ROOT,
- token_format=URI_TOKEN_FORMAT,
- ):
- """Request an OAuth token to Launchpad.
- Also store the token in self._request_token.
- This method must not be called on an object with no consumer
- specified or if an access token has already been obtained.
- :param context: The context of this token, that is, its scope of
- validity within Launchpad.
- :param web_root: The URL of the website on which the token
- should be requested.
- :token_format: How the token should be
- presented. URI_TOKEN_FORMAT means just return the URL to
- the page that authorizes the token. DICT_TOKEN_FORMAT
- means return a dictionary describing the token
- and the site's authentication policy.
- :return: If token_format is URI_TOKEN_FORMAT, the URL for the
- user to authorize the `AccessToken` provided by
- Launchpad. If token_format is DICT_TOKEN_FORMAT, a dict of
- information about the new access token.
- """
- assert self.consumer is not None, "Consumer not specified."
- assert self.access_token is None, "Access token already obtained."
- web_root = uris.lookup_web_root(web_root)
- params = dict(
- oauth_consumer_key=self.consumer.key,
- oauth_signature_method="PLAINTEXT",
- oauth_signature="&",
- )
- url = web_root + request_token_page
- headers = {"Referer": web_root}
- if token_format == self.DICT_TOKEN_FORMAT:
- headers["Accept"] = "application/json"
- response, content = _http_post(url, headers, params)
- if isinstance(content, bytes):
- content = content.decode("utf-8")
- if token_format == self.DICT_TOKEN_FORMAT:
- params = json.loads(content)
- if context is not None:
- params["lp.context"] = context
- self._request_token = AccessToken.from_params(params)
- return params
- else:
- self._request_token = AccessToken.from_string(content)
- url = "%s%s?oauth_token=%s" % (
- web_root,
- authorize_token_page,
- self._request_token.key,
- )
- if context is not None:
- self._request_token.context = context
- url += "&lp.context=%s" % context
- return url
- def exchange_request_token_for_access_token(
- self, web_root=uris.STAGING_WEB_ROOT
- ):
- """Exchange the previously obtained request token for an access token.
- This method must not be called unless get_request_token() has been
- called and completed successfully.
- The access token will be stored as self.access_token.
- :param web_root: The base URL of the website that granted the
- request token.
- """
- assert (
- self._request_token is not None
- ), "get_request_token() doesn't seem to have been called."
- web_root = uris.lookup_web_root(web_root)
- params = dict(
- oauth_consumer_key=self.consumer.key,
- oauth_signature_method="PLAINTEXT",
- oauth_token=self._request_token.key,
- oauth_signature="&%s" % self._request_token.secret,
- )
- url = web_root + access_token_page
- headers = {"Referer": web_root}
- response, content = _http_post(url, headers, params)
- self.access_token = AccessToken.from_string(content)
- class AccessToken(_AccessToken):
- """An OAuth access token."""
- @classmethod
- def from_params(cls, params):
- """Create and return a new `AccessToken` from the given dict."""
- key = params["oauth_token"]
- secret = params["oauth_token_secret"]
- context = params.get("lp.context")
- return cls(key, secret, context)
- @classmethod
- def from_string(cls, query_string):
- """Create and return a new `AccessToken` from the given string."""
- if not isinstance(query_string, unicode_type):
- query_string = query_string.decode("utf-8")
- params = parse_qs(query_string, keep_blank_values=False)
- key = params["oauth_token"]
- assert len(key) == 1, "Query string must have exactly one oauth_token."
- key = key[0]
- secret = params["oauth_token_secret"]
- assert len(secret) == 1, "Query string must have exactly one secret."
- secret = secret[0]
- context = params.get("lp.context")
- if context is not None:
- assert (
- len(context) == 1
- ), "Query string must have exactly one context"
- context = context[0]
- return cls(key, secret, context)
- class AnonymousAccessToken(_AccessToken):
- """An OAuth access token that doesn't authenticate anybody.
- This token can be used for anonymous access.
- """
- def __init__(self):
- super(AnonymousAccessToken, self).__init__("", "")
- class CredentialStore(object):
- """Store OAuth credentials locally.
- This is a generic superclass. To implement a specific way of
- storing credentials locally you'll need to subclass this class,
- and implement `do_save` and `do_load`.
- """
- def __init__(self, credential_save_failed=None):
- """Constructor.
- :param credential_save_failed: A callback to be invoked if the
- save to local storage fails. You should never invoke this
- callback yourself! Instead, you should raise an exception
- from do_save().
- """
- self.credential_save_failed = credential_save_failed
- def save(self, credentials, unique_consumer_id):
- """Save the credentials and invoke the callback on failure.
- Do not override this method when subclassing. Override
- do_save() instead.
- """
- try:
- self.do_save(credentials, unique_consumer_id)
- except EXPLOSIVE_ERRORS:
- raise
- except Exception as e:
- if self.credential_save_failed is None:
- raise e
- self.credential_save_failed()
- return credentials
- def do_save(self, credentials, unique_consumer_id):
- """Store newly-authorized credentials locally for later use.
- :param credentials: A Credentials object to save.
- :param unique_consumer_id: A string uniquely identifying an
- OAuth consumer on a Launchpad instance.
- """
- raise NotImplementedError()
- def load(self, unique_key):
- """Retrieve credentials from a local store.
- This method is the inverse of `save`.
- There's no special behavior in this method--it just calls
- `do_load`. There _is_ special behavior in `save`, and this
- way, developers can remember to implement `do_save` and
- `do_load`, not `do_save` and `load`.
- :param unique_key: A string uniquely identifying an OAuth consumer
- on a Launchpad instance.
- :return: A `Credentials` object if one is found in the local
- store, and None otherise.
- """
- return self.do_load(unique_key)
- def do_load(self, unique_key):
- """Retrieve credentials from a local store.
- This method is the inverse of `do_save`.
- :param unique_key: A string uniquely identifying an OAuth consumer
- on a Launchpad instance.
- :return: A `Credentials` object if one is found in the local
- store, and None otherise.
- """
- raise NotImplementedError()
- class KeyringCredentialStore(CredentialStore):
- """Store credentials in the GNOME keyring or KDE wallet.
- This is a good solution for desktop applications and interactive
- scripts. It doesn't work for non-interactive scripts, or for
- integrating third-party websites into Launchpad.
- """
- B64MARKER = b"<B64>"
- def __init__(self, credential_save_failed=None, fallback=False):
- super(KeyringCredentialStore, self).__init__(credential_save_failed)
- self._fallback = None
- if fallback:
- self._fallback = MemoryCredentialStore(credential_save_failed)
- @staticmethod
- def _ensure_keyring_imported():
- """Ensure the keyring module is imported (postponing side effects).
- The keyring module initializes the environment-dependent backend at
- import time (nasty). We want to avoid that initialization because it
- may do things like prompt the user to unlock their password store
- (e.g., KWallet).
- """
- if "keyring" not in globals():
- global keyring
- import keyring
- if "NoKeyringError" not in globals():
- global NoKeyringError
- try:
- from keyring.errors import NoKeyringError
- except ImportError:
- NoKeyringError = RuntimeError
- def do_save(self, credentials, unique_key):
- """Store newly-authorized credentials in the keyring."""
- self._ensure_keyring_imported()
- serialized = credentials.serialize()
- # Some users have reported problems with corrupted keyrings, both in
- # Gnome and KDE, when newlines are included in the password. Avoid
- # this problem by base 64 encoding the serialized value.
- serialized = self.B64MARKER + b64encode(serialized)
- try:
- keyring.set_password(
- "launchpadlib", unique_key, serialized.decode("utf-8")
- )
- except NoKeyringError as e:
- # keyring < 21.2.0 raises RuntimeError rather than anything more
- # specific. Make sure it's the exception we're interested in.
- if (
- NoKeyringError == RuntimeError
- and "No recommended backend was available" not in str(e)
- ):
- raise
- if self._fallback:
- self._fallback.save(credentials, unique_key)
- else:
- raise
- def do_load(self, unique_key):
- """Retrieve credentials from the keyring."""
- self._ensure_keyring_imported()
- try:
- credential_string = keyring.get_password(
- "launchpadlib", unique_key
- )
- except NoKeyringError as e:
- # keyring < 21.2.0 raises RuntimeError rather than anything more
- # specific. Make sure it's the exception we're interested in.
- if (
- NoKeyringError == RuntimeError
- and "No recommended backend was available" not in str(e)
- ):
- raise
- if self._fallback:
- return self._fallback.load(unique_key)
- else:
- raise
- if credential_string is not None:
- if isinstance(credential_string, unicode_type):
- credential_string = credential_string.encode("utf8")
- if credential_string.startswith(self.B64MARKER):
- try:
- credential_string = b64decode(
- credential_string[len(self.B64MARKER) :]
- )
- except TypeError:
- # The credential_string should be base 64 but cannot be
- # decoded.
- return None
- try:
- credentials = Credentials.from_string(credential_string)
- return credentials
- except Exception:
- # If any error occurs at this point the most reasonable thing
- # to do is return no credentials, which will require
- # re-authorization but the user will be able to proceed.
- return None
- return None
- class UnencryptedFileCredentialStore(CredentialStore):
- """Store credentials unencrypted in a file on disk.
- This is a good solution for scripts that need to run without any
- user interaction.
- """
- def __init__(self, filename, credential_save_failed=None):
- super(UnencryptedFileCredentialStore, self).__init__(
- credential_save_failed
- )
- self.filename = filename
- def do_save(self, credentials, unique_key):
- """Save the credentials to disk."""
- credentials.save_to_path(self.filename)
- def do_load(self, unique_key):
- """Load the credentials from disk."""
- if (
- os.path.exists(self.filename)
- and not os.stat(self.filename)[stat.ST_SIZE] == 0
- ):
- return Credentials.load_from_path(self.filename)
- return None
- class MemoryCredentialStore(CredentialStore):
- """CredentialStore that stores keys only in memory.
- This can be used to provide a CredentialStore instance without
- actually saving any key to persistent storage.
- """
- def __init__(self, credential_save_failed=None):
- super(MemoryCredentialStore, self).__init__(credential_save_failed)
- self._credentials = {}
- def do_save(self, credentials, unique_key):
- """Store the credentials in our dict"""
- self._credentials[unique_key] = credentials
- def do_load(self, unique_key):
- """Retrieve the credentials from our dict"""
- return self._credentials.get(unique_key)
- class RequestTokenAuthorizationEngine(object):
- """The superclass of all request token authorizers.
- This base class does not implement request token authorization,
- since that varies depending on how you want the end-user to
- authorize a request token. You'll need to subclass this class and
- implement `make_end_user_authorize_token`.
- """
- UNAUTHORIZED_ACCESS_LEVEL = "UNAUTHORIZED"
- def __init__(
- self,
- service_root,
- application_name=None,
- consumer_name=None,
- allow_access_levels=None,
- ):
- """Base class initialization.
- :param service_root: The root of the Launchpad instance being
- used.
- :param application_name: The name of the application that
- wants to use launchpadlib. This is used in conjunction
- with a desktop-wide integration.
- If you specify this argument, your values for
- consumer_name and allow_access_levels are ignored.
- :param consumer_name: The OAuth consumer name, for an
- application that wants its own point of integration into
- Launchpad. In almost all cases, you want to specify
- application_name instead and do a desktop-wide
- integration. The exception is when you're integrating a
- third-party website into Launchpad.
- :param allow_access_levels: A list of the Launchpad access
- levels to present to the user. ('READ_PUBLIC' and so on.)
- Your value for this argument will be ignored during a
- desktop-wide integration.
- :type allow_access_levels: A list of strings.
- """
- self.service_root = uris.lookup_service_root(service_root)
- self.web_root = uris.web_root_for_service_root(service_root)
- if application_name is None and consumer_name is None:
- raise ValueError(
- "You must provide either application_name or consumer_name."
- )
- if application_name is not None and consumer_name is not None:
- raise ValueError(
- "You must provide only one of application_name and "
- "consumer_name. (You provided %r and %r.)"
- % (application_name, consumer_name)
- )
- if consumer_name is None:
- # System-wide integration. Create a system-wide consumer
- # and identify the application using a separate
- # application name.
- allow_access_levels = ["DESKTOP_INTEGRATION"]
- consumer = SystemWideConsumer(application_name)
- else:
- # Application-specific integration. Use the provided
- # consumer name to create a consumer automatically.
- consumer = Consumer(consumer_name)
- application_name = consumer_name
- self.consumer = consumer
- self.application_name = application_name
- self.allow_access_levels = allow_access_levels or []
- @property
- def unique_consumer_id(self):
- """Return a string identifying this consumer on this host."""
- return self.consumer.key + "@" + self.service_root
- def authorization_url(self, request_token):
- """Return the authorization URL for a request token.
- This is the URL the end-user must visit to authorize the
- token. How exactly does this happen? That depends on the
- subclass implementation.
- """
- page = "%s?oauth_token=%s" % (authorize_token_page, request_token)
- allow_permission = "&allow_permission="
- if len(self.allow_access_levels) > 0:
- page += allow_permission + allow_permission.join(
- self.allow_access_levels
- )
- return urljoin(self.web_root, page)
- def __call__(self, credentials, credential_store):
- """Authorize a token and associate it with the given credentials.
- If the credential store runs into a problem storing the
- credential locally, the `credential_save_failed` callback will
- be invoked. The callback will not be invoked if there's a
- problem authorizing the credentials.
- :param credentials: A `Credentials` object. If the end-user
- authorizes these credentials, this object will have its
- .access_token property set.
- :param credential_store: A `CredentialStore` object. If the
- end-user authorizes the credentials, they will be
- persisted locally using this object.
- :return: If the credentials are successfully authorized, the
- return value is the `Credentials` object originally passed
- in. Otherwise the return value is None.
- """
- request_token_string = self.get_request_token(credentials)
- # Hand off control to the end-user.
- self.make_end_user_authorize_token(credentials, request_token_string)
- if credentials.access_token is None:
- # The end-user refused to authorize the application.
- return None
- # save() invokes the callback on failure.
- credential_store.save(credentials, self.unique_consumer_id)
- return credentials
- def get_request_token(self, credentials):
- """Get a new request token from the server.
- :param return: The request token.
- """
- authorization_json = credentials.get_request_token(
- web_root=self.web_root, token_format=Credentials.DICT_TOKEN_FORMAT
- )
- return authorization_json["oauth_token"]
- def make_end_user_authorize_token(self, credentials, request_token):
- """Authorize the given request token using the given credentials.
- Your subclass must implement this method: it has no default
- implementation.
- Because an access token may expire or be revoked in the middle
- of a session, this method may be called at arbitrary points in
- a launchpadlib session, or even multiple times during a single
- session (with a different request token each time).
- In most cases, however, this method will be called at the
- beginning of a launchpadlib session, or not at all.
- """
- raise NotImplementedError()
- class AuthorizeRequestTokenWithURL(RequestTokenAuthorizationEngine):
- """Authorize using a URL.
- This authorizer simply shows the URL for the user to open for
- authorization, and waits until the server responds.
- """
- WAITING_FOR_USER = (
- "Please open this authorization page:\n"
- " (%s)\n"
- "in your browser. Use your browser to authorize\n"
- "this program to access Launchpad on your behalf."
- )
- WAITING_FOR_LAUNCHPAD = "Press Enter after authorizing in your browser."
- def output(self, message):
- """Display a message.
- By default, prints the message to standard output. The message
- does not require any user interaction--it's solely
- informative.
- """
- print(message)
- def notify_end_user_authorization_url(self, authorization_url):
- """Notify the end-user of the URL."""
- self.output(self.WAITING_FOR_USER % authorization_url)
- def check_end_user_authorization(self, credentials):
- """Check if the end-user authorized"""
- try:
- credentials.exchange_request_token_for_access_token(self.web_root)
- except HTTPError as e:
- if e.response.status == 403:
- # The user decided not to authorize this
- # application.
- raise EndUserDeclinedAuthorization(e.content)
- else:
- if e.response.status != 401:
- # There was an error accessing the server.
- print("Unexpected response from Launchpad:")
- print(e)
- # The user has not made a decision yet.
- raise EndUserNoAuthorization(e.content)
- return credentials.access_token is not None
- def wait_for_end_user_authorization(self, credentials):
- """Wait for the end-user to authorize"""
- self.output(self.WAITING_FOR_LAUNCHPAD)
- stdin.readline()
- self.check_end_user_authorization(credentials)
- def make_end_user_authorize_token(self, credentials, request_token):
- """Have the end-user authorize the token using a URL."""
- authorization_url = self.authorization_url(request_token)
- self.notify_end_user_authorization_url(authorization_url)
- self.wait_for_end_user_authorization(credentials)
- class AuthorizeRequestTokenWithBrowser(AuthorizeRequestTokenWithURL):
- """Authorize using a URL that pops-up automatically in a browser.
- This authorizer simply opens up the end-user's web browser to a
- Launchpad URL and lets the end-user authorize the request token
- themselves.
- This is the same as its superclass, except this class also
- performs the browser automatic opening of the URL.
- """
- WAITING_FOR_USER = (
- "The authorization page:\n"
- " (%s)\n"
- "should be opening in your browser. Use your browser to authorize\n"
- "this program to access Launchpad on your behalf."
- )
- TIMEOUT_MESSAGE = "Press Enter to continue or wait (%d) seconds..."
- TIMEOUT = 5
- TERMINAL_BROWSERS = (
- "www-browser",
- "links",
- "links2",
- "lynx",
- "elinks",
- "elinks-lite",
- "netrik",
- "w3m",
- )
- WAITING_FOR_LAUNCHPAD = (
- "Waiting to hear from Launchpad about your decision..."
- )
- def __init__(
- self,
- service_root,
- application_name,
- consumer_name=None,
- credential_save_failed=None,
- allow_access_levels=None,
- ):
- """Constructor.
- :param service_root: See `RequestTokenAuthorizationEngine`.
- :param application_name: See `RequestTokenAuthorizationEngine`.
- :param consumer_name: The value of this argument is
- ignored. If we have the capability to open the end-user's
- web browser, we must be running on the end-user's computer,
- so we should do a full desktop integration.
- :param credential_save_failed: See `RequestTokenAuthorizationEngine`.
- :param allow_access_levels: The value of this argument is
- ignored, for the same reason as consumer_name.
- """
- # It doesn't look like we're doing anything here, but we
- # are discarding the passed-in values for consumer_name and
- # allow_access_levels.
- super(AuthorizeRequestTokenWithBrowser, self).__init__(
- service_root, application_name, None, credential_save_failed
- )
- def notify_end_user_authorization_url(self, authorization_url):
- """Notify the end-user of the URL."""
- super(
- AuthorizeRequestTokenWithBrowser, self
- ).notify_end_user_authorization_url(authorization_url)
- try:
- browser_obj = webbrowser.get()
- browser = getattr(browser_obj, "basename", None)
- console_browser = browser in self.TERMINAL_BROWSERS
- except webbrowser.Error:
- browser_obj = None
- console_browser = False
- if console_browser:
- self.output(self.TIMEOUT_MESSAGE % self.TIMEOUT)
- # Wait a little time before attempting to launch browser,
- # give users the chance to press a key to skip it anyway.
- rlist, _, _ = select([stdin], [], [], self.TIMEOUT)
- if rlist:
- stdin.readline()
- if browser_obj is not None:
- webbrowser.open(authorization_url)
- def wait_for_end_user_authorization(self, credentials):
- """Wait for the end-user to authorize"""
- self.output(self.WAITING_FOR_LAUNCHPAD)
- start_time = time.time()
- while credentials.access_token is None:
- time.sleep(access_token_poll_time)
- try:
- if self.check_end_user_authorization(credentials):
- break
- except EndUserNoAuthorization:
- pass
- if time.time() >= start_time + access_token_poll_timeout:
- raise TokenAuthorizationTimedOut(
- "Timed out after %d seconds." % access_token_poll_timeout
- )
- class TokenAuthorizationException(Exception):
- pass
- class RequestTokenAlreadyAuthorized(TokenAuthorizationException):
- pass
- class EndUserAuthorizationFailed(TokenAuthorizationException):
- """Superclass exception for all failures of end-user authorization"""
- pass
- class EndUserDeclinedAuthorization(EndUserAuthorizationFailed):
- """End-user declined authorization"""
- pass
- class EndUserNoAuthorization(EndUserAuthorizationFailed):
- """End-user did not perform any authorization"""
- pass
- class TokenAuthorizationTimedOut(EndUserNoAuthorization):
- """End-user did not perform any authorization in timeout period"""
- pass
- class ClientError(TokenAuthorizationException):
- pass
- class ServerError(TokenAuthorizationException):
- pass
- class NoLaunchpadAccount(TokenAuthorizationException):
- pass
- class TooManyAuthenticationFailures(TokenAuthorizationException):
- pass
|