Newer
Older
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Authors:
Sergio Oliveira Campos
committed
# Sergio Oliveira Campos <seocam@redhat.com>
# Thomas Woerner <twoerner@redhat.com>
#
# Copyright (C) 2019 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__all__ = ["gssapi", "netaddr", "api", "ipalib_errors", "Env",
"DEFAULT_CONFIG", "LDAP_GENERALIZED_TIME_FORMAT",
"kinit_password", "kinit_keytab", "run", "DN", "VERSION",
"paths", "get_credentials_if_valid", "Encoding",
import sys
# HACK: workaround for Ansible 2.9
# https://github.com/ansible/ansible/issues/68361
if 'ansible.executor' in sys.modules:
for attr in __all__:
setattr(sys.modules[__name__], attr, None)
else:
import operator
import os
import uuid
import tempfile
import shutil
import netaddr
import gssapi
from datetime import datetime
from contextlib import contextmanager
# ansible-freeipa requires locale to be C, IPA requires utf-8.
os.environ["LANGUAGE"] = "C"
try:
from packaging import version
except ImportError:
# If `packaging` not found, split version string for creating version
# object. Although it is not PEP 440 compliant, it will work for stable
# FreeIPA releases.
import re
class version: # pylint: disable=invalid-name, too-few-public-methods
@staticmethod
def parse(version_str):
"""
Split a version string A.B.C, into a tuple.
This will not work for `rc`, `dev` or similar version string.
"""
return tuple(re.split("[-_.]", version_str)) # noqa: W605
from ipalib import api
from ipalib import errors as ipalib_errors # noqa
from ipalib.config import Env
from ipalib.constants import DEFAULT_CONFIG, LDAP_GENERALIZED_TIME_FORMAT
try:
from ipalib.install.kinit import kinit_password, kinit_keytab
except ImportError:
from ipapython.ipautil import kinit_password, kinit_keytab
from ipapython.ipautil import run
from ipapython.dn import DN
from ipapython.version import VERSION
from ipaplatform.paths import paths
from ipalib.krb_utils import get_credentials_if_valid
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text
from ansible.module_utils.common.text.converters import jsonify
try:
from ipalib.x509 import Encoding
except ImportError:
from cryptography.hazmat.primitives.serialization import Encoding
try:
from ipalib.x509 import load_pem_x509_certificate
except ImportError:
from ipalib.x509 import load_certificate
load_pem_x509_certificate = None
import socket
import base64
import six
try:
from collections.abc import Mapping # noqa
except ImportError:
from collections import Mapping # pylint: disable=deprecated-class
# AnsibleModule argument specs for all modules
ipamodule_base_spec = dict(
ipaadmin_principal=dict(type="str", default="admin"),
ipaadmin_password=dict(type="str", required=False, no_log=True),
)
# Get ipamodule common vars as nonlocal
def get_ipamodule_base_vars(module):
ipaadmin_principal = module_params_get(module, "ipaadmin_principal")
ipaadmin_password = module_params_get(module, "ipaadmin_password")
return dict(
ipaadmin_principal=ipaadmin_principal,
ipaadmin_password=ipaadmin_password,
)
def valid_creds(module, principal): # noqa
"""Get valid credentials matching the princial, try GSSAPI first."""
if "KRB5CCNAME" in os.environ:
ccache = os.environ["KRB5CCNAME"]
module.debug('KRB5CCNAME set to %s' % ccache)
try:
cred = gssapi.Credentials(usage='initiate',
store={'ccache': ccache})
except gssapi.raw.misc.GSSError as e:
module.fail_json(msg='Failed to find default ccache: %s' % e)
else:
module.debug("Using principal %s" % str(cred.name))
return True
elif "KRB5_CLIENT_KTNAME" in os.environ:
keytab = os.environ.get('KRB5_CLIENT_KTNAME', None)
module.debug('KRB5_CLIENT_KTNAME set to %s' % keytab)
ccache_name = "MEMORY:%s" % str(uuid.uuid4())
os.environ["KRB5CCNAME"] = ccache_name
try:
cred = kinit_keytab(principal, keytab, ccache_name)
except gssapi.raw.misc.GSSError as e:
module.fail_json(msg='Kerberos authentication failed : %s' % e)
else:
module.debug("Using principal %s" % str(cred.name))
return True
creds = get_credentials_if_valid()
if creds and \
creds.lifetime > 0 and \
"%s@" % principal in creds.name.display_as(creds.name.name_type):
return True
return False
def temp_kinit(principal, password):
"""Kinit with password using a temporary ccache."""
if not password:
raise RuntimeError("The password is not set")
if not principal:
principal = "admin"
ccache_dir = tempfile.mkdtemp(prefix='krbcc')
ccache_name = os.path.join(ccache_dir, 'ccache')
try:
kinit_password(principal, password, ccache_name)
except RuntimeError as e:
raise RuntimeError("Kerberos authentication failed: {}".format(e))
os.environ["KRB5CCNAME"] = ccache_name
return ccache_dir, ccache_name
def temp_kdestroy(ccache_dir, ccache_name):
"""Destroy temporary ticket and remove temporary ccache."""
if ccache_name is not None:
run([paths.KDESTROY, '-c', ccache_name], raiseonerr=False)
del os.environ['KRB5CCNAME']
if ccache_dir is not None:
shutil.rmtree(ccache_dir, ignore_errors=True)
def api_connect(context=None):
"""
Initialize IPA API with the provided context.
`context` can be any of:
* `server` (default)
* `ansible-freeipa`
* `cli_installer`
"""
env = Env()
Loading
Loading full blame...