Source code for flask_acl.extension

from __future__ import absolute_import

import functools
import logging
from pprint import pformat
from urllib import urlencode

import flask
from flask import request, current_app
from flask.ext.login import current_user
import werkzeug as wz

from flask_acl.core import iter_object_acl, get_object_context, check
from flask_acl.permission import default_permission_sets
from flask_acl.predicate import default_predicates


log = logging.getLogger(__name__)


class _Redirect(Exception):
    pass


[docs]class ACLManager(object): """Flask extension for registration and checking of ACLs on routes and other objects.""" login_view = 'login' def __init__(self, app=None): self._context_processors = [] self.permission_sets = default_permission_sets.copy() self.predicates = default_predicates.copy() if app: self.init_app(app) def init_app(self, app): app.acl_manager = self app.extensions['acl'] = self app.config.setdefault('ACL_ROUTE_DEFAULT_STATE', True) # I suspect that Werkzeug has something for this already... app.errorhandler(_Redirect)(lambda r: flask.redirect(r.args[0]))
[docs] def predicate(self, name, predicate=None): """Define a new predicate (direclty, or as a decorator). E.g.:: @authz.predicate def ROOT(user, **ctx): # return True of user is in group "wheel". """ if predicate is None: return functools.partial(self.predicate, name) self.predicates[name] = predicate return predicate
[docs] def permission_set(self, name, permission_set=None): """Define a new permission set (directly, or as a decorator).""" if permission_set is None: return functools.partial(self.permission_set, name) self.permission_sets[name] = permission_set return permission_set
[docs] def context_processor(self, func): """Register a function to build authorization contexts. The function is called with no arguments, and must return a dict of new context material. """ self._context_processors.append(func)
[docs] def route_acl(self, *acl, **options): """Decorator to attach an ACL to a route. E.g:: @app.route('/url/to/view') @authz.route_acl(''' ALLOW WHEEL ALL DENY ANY ALL ''') def my_admin_function(): pass """ def _route_acl(func): func.__acl__ = acl @functools.wraps(func) def wrapped(*args, **kwargs): permission = 'http.' + request.method.lower() local_opts = options.copy() local_opts.setdefault('default', current_app.config['ACL_ROUTE_DEFAULT_STATE']) self.assert_can(permission, func, **local_opts) return func(*args, **kwargs) return wrapped return _route_acl
[docs] def can(self, permission, obj, **kwargs): """Check if we can do something with an object. :param permission: The permission to look for. :param obj: The object to check the ACL of. :param **kwargs: The context to pass to predicates. >>> auth.can('read', some_object) >>> auth.can('write', another_object, group=some_group) """ context = {'user': current_user} for func in self._context_processors: context.update(func()) context.update(get_object_context(obj)) context.update(kwargs) return check(permission, iter_object_acl(obj), **context)
[docs] def assert_can(self, permission, obj, **kwargs): """Make sure we have a permission, or abort the request. :param permission: The permission to look for. :param obj: The object to check the ACL of. :param flash: The message to flask if denied (keyword only). :param stealth: Abort with a 404? (keyword only). :param **kwargs: The context to pass to predicates. """ flash_message = kwargs.pop('flash', None) stealth = kwargs.pop('stealth', False) default = kwargs.pop('default', None) res = self.can(permission, obj, **kwargs) res = default if res is None else res if not res: if flash_message and not stealth: flask.flash(flash_message, 'danger') if current_user.is_authenticated(): if flash_message is not False: flask.flash(flash_message or 'You are not permitted to "%s" this resource' % permission) flask.abort(403) elif not stealth and self.login_view: if flash_message is not False: flask.flash(flash_message or 'Please login for access.') raise _Redirect(flask.url_for(self.login_view) + '?' + urlencode(dict(next= flask.request.script_root + flask.request.path ))) else: flask.abort(404)
[docs] def can_route(self, endpoint, method=None, **kwargs): """Make sure we can route to the given endpoint or url. This checks for `http.get` permission (or other methods) on the ACL of route functions, attached via the `ACL` decorator. :param endpoint: A URL or endpoint to check for permission to access. :param method: The HTTP method to check; defaults to `'GET'`. :param **kwargs: The context to pass to predicates. """ view = flask.current_app.view_functions.get(endpoint) if not view: endpoint, args = flask._request_ctx.top.match(endpoint) view = flask.current_app.view_functions.get(endpoint) if not view: return False return self.can('http.' + (method or 'GET').lower(), view, **kwargs)

Related Topics