Skip to content
Snippets Groups Projects
Commit 71f3f110 authored by Thomas Woerner's avatar Thomas Woerner
Browse files

ansible_freeipa_module: Fix ansible-test fake execution test findings

All imports that are only available after installing IPA need to be in a
try exception clause to be able to pass the fake execution test. The old
workaround "if 'ansible.executor' in sys.modules:" is not working with
this test anymore.

If the imports can not be done, all used and needed attributes are
defines with the value None.

A check has been added to IPAAnsibleModule.__init__ to make sure that it
fails if the imports have not been done successfully.
parent f2d698b8
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
# Sergio Oliveira Campos <seocam@redhat.com>
# Thomas Woerner <twoerner@redhat.com>
#
# Copyright (C) 2019 Red Hat
# Copyright (C) 2019-2022 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
......@@ -31,23 +31,20 @@ __all__ = ["gssapi", "netaddr", "api", "ipalib_errors", "Env",
"paths", "tasks", "get_credentials_if_valid", "Encoding",
"load_pem_x509_certificate", "DNSName", "getargspec"]
import os
import sys
# HACK: workaround for Ansible 2.9
# https://github.com/ansible/ansible/issues/68361
if 'ansible.executor' in sys.modules:
for attr in __all__:
setattr(sys.modules[__name__], attr, None)
else:
import operator
import os
import uuid
import tempfile
import shutil
import netaddr
import gssapi
import socket
import base64
from datetime import datetime
from contextlib import contextmanager
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text
from ansible.module_utils.common.text.converters import jsonify
from ansible.module_utils import six
from ansible.module_utils.common._collections_compat import Mapping
# Import getargspec from inspect or provide own getargspec for
# Python 2 compatibility with Python 3.11+.
......@@ -71,8 +68,11 @@ else:
", use inspect.signature() API which can support them")
return ArgSpec(args, varargs, varkw, defaults)
# ansible-freeipa requires locale to be C, IPA requires utf-8.
os.environ["LANGUAGE"] = "C"
try:
import uuid
import netaddr
import gssapi
from ipalib import api
from ipalib import errors as ipalib_errors # noqa
......@@ -91,9 +91,6 @@ else:
from ipalib.krb_utils import get_credentials_if_valid
from ipapython.dnsutil import DNSName
from ipapython import kerberos
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text
from ansible.module_utils.common.text.converters import jsonify
try:
from ipalib.x509 import Encoding
......@@ -106,15 +103,6 @@ else:
from ipalib.x509 import load_certificate
load_pem_x509_certificate = None
import socket
import base64
from ansible.module_utils import six
try:
from collections.abc import Mapping # noqa
except ImportError:
from collections import Mapping # pylint: disable=deprecated-class
# Try to import is_ipa_configured or use a fallback implementation.
try:
from ipalib.facts import is_ipa_configured
......@@ -150,9 +138,29 @@ else:
except ImportError:
_dcerpc_bindings_installed = False # pylint: disable=invalid-name
except ImportError as _err:
ANSIBLE_FREEIPA_MODULE_IMPORT_ERROR = str(_err)
for attr in __all__:
setattr(sys.modules[__name__], attr, None)
uuid = None
netaddr = None
is_ipa_configured = None
load_certificate = None
kerberos = None
ipaserver = None # pylint: disable=C0103
else:
ANSIBLE_FREEIPA_MODULE_IMPORT_ERROR = None
# ansible-freeipa requires locale to be C, IPA requires utf-8.
os.environ["LANGUAGE"] = "C"
if six.PY3:
unicode = str
def valid_creds(module, principal): # noqa
"""Get valid credentials matching the princial, try GSSAPI first."""
if "KRB5CCNAME" in os.environ:
......@@ -190,6 +198,7 @@ else:
return True
return False
def temp_kinit(principal, password):
"""Kinit with password using a temporary ccache."""
if not password:
......@@ -203,11 +212,12 @@ else:
try:
kinit_password(principal, password, ccache_name)
except RuntimeError as e:
raise RuntimeError("Kerberos authentication failed: {}".format(e))
raise RuntimeError("Kerberos authentication failed: %s" % str(e))
os.environ["KRB5CCNAME"] = ccache_name
return ccache_dir, ccache_name
def temp_kdestroy(ccache_dir, ccache_name):
"""Destroy temporary ticket and remove temporary ccache."""
if ccache_name is not None:
......@@ -216,6 +226,7 @@ else:
if ccache_dir is not None:
shutil.rmtree(ccache_dir, ignore_errors=True)
def api_connect(context=None, **overrides):
"""
Initialize IPA API with the provided configuration.
......@@ -270,22 +281,27 @@ else:
if not backend.isconnected():
backend.connect(ccache=os.environ.get('KRB5CCNAME', None))
def api_command(_module, command, name, args):
"""Call ipa.Command."""
return api.Command[command](name, **args)
def api_command_no_name(_module, command, args):
"""Call ipa.Command without a name."""
return api.Command[command](**args)
def api_check_command(command):
"""Return if command exists in command list."""
return command in api.Command
def api_check_param(command, name):
"""Check if param exists in command param list."""
return name in api.Command[command].params
def api_check_ipa_version(oper, requested_version):
"""
Compare the installed IPA version against a requested version.
......@@ -306,6 +322,7 @@ else:
return operation(tasks.parse_ipa_version(VERSION),
tasks.parse_ipa_version(requested_version))
def date_format(value):
accepted_date_formats = [
LDAP_GENERALIZED_TIME_FORMAT, # generalized time
......@@ -323,6 +340,7 @@ else:
pass
raise ValueError("Invalid date '%s'" % value)
def compare_args_ipa(module, args, ipa, ignore=None): # noqa
"""Compare IPA object attributes against command arguments.
......@@ -427,6 +445,7 @@ else:
return False
return True
def _afm_convert(value):
if value is not None:
if isinstance(value, list):
......@@ -439,6 +458,7 @@ else:
return value
def module_params_get(module, name, allow_empty_string=False):
value = _afm_convert(module.params.get(name))
......@@ -463,6 +483,7 @@ else:
return value
def module_params_get_lowercase(module, name, allow_empty_string=False):
value = module_params_get(module, name, allow_empty_string)
if isinstance(value, list):
......@@ -471,20 +492,25 @@ else:
value = value.lower()
return value
def api_get_domain():
return api.env.domain
def ensure_fqdn(name, domain):
if "." not in name:
return "%s.%s" % (name, domain)
return name
def api_get_realm():
return api.env.realm
def api_get_basedn():
return api.env.basedn
def gen_add_del_lists(user_list, res_list):
"""
Generate the lists for the addition and removal of members.
......@@ -505,6 +531,7 @@ else:
return add_list, del_list
def gen_add_list(user_list, res_list):
"""
Generate add list for addition of new members.
......@@ -521,6 +548,7 @@ else:
return list(set(user_list or []) - set(res_list or []))
def gen_intersection_list(user_list, res_list):
"""
Generate the intersection list for removal of existing members.
......@@ -537,6 +565,7 @@ else:
return list(set(res_list or []).intersection(set(user_list or [])))
def encode_certificate(cert):
"""
Encode a certificate using base64.
......@@ -551,6 +580,7 @@ else:
encoded = encoded.decode('ascii')
return encoded
def load_cert_from_str(cert):
cert = cert.strip()
if not cert.startswith("-----BEGIN CERTIFICATE-----"):
......@@ -564,6 +594,7 @@ else:
cert = load_certificate(cert.encode('utf-8'))
return cert
def DN_x500_text(text): # pylint: disable=invalid-name
if hasattr(DN, "x500_text"):
return DN(text).x500_text()
......@@ -572,6 +603,7 @@ else:
dn.rdns = reversed(dn.rdns)
return str(dn)
def is_valid_port(port):
if not isinstance(port, int):
return False
......@@ -581,6 +613,7 @@ else:
return False
def is_ip_address(ipaddr):
"""Test if given IP address is a valid IPv4 or IPv6 address."""
try:
......@@ -589,6 +622,7 @@ else:
return False
return True
def is_ip_network_address(ipaddr):
"""Test if given IP address is a valid IPv4 or IPv6 address."""
try:
......@@ -597,6 +631,7 @@ else:
return False
return True
def is_ipv4_addr(ipaddr):
"""Test if given IP address is a valid IPv4 address."""
try:
......@@ -605,6 +640,7 @@ else:
return False
return True
def is_ipv6_addr(ipaddr):
"""Test if given IP address is a valid IPv6 address."""
try:
......@@ -613,6 +649,7 @@ else:
return False
return True
def servicedelegation_normalize_principals(module, principal,
check_exists=False):
"""
......@@ -696,6 +733,7 @@ else:
return _principal
def exit_raw_json(module, **kwargs):
"""
Print the raw parameters in JSON format, without masking.
......@@ -715,6 +753,7 @@ else:
print(jsonify(kwargs))
sys.exit(0)
def __get_domain_validator():
if not _dcerpc_bindings_installed:
raise ipalib_errors.NotFound(
......@@ -739,6 +778,7 @@ else:
return domain_validator
def get_trusted_domain_sid_from_name(dom_name):
"""
Given a trust domain name, returns the domain SID.
......@@ -751,6 +791,7 @@ else:
return unicode(sid) if sid is not None else None
class IPAParamMapping(Mapping):
"""
Provides IPA API mapping to playbook parameters or computed values.
......@@ -874,6 +915,7 @@ else:
return args
class IPAAnsibleModule(AnsibleModule):
"""
IPA Ansible Module.
......@@ -953,6 +995,9 @@ else:
# pylint: disable=super-with-arguments
super(IPAAnsibleModule, self).__init__(*args, **kwargs)
if ANSIBLE_FREEIPA_MODULE_IMPORT_ERROR is not None:
self.fail_json(msg=ANSIBLE_FREEIPA_MODULE_IMPORT_ERROR)
@contextmanager
def ipa_connect(self, context=None):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment