aboutsummaryrefslogtreecommitdiffstats
path: root/inventory.py
blob: 535447469c3a4f714aeb3f6a9d4093ff063dea56 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/env python3
#
# This is a dynamic inventory script for Ansible at
# rgoncalves.se infrastructure.
#
# Only groups_vars/ and host_vars/ directory are used for setting up devices
# and nodes.
# > Files, files, files, ... only files, everything files!
#
# The main goal here is to allow flexible code reviews and edits, POSIX
# compliant actions over files, and using "less is more" moto. Moreover, each
# host/group/node is individually track on version control.

import os
import logging
import json
import re
import yaml

from optparse import OptionParser
from typing import Final


HOST_DIR: Final = "./host_vars"
GROUP_DIR: Final = "./group_vars"
GROUP_PATTERNS: Final = {
    "servers": [
        "^dc[0-9]*",
        "^rt[0-9]*",
        "^st[0-9]*",
        "^stack[0-9]*",
        "^emb[0-9]*"
    ],
    "workstation": [
        "^ws*"
    ]
}


class AnsibleInput:
    """
    AnsibleInput class mixin.
    Meta methods and inits for reading and processing yaml files.
    """

    data = None
    name = None

    def __init__(self, filepath):
        with open(filepath) as file:
            self.data = yaml.load(file, Loader=yaml.FullLoader) or {}
            self.name = os.path.splitext(os.path.basename(filepath))[0]


def pprint(data):
    """
    Json pretty print for ansible hosts/groups or whatever output we get.
    """

    try:
        print(json.dumps(data, indent=4, sort_keys=True))
    except TypeError:
        print(data)


def create_nodes(directory):
    """
    Build-up and ansible node (Host or Group),
    then return the full type list.
    """

    nodes = {}
    for file in os.scandir(directory):
        node = AnsibleInput(file.path)
        nodes[node.name] = node
    return nodes


def group_regexp(hostname, hostgroups):
    for groupname, group in GROUP_PATTERNS.items():
        for pattern in group:
            regexp = re.compile(pattern)
            if regexp.search(hostname):
                logging.info(f"hostname { hostname } matched with"
                             f"{ pattern } for group { groupname }")
                hostgroups.append(groupname)


def get_hostvars(host):
    """
    Retrieve and build dynamic variables for a specific host.
    """

    data = {}
    if "ansible_host" not in host.data:
        data["ansible_host"] = host.name
    return data


def get_subgroups(groups, og):
    """
    Get and return all children of a specific group, in a given subset.
    """

    # specific return for group "all"
    if og.name in "all":
        groups = list(groups.keys())
        groups.remove("all")
        return groups

    # usual behavior
    valid_groups = []
    if "_groups" not in og.data:
        logging.warning(f"group { og.name } does not have any _groups var!")
    else:
        for group in og.data["_groups"]:
            if group in groups:
                valid_groups.append(group)
    return valid_groups


def get_meta(hosts):
    """
    For performance reasons, return all the hosts in a meta dict.
    """

    _meta = {}
    _meta["hostvars"] = {}
    for hostname, host in hosts.items():
        _meta["hostvars"][hostname] = get_hostvars(host)
    return _meta


def get_list(hosts, groups):
    """
    Return list of groups with all hosts.
    """

    _list = {}
    _list["_meta"] = get_meta(hosts)
    # init group array
    for groupname, group in groups.items():
        _list[groupname] = {}
        _list[groupname]["hosts"] = []
        _list[groupname]["vars"] = {}
        _list[groupname]["children"] = get_subgroups(groups, group)
    # append each hosts to their corresponding groups
    for hostname, host in hosts.items():
        _list["all"]["hosts"].append(hostname)
        if "_groups" not in host.data:
            logging.info(f"no _groups data found for host { hostname }")
            continue
        # retrieve _groups variables and force conversion to list
        cleangr = host.data["_groups"]
        cleangr = [cleangr] if isinstance(cleangr, str) else cleangr
        # group assignement based on regexp
        group_regexp(hostname, cleangr)
        # final assignement with correct groupname
        for group in cleangr:
            if group not in groups:
                logging.error(f"group { group } does not exist!")
                continue
            _list[group]["hosts"].append(hostname)
    return _list


def main():
    """
    Main entry.
    """

    # logging config
    logging.basicConfig(level=logging.INFO)
    logging.disable()

    # parse cli arguments
    parser = OptionParser()
    parser.add_option("--host",
                      dest="host",
                      default="_empty",
                      help="show specific host")
    parser.add_option("--list",
                      action="store_true",
                      help="list all groups")
    parser.add_option("--debug",
                      action="store_true",
                      help="show logging informations")
    (options, args) = parser.parse_args()

    # enable debug if required
    if options.debug:
        logging.disable(0)

    # retrieve hosts and groups
    hosts = create_nodes(HOST_DIR)
    groups = create_nodes(GROUP_DIR)

    # return specific hostname
    if options.host != "_empty":
        if options.host in hosts:
            pprint(get_hostvars(hosts[options.host]))
        else:
            pprint("{}")
        exit(0)

    # return all groups
    if options.list:
        pprint(get_list(hosts, groups))
        exit(0)

    # return all hosts
    exit(0)


if __name__ == "__main__":
    main()
remember that computers suck.