services/auth/mozilla_sreg.py
author Ryan Kelly <rfkelly@mozilla.com>
Tue, 15 Jul 2014 15:57:05 +1000
changeset 1075 e6a6f99722a8
parent 970 8cc08a17f2a5
permissions -rw-r--r--
Fix mock_wsgi context-manager to actually install the mocks.

# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Sync Server
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2010
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
#   Tarek Ziade (tarek@mozilla.com)
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
""" Mozilla Authentication using a two-tier system
"""
import simplejson as json
import urlparse

from services.exceptions import BackendError
from services.http_helpers import get_url
from services.auth.ldapsql import LDAPAuth
from services.ldappool import StateConnector
from services.auth import NoEmailError, InvalidCodeError
from services.respcodes import ERROR_NO_EMAIL_ADDRESS, ERROR_INVALID_RESET_CODE


class MozillaAuth(LDAPAuth):
    """LDAP authentication."""

    def __init__(self, ldapuri, sreg_location, sreg_path, sreg_scheme='https',
                 use_tls=False, bind_user='binduser',
                 bind_password='binduser', admin_user='adminuser',
                 admin_password='adminuser', users_root='ou=users,dc=mozilla',
                 users_base_dn=None, pool_size=100, pool_recycle=3600,
                 reset_on_return=True, single_box=False, ldap_timeout=-1,
                 nodes_scheme='https', check_account_state=True,
                 create_tables=False, ldap_pool_size=10, ldap_use_pool=False,
                 connector_cls=StateConnector, **kw):

        super(MozillaAuth, self).__init__(ldapuri, None, use_tls, bind_user,
                                     bind_password, admin_user,
                                     admin_password, users_root,
                                     users_base_dn, pool_size, pool_recycle,
                                     reset_on_return, single_box, ldap_timeout,
                                     nodes_scheme, check_account_state,
                                     create_tables, ldap_pool_size,
                                     ldap_use_pool, connector_cls)

        self.sreg_location = sreg_location
        self.sreg_scheme = sreg_scheme
        self.sreg_path = sreg_path

    def _proxy(self, method, url, data=None, headers=None):
        """Proxies and return the result from the other server.

        - scheme: http or https
        - netloc: proxy location
        """
        if data is not None:
            data = json.dumps(data)

        status, headers, body = get_url(url, method, data, headers)

        if body:
            try:
                body = json.loads(body)
            except Exception:
                self.logger.error("bad json body from sreg (%s): %s" %
                                                            (url, body))

        return status, body

    def create_user(self, username, password, email):
        """Creates a user. Returns True on success."""
        payload = {'password': password, 'email': email}
        url = self.generate_url(username)
        status, body = self._proxy('PUT', url, payload)
        if status != 200:
            raise BackendError()

        # the result is the username on success
        return body == username

    def generate_reset_code(self, user_id, overwrite=True):
        """Sends a reset code by e-mail

        Args:
            user_id: user id
            overwrite: if True, overwrites an existing code

        Returns:
            True if reset code was generated and sent to user, False otherwise
        """
        username = self._get_username(user_id)
        status, body = self._proxy('GET',
                             self.generate_url(username,
                                               'password_reset_code'))
        if status == 200:
            return body == 0

        if status == 400:
            if body == ERROR_NO_EMAIL_ADDRESS:
                raise NoEmailError()

        raise BackendError()

    def verify_reset_code(self, user_id, code):
        """Verify a reset code

        Args:
            user_id: user id
            code: reset code

        Returns:
            True or False
        """
        raise NotImplementedError()

    def clear_reset_code(self, user_id):
        """Clears the reset code

        Args:
            user_id: user id

        Returns:
            True if the change was successful, False otherwise
        """
        # handled by sreg
        username = self._get_username(user_id)
        status, body = self._proxy('DELETE', self.generate_url(username,
                                                        'password_reset_code'))
        if status != 200:
            raise BackendError()

        return body == 0

    def delete_user(self, user_id, password=None):
        """Deletes the user

        Args:
            user_id: user id
            password: user password

        Returns:
            True if the change was successful, False otherwise
        """
        if password is None:
            return False

        payload = {'password': password}
        username = self._get_username(user_id)
        url = self.generate_url(username)
        status, body = self._proxy('DELETE', url, payload)
        if status != 200:
            raise BackendError()

        return body == 0

    def get_user_node(self, user_id, assign=True):
        if self.single_box:
            return None

        node = super(MozillaAuth, self).get_user_node(user_id,
                                                      assign=False)
        if node is not None or assign is False:
            return node

        username = self._get_username(user_id)
        url = self.generate_url(username, 'node/weave')
        status, body = self._proxy('GET', url)
        if status != 200:
            raise BackendError()

        return body

    def admin_update_password(self, user_id, new_password, key):
        """Change the user password.

        Uses the admin bind or the user bind if the old password is provided.

        Args:
            user_id: user id
            password: new password
            key: the reset code

        Returns:
            True if the change was successful, False otherwise
        """
        payload = {'reset_code': key, 'password': new_password}
        username = self._get_username(user_id)
        url = self.generate_url(username, 'password')
        status, body = self._proxy('POST', url, payload)
        if status == 200:
            return body == 0
        elif status == 400:
            if body == ERROR_INVALID_RESET_CODE:
                raise InvalidCodeError()

        raise BackendError()

    def generate_url(self, username, additional_path=None):
        path = "%s/%s" % (self.sreg_path, username)
        if additional_path:
            path = "%s/%s" % (path, additional_path)

        url = urlparse.urlunparse([self.sreg_scheme, self.sreg_location,
                                  path,
                                  None, None, None])
        return url