diff --git a/roles/ipaserver/module_utils/ansible_ipa_server.py b/roles/ipaserver/module_utils/ansible_ipa_server.py index f1f4972b690b3fb44fd2f07a622a031088762916..6dec4bc210b1a2497f3c5accefae6cf3210d477e 100644 --- a/roles/ipaserver/module_utils/ansible_ipa_server.py +++ b/roles/ipaserver/module_utils/ansible_ipa_server.py @@ -3,9 +3,9 @@ # Authors: # Thomas Woerner <twoerner@redhat.com> # -# Based on ipa-client-install code +# Based on ipa-server-install code # -# Copyright (C) 2017 Red Hat +# Copyright (C) 2017-2022 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify @@ -23,12 +23,12 @@ from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +__metaclass__ = type # pylint: disable=invalid-name __all__ = ["IPAChangeConf", "certmonger", "sysrestore", "root_logger", "ipa_generate_password", "run", "ScriptError", "services", "tasks", "errors", "x509", "DOMAIN_LEVEL_0", "MIN_DOMAIN_LEVEL", - "validate_domain_name", + "MAX_DOMAIN_LEVEL", "validate_domain_name", "no_matching_interface_for_ip_address_warning", "check_zone_overlap", "timeconf", "ntpinstance", "adtrust", "bindinstance", "ca", "dns", "httpinstance", "installutils", @@ -41,45 +41,42 @@ __all__ = ["IPAChangeConf", "certmonger", "sysrestore", "root_logger", "adtrustinstance", "IPAAPI_USER", "sync_time", "PKIIniLoader", "default_subject_base", "default_ca_subject_dn", "check_ldap_conf", "encode_certificate", "decode_certificate", - "check_available_memory", "getargspec", "get_min_idstart"] + "check_available_memory", "getargspec", "get_min_idstart", + "paths", "api", "ipautil", "adtrust_imported", "NUM_VERSION", + "time_service", "kra_imported", "dsinstance", "IPA_PYTHON_VERSION", + "NUM_VERSION"] import sys - -# HACK: workaround for Ansible 2.9 -# https://github.com/ansible/ansible/issues/68361 -if 'ansible.executor' in sys.modules: - for attr in __all__: - setattr(sys.modules[__name__], attr, None) - -else: - - import logging +import logging + +# Import getargspec from inspect or provide own getargspec for +# Python 2 compatibility with Python 3.11+. +try: + from inspect import getargspec +except ImportError: + from collections import namedtuple + from inspect import getfullargspec + + # The code is copied from Python 3.10 inspect.py + # Authors: Ka-Ping Yee <ping@lfw.org> + # Yury Selivanov <yselivanov@sprymix.com> + ArgSpec = namedtuple('ArgSpec', 'args varargs keywords defaults') + + def getargspec(func): + args, varargs, varkw, defaults, kwonlyargs, _kwonlydefaults, \ + ann = getfullargspec(func) + if kwonlyargs or ann: + raise ValueError( + "Function has keyword-only parameters or annotations" + ", use inspect.signature() API which can support them") + return ArgSpec(args, varargs, varkw, defaults) + + +try: from contextlib import contextmanager as contextlib_contextmanager from ansible.module_utils import six import base64 - # Import getargspec from inspect or provide own getargspec for - # Python 2 compatibility with Python 3.11+. - try: - from inspect import getargspec - except ImportError: - from collections import namedtuple - from inspect import getfullargspec - - # The code is copied from Python 3.10 inspect.py - # Authors: Ka-Ping Yee <ping@lfw.org> - # Yury Selivanov <yselivanov@sprymix.com> - ArgSpec = namedtuple('ArgSpec', 'args varargs keywords defaults') - - def getargspec(func): - args, varargs, varkw, defaults, kwonlyargs, _kwonlydefaults, \ - ann = getfullargspec(func) - if kwonlyargs or ann: - raise ValueError( - "Function has keyword-only parameters or annotations" - ", use inspect.signature() API which can support them") - return ArgSpec(args, varargs, varkw, defaults) - from ipapython.version import NUM_VERSION, VERSION if NUM_VERSION < 30201: @@ -211,223 +208,249 @@ else: raise Exception("freeipa version '%s' is too old" % VERSION) - logger = logging.getLogger("ipa-server-install") +except ImportError as _err: + ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR = str(_err) - def setup_logging(): - # logger.setLevel(logging.DEBUG) - standard_logging_setup( - paths.IPASERVER_INSTALL_LOG, verbose=False, debug=False, - filemode='a', console_format='%(message)s') + for attr in __all__: + setattr(sys.modules[__name__], attr, None) +else: + ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR = None - @contextlib_contextmanager - def redirect_stdout(stream): - sys.stdout = stream - try: - yield stream - finally: - sys.stdout = sys.__stdout__ - - class AnsibleModuleLog(): - def __init__(self, module): - self.module = module - _ansible_module_log = self - - class AnsibleLoggingHandler(logging.Handler): - def emit(self, record): - _ansible_module_log.write(self.format(record)) - - self.logging_handler = AnsibleLoggingHandler() - logger.setLevel(logging.DEBUG) - logger.root.addHandler(self.logging_handler) - - def close(self): - self.flush() - - def flush(self): - pass - - def log(self, msg): - # self.write(msg+"\n") - self.write(msg) - - def debug(self, msg): - self.module.debug(msg) - - def info(self, msg): - self.module.debug(msg) - - @staticmethod - def isatty(): - return False - - def write(self, msg): - self.module.debug(msg) - # self.module.warn(msg) - - # pylint: disable=too-few-public-methods, useless-object-inheritance - # pylint: disable=too-many-instance-attributes - class options_obj(object): # pylint: disable=invalid-name - def __init__(self): - self._replica_install = False - self.dnssec_master = False # future unknown - self.disable_dnssec_master = False # future unknown - self.domainlevel = MAX_DOMAIN_LEVEL # deprecated - self.domain_level = self.domainlevel # deprecated - self.interactive = False - self.unattended = not self.interactive - - # def __getattribute__(self, attr): - # logger.info(" <-- Accessing options.%s" % attr) - # return super(options_obj, self).__getattribute__(attr) - - # def __getattr__(self, attr): - # logger.info(" --> Adding missing options.%s" % attr) - # setattr(self, attr, None) - # return getattr(self, attr) - - def knobs(self): - for name in self.__dict__: - yield self, name - - # pylint: enable=too-few-public-methods, useless-object-inheritance - - # pylint: enable=too-many-instance-attributes - options = options_obj() - installer = options - - # pylint: disable=attribute-defined-outside-init - - # ServerMasterInstall - options.add_sids = True - options.add_agents = False - - # Installable - options.uninstalling = False - - # ServerInstallInterface - options.description = "Server" - - options.kinit_attempts = 1 - options.fixed_primary = True - options.permit = False - options.enable_dns_updates = False - options.no_krb5_offline_passwords = False - options.preserve_sssd = False - options.no_sssd = False - - # ServerMasterInstall - options.force_join = False - options.servers = None - options.no_wait_for_dns = True - options.host_password = None - options.keytab = None - options.setup_ca = True - # always run sidgen task and do not allow adding agents on first master - options.add_sids = True - options.add_agents = False - - # ADTrustInstallInterface - # no_msdcs is deprecated - options.no_msdcs = False - - # For pylint - options.external_cert_files = None - options.dirsrv_cert_files = None - - # Uninstall - options.ignore_topology_disconnect = False - options.ignore_last_of_role = False - - # pylint: enable=attribute-defined-outside-init - - # pylint: disable=invalid-name - def api_Backend_ldap2(host_name, setup_ca, connect=False): - # we are sure we have the configuration file ready. - cfg = dict(context='installer', confdir=paths.ETC_IPA, in_server=True, - host=host_name) - if setup_ca: - # we have an IPA-integrated CA - cfg['ca_host'] = host_name - - api.bootstrap(**cfg) - api.finalize() - if connect: - api.Backend.ldap2.connect() - - # pylint: enable=invalid-name - - def ds_init_info(ansible_log, fstore, domainlevel, dirsrv_config_file, - realm_name, host_name, domain_name, dm_password, - idstart, idmax, subject_base, ca_subject, - _no_hbac_allow, dirsrv_pkcs12_info, no_pkinit): - - if not options.external_cert_files: - _ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel, - config_ldif=dirsrv_config_file) - _ds.set_output(ansible_log) - - if options.dirsrv_cert_files: - _dirsrv_pkcs12_info = dirsrv_pkcs12_info - else: - _dirsrv_pkcs12_info = None - - with redirect_stdout(ansible_log): - _ds.init_info(realm_name, host_name, domain_name, dm_password, - subject_base, ca_subject, idstart, idmax, - # hbac_allow=not no_hbac_allow, - _dirsrv_pkcs12_info, setup_pkinit=not no_pkinit) - else: - _ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel) - _ds.set_output(ansible_log) - with redirect_stdout(ansible_log): - _ds.init_info(realm_name, host_name, domain_name, dm_password, - subject_base, ca_subject, 1101, 1100, None, - setup_pkinit=not no_pkinit) +logger = logging.getLogger("ipa-server-install") - return _ds - def ansible_module_get_parsed_ip_addresses(ansible_module, - param='ip_addresses'): - ip_addrs = [] - for _ip in ansible_module.params.get(param): - try: - ip_parsed = ipautil.CheckedIPAddress(_ip) - except Exception as err: - ansible_module.fail_json( - msg="Invalid IP Address %s: %s" % (_ip, err)) - ip_addrs.append(ip_parsed) - return ip_addrs - - def encode_certificate(cert): - """ - Encode a certificate using base64. - - It also takes FreeIPA and Python versions into account. - """ - if isinstance(cert, (str, bytes)): - encoded = base64.b64encode(cert) - else: - encoded = base64.b64encode(cert.public_bytes(Encoding.DER)) - if not six.PY2: - encoded = encoded.decode('ascii') - return encoded - - def decode_certificate(cert): - """ - Decode a certificate using base64. - - It also takes FreeIPA versions into account and returns a - IPACertificate for newer IPA versions. - """ - if hasattr(x509, "IPACertificate"): - cert = cert.strip() - if not cert.startswith("-----BEGIN CERTIFICATE-----"): - cert = "-----BEGIN CERTIFICATE-----\n" + cert - if not cert.endswith("-----END CERTIFICATE-----"): - cert += "\n-----END CERTIFICATE-----" - - cert = certificate_loader(cert.encode('utf-8')) +def setup_logging(): + # logger.setLevel(logging.DEBUG) + standard_logging_setup( + paths.IPASERVER_INSTALL_LOG, verbose=False, debug=False, + filemode='a', console_format='%(message)s') + + +@contextlib_contextmanager +def redirect_stdout(stream): + sys.stdout = stream + try: + yield stream + finally: + sys.stdout = sys.__stdout__ + + +class AnsibleModuleLog(): + def __init__(self, module): + self.module = module + _ansible_module_log = self + + class AnsibleLoggingHandler(logging.Handler): + def emit(self, record): + _ansible_module_log.write(self.format(record)) + + self.logging_handler = AnsibleLoggingHandler() + logger.setLevel(logging.DEBUG) + logger.root.addHandler(self.logging_handler) + + def close(self): + self.flush() + + def flush(self): + pass + + def log(self, msg): + # self.write(msg+"\n") + self.write(msg) + + def debug(self, msg): + self.module.debug(msg) + + def info(self, msg): + self.module.debug(msg) + + @staticmethod + def isatty(): + return False + + def write(self, msg): + self.module.debug(msg) + # self.module.warn(msg) + + +# pylint: disable=too-few-public-methods, useless-object-inheritance +# pylint: disable=too-many-instance-attributes +class options_obj(object): # pylint: disable=invalid-name + def __init__(self): + self._replica_install = False + self.dnssec_master = False # future unknown + self.disable_dnssec_master = False # future unknown + self.domainlevel = MAX_DOMAIN_LEVEL # deprecated + self.domain_level = self.domainlevel # deprecated + self.interactive = False + self.unattended = not self.interactive + + # def __getattribute__(self, attr): + # logger.info(" <-- Accessing options.%s" % attr) + # return super(options_obj, self).__getattribute__(attr) + + # def __getattr__(self, attr): + # logger.info(" --> Adding missing options.%s" % attr) + # setattr(self, attr, None) + # return getattr(self, attr) + + def knobs(self): + for name in self.__dict__: + yield self, name + + +# pylint: enable=too-few-public-methods, useless-object-inheritance + + +# pylint: enable=too-many-instance-attributes +options = options_obj() +installer = options + +# pylint: disable=attribute-defined-outside-init + +# ServerMasterInstall +options.add_sids = True +options.add_agents = False + +# Installable +options.uninstalling = False + +# ServerInstallInterface +options.description = "Server" + +options.kinit_attempts = 1 +options.fixed_primary = True +options.permit = False +options.enable_dns_updates = False +options.no_krb5_offline_passwords = False +options.preserve_sssd = False +options.no_sssd = False + +# ServerMasterInstall +options.force_join = False +options.servers = None +options.no_wait_for_dns = True +options.host_password = None +options.keytab = None +options.setup_ca = True +# always run sidgen task and do not allow adding agents on first master +options.add_sids = True +options.add_agents = False + +# ADTrustInstallInterface +# no_msdcs is deprecated +options.no_msdcs = False + +# For pylint +options.external_cert_files = None +options.dirsrv_cert_files = None + +# Uninstall +options.ignore_topology_disconnect = False +options.ignore_last_of_role = False + +# pylint: enable=attribute-defined-outside-init + + +# pylint: disable=invalid-name +def api_Backend_ldap2(host_name, setup_ca, connect=False): + # we are sure we have the configuration file ready. + cfg = dict(context='installer', confdir=paths.ETC_IPA, in_server=True, + host=host_name) + if setup_ca: + # we have an IPA-integrated CA + cfg['ca_host'] = host_name + + api.bootstrap(**cfg) + api.finalize() + if connect: + api.Backend.ldap2.connect() + + +# pylint: enable=invalid-name + + +def ds_init_info(ansible_log, fstore, domainlevel, dirsrv_config_file, + realm_name, host_name, domain_name, dm_password, + idstart, idmax, subject_base, ca_subject, + _no_hbac_allow, dirsrv_pkcs12_info, no_pkinit): + + if not options.external_cert_files: + _ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel, + config_ldif=dirsrv_config_file) + _ds.set_output(ansible_log) + + if options.dirsrv_cert_files: + _dirsrv_pkcs12_info = dirsrv_pkcs12_info else: - cert = base64.b64decode(cert) - return cert + _dirsrv_pkcs12_info = None + + with redirect_stdout(ansible_log): + _ds.init_info(realm_name, host_name, domain_name, dm_password, + subject_base, ca_subject, idstart, idmax, + # hbac_allow=not no_hbac_allow, + _dirsrv_pkcs12_info, setup_pkinit=not no_pkinit) + else: + _ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel) + _ds.set_output(ansible_log) + + with redirect_stdout(ansible_log): + _ds.init_info(realm_name, host_name, domain_name, dm_password, + subject_base, ca_subject, 1101, 1100, None, + setup_pkinit=not no_pkinit) + + return _ds + + +def ansible_module_get_parsed_ip_addresses(ansible_module, + param='ip_addresses'): + ip_addrs = [] + for _ip in ansible_module.params.get(param): + try: + ip_parsed = ipautil.CheckedIPAddress(_ip) + except Exception as err: + ansible_module.fail_json( + msg="Invalid IP Address %s: %s" % (_ip, err)) + ip_addrs.append(ip_parsed) + return ip_addrs + + +def encode_certificate(cert): + """ + Encode a certificate using base64. + + It also takes FreeIPA and Python versions into account. + """ + if isinstance(cert, (str, bytes)): + encoded = base64.b64encode(cert) + else: + encoded = base64.b64encode(cert.public_bytes(Encoding.DER)) + if not six.PY2: + encoded = encoded.decode('ascii') + return encoded + + +def decode_certificate(cert): + """ + Decode a certificate using base64. + + It also takes FreeIPA versions into account and returns a + IPACertificate for newer IPA versions. + """ + if hasattr(x509, "IPACertificate"): + cert = cert.strip() + if not cert.startswith("-----BEGIN CERTIFICATE-----"): + cert = "-----BEGIN CERTIFICATE-----\n" + cert + if not cert.endswith("-----END CERTIFICATE-----"): + cert += "\n-----END CERTIFICATE-----" + + cert = certificate_loader(cert.encode('utf-8')) + else: + cert = base64.b64decode(cert) + return cert + + +def check_imports(module): + if ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR is not None: + module.fail_json(msg=ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR)