#!/usr/bin/env python3
#
# This is a dynamic inventory script for Ansible at
# rgoncalves.se infrastructure.
#
# Only groups_vars/ and host_vars/ directory are used for setting up devices
# and nodes.
# > Files, files, files, ... only files, everything files!
#
# The main goal here is to allow flexible code reviews and edits, POSIX
# compliant actions over files, and using "less is more" moto. Moreover, each
# host/group/node is individually track on version control.
import os
import yaml
import logging
import json
from typing import Final
from optparse import OptionParser
HOST_DIR: Final = "./host_vars"
GROUP_DIR: Final = "./group_vars"
class AnsibleInput:
"""
AnsibleInput class mixin.
Meta methods and inits for reading and processing yaml files.
"""
data = None
name = None
def __init__(self, filepath):
with open(filepath) as file:
self.data = yaml.load(file, Loader=yaml.FullLoader)
self.name = os.path.splitext(os.path.basename(filepath))[0]
def pprint(data):
"""
Json pretty print for ansible hosts/groups or whatever output we get.
"""
try:
print(json.dumps(data, indent=4, sort_keys=True))
except TypeError:
print(data)
def create_nodes(directory):
"""
Build-up and ansible node (Host or Group),
then return the full type list.
"""
nodes = {}
for file in os.scandir(directory):
node = AnsibleInput(file.path)
nodes[node.name] = node
return nodes
def get_hostvars(host):
"""
Retrieve and build dynamic variables for a specific host.
"""
data = host.data
if "ansible_host" not in host.data:
data["ansible_host"] = host.name
return data
def get_subgroups(groups, og):
"""
Get and return all children of a specific group, in a given subset.
"""
# specific return for group "all"
if og.name in "all":
groups = list(groups.keys())
groups.remove("all")
return groups
# usual behavior
valid_groups = []
if "_groups" not in og.data:
logging.warning(f"group { og.name } does not have any _groups var!")
else:
for group in og.data["_groups"]:
if group in groups:
valid_groups.append(group)
return valid_groups
def get_meta(hosts):
"""
For performance reasons, return all the hosts in a meta dict.
"""
_meta = {}
_meta["hostvars"] = {}
for hostname, host in hosts.items():
_meta["hostvars"][hostname] = get_hostvars(host)
# _meta["hostvars"][hostname] = host.data
return "{}"
return _meta
def get_list(hosts, groups):
"""
Return list of groups with all hosts.
"""
_list = {}
_list["_meta"] = get_meta(hosts)
# init group array
for groupname, group in groups.items():
_list[groupname] = {}
_list[groupname]["hosts"] = []
_list[groupname]["vars"] = {}
_list[groupname]["children"] = get_subgroups(groups, group)
# append each hosts to their corresponding groups
for hostname, host in hosts.items():
_list["all"]["hosts"].append(hostname)
if "_groups" not in host.data:
logging.info(f"no _groups data found for host { hostname }")
continue
# retrieve _groups variables and force conversion to list
cleangr = host.data["_groups"]
cleangr = [cleangr] if isinstance(cleangr, str) else cleangr
for group in cleangr:
# skip group assignement on non-existence!
logging.info(f"searching for group : { group }")
if group not in groups:
logging.error(f"group { group } does not exist!")
continue
_list[group]["hosts"].append(hostname)
return _list
def main():
"""
Main entry.
"""
# logging config
logging.basicConfig(level=logging.INFO)
logging.disable()
# parse cli arguments
parser = OptionParser()
parser.add_option("--host",
dest="host",
default="_empty",
help="show specific host")
parser.add_option("--list",
action="store_true",
help="list all groups")
parser.add_option("--debug",
action="store_true",
help="show logging informations")
(options, args) = parser.parse_args()
# enable debug if required
if options.debug:
logging.disable(0)
# retrieve hosts and groups
hosts = create_nodes(HOST_DIR)
groups = create_nodes(GROUP_DIR)
# return specific hostname
if options.host != "_empty":
if options.host in hosts:
pprint(get_hostvars(hosts[options.host]))
else:
pprint("{}")
exit(0)
# return all groups
if options.list:
pprint(get_list(hosts, groups))
exit(0)
# return all hosts
exit(0)
if __name__ == "__main__":
main()