Source code for provy.more.centos.database.mysql

#!/usr/bin/python
# -*- coding: utf-8 -*-

'''
Roles in this namespace are meant to provide `MySQL <http://www.mysql.com/>`_ database management utilities for CentOS distributions.
'''

import re

from provy.core import Role
from provy.more.centos.package.yum import YumRole


[docs]class MySQLRole(Role): ''' This role provides `MySQL <http://www.mysql.com/>`_ database management utilities for CentOS distributions. This role uses two context keys: `mysql_root_user` and `mysql_root_pass`. If none are found, it uses 'root' and empty password. Example: :: from provy.core import Role from provy.more.centos import MySQLRole class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: role.ensure_user(username=self.context['mysql_user'], identified_by=self.context['mysql_password']) role.ensure_database(self.context['mysql_database'], owner=self.context['mysql_user']) ''' def __init__(self, prov, context): super(MySQLRole, self).__init__(prov, context) self.mysql_root_user = 'mysql_root_user' in self.context and self.context['mysql_root_user'] or 'root' self.mysql_root_pass = 'mysql_root_pass' in self.context and self.context['mysql_root_pass'] or ''
[docs] def provision(self): ''' Installs `MySQL <http://www.mysql.com/>`_ Server and its dependencies. This method should be called upon if overriden in base classes, or MySQL won't work properly in the remote server. Example: :: class MySampleRole(Role): def provision(self): self.provision_role(MySQLRole) # no need to call this if using with block. ''' with self.using(YumRole) as role: role.ensure_up_to_date() result = role.ensure_package_installed('mysql-server') role.ensure_package_installed('mysql-devel') role.ensure_package_installed('mysql-libs') if result: self.log("setting root user %s password..." % self.mysql_root_user) self.execute("mysqladmin -u %s -p'temppass' password '%s'" % (self.mysql_root_user, self.mysql_root_pass), stdout=False, sudo=True)
def __execute_non_query(self, query): pass_string = "" if self.mysql_root_pass: pass_string = '--password="%s" ' % self.mysql_root_pass self.execute('mysql -u %s %s-e "%s" mysql' % (self.mysql_root_user, pass_string, query), stdout=False, sudo=True) def __execute_query(self, query): pass_string = "" if self.mysql_root_pass: pass_string = '--password="%s" ' % self.mysql_root_pass result = self.execute('mysql -u %s %s-E -e "%s" mysql' % (self.mysql_root_user, pass_string, query), stdout=False, sudo=True) rows = self.__get_rows(result) return rows def __get_rows(self, result): index_re = re.compile('(\d+)[.]') items = [] item = None for line in result.split('\n'): if not line.strip(): continue if line.startswith('*'): if item: items.append(item) item = { 'index': index_re.search(line).groups()[0] } else: key, value = line.split(':', 1) item[key.strip()] = value.strip() if item: items.append(item) return items
[docs] def get_user_hosts(self, username): ''' Returns all the available hosts that this user can login from. :param username: Name of the user to be verified. :type username: :class:`str` :return: The user hosts. :rtype: :class:`list` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: if not '%' in role.get_user_hosts('someuser'): pass ''' users = self.__execute_query("select Host from mysql.user where LOWER(User)='%s'" % username.lower()) hosts = [] if users: for user in users: hosts.append(user['Host']) return hosts
[docs] def user_exists(self, username, login_from='%'): ''' Returns :data:`True` if the given user exists for the given location in mysql server. :param username: Name of the user to be verified. :type username: :class:`str` :param login_from: Locations that this user can login from. Defaults to '%' (anywhere). :type login_from: :class:`str` :return: Whether the user exists. :rtype: :class:`bool` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: if not role.user_exists('someuser'): pass ''' return login_from in self.get_user_hosts(username)
[docs] def ensure_user(self, username, identified_by, login_from='%'): ''' Ensure the given user is created in the database and can login from the specified location. :param username: Name of the user to be created. :type username: :class:`str` :param identified_by: Password that the user will use to login to mysql server. :type identified_by: :class:`str` :param login_from: Locations that this user can login from. Defaults to '%' (anywhere). :type login_from: :class:`str` :return: Whether the user had to be created or not. :rtype: :class:`bool` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: role.ensure_user('someuser', 'somepass', 'localhost') ''' if not self.user_exists(username, login_from): self.__execute_non_query("CREATE USER '%s'@'%s' IDENTIFIED BY '%s';" % (username, login_from, identified_by)) self.log("User %s not found with login access for %s. User created!" % (username, login_from)) return True return False
[docs] def is_database_present(self, database_name): ''' Returns :data:`True` if the database is already created. :param database_name: Database to verify. :type database_name: :class:`str` :return: Whether the database is present or not. :rtype: :class:`bool` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: if role.is_database_present('database'): pass ''' result = self.__execute_query('SHOW DATABASES') is_there = False if result: for row in result: if row['Database'].lower() == database_name.lower(): is_there = True return is_there
[docs] def ensure_database(self, database_name): ''' Creates the database if it does not exist. :param database_name: Database to create. :type database_name: :class:`str` :return: Whether the database had to be created or not. :rtype: :class:`bool` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: role.ensure_database('database') ''' if not self.is_database_present(database_name): self.__execute_non_query('CREATE DATABASE %s' % database_name) self.log("Database %s not found. Database created!" % database_name) return True return False
[docs] def get_user_grants(self, username, login_from='%'): ''' Returns all grants for the given user at the given location. :param username: Name of the user to be verify. :type username: :class:`str` :param login_from: Locations that this user can login from. Defaults to '%' (anywhere). :type login_from: :class:`str` :return: The user grants. :rtype: :class:`list` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: if role.get_user_grants('user', login_from='%'): pass ''' grants = self.__execute_query("SHOW GRANTS FOR '%s'@'%s';" % (username, login_from)) only_grants = [] for grant in grants: filtered_grants = filter(lambda x: x.startswith('GRANT '), grant.itervalues()) only_grants.extend(filtered_grants) return only_grants
[docs] def has_grant(self, privileges, on, username, login_from, with_grant_option): ''' Returns :data:`True` if the user has the specified privileges on the specified object in the given location. :param privileges: Privileges that are being verified. :type privileges: :class:`str` :param on: Database object that the user holds privileges on. :type on: :class:`str` :param username: Name of the user to be verify. :type username: :class:`str` :param login_from: Locations that this user can login from. Defaults to '%' (anywhere). :type login_from: :class:`str` :param with_grant_option: Indicates if we are verifying against grant option. :type with_grant_option: :class:`bool` :return: Whether the user has the privileges or not. :rtype: :class:`bool` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: if role.has_grant('ALL PRIVILEGES', 'database', 'user', login_from='%', with_grant_option=True): pass ''' grants = self.get_user_grants(username, login_from) grant_option_string = self._get_grant_option_string(with_grant_option) privileges = self._get_privileges(privileges) grant_strings = self._get_possible_grant_strings(on, username, privileges, login_from, grant_option_string) for grant_string in grant_strings: if grant_string in grants: return True return False
def _get_privileges(self, privileges): privileges = privileges.upper() if privileges == 'ALL': privileges = 'ALL PRIVILEGES' return privileges def _get_grant_option_string(self, with_grant_option): grant_option_string = "" if with_grant_option: grant_option_string = " WITH GRANT OPTION" return grant_option_string def _get_possible_grant_strings(self, on, username, privileges, login_from, grant_option_string): # These possible "ON" tokens are used because MySQL can behave differently depending on the version and system possible_on_tokens = [ '`%s`.*' % on, '`%s`.`*`' % on, '%s.*' % on, ] grant_strings = ["GRANT %s ON %s TO '%s'@'%s'%s" % (privileges, on_token, username, login_from, grant_option_string) for on_token in possible_on_tokens] return grant_strings
[docs] def ensure_grant(self, privileges, on, username, login_from="%", with_grant_option=False): ''' Ensures that the given user has the given privileges on the specified location. :param privileges: Privileges to assign to user (e.g.: "ALL PRIVILEGES"). :type privileges: :class:`str` :param on: Object to assign privileges to. If only the name is supplied, '.*' will be appended to the name. If you want all databases pass '*.*'. :type on: :class:`str` :param username: User to grant the privileges to. :type username: :class:`str` :param login_from: Location where the user gets the grants. Defaults to '%' (anywhere). :type login_from: :class:`str` :param with_grant_option: If :data:`True`, indicates that this user may grant other users the same privileges. Defaults to :data:`False`. :type with_grant_option: :class:`bool` :return: Whether the grant had to be added or not. :rtype: :class:`bool` Example: :: class MySampleRole(Role): def provision(self): with self.using(MySQLRole) as role: role.ensure_grant('ALL PRIVILEGES', on='database', username='backend', login_from='%', with_grant_option=True) ''' if self.has_grant(privileges, on, username, login_from, with_grant_option): return False grant_option_string = "" if with_grant_option: grant_option_string = " WITH GRANT OPTION" if not '.' in on: on = '%s.*' % on grant_string = "GRANT %s ON %s TO '%s'@'%s'%s" % (privileges, on, username, login_from, grant_option_string) self.__execute_non_query(grant_string) self.log("User %s@%s did not have grant '%s' on %s. Privileges granted!" % (username, login_from, privileges, on)) return True