#!/usr/bin/python3 import sys import ldap3 import syslog import requests from base64 import b64decode from email.utils import parsedate_to_datetime from requests.packages.urllib3.exceptions import SubjectAltNameWarning URL = "https://adm01.home.foo.sh/facts" def get_url(URL): return requests.get( URL, cert=( "/etc/pki/tls/certs/ldap01.home.foo.sh.crt", "/etc/pki/tls/private/ldap01.home.foo.sh.key", ), verify="/etc/pki/tls/certs/ca.crt", ).json() def get_hostlist(): for entry in get_url(URL): mtime = parsedate_to_datetime(entry["mtime"]) yield (entry["name"], mtime) def get_hostdata(hostname): return get_url("{}/{}".format(URL, hostname)) def ldap_connect(): server = ldap3.Server("ldapi:///var/run/ldapi") conn = ldap3.Connection( server, authentication=ldap3.SASL, sasl_mechanism=ldap3.EXTERNAL, sasl_credentials="", client_strategy=ldap3.SYNC, ) conn.bind() conn.search( search_base="", search_filter="(objectClass=*)", search_scope=ldap3.BASE, attributes=["namingContexts"], ) basedn = conn.response[0]["attributes"]["namingContexts"][0] return (conn, basedn) def parse_certificate(data): try: return b64decode(data["ansible_local"]["ansible_certificate"]) except KeyError: return [] def parse_description(data): osinfo = "{} {} ({})".format( data["ansible_distribution"], data["ansible_distribution_version"], data["ansible_architecture"], ) try: memory = data["ansible_memory_mb"]["real"]["total"] except KeyError: memory = data["ansible_memtotal_mb"] if memory < 1024: memory = "{} MB".format(memory) else: memory = "{} GB".format(round(memory / 1024, 1)) sysinfo = "{} CPU(s), {} GB memory".format(data["ansible_processor_count"], memory,) if data["ansible_product_name"] == "KVM": devinfo = "KVM - Guest" else: devinfo = "{} - {}".format( data["ansible_system_vendor"], data["ansible_product_version"], ) return "\n".join([devinfo, osinfo, sysinfo]) def parse_location(data): return "Home" def parse_sshkey(data): try: return "ssh-ed25519 {}".format(data["ansible_ssh_host_key_ed25519_public"]) except KeyError: return [] def parse_serial(data): try: serial = data["ansible_product_serial"] if serial in ["NA", "System Serial Number"]: return [] return serial except KeyError: return [] def parse_objectclass(data): objclass = ["device"] if data["userCertificate;binary"] != []: objclass.append("strongAuthenticationUser") if data["sshPublicKey"] != []: objclass.append("ldapPublicKey") return objclass def main(verbose): requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) syslog.openlog( sys.argv[0].split("/")[-1], logoption=syslog.LOG_PID, facility=syslog.LOG_DAEMON ) (conn, basedn) = ldap_connect() for (hostname, mtime) in get_hostlist(): if hostname in ("localhost"): continue dn = f"cn={hostname},ou=Hosts,{basedn}" if verbose: print(f"Processing entry {dn}") result = conn.search( search_base=dn, search_scope=ldap3.BASE, search_filter=f"(&(objectClass=device)(cn={hostname}))", attributes=[ldap3.ALL_ATTRIBUTES, "modifyTimeStamp"], ) if result and conn.response[0]["attributes"]["modifyTimestamp"] > mtime: continue facts = get_hostdata(hostname) # initialize data if result: ldap_data = dict(conn.response[0]["attributes"]) del ldap_data["modifyTimestamp"] else: ldap_data = {"cn": hostname} ldap_data["userCertificate;binary"] = parse_certificate(facts) ldap_data["description"] = parse_description(facts) ldap_data["l"] = parse_location(facts) ldap_data["sshPublicKey"] = parse_sshkey(facts) ldap_data["serialNumber"] = parse_serial(facts) ldap_data["objectClass"] = parse_objectclass(ldap_data) if result: for key in ldap_data: ldap_data[key] = [(ldap3.MODIFY_REPLACE, ldap_data[key])] syslog.syslog(syslog.LOG_INFO, f"Updating netdb data for host '{dn}'") if not conn.modify(dn, ldap_data): print(dn, conn.result) else: ldap_data = dict(filter(lambda x: x[1] != [], ldap_data.items())) syslog.syslog(syslog.LOG_INFO, f"Adding netdb data for host '{dn}'") if not conn.add(dn, attributes=ldap_data): print(dn, conn.result) syslog.closelog() if __name__ == "__main__": try: verbose = False if len(sys.argv) > 1: if sys.argv[1] == "-v": verbose = True else: print(f"Usage: {sys.argv[0]} [-v]", file=sys.stderr) main(verbose) except KeyboardInterrupt: sys.exit(1)