ansible/roles/ldap_netdb/files/netdb-update.py

181 lines
5.2 KiB
Python

#!/usr/bin/python3
import sys
import ldap3
import syslog
import requests
from base64 import b64decode
from email.utils import parsedate_to_datetime
from requests.packages.urllib3.exceptions import SubjectAltNameWarning
URL = "https://adm01.home.foo.sh/facts"
def get_url(URL):
return requests.get(
URL,
cert=(
"/etc/pki/tls/certs/ldap01.home.foo.sh.crt",
"/etc/pki/tls/private/ldap01.home.foo.sh.key",
),
verify="/etc/pki/tls/certs/ca.crt",
).json()
def get_hostlist():
for entry in get_url(URL):
mtime = parsedate_to_datetime(entry["mtime"])
yield (entry["name"], mtime)
def get_hostdata(hostname):
return get_url("{}/{}".format(URL, hostname))
def ldap_connect():
server = ldap3.Server("ldapi:///var/run/ldapi")
conn = ldap3.Connection(
server,
authentication=ldap3.SASL,
sasl_mechanism=ldap3.EXTERNAL,
sasl_credentials="",
client_strategy=ldap3.SYNC,
)
conn.bind()
conn.search(
search_base="",
search_filter="(objectClass=*)",
search_scope=ldap3.BASE,
attributes=["namingContexts"],
)
basedn = conn.response[0]["attributes"]["namingContexts"][0]
return (conn, basedn)
def parse_certificate(data):
try:
return b64decode(data["ansible_local"]["ansible_certificate"])
except KeyError:
return []
def parse_description(data):
osinfo = "{} {} ({})".format(
data["ansible_distribution"],
data["ansible_distribution_version"],
data["ansible_architecture"],
)
try:
memory = data["ansible_memory_mb"]["real"]["total"]
except KeyError:
memory = data["ansible_memtotal_mb"]
if memory < 1024:
memory = "{} MB".format(memory)
else:
memory = "{} GB".format(round(memory / 1024, 1))
sysinfo = "{} CPU(s), {} GB memory".format(data["ansible_processor_count"], memory,)
if data["ansible_product_name"] == "KVM":
devinfo = "KVM - Guest"
else:
devinfo = "{} - {}".format(
data["ansible_system_vendor"], data["ansible_product_version"],
)
return "\n".join([devinfo, osinfo, sysinfo])
def parse_location(data):
return "Home"
def parse_sshkey(data):
try:
return "ssh-ed25519 {}".format(data["ansible_ssh_host_key_ed25519_public"])
except KeyError:
return []
def parse_serial(data):
try:
serial = data["ansible_product_serial"]
if serial in ["NA", "System Serial Number"]:
return []
return serial
except KeyError:
return []
def parse_objectclass(data):
objclass = ["device"]
if data["userCertificate;binary"] != []:
objclass.append("strongAuthenticationUser")
if data["sshPublicKey"] != []:
objclass.append("ldapPublicKey")
return objclass
def main(verbose):
requests.packages.urllib3.disable_warnings(SubjectAltNameWarning)
syslog.openlog(
sys.argv[0].split("/")[-1], logoption=syslog.LOG_PID, facility=syslog.LOG_DAEMON
)
(conn, basedn) = ldap_connect()
count = 0
for (hostname, mtime) in get_hostlist():
count = count + 1
if hostname in ("localhost"):
continue
dn = f"cn={hostname},ou=Hosts,{basedn}"
if verbose:
print(f"Processing entry {dn}")
result = conn.search(
search_base=dn,
search_scope=ldap3.BASE,
search_filter=f"(&(objectClass=device)(cn={hostname}))",
attributes=[ldap3.ALL_ATTRIBUTES, "modifyTimeStamp"],
)
if result and conn.response[0]["attributes"]["modifyTimestamp"] > mtime:
continue
facts = get_hostdata(hostname)
# initialize data
if result:
ldap_data = dict(conn.response[0]["attributes"])
del ldap_data["modifyTimestamp"]
else:
ldap_data = {"cn": hostname}
ldap_data["userCertificate;binary"] = parse_certificate(facts)
ldap_data["description"] = parse_description(facts)
ldap_data["l"] = parse_location(facts)
ldap_data["sshPublicKey"] = parse_sshkey(facts)
ldap_data["serialNumber"] = parse_serial(facts)
ldap_data["objectClass"] = parse_objectclass(ldap_data)
if result:
for key in ldap_data:
ldap_data[key] = [(ldap3.MODIFY_REPLACE, ldap_data[key])]
syslog.syslog(syslog.LOG_INFO, f"Updating netdb data for host '{dn}'")
if not conn.modify(dn, ldap_data):
print(dn, conn.result)
else:
ldap_data = dict(filter(lambda x: x[1] != [], ldap_data.items()))
syslog.syslog(syslog.LOG_INFO, f"Adding netdb data for host '{dn}'")
if not conn.add(dn, attributes=ldap_data):
print(dn, conn.result)
if count == 0:
print("ERR: No hosts found")
syslog.closelog()
if __name__ == "__main__":
try:
verbose = False
if len(sys.argv) > 1:
if sys.argv[1] == "-v":
verbose = True
else:
print(f"Usage: {sys.argv[0]} [-v]", file=sys.stderr)
main(verbose)
except KeyboardInterrupt:
sys.exit(1)