# -*- coding: utf-8 -*-
#
# Copyright (C) 2008-2009 Ricky Zhou, Red Hat, Inc.
# This file is part of python-fedora
#
# python-fedora is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# python-fedora is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with python-fedora; if not, see
#
'''
Provide a client module for talking to the Fedora Account System.
.. moduleauthor:: Ricky Zhou
.. moduleauthor:: Toshio Kuratomi
'''
import urllib
import warnings
from fedora.client import DictContainer, BaseClient, FasProxyClient, \
AppError, FedoraServiceError, FedoraClientError
from fedora import __version__, _
### FIXME: To merge:
# /usr/bin/fasClient from fas
# API from Will Woods
# API from MyFedora
class FASError(FedoraClientError):
'''FAS Error'''
pass
class CLAError(FASError):
'''CLA Error'''
pass
USERFIELDS = ['affiliation', 'bugzilla_email', 'certificate_serial',
'comments', 'country_code', 'creation', 'email', 'emailtoken',
'facsimile', 'gpg_keyid', 'human_name', 'id', 'internal_comments',
'ircnick', 'latitude', 'last_seen', 'longitude', 'password',
'password_changed', 'passwordtoken', 'postal_address', 'privacy',
'locale', 'ssh_key', 'status', 'status_change', 'telephone',
'unverified_email', 'timezone', 'username', ]
class AccountSystem(BaseClient):
'''An object for querying the Fedora Account System.
The Account System object provides a python API for talking to the Fedora
Account System. It abstracts the http requests, cookie handling, and
other details so you can concentrate on the methods that are important to
your program.
'''
proxy = None
def __init__(self, base_url='https://admin.fedoraproject.org/accounts/',
*args, **kwargs):
'''Create the AccountSystem client object.
:kwargs base_url: Base of every URL used to contact the server.
Defaults to the Fedora Project FAS instance.
:kwargs useragent: useragent string to use. If not given, default to
"Fedora Account System Client/VERSION"
:kwargs debug: If True, log debug information
:kwargs username: username for establishing authenticated connections
:kwargs password: password to use with authenticated connections
:kwargs session_cookie: **Deprecated** Use session_id instead.
User's session_cookie to connect to the server
:kwargs session_id: user's session_id to connect to the server
:kwargs cache_session: if set to true, cache the user's session cookie
on the filesystem between runs.
'''
if 'useragent' not in kwargs:
kwargs['useragent'] = 'Fedora Account System Client/%s' \
% __version__
super(AccountSystem, self).__init__(base_url, *args, **kwargs)
# We need a single proxy for the class to verify username/passwords
# against.
if not self.proxy:
self.proxy = FasProxyClient(base_url, useragent=self.useragent,
session_as_cookie=False, debug=self.debug)
# Preseed a list of FAS accounts with bugzilla addresses
# This allows us to specify a different email for bugzilla than is
# in the FAS db. It is a hack, however, until FAS has a field for the
# bugzilla address.
self.__bugzilla_email = {
# Konstantin Ryabitsev: mricon@gmail.com
100029: 'icon@fedoraproject.org',
# Sean Reifschneider: jafo@tummy.com
100488: 'jafo-redhat@tummy.com',
# Karen Pease: karen-pease@uiowa.edu
100281: 'meme@daughtersoftiresias.org',
# Robert Scheck: redhat@linuxnetz.de
100093: 'redhat-bugzilla@linuxnetz.de',
# Scott Bakers: bakers@web-ster.com
100881: 'scott@perturb.org',
# Colin Charles: byte@aeon.com.my
100014: 'byte@fedoraproject.org',
# W. Michael Petullo: mike@flyn.org
100136: 'redhat@flyn.org',
# Elliot Lee: sopwith+fedora@gmail.com
100060: 'sopwith@redhat.com',
# Control Center Team: Bugzilla user but email doesn't exist
9908: 'control-center-maint@redhat.com',
# Máirín Duffy
100548: 'duffy@redhat.com',
# Muray McAllister: murray.mcallister@gmail.com
102321: 'mmcallis@redhat.com',
# William Jon McCann: mccann@jhu.edu
102489: 'jmccann@redhat.com',
# Matt Domsch's rebuild script -- bz email goes to /dev/null
103590: 'ftbfs@fedoraproject.org',
# Sindre Pedersen Bjørdal: foolish@guezz.net
100460 : 'sindrepb@fedoraproject.org',
# Jesus M. Rodriguez: jmrodri@gmail.com
102180: 'jesusr@redhat.com',
# Jeff Sheltren: jeff@osuosl.org
100058: 'sheltren@fedoraproject.org',
# Roozbeh Pournader: roozbeh@farsiweb.info
100350: 'roozbeh@gmail.com',
# Michael DeHaan: michael.dehaan@gmail.com
100603: 'mdehaan@redhat.com',
# Sebastian Gosenheimer: sgosenheimer@googlemail.com
103647: 'sebastian.gosenheimer@proio.com',
# Ben Konrath: bkonrath@redhat.com
101156: 'ben@bagu.org',
# Kai Engert: kaie@redhat.com
100399: 'kengert@redhat.com',
# William Jon McCann: william.jon.mccann@gmail.com
102952: 'jmccann@redhat.com',
# Simon Wesp: simon@w3sp.de
109464: 'cassmodiah@fedoraproject.org',
# Robert M. Albrecht: romal@gmx.de
101475: 'mail@romal.de',
# Mathieu Bridon: mathieu.bridon@gmail.com
100753: 'bochecha@fedoraproject.org',
# Davide Cescato: davide.cescato@iaeste.ch
123204: 'ceski@fedoraproject.org',
# Nick Bebout: nick@bebout.net
101458: 'nb@fedoraproject.org',
# Niels Haase: haase.niels@gmail.com
126862: 'arxs@fedoraproject.org',
# Thomas Janssen: th.p.janssen@googlemail.com
103110: 'thomasj@fedoraproject.org',
# Michael J Gruber: 'michaeljgruber+fedoraproject@gmail.com'
105113: 'mjg@fedoraproject.org',
# Juan Manuel Rodriguez Moreno: 'nushio@gmail.com'
101302: 'nushio@fedoraproject.org',
# Andrew Cagney: 'andrew.cagney@gmail.com'
102169: 'cagney@fedoraproject.org',
# Jeremy Katz: 'jeremy@katzbox.net'
100036: 'katzj@fedoraproject.org',
# Dominic Hopf: 'dmaphy@gmail.com'
124904: 'dmaphy@fedoraproject.org',
# Christoph Wickert: 'christoph.wickert@googlemail.com':
100271: 'cwickert@fedoraproject.org',
# Elliott Baron: 'elliottbaron@gmail.com'
106760: 'ebaron@fedoraproject.org',
# Thomas Spura: 'spurath@students.uni-mainz.de'
111433: 'tomspur@fedoraproject.org',
# Adam Miller: 'maxamillion@gmail.com'
110673: 'maxamillion@fedoraproject.org',
# Garrett Holmstrom: 'garrett.holmstrom@gmail.com'
131739: 'gholms@fedoraproject.org',
# Tareq Al Jurf: taljurf.fedora@gmail.com
109863: 'taljurf@fedoraproject.org',
# Josh Kayse: jokajak@gmail.com
148243: 'jokajak@fedoraproject.org',
# Behdad Esfahbod: fedora@behdad.org
100102: 'behdad@fedoraproject.org',
# Daniel Bruno: danielbrunos@gmail.com
101608: 'dbruno@fedoraproject.org',
}
# A few people have an email account that is used in owners.list but
# have setup a bugzilla account for their primary account system email
# address now. Map these here.
self.__alternate_email = {
# Damien Durand: splinux25@gmail.com
'splinux@fedoraproject.org': 100406,
# Kevin Fenzi: kevin@tummy.com
'kevin-redhat-bugzilla@tummy.com': 100037,
}
for bugzilla_map in self.__bugzilla_email.items():
self.__alternate_email[bugzilla_map[1]] = bugzilla_map[0]
# We use the two mappings as follows::
# When looking up a user by email, use __alternate_email.
# When looking up a bugzilla email address use __bugzilla_email.
#
# This allows us to parse in owners.list and have a value for all the
# emails in there while not using the alternate email unless it is
# the only option.
# TODO: Use exceptions properly
### Groups ###
def group_by_id(self, group_id):
'''Returns a group object based on its id'''
params = {'group_id': int(group_id)}
request = self.send_request('json/group_by_id', auth = True,
req_params = params)
if request['success']:
return request['group']
else:
return dict()
def group_by_name(self, groupname):
'''Returns a group object based on its name'''
params = {'groupname': groupname}
request = self.send_request('json/group_by_name', auth = True,
req_params = params)
if request['success']:
return request['group']
else:
raise AppError(message=_('FAS server unable to retrieve group %s')
% groupname, name='FASError')
def group_members(self, groupname):
'''Return a list of people approved for a group.
This method returns a list of people who are in the requested group.
The people are all approved in the group. Unapproved people are not
shown. The format of data is::
\[{'username': 'person1', 'role_type': 'user'},
\{'username': 'person2', 'role_type': 'sponsor'}]
role_type can be one of 'user', 'sponsor', or 'administrator'.
.. versionadded:: 0.3.2
'''
request = self.send_request('/group/dump/%s' %
urllib.quote(groupname), auth=True)
return [DictContainer(username=user[0], role_type=user[3])
for user in request['people']]
### People ###
def person_by_id(self, person_id):
'''Returns a person object based on its id'''
person_id = int(person_id)
params = {'person_id': person_id}
request = self.send_request('json/person_by_id', auth=True,
req_params=params)
if request['success']:
if person_id in self.__bugzilla_email:
request['person']['bugzilla_email'] = \
self.__bugzilla_email[person_id]
else:
request['person']['bugzilla_email'] = request['person']['email']
# In the new FAS, membership info is returned separately
if 'approved' in request:
request['person']['approved_memberships'] = request['approved']
if 'unapproved' in request:
request['person']['unapproved_memberships'] = \
request['unapproved']
return request['person']
else:
return dict()
def person_by_username(self, username):
'''Returns a person object based on its username'''
params = {'username': username}
request = self.send_request('json/person_by_username', auth = True,
req_params = params)
if request['success']:
person = request['person']
if person['id'] in self.__bugzilla_email:
person['bugzilla_email'] = self.__bugzilla_email[person['id']]
else:
person['bugzilla_email'] = person['email']
# In the new FAS, membership info is returned separately
if 'approved' in request:
request['person']['approved_memberships'] = request['approved']
if 'unapproved' in request:
request['person']['unapproved_memberships'] = \
request['unapproved']
return person
else:
return dict()
def user_id(self):
'''Returns a dict relating user IDs to usernames'''
request = self.send_request('json/user_id', auth=True)
people = {}
for person_id, username in request['people'].items():
# change userids from string back to integer
people[int(person_id)] = username
return people
def people_by_key(self, key=u'username', search=u'*', fields=None):
'''Return a dict of people
:kwarg key: Key by this field. Valid values are 'id', 'username', or
'email'. Default is 'username'
:kwarg search: Pattern to match usernames against. Defaults to the
'*' wildcard which matches everyone.
:kwarg fields: Limit the data returned to a specific list of fields.
The default is to retrieve all fields.
Valid fields are:
* username
* certificate_serial
* locale
* creation
* telephone
* status_change
* id
* password_changed
* privacy
* comments
* latitude
* email
* status
* gpg_keyid
* internal_comments
* postal_address
* unverified_email
* ssh_key
* passwordtoken
* ircnick
* password
* emailtoken
* longitude
* facsimile
* human_name
* last_seen
* bugzilla_email
Note that for most users who access this data, many of these
fields will be set to None due to security or privacy settings.
:returns: a dict relating the key value to the fields.
'''
# Make sure we have a valid key value
if key not in ('id', 'username', 'email'):
raise KeyError(_('key must be one of "id", "username", or "email"'))
if fields:
fields = list(fields)
for field in fields:
if field not in USERFIELDS:
raise KeyError(_('%(field)s is not a valid field to filter')
% {'field': field})
else:
fields = USERFIELDS
# Make sure we retrieve the key value
unrequested_fields = []
if key not in fields:
unrequested_fields.append(key)
fields.append(key)
if 'bugzilla_email' in fields:
# Need id and email for the bugzilla information
if 'id' not in fields:
unrequested_fields.append('id')
fields.append('id')
if 'email' not in fields:
unrequested_fields.append('email')
fields.append('email')
request = self.send_request('/user/list', req_params={'search': search,
'fields': [f for f in fields if f != 'bugzilla_email']}, auth=True)
people = DictContainer()
for person in request['people']:
# Retrieve bugzilla_email from our list if necessary
if 'bugzilla_email' in fields:
if person['id'] in self.__bugzilla_email:
person['bugzilla_email'] = \
self.__bugzilla_email[person['id']]
else:
person['bugzilla_email'] = person['email']
person_key = person[key]
# Remove any fields that weren't requested by the user
if unrequested_fields:
for field in unrequested_fields:
del person[field]
# Add the person record to the people dict
people[person_key] = person
return people
def people_by_id(self):
'''*Deprecated* Use people_by_key() instead.
Returns a dict relating user IDs to human_name, email, username,
and bugzilla email
'''
warnings.warn(_("people_by_id() is deprecated and will be removed in"
" 0.4. Please port your code to use people_by_key(key='id',"
" fields=['human_name', 'email', 'username', 'bugzilla_email'])"
" instead"), DeprecationWarning, stacklevel=2)
request = self.send_request('/json/user_id', auth=True)
user_to_id = {}
people = DictContainer()
for person_id, username in request['people'].items():
person_id = int(person_id)
# change userids from string back to integer
people[person_id] = {'username': username, 'id': person_id}
user_to_id[username] = person_id
# Retrieve further useful information about the users
request = self.send_request('/group/dump', auth=True)
for user in request['people']:
userid = user_to_id[user[0]]
person = people[userid]
person['email'] = user[1]
person['human_name'] = user[2]
if userid in self.__bugzilla_email:
person['bugzilla_email'] = self.__bugzilla_email[userid]
else:
person['bugzilla_email'] = person['email']
return people
### Utils ###
def people_by_groupname(self, groupname):
'''Return a list of persons for the given groupname.
:arg groupname: Name of the group to look up
:returns: A list of person objects from the group. If the group
contains no entries, then an empty list is returned.
'''
people = self.people_by_id()
group = dict(self.group_by_name(groupname))
userids = [user[u'person_id'] for user in
group[u'approved_roles'] + group[u'unapproved_roles']]
return [people[userid] for userid in userids]
### Configs ###
def get_config(self, username, application, attribute):
'''Return the config entry for the key values.
:arg username: Username of the person
:arg application: Application for which the config is set
:arg attribute: Attribute key to lookup
:raises AppError: if the server returns an exception
:returns: The unicode string that describes the value. If no entry
matched the username, application, and attribute then None is
returned.
'''
request = self.send_request('config/list/%s/%s/%s' %
(username, application, attribute), auth=True)
if 'exc' in request:
raise AppError(name = request['exc'], message = request['tg_flash'])
# Return the value if it exists, else None.
if 'configs' in request and attribute in request['configs']:
return request['configs'][attribute]
return None
def get_configs_like(self, username, application, pattern=u'*'):
'''Return the config entries that match the keys and the pattern.
Note: authentication on the server will prevent anyone but the user
or a fas admin from viewing or changing their configs.
:arg username: Username of the person
:arg application: Application for which the config is set
:kwarg pattern: A pattern to select values for. This accepts * as a
wildcard character. Default='*'
:raises AppError: if the server returns an exception
:returns: A dict mapping ``attribute`` to ``value``.
'''
request = self.send_request('config/list/%s/%s/%s' %
(username, application, pattern), auth=True)
if 'exc' in request:
raise AppError(name = request['exc'], message = request['tg_flash'])
return request['configs']
def set_config(self, username, application, attribute, value):
'''Set a config entry in FAS for the user.
Note: authentication on the server will prevent anyone but the user
or a fas admin from viewing or changing their configs.
:arg username: Username of the person
:arg application: Application for which the config is set
:arg attribute: The name of the config key that we're setting
:arg value: The value to set this to
:raises AppError: if the server returns an exception
'''
request = self.send_request('config/set/%s/%s/%s' %
(username, application, attribute),
req_params={'value': value}, auth=True)
if 'exc' in request:
raise AppError(name = request['exc'], message = request['tg_flash'])
def people_query(self, constraints=None, columns=None):
'''Returns a list of dicts representing database rows
:arg constraints: A dictionary specifying WHERE constraints on columns
:arg columns: A list of columns to be selected in the query
:raises AppError: if the query failed on the server (most likely
because the server was given a bad query)
:returns: A list of dicts representing database rows (the keys of
the dict are the columns requested)
.. versionadded:: 0.3.12.1
'''
if constraints is None:
constraints = {}
if columns is None:
columns = []
req_params = {}
req_params.update(constraints)
req_params['columns'] = ','.join(columns)
try:
request = self.send_request('json/people_query',
req_params=req_params, auth=True)
if request['success']:
return request['data']
else:
raise AppError(message=request['error'], name='FASError')
except FedoraServiceError:
raise
### Certs ###
def user_gencert(self):
'''Generate a cert for a user'''
try:
request = self.send_request('user/dogencert', auth=True)
except FedoraServiceError:
raise
if not request['cla']:
raise CLAError
return "%(cert)s\n%(key)s" % request
### Passwords ###
def verify_password(self, username, password):
'''Return whether the username and password pair are valid.
:arg username: username to try authenticating
:arg password: password for the user
:returns: True if the username/password are valid. False otherwise.
'''
return self.proxy.verify_password(username, password)
### fasClient Special Methods ###
def group_data(self, force_refresh=None):
'''Return administrators/sponsors/users and group type for all groups
:arg force_refresh: If true, the returned data will be queried from the
database, as opposed to memcached.
:raises AppError: if the query failed on the server
:returns: A dict mapping group names to the group type and the
user IDs of the administrator, sponsors, and users of the group.
.. versionadded:: 0.3.8
'''
params = {}
if force_refresh:
params['force_refresh'] = True
try:
request = self.send_request('json/fas_client/group_data',
req_params=params, auth=True)
if request['success']:
return request['data']
else:
raise AppError(message=_('FAS server unable to retrieve group'
' members'), name='FASError')
except FedoraServiceError:
raise
def user_data(self):
'''Return user data for all users in FAS
Note: If the user is not authorized to see password hashes,
'*' is returned for the hash.
:raises AppError: if the query failed on the server
:returns: A dict mapping user IDs to a username, password hash,
SSH public key, email address, and status.
.. versionadded:: 0.3.8
'''
try:
request = self.send_request('json/fas_client/user_data', auth=True)
if request['success']:
return request['data']
else:
raise AppError(message=_('FAS server unable to retrieve user'
' information'), name='FASError')
except FedoraServiceError:
raise