Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# -*- coding: utf-8 -*-
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# Copyright (C) 2019-2022 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {
"metadata_version": "1.0",
"supported_by": "community",
"status": ["preview"],
}
DOCUMENTATION = """
---
module: ipaserver_get_connected_server
short_description: Get connected servers for server
description: Get connected servers for server
options:
ipaadmin_principal:
description: The admin principal.
default: admin
type: str
ipaadmin_password:
description: The admin password.
required: true
type: str
hostname:
description: The FQDN server name.
type: str
required: true
author:
- Thomas Woerner (@t-woerner)
"""
EXAMPLES = """
"""
RETURN = """
server:
description: Connected server name
returned: always
type: str
"""
import os
import tempfile
import shutil
from contextlib import contextmanager
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text
from ansible.module_utils import six
try:
from ipalib import api
from ipalib import errors as ipalib_errors # noqa
from ipalib.config import Env
from ipaplatform.paths import paths
from ipapython.ipautil import run
from ipalib.constants import DEFAULT_CONFIG
try:
from ipalib.kinit import kinit_password
except ImportError:
try:
from ipalib.install.kinit import kinit_password
except ImportError:
# pre 4.5.0
from ipapython.ipautil import kinit_password
except ImportError as _err:
MODULE_IMPORT_ERROR = str(_err)
else:
MODULE_IMPORT_ERROR = None
if six.PY3:
unicode = str
def temp_kinit(principal, password):
"""Kinit with password using a temporary ccache."""
ccache_dir = tempfile.mkdtemp(prefix='krbcc')
ccache_name = os.path.join(ccache_dir, 'ccache')
try:
kinit_password(principal, password, ccache_name)
except RuntimeError as e:
raise RuntimeError("Kerberos authentication failed: %s" % str(e))
os.environ["KRB5CCNAME"] = ccache_name
return ccache_dir, ccache_name
def temp_kdestroy(ccache_dir, ccache_name):
"""Destroy temporary ticket and remove temporary ccache."""
if ccache_name is not None:
run([paths.KDESTROY, '-c', ccache_name], raiseonerr=False)
os.environ.pop('KRB5CCNAME', None)
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
if ccache_dir is not None:
shutil.rmtree(ccache_dir, ignore_errors=True)
@contextmanager
def ipa_connect(module, principal=None, password=None):
"""
Create a context with a connection to IPA API.
Parameters
----------
module: AnsibleModule
The AnsibleModule to use
principal: string
The optional principal name
password: string
The admin password.
"""
if not password:
module.fail_json(msg="Password is required.")
if not principal:
principal = "admin"
ccache_dir = None
ccache_name = None
try:
ccache_dir, ccache_name = temp_kinit(principal, password)
# api_connect start
env = Env()
env._bootstrap()
env._finalize_core(**dict(DEFAULT_CONFIG))
api.bootstrap(context="server", debug=env.debug, log=None)
api.finalize()
if api.env.in_server:
backend = api.Backend.ldap2
else:
backend = api.Backend.rpcclient
if not backend.isconnected():
backend.connect(ccache=ccache_name)
# api_connect end
except Exception as e:
module.fail_json(msg=str(e))
else:
try:
yield ccache_name
except Exception as e:
module.fail_json(msg=str(e))
finally:
temp_kdestroy(ccache_dir, ccache_name)
def ipa_command(command, name, args):
"""
Execute an IPA API command with a required `name` argument.
Parameters
----------
command: string
The IPA API command to execute.
name: string
The name parameter to pass to the command.
args: dict
The parameters to pass to the command.
"""
return api.Command[command](name, **args)
def _afm_convert(value):
if value is not None:
if isinstance(value, list):
return [_afm_convert(x) for x in value]
if isinstance(value, dict):
return {_afm_convert(k): _afm_convert(v)
for k, v in value.items()}
if isinstance(value, str):
return to_text(value)
return value
def module_params_get(module, name):
return _afm_convert(module.params.get(name))
def host_show(module, name):
_args = {
"all": True,
}
try:
_result = ipa_command("host_show", name, _args)
except ipalib_errors.NotFound as e:
msg = str(e)
if "host not found" in msg:
return None
module.fail_json(msg="host_show failed: %s" % msg)
return _result["result"]
def main():
module = AnsibleModule(
argument_spec=dict(
ipaadmin_principal=dict(type="str", default="admin"),
ipaadmin_password=dict(type="str", required=True, no_log=True),
hostname=dict(type="str", required=True),
),
supports_check_mode=True,
)
if MODULE_IMPORT_ERROR is not None:
module.fail_json(msg=MODULE_IMPORT_ERROR)
# In check mode always return changed.
if module.check_mode:
module.exit_json(changed=False)
ipaadmin_principal = module_params_get(module, "ipaadmin_principal")
ipaadmin_password = module_params_get(module, "ipaadmin_password")
hostname = module_params_get(module, "hostname")
server = None
right_left = ["iparepltoposegmentrightnode", "iparepltoposegmentleftnode"]
with ipa_connect(module, ipaadmin_principal, ipaadmin_password):
# At first search in the domain, then ca suffix:
# Search for the first iparepltoposegmentleftnode (node 2), where
# iparepltoposegmentrightnode is hostname (node 1), then for the
# first iparepltoposegmentrightnode (node 2) where
# iparepltoposegmentleftnode is hostname (node 1).
for suffix_name in ["domain", "ca"]:
for node1, node2 in [[right_left[0], right_left[1]],
[right_left[1], right_left[0]]]:
args = {node1: hostname}
result = api.Command.topologysegment_find(
suffix_name, **args)
if result and "result" in result and len(result["result"]) > 0:
res = result["result"][0]
if node2 in res:
if len(res[node2]) > 0:
server = res[node2][0]
break
if server is not None:
module.exit_json(changed=False, server=server)
module.exit_json(changed=False)
if __name__ == "__main__":
main()