#!/usr/bin/env python3 import subprocess import shlex # This script uses the `ovn-nbctl` command to determine if the deployment is # vulnerable to . It does this by finding the set of # switches that have DNS records and that have egress ACLs. If this set has any # members, then the deployment is vulnerable. def build_obj(text: str): objs = dict() obj = dict() cur_uuid = None for line in text.split("\n"): line = line.strip() if len(line) == 0: if cur_uuid: objs[cur_uuid] = obj obj = dict() continue key, _, value = line.partition(":") key = key.strip() value = value.strip() if value.startswith("["): value = value[1:-1] value = [item for item in value.split(",")] obj[key.strip()] = value if key == "_uuid": cur_uuid = value return objs def build_obj_from_db(cmd: str): cmd_list = shlex.split(cmd) sub = subprocess.run(cmd_list, capture_output=True) return build_obj(sub.stdout.decode()) switches = build_obj_from_db("ovn-nbctl list logical_switch") dns = build_obj_from_db("ovn-nbctl find dns records!={}") egress_acls = build_obj_from_db("ovn-nbctl find acl direction=to-lport") port_groups = build_obj_from_db("ovn-nbctl list port_group") # Build the set of switches with DNS records switches_with_dns = set() for uuid, vals in switches.items(): for record in vals["dns_records"]: if record in dns: switches_with_dns.add(uuid) # Build the set of switches with ACLs. Step one is simple, get the switches that # directly have ACLs set on them. switches_with_acls = set() for uuid, vals in switches.items(): for record in vals["acls"]: if record in egress_acls: switches_with_acls.add(uuid) break # Now we need to check if any of the switches' ports belong to a port group with # an egress ACL. Start by creating a map of ports to switches. This way, when we # come across a port in the port group, we can find the switch easily. port_switch_map = dict() for uuid, vals in switches.items(): for port in vals["ports"]: if not port: continue port_switch_map[port] = uuid # Create a filtered dict of port groups that have egress ACLs. port_groups_with_egress_acls = dict() for uuid, vals in port_groups.items(): for record in vals["acls"]: if record in egress_acls: port_groups_with_egress_acls[uuid] = vals break # Now find switches that have ports in the port groups with egress ACLs. Add # these to the switches_with_acls set. for uuid, vals in port_groups_with_egress_acls.items(): for port in vals["ports"]: if port in port_switch_map: switches_with_acls.add(port_switch_map[port]) # Switches with DNS and egress ACLs mean the system is vulnerable. if switches_with_dns & switches_with_acls: print("This installation is vulnerable") else: print("This installation is NOT vulnerable")