aboutsummaryrefslogblamecommitdiffstats
path: root/dynamic.py
blob: 78ebdb252df32ec65ea838a22c96e27a9eec79e3 (plain) (tree)

































































































































































































                                                                             
#!/usr/bin/env python3
#
# This is a dynamic inventory script for Ansible at
# rgoncalves.se infrastructure.
#
# Only groups_vars/ and host_vars/ directory are used for setting up devices
# and nodes.
# > Files, files, files, ... only files, everything files!
#
# The main goal here is to allow flexible code reviews and edits, POSIX
# compliant actions over files, and using "less is more" moto. Moreover, each
# host/group/node is individually track on version control.

import os
import yaml
import logging
import json

from typing import Final
from optparse import OptionParser


HOST_DIR: Final = "./host_vars"
GROUP_DIR: Final = "./group_vars"


class AnsibleInput:
    """
    AnsibleInput class mixin.
    Meta methods and inits for reading and processing yaml files.
    """

    data = None
    name = None

    def __init__(self, filepath):
        with open(filepath) as file:
            self.data = yaml.load(file, Loader=yaml.FullLoader)
            self.name = os.path.splitext(os.path.basename(filepath))[0]


def pprint(data):
    """
    Json pretty print for ansible hosts/groups or whatever output we get.
    """

    try:
        print(json.dumps(data, indent=4, sort_keys=True))
    except TypeError:
        print(data)


def create_nodes(directory):
    """
    Build-up and ansible node (Host or Group),
    then return the full type list.
    """

    nodes = {}
    for file in os.scandir(directory):
        node = AnsibleInput(file.path)
        nodes[node.name] = node
    return nodes


def get_hostvars(host):
    """
    Retrieve and build dynamic variables for a specific host.
    """

    data = host.data
    if "ansible_host" not in host.data:
        data["ansible_host"] = host.name
    return data


def get_subgroups(groups, og):
    """
    Get and return all children of a specific group, in a given subset.
    """

    # specific return for group "all" 
    if og.name in "all":
        groups = list(groups.keys())
        groups.remove("all")
        return groups

    # usual behavior
    valid_groups = []
    if "_groups" not in og.data:
        logging.warning(f"group { og.name } does not have any _groups var!")
    else:
        for group in og.data["_groups"]:
            if group in groups:
                valid_groups.append(group)
    return valid_groups


def get_meta(hosts):
    """
    For performance reasons, return all the hosts in a meta dict.
    """

    _meta = {}
    _meta["hostvars"] = {}
    for hostname, host in hosts.items():
        _meta["hostvars"][hostname] = get_hostvars(host)
        # _meta["hostvars"][hostname] = host.data
    return "{}"
    return _meta


def get_list(hosts, groups):
    """
    Return list of groups with all hosts.
    """

    _list = {}
    _list["_meta"] = get_meta(hosts)
    # init group array
    for groupname, group in groups.items():
        _list[groupname] = {}
        _list[groupname]["hosts"] = []
        _list[groupname]["vars"] = {}
        _list[groupname]["children"] = get_subgroups(groups, group)
    # append each hosts to their corresponding groups
    for hostname, host in hosts.items():
        _list["all"]["hosts"].append(hostname)
        if "_groups" not in host.data:
            logging.info(f"no _groups data found for host { hostname }")
            continue
        # retrieve _groups variables and force conversion to list
        cleangr = host.data["_groups"]
        cleangr = [cleangr] if isinstance(cleangr, str) else cleangr
        for group in cleangr:
            # skip group assignement on non-existence!
            logging.info(f"searching for group : { group }")
            if group not in groups:
                logging.error(f"group { group } does not exist!")
                continue
            _list[group]["hosts"].append(hostname)
    return _list


def main():
    """
    Main entry.
    """

    # logging config
    logging.basicConfig(level=logging.INFO)
    logging.disable()

    # parse cli arguments
    parser = OptionParser()
    parser.add_option("--host",
                      dest="host",
                      default="_empty",
                      help="show specific host")
    parser.add_option("--list",
                      action="store_true",
                      help="list all groups")
    parser.add_option("--debug",
                      action="store_true",
                      help="show logging informations")
    (options, args) = parser.parse_args()

    # enable debug if required
    if options.debug:
        logging.disable(0)

    # retrieve hosts and groups
    hosts = create_nodes(HOST_DIR)
    groups = create_nodes(GROUP_DIR)

    # return specific hostname
    if options.host != "_empty":
        if options.host in hosts:
            pprint(get_hostvars(hosts[options.host]))
        else:
            pprint("{}")
        exit(0)

    # return all groups
    if options.list:
        pprint(get_list(hosts, groups))
        exit(0)

    # return all hosts
    exit(0)


if __name__ == "__main__":
    main()
remember that computers suck.