#
# -*- coding: utf-8 -*-
#
# Copyright (C) 2008-2009 Red Hat, Inc.
# This file is part of python-fedora
#
# python-fedora is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# python-fedora is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with python-fedora; if not, see
#
'''
Cross-site Request Forgery Protection.
http://en.wikipedia.org/wiki/Cross-site_request_forgery
.. moduleauthor:: John (J5) Palmieri
.. moduleauthor:: Luke Macken
.. versionadded:: 0.3.17
'''
import logging
from webob import Request
from paste.httpexceptions import HTTPFound
from paste.response import replace_header
from repoze.who.interfaces import IMetadataProvider
from zope.interface import implements
try:
from hashlib import sha1
except ImportError:
from sha import sha as sha1
from fedora.urlutils import update_qs
from fedora import _
log = logging.getLogger(__name__)
class CSRFProtectionMiddleware(object):
'''
CSRF Protection WSGI Middleware.
A layer of WSGI middleware that is responsible for making sure authenticated
requests originated from the user inside of the app's domain
and not a malicious website.
This middleware works with the :mod:`repoze.who` middleware, and requires
that it is placed below :mod:`repoze.who` in the WSGI stack,
since it relies upon ``repoze.who.identity`` to exist in the environ before
it is called.
To utilize this middleware, you can just add it to your WSGI stack below
the :mod:`repoze.who` middleware. Here is an example of utilizing the
`CSRFProtectionMiddleware` within a TurboGears2 application.
In your ``project/config/middleware.py``, you would wrap your main
application with the `CSRFProtectionMiddleware`, like so:
.. code-block:: python
from fedora.wsgi.csrf import CSRFProtectionMiddleware
def make_app(global_conf, full_stack=True, **app_conf):
app = make_base_app(global_conf, wrap_app=CSRFProtectionMiddleware,
full_stack=full_stack, **app_conf)
=== From here on is broken ===
The :class:`moksha.api.widgets.moksha:MokshaGlobals` widget then needs to
be rendered in every page, which automatically handles injecting the CSRF
token. This widget is registerd as a Moksha Global Resource, and Moksha's
default index template handles injecting this by default, but you
can easily render Moksha's global resource injection widget in your own
applications template by doing the following in your master template::
${tmpl_context.moksha_global_resources()}
URLs can then be re-written using the ``moksha.csrf_rewrite_url`` function
that is in the ``moksha.js`` library, which is automatically pulled in by
the MokshaGlobals widget. Here is an example of adding the CSRF token to
an ajax. This example also utilizes the ``moksha.filter_resources``
function to strip out any duplicate javascript files.
.. code-block:: javascript
$.ajax({
url: moksha.csrf_rewrite_url('/widgets/%(id)s'),
success: function(data, status) {
var $panel = $('#%(id)s_panel');
var $stripped = moksha.filter_resources(data);
$panel.html($stripped);
}
});
'''
def __init__(self, application, csrf_token_id='_csrf_token',
clear_env='repoze.who.identity repoze.what.credentials',
token_env='CSRF_TOKEN', auth_state='CSRF_AUTH_STATE'):
'''
Initialize the CSRF Protection WSGI Middleware.
:csrf_token_id: The name of the CSRF token variable
:clear_env: Variables to clear out of the `environ` on invalid token
:token_env: The name of the token variable in the environ
:auth_state: The environ key that will be set when we are logging in
'''
log.info(_('Creating CSRFProtectionMiddleware'))
self.application = application
self.csrf_token_id = csrf_token_id
self.clear_env = clear_env.split()
self.token_env = token_env
self.auth_state = auth_state
def _clean_environ(self, environ):
''' Delete the ``keys`` from the supplied ``environ`` '''
log.debug(_('clean_environ(%s)') % self.clear_env)
for key in self.clear_env:
if key in environ:
log.debug(_('Deleting %(key)s from environ') % {'key': key})
del(environ[key])
def __call__(self, environ, start_response):
'''
This method is called for each request. It looks for a user-supplied
CSRF token in the GET/POST parameters, and compares it to the token
attached to ``environ['repoze.who.identity']['_csrf_token']``. If it
does not match, or if a token is not provided, it will remove the
user from the ``environ``, based on the ``clear_env`` setting.
'''
request = Request(environ)
log.debug(_('CSRFProtectionMiddleware(%(r_path)s)') %
{'r_path': request.path})
token = environ.get('repoze.who.identity', {}).get(self.csrf_token_id)
csrf_token = environ.get(self.token_env)
if token and csrf_token and token == csrf_token:
log.debug(_('User supplied CSRF token matches environ!'))
else:
if not environ.get(self.auth_state):
log.debug(_('Clearing identity'))
self._clean_environ(environ)
if csrf_token:
log.warning(_('Invalid CSRF token. User supplied'
' (%(u_token)s) does not match what\'s in our'
' environ (%(e_token)s)') %
{'u_token': csrf_token, 'e_token': token})
response = request.get_response(self.application)
if environ.get(self.auth_state):
log.debug(_('CSRF_AUTH_STATE; rewriting headers'))
token = environ.get('repoze.who.identity', {})\
.get(self.csrf_token_id)
loc = update_qs(response.location, {self.csrf_token_id: str(token)})
response.location = loc
log.debug(_('response.location = %(r_loc)s') %
{'r_loc': response.location})
environ[self.auth_state] = None
return response(environ, start_response)
class CSRFMetadataProvider(object):
'''
Repoze.who CSRF Metadata Provider Plugin.
This metadata provider is called with an authenticated users identity
automatically by repoze.who. It will then take the SHA1 hash of the
users session cookie, and set it as the CSRF token in
``environ['repoze.who.identity']['_csrf_token']``.
This plugin will also set ``CSRF_AUTH_STATE`` in the environ if the user
has just authenticated during this request.
To enable this plugin in a TurboGears2 application, you can
add the following to your ``project/config/app_cfg.py``
.. code-block:: python
from fedora.wsgi.csrf import CSRFMetadataProvider
base_config.sa_auth.mdproviders = [('csrfmd', CSRFMetadataProvider())]
Note: If you use the faswho plugin, this is turned on automatically.
'''
implements(IMetadataProvider)
def __init__(self, csrf_token_id='_csrf_token', session_cookie='tg-visit',
clear_env='repoze.who.identity repoze.what.credentials',
login_handler='/post_login', token_env='CSRF_TOKEN',
auth_session_id='CSRF_AUTH_SESSION_ID',
auth_state='CSRF_AUTH_STATE'):
'''
Create the CSRF Metadata Provider Plugin.
:kwarg csrf_token_id: The name of the CSRF token variable
:kwarg session_cookie: The name of the session cookie
:kwarg login_handler: The path to the login handler, used to determine
if the user logged in during this request
:kwarg token_env: The name of the token variable in the environ
:kwarg auth_session_id: The environ key containing an optional
session id
:kwarg auth_state: The environ key that indicates when we are
logging in
'''
self.csrf_token_id = csrf_token_id
self.session_cookie = session_cookie
self.clear_env = clear_env
self.login_handler = login_handler
self.token_env = token_env
self.auth_session_id = auth_session_id
self.auth_state = auth_state
def strip_script(self, environ, path):
# Strips the script portion of a url path so the middleware works even
# when mounted under a path other than root
if path.startswith('/') and 'SCRIPT_NAME' in environ:
prefix = environ.get('SCRIPT_NAME')
if prefix.endswith('/'):
prefix = prefix[:-1]
if path.startswith(prefix):
path = path[len(prefix):]
return path
def add_metadata(self, environ, identity):
request = Request(environ)
log.debug(_('CSRFMetadataProvider.add_metadata(%(r_path)s)')
% {'r_path': request.path})
session_id = environ.get(self.auth_session_id)
if not session_id:
session_id = request.cookies.get(self.session_cookie)
log.debug(_('session_id = %(s_id)r') % {'s_id': session_id})
if session_id and session_id != 'Set-Cookie:':
token = sha1(session_id).hexdigest()
identity.update({self.csrf_token_id: token})
log.debug(_('Identity updated with CSRF token'))
path = self.strip_script(environ, request.path)
if path == self.login_handler:
log.debug(_('Setting CSRF_AUTH_STATE'))
environ[self.auth_state] = True
environ[self.token_env] = token
else:
environ[self.token_env] = self.extract_csrf_token(request)
app = environ.get('repoze.who.application')
if app:
# This occurs during login in some application configurations
if isinstance(app, HTTPFound) and environ.get(self.auth_state):
log.debug(_('Got HTTPFound(302) from'
' repoze.who.application'))
loc = update_qs(app.location, {self.csrf_token_id:
str(token)})
headers = app.headers.items()
replace_header(headers, 'location', loc)
app.headers = headers
log.debug(_('Altered headers: %(headers)s') % {'headers':
str(app.headers)})
else:
log.warning(_('Invalid session cookie %(s_id)r, not setting CSRF'
' token!') % {'s_id': session_id})
def extract_csrf_token(self, request):
'''Extract and remove the CSRF token from a given
:class:`webob.Request`
'''
csrf_token = None
if self.csrf_token_id in request.GET:
log.debug(_("%(token)s in GET") % {'token': self.csrf_token_id})
csrf_token = request.GET[self.csrf_token_id]
del(request.GET[self.csrf_token_id])
request.query_string = '&'.join(['%s=%s' % (k, v) for k, v in
request.GET.items()])
if self.csrf_token_id in request.POST:
log.debug(_("%(token)s in POST") % {'token': self.csrf_token_id})
csrf_token = request.POST[self.csrf_token_id]
del(request.POST[self.csrf_token_id])
return csrf_token