Skip to content
ansible_freeipa_module.py 35.9 KiB
Newer Older
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Authors:
#   Sergio Oliveira Campos <seocam@redhat.com>
#   Thomas Woerner <twoerner@redhat.com>
#
# Copyright (C) 2019  Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


__all__ = ["gssapi", "netaddr", "api", "ipalib_errors", "Env",
           "DEFAULT_CONFIG", "LDAP_GENERALIZED_TIME_FORMAT",
           "kinit_password", "kinit_keytab", "run", "DN", "VERSION",
           "paths", "get_credentials_if_valid", "Encoding",
           "load_pem_x509_certificate", "DNSName"]
# HACK: workaround for Ansible 2.9
# https://github.com/ansible/ansible/issues/68361
if 'ansible.executor' in sys.modules:
    for attr in __all__:
        setattr(sys.modules[__name__], attr, None)
else:
    import operator
    import os
    import uuid
    import tempfile
    import shutil
    import netaddr
    import gssapi
    from datetime import datetime
    from contextlib import contextmanager
    # ansible-freeipa requires locale to be C, IPA requires utf-8.
    os.environ["LANGUAGE"] = "C"
    try:
        from packaging import version
    except ImportError:
        # If `packaging` not found, split version string for creating version
        # object. Although it is not PEP 440 compliant, it will work for stable
        # FreeIPA releases.
        import re

        class version:  # pylint: disable=invalid-name, too-few-public-methods
            @staticmethod
            def parse(version_str):
                """
                Split a version string A.B.C, into a tuple.

                This will not work for `rc`, `dev` or similar version string.
                """
                return tuple(re.split("[-_.]", version_str))  # noqa: W605

    from ipalib import api
    from ipalib import errors as ipalib_errors  # noqa
    from ipalib.config import Env
    from ipalib.constants import DEFAULT_CONFIG, LDAP_GENERALIZED_TIME_FORMAT
    try:
        from ipalib.install.kinit import kinit_password, kinit_keytab
    except ImportError:
        from ipapython.ipautil import kinit_password, kinit_keytab
    from ipapython.ipautil import run
    from ipapython.dn import DN
    from ipapython.version import VERSION
    from ipaplatform.paths import paths
    from ipalib.krb_utils import get_credentials_if_valid
    from ipapython.dnsutil import DNSName
    from ansible.module_utils.basic import AnsibleModule
    from ansible.module_utils._text import to_text
    from ansible.module_utils.common.text.converters import jsonify
    try:
        from ipalib.x509 import Encoding
    except ImportError:
        from cryptography.hazmat.primitives.serialization import Encoding
    try:
        from ipalib.x509 import load_pem_x509_certificate
    except ImportError:
        from ipalib.x509 import load_certificate
        load_pem_x509_certificate = None
    import socket
    import base64
    import six
    try:
        from collections.abc import Mapping  # noqa
    except ImportError:
        from collections import Mapping  # pylint: disable=deprecated-class
    if six.PY3:
        unicode = str
Thomas Woerner's avatar
Thomas Woerner committed
    # AnsibleModule argument specs for all modules
    ipamodule_base_spec = dict(
        ipaadmin_principal=dict(type="str", default="admin"),
        ipaadmin_password=dict(type="str", required=False, no_log=True),
    )

    # Get ipamodule common vars as nonlocal
    def get_ipamodule_base_vars(module):
        ipaadmin_principal = module_params_get(module, "ipaadmin_principal")
        ipaadmin_password = module_params_get(module, "ipaadmin_password")

        return dict(
            ipaadmin_principal=ipaadmin_principal,
            ipaadmin_password=ipaadmin_password,
        )

    def valid_creds(module, principal):  # noqa
        """Get valid credentials matching the princial, try GSSAPI first."""
        if "KRB5CCNAME" in os.environ:
            ccache = os.environ["KRB5CCNAME"]
            module.debug('KRB5CCNAME set to %s' % ccache)
            try:
                cred = gssapi.Credentials(usage='initiate',
                                          store={'ccache': ccache})
            except gssapi.raw.misc.GSSError as e:
                module.fail_json(msg='Failed to find default ccache: %s' % e)
            else:
                module.debug("Using principal %s" % str(cred.name))
                return True
        elif "KRB5_CLIENT_KTNAME" in os.environ:
            keytab = os.environ.get('KRB5_CLIENT_KTNAME', None)
            module.debug('KRB5_CLIENT_KTNAME set to %s' % keytab)
            ccache_name = "MEMORY:%s" % str(uuid.uuid4())
            os.environ["KRB5CCNAME"] = ccache_name
            try:
                cred = kinit_keytab(principal, keytab, ccache_name)
            except gssapi.raw.misc.GSSError as e:
                module.fail_json(msg='Kerberos authentication failed : %s' % e)
            else:
                module.debug("Using principal %s" % str(cred.name))
                return True
        creds = get_credentials_if_valid()
        if creds and \
           creds.lifetime > 0 and \
           "%s@" % principal in creds.name.display_as(creds.name.name_type):
            return True
        return False
    def temp_kinit(principal, password):
        """Kinit with password using a temporary ccache."""
        if not password:
            raise RuntimeError("The password is not set")
        if not principal:
            principal = "admin"
        ccache_dir = tempfile.mkdtemp(prefix='krbcc')
        ccache_name = os.path.join(ccache_dir, 'ccache')
        try:
            kinit_password(principal, password, ccache_name)
        except RuntimeError as e:
            raise RuntimeError("Kerberos authentication failed: {}".format(e))
        os.environ["KRB5CCNAME"] = ccache_name
        return ccache_dir, ccache_name
    def temp_kdestroy(ccache_dir, ccache_name):
        """Destroy temporary ticket and remove temporary ccache."""
        if ccache_name is not None:
            run([paths.KDESTROY, '-c', ccache_name], raiseonerr=False)
            del os.environ['KRB5CCNAME']
        if ccache_dir is not None:
            shutil.rmtree(ccache_dir, ignore_errors=True)
    def api_connect(context=None):
        """
        Initialize IPA API with the provided context.
        `context` can be any of:
            * `server` (default)
            * `ansible-freeipa`
            * `cli_installer`
        """
        env = Env()
Loading
Loading full blame...