#!/usr/bin/env python3 # # This is a dynamic inventory script for Ansible at # rgoncalves.se infrastructure. # # Only groups_vars/ and host_vars/ directory are used for setting up devices # and nodes. # > Files, files, files, ... only files, everything files! # # The main goal here is to allow flexible code reviews and edits, POSIX # compliant actions over files, and using "less is more" moto. Moreover, each # host/group/node is individually track on version control. import os import yaml import logging import json from typing import Final from optparse import OptionParser HOST_DIR: Final = "./host_vars" GROUP_DIR: Final = "./group_vars" class AnsibleInput: """ AnsibleInput class mixin. Meta methods and inits for reading and processing yaml files. """ data = None name = None def __init__(self, filepath): with open(filepath) as file: self.data = yaml.load(file, Loader=yaml.FullLoader) self.name = os.path.splitext(os.path.basename(filepath))[0] def pprint(data): """ Json pretty print for ansible hosts/groups or whatever output we get. """ try: print(json.dumps(data, indent=4, sort_keys=True)) except TypeError: print(data) def create_nodes(directory): """ Build-up and ansible node (Host or Group), then return the full type list. """ nodes = {} for file in os.scandir(directory): node = AnsibleInput(file.path) nodes[node.name] = node return nodes def get_hostvars(host): """ Retrieve and build dynamic variables for a specific host. """ data = host.data if "ansible_host" not in host.data: data["ansible_host"] = host.name return data def get_subgroups(groups, og): """ Get and return all children of a specific group, in a given subset. """ # specific return for group "all" if og.name in "all": groups = list(groups.keys()) groups.remove("all") return groups # usual behavior valid_groups = [] if "_groups" not in og.data: logging.warning(f"group { og.name } does not have any _groups var!") else: for group in og.data["_groups"]: if group in groups: valid_groups.append(group) return valid_groups def get_meta(hosts): """ For performance reasons, return all the hosts in a meta dict. """ _meta = {} _meta["hostvars"] = {} for hostname, host in hosts.items(): _meta["hostvars"][hostname] = get_hostvars(host) # _meta["hostvars"][hostname] = host.data return "{}" return _meta def get_list(hosts, groups): """ Return list of groups with all hosts. """ _list = {} _list["_meta"] = get_meta(hosts) # init group array for groupname, group in groups.items(): _list[groupname] = {} _list[groupname]["hosts"] = [] _list[groupname]["vars"] = {} _list[groupname]["children"] = get_subgroups(groups, group) # append each hosts to their corresponding groups for hostname, host in hosts.items(): _list["all"]["hosts"].append(hostname) if "_groups" not in host.data: logging.info(f"no _groups data found for host { hostname }") continue # retrieve _groups variables and force conversion to list cleangr = host.data["_groups"] cleangr = [cleangr] if isinstance(cleangr, str) else cleangr for group in cleangr: # skip group assignement on non-existence! logging.info(f"searching for group : { group }") if group not in groups: logging.error(f"group { group } does not exist!") continue _list[group]["hosts"].append(hostname) return _list def main(): """ Main entry. """ # logging config logging.basicConfig(level=logging.INFO) logging.disable() # parse cli arguments parser = OptionParser() parser.add_option("--host", dest="host", default="_empty", help="show specific host") parser.add_option("--list", action="store_true", help="list all groups") parser.add_option("--debug", action="store_true", help="show logging informations") (options, args) = parser.parse_args() # enable debug if required if options.debug: logging.disable(0) # retrieve hosts and groups hosts = create_nodes(HOST_DIR) groups = create_nodes(GROUP_DIR) # return specific hostname if options.host != "_empty": if options.host in hosts: pprint(get_hostvars(hosts[options.host])) else: pprint("{}") exit(0) # return all groups if options.list: pprint(get_list(hosts, groups)) exit(0) # return all hosts exit(0) if __name__ == "__main__": main()