1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
#!/usr/bin/env python3
#
# This is a dynamic inventory script for Ansible at
# rgoncalves.se infrastructure.
#
# Only groups_vars/ and host_vars/ directory are used for setting up devices
# and nodes.
# > Files, files, files, ... only files, everything files!
#
# The main goal here is to allow flexible code reviews and edits, POSIX
# compliant actions over files, and using "less is more" moto. Moreover, each
# host/group/node is individually track on version control.
import os
import yaml
import logging
import json
from typing import Final
from optparse import OptionParser
HOST_DIR: Final = "./host_vars"
GROUP_DIR: Final = "./group_vars"
class AnsibleInput:
"""
AnsibleInput class mixin.
Meta methods and inits for reading and processing yaml files.
"""
data = None
name = None
def __init__(self, filepath):
with open(filepath) as file:
self.data = yaml.load(file, Loader=yaml.FullLoader)
self.name = os.path.splitext(os.path.basename(filepath))[0]
def pprint(data):
"""
Json pretty print for ansible hosts/groups or whatever output we get.
"""
try:
print(json.dumps(data, indent=4, sort_keys=True))
except TypeError:
print(data)
def create_nodes(directory):
"""
Build-up and ansible node (Host or Group),
then return the full type list.
"""
nodes = {}
for file in os.scandir(directory):
node = AnsibleInput(file.path)
nodes[node.name] = node
return nodes
def get_hostvars(host):
"""
Retrieve and build dynamic variables for a specific host.
"""
data = {}
if "ansible_host" not in host.data:
data["ansible_host"] = host.name
return data
def get_subgroups(groups, og):
"""
Get and return all children of a specific group, in a given subset.
"""
# specific return for group "all"
if og.name in "all":
groups = list(groups.keys())
groups.remove("all")
return groups
# usual behavior
valid_groups = []
if "_groups" not in og.data:
logging.warning(f"group { og.name } does not have any _groups var!")
else:
for group in og.data["_groups"]:
if group in groups:
valid_groups.append(group)
return valid_groups
def get_meta(hosts):
"""
For performance reasons, return all the hosts in a meta dict.
"""
_meta = {}
_meta["hostvars"] = {}
for hostname, host in hosts.items():
_meta["hostvars"][hostname] = get_hostvars(host)
return _meta
def get_list(hosts, groups):
"""
Return list of groups with all hosts.
"""
_list = {}
_list["_meta"] = get_meta(hosts)
# init group array
for groupname, group in groups.items():
_list[groupname] = {}
_list[groupname]["hosts"] = []
_list[groupname]["vars"] = {}
_list[groupname]["children"] = get_subgroups(groups, group)
# append each hosts to their corresponding groups
for hostname, host in hosts.items():
_list["all"]["hosts"].append(hostname)
if "_groups" not in host.data:
logging.info(f"no _groups data found for host { hostname }")
continue
# retrieve _groups variables and force conversion to list
cleangr = host.data["_groups"]
cleangr = [cleangr] if isinstance(cleangr, str) else cleangr
for group in cleangr:
if group not in groups:
logging.error(f"group { group } does not exist!")
continue
_list[group]["hosts"].append(hostname)
return _list
def main():
"""
Main entry.
"""
# logging config
logging.basicConfig(level=logging.INFO)
logging.disable()
# parse cli arguments
parser = OptionParser()
parser.add_option("--host",
dest="host",
default="_empty",
help="show specific host")
parser.add_option("--list",
action="store_true",
help="list all groups")
parser.add_option("--debug",
action="store_true",
help="show logging informations")
(options, args) = parser.parse_args()
# enable debug if required
if options.debug:
logging.disable(0)
# retrieve hosts and groups
hosts = create_nodes(HOST_DIR)
groups = create_nodes(GROUP_DIR)
# return specific hostname
if options.host != "_empty":
if options.host in hosts:
pprint(get_hostvars(hosts[options.host]))
else:
pprint("{}")
exit(0)
# return all groups
if options.list:
pprint(get_list(hosts, groups))
exit(0)
# return all hosts
exit(0)
if __name__ == "__main__":
main()
|