403Webshell
Server IP : 23.254.227.96  /  Your IP : 216.73.216.46
Web Server : Apache/2.4.62 (Unix) OpenSSL/1.1.1k
System : Linux hwsrv-1277026.hostwindsdns.com 4.18.0-477.13.1.el8_8.x86_64 #1 SMP Tue May 30 14:53:41 EDT 2023 x86_64
User : viralblo ( 1001)
PHP Version : 8.1.31
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3.6/site-packages/cloudinit/net/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3.6/site-packages/cloudinit/net/eni.py
# This file is part of cloud-init. See LICENSE file for license information.

import copy
import glob
import os
import re

from cloudinit import log as logging
from cloudinit import subp, util

from . import ParserError, renderer
from .network_state import subnet_is_ipv6

LOG = logging.getLogger(__name__)

NET_CONFIG_COMMANDS = [
    "pre-up",
    "up",
    "post-up",
    "down",
    "pre-down",
    "post-down",
]

NET_CONFIG_BRIDGE_OPTIONS = [
    "bridge_ageing",
    "bridge_bridgeprio",
    "bridge_fd",
    "bridge_gcinit",
    "bridge_hello",
    "bridge_maxage",
    "bridge_maxwait",
    "bridge_stp",
]

NET_CONFIG_OPTIONS = [
    "address",
    "netmask",
    "broadcast",
    "network",
    "metric",
    "gateway",
    "pointtopoint",
    "media",
    "mtu",
    "hostname",
    "leasehours",
    "leasetime",
    "vendor",
    "client",
    "bootfile",
    "server",
    "hwaddr",
    "provider",
    "frame",
    "netnum",
    "endpoint",
    "local",
    "ttl",
]


# TODO: switch valid_map based on mode inet/inet6
def _iface_add_subnet(iface, subnet):
    content = []
    valid_map = [
        "address",
        "netmask",
        "broadcast",
        "metric",
        "gateway",
        "pointopoint",
        "mtu",
        "scope",
        "dns_search",
        "dns_nameservers",
    ]
    for key, value in subnet.items():
        if key == "netmask":
            continue
        if key == "address":
            value = "%s/%s" % (subnet["address"], subnet["prefix"])
        if value and key in valid_map:
            if type(value) == list:
                value = " ".join(value)
            if "_" in key:
                key = key.replace("_", "-")
            content.append("    {0} {1}".format(key, value))

    return sorted(content)


# TODO: switch to valid_map for attrs
def _iface_add_attrs(iface, index, ipv4_subnet_mtu):
    # If the index is non-zero, this is an alias interface. Alias interfaces
    # represent additional interface addresses, and should not have additional
    # attributes. (extra attributes here are almost always either incorrect,
    # or are applied to the parent interface.) So if this is an alias, stop
    # right here.
    if index != 0:
        return []
    content = []
    ignore_map = [
        "control",
        "device_id",
        "driver",
        "index",
        "inet",
        "mode",
        "name",
        "subnets",
        "type",
    ]

    # The following parameters require repetitive entries of the key for
    # each of the values
    multiline_keys = [
        "bridge_pathcost",
        "bridge_portprio",
        "bridge_waitport",
    ]

    renames = {"mac_address": "hwaddress"}
    if iface["type"] not in ["bond", "bridge", "infiniband", "vlan"]:
        ignore_map.append("mac_address")

    for key, value in iface.items():
        # convert bool to string for eni
        if type(value) == bool:
            value = "on" if iface[key] else "off"
        if not value or key in ignore_map:
            continue
        if key == "mtu" and ipv4_subnet_mtu:
            if value != ipv4_subnet_mtu:
                LOG.warning(
                    "Network config: ignoring %s device-level mtu:%s because"
                    " ipv4 subnet-level mtu:%s provided.",
                    iface["name"],
                    value,
                    ipv4_subnet_mtu,
                )
            continue
        if key in multiline_keys:
            for v in value:
                content.append("    {0} {1}".format(renames.get(key, key), v))
            continue
        if type(value) == list:
            value = " ".join(value)
        content.append("    {0} {1}".format(renames.get(key, key), value))

    return sorted(content)


def _iface_start_entry(iface, index, render_hwaddress=False):
    fullname = iface["name"]

    control = iface["control"]
    if control == "auto":
        cverb = "auto"
    elif control in ("hotplug",):
        cverb = "allow-" + control
    else:
        cverb = "# control-" + control

    subst = iface.copy()
    subst.update({"fullname": fullname, "cverb": cverb})

    lines = [
        "{cverb} {fullname}".format(**subst),
        "iface {fullname} {inet} {mode}".format(**subst),
    ]
    if render_hwaddress and iface.get("mac_address"):
        lines.append("    hwaddress {mac_address}".format(**subst))

    return lines


def _parse_deb_config_data(ifaces, contents, src_dir, src_path):
    """Parses the file contents, placing result into ifaces.

    '_source_path' is added to every dictionary entry to define which file
    the configration information came from.

    :param ifaces: interface dictionary
    :param contents: contents of interfaces file
    :param src_dir: directory interfaces file was located
    :param src_path: file path the `contents` was read
    """
    currif = None
    for line in contents.splitlines():
        line = line.strip()
        if line.startswith("#"):
            continue
        split = line.split(" ")
        option = split[0]
        if option == "source-directory":
            parsed_src_dir = split[1]
            if not parsed_src_dir.startswith("/"):
                parsed_src_dir = os.path.join(src_dir, parsed_src_dir)
            for expanded_path in glob.glob(parsed_src_dir):
                dir_contents = os.listdir(expanded_path)
                dir_contents = [
                    os.path.join(expanded_path, path)
                    for path in dir_contents
                    if (
                        os.path.isfile(os.path.join(expanded_path, path))
                        and re.match("^[a-zA-Z0-9_-]+$", path) is not None
                    )
                ]
                for entry in dir_contents:
                    with open(entry, "r") as fp:
                        src_data = fp.read().strip()
                    abs_entry = os.path.abspath(entry)
                    _parse_deb_config_data(
                        ifaces, src_data, os.path.dirname(abs_entry), abs_entry
                    )
        elif option == "source":
            new_src_path = split[1]
            if not new_src_path.startswith("/"):
                new_src_path = os.path.join(src_dir, new_src_path)
            for expanded_path in glob.glob(new_src_path):
                with open(expanded_path, "r") as fp:
                    src_data = fp.read().strip()
                abs_path = os.path.abspath(expanded_path)
                _parse_deb_config_data(
                    ifaces, src_data, os.path.dirname(abs_path), abs_path
                )
        elif option == "auto":
            for iface in split[1:]:
                if iface not in ifaces:
                    ifaces[iface] = {
                        # Include the source path this interface was found in.
                        "_source_path": src_path
                    }
                ifaces[iface]["auto"] = True
        elif option == "iface":
            iface, family, method = split[1:4]
            if iface not in ifaces:
                ifaces[iface] = {
                    # Include the source path this interface was found in.
                    "_source_path": src_path
                }
            elif "family" in ifaces[iface]:
                raise ParserError(
                    "Interface %s can only be defined once. "
                    "Re-defined in '%s'." % (iface, src_path)
                )
            ifaces[iface]["family"] = family
            ifaces[iface]["method"] = method
            currif = iface
        elif option == "hwaddress":
            if split[1] == "ether":
                val = split[2]
            else:
                val = split[1]
            ifaces[currif]["hwaddress"] = val
        elif option in NET_CONFIG_OPTIONS:
            ifaces[currif][option] = split[1]
        elif option in NET_CONFIG_COMMANDS:
            if option not in ifaces[currif]:
                ifaces[currif][option] = []
            ifaces[currif][option].append(" ".join(split[1:]))
        elif option.startswith("dns-"):
            if "dns" not in ifaces[currif]:
                ifaces[currif]["dns"] = {}
            if option == "dns-search":
                ifaces[currif]["dns"]["search"] = []
                for domain in split[1:]:
                    ifaces[currif]["dns"]["search"].append(domain)
            elif option == "dns-nameservers":
                ifaces[currif]["dns"]["nameservers"] = []
                for server in split[1:]:
                    ifaces[currif]["dns"]["nameservers"].append(server)
        elif option.startswith("bridge_"):
            if "bridge" not in ifaces[currif]:
                ifaces[currif]["bridge"] = {}
            if option in NET_CONFIG_BRIDGE_OPTIONS:
                bridge_option = option.replace("bridge_", "", 1)
                ifaces[currif]["bridge"][bridge_option] = split[1]
            elif option == "bridge_ports":
                ifaces[currif]["bridge"]["ports"] = []
                for iface in split[1:]:
                    ifaces[currif]["bridge"]["ports"].append(iface)
            elif option == "bridge_hw":
                # doc is confusing and thus some may put literal 'MAC'
                #    bridge_hw MAC <address>
                # but correct is:
                #    bridge_hw <address>
                if split[1].lower() == "mac":
                    ifaces[currif]["bridge"]["mac"] = split[2]
                else:
                    ifaces[currif]["bridge"]["mac"] = split[1]
            elif option == "bridge_pathcost":
                if "pathcost" not in ifaces[currif]["bridge"]:
                    ifaces[currif]["bridge"]["pathcost"] = {}
                ifaces[currif]["bridge"]["pathcost"][split[1]] = split[2]
            elif option == "bridge_portprio":
                if "portprio" not in ifaces[currif]["bridge"]:
                    ifaces[currif]["bridge"]["portprio"] = {}
                ifaces[currif]["bridge"]["portprio"][split[1]] = split[2]
        elif option.startswith("bond-"):
            if "bond" not in ifaces[currif]:
                ifaces[currif]["bond"] = {}
            bond_option = option.replace("bond-", "", 1)
            ifaces[currif]["bond"][bond_option] = split[1]
    for iface in ifaces.keys():
        if "auto" not in ifaces[iface]:
            ifaces[iface]["auto"] = False


def parse_deb_config(path):
    """Parses a debian network configuration file."""
    ifaces = {}
    with open(path, "r") as fp:
        contents = fp.read().strip()
    abs_path = os.path.abspath(path)
    _parse_deb_config_data(
        ifaces, contents, os.path.dirname(abs_path), abs_path
    )
    return ifaces


def convert_eni_data(eni_data):
    # return a network config representation of what is in eni_data
    ifaces = {}
    _parse_deb_config_data(ifaces, eni_data, src_dir=None, src_path=None)
    return _ifaces_to_net_config_data(ifaces)


def _ifaces_to_net_config_data(ifaces):
    """Return network config that represents the ifaces data provided.
    ifaces = parse_deb_config("/etc/network/interfaces")
    config = ifaces_to_net_config_data(ifaces)
    state = parse_net_config_data(config)."""
    devs = {}
    for name, data in ifaces.items():
        # devname is 'eth0' for name='eth0:1'
        devname = name.partition(":")[0]
        if devname not in devs:
            if devname == "lo":
                dtype = "loopback"
            else:
                dtype = "physical"
            devs[devname] = {"type": dtype, "name": devname, "subnets": []}
            # this isnt strictly correct, but some might specify
            # hwaddress on a nic for matching / declaring name.
            if "hwaddress" in data:
                devs[devname]["mac_address"] = data["hwaddress"]
        subnet = {"_orig_eni_name": name, "type": data["method"]}
        if data.get("auto"):
            subnet["control"] = "auto"
        else:
            subnet["control"] = "manual"

        if data.get("method") == "static":
            subnet["address"] = data["address"]

        for copy_key in ("netmask", "gateway", "broadcast"):
            if copy_key in data:
                subnet[copy_key] = data[copy_key]

        if "dns" in data:
            for n in ("nameservers", "search"):
                if n in data["dns"] and data["dns"][n]:
                    subnet["dns_" + n] = data["dns"][n]
        devs[devname]["subnets"].append(subnet)

    return {"version": 1, "config": [devs[d] for d in sorted(devs)]}


class Renderer(renderer.Renderer):
    """Renders network information in a /etc/network/interfaces format."""

    def __init__(self, config=None):
        if not config:
            config = {}
        self.eni_path = config.get("eni_path", "etc/network/interfaces")
        self.eni_header = config.get("eni_header", None)
        self.netrules_path = config.get(
            "netrules_path", "etc/udev/rules.d/70-persistent-net.rules"
        )

    def _render_route(self, route, indent=""):
        """When rendering routes for an iface, in some cases applying a route
        may result in the route command returning non-zero which produces
        some confusing output for users manually using ifup/ifdown[1].  To
        that end, we will optionally include an '|| true' postfix to each
        route line allowing users to work with ifup/ifdown without using
        --force option.

        We may at somepoint not want to emit this additional postfix, and
        add a 'strict' flag to this function.  When called with strict=True,
        then we will not append the postfix.

        1. http://askubuntu.com/questions/168033/
                 how-to-set-static-routes-in-ubuntu-server
        """
        content = []
        up = indent + "post-up route add"
        down = indent + "pre-down route del"
        or_true = " || true"
        mapping = {
            "gateway": "gw",
            "metric": "metric",
        }

        default_gw = ""
        if route["network"] == "0.0.0.0" and route["netmask"] == "0.0.0.0":
            default_gw = " default"
        elif route["network"] == "::" and route["prefix"] == 0:
            default_gw = " -A inet6 default"

        route_line = ""
        for k in ["network", "gateway", "metric"]:
            if default_gw and k == "network":
                continue
            if k == "gateway":
                route_line += "%s %s %s" % (default_gw, mapping[k], route[k])
            elif k in route:
                if k == "network":
                    if ":" in route[k]:
                        route_line += " -A inet6"
                    elif route.get("prefix") == 32:
                        route_line += " -host"
                    else:
                        route_line += " -net"
                    if "prefix" in route:
                        route_line += " %s/%s" % (route[k], route["prefix"])
                else:
                    route_line += " %s %s" % (mapping[k], route[k])
        content.append(up + route_line + or_true)
        content.append(down + route_line + or_true)
        return content

    def _render_iface(self, iface, render_hwaddress=False):
        sections = []
        subnets = iface.get("subnets", {})
        accept_ra = iface.pop("accept-ra", None)
        ethernet_wol = iface.pop("wakeonlan", None)
        if ethernet_wol:
            # Specify WOL setting 'g' for using "Magic Packet"
            iface["ethernet-wol"] = "g"
        if subnets:
            for index, subnet in enumerate(subnets):
                ipv4_subnet_mtu = None
                iface["index"] = index
                iface["mode"] = subnet["type"]
                iface["control"] = subnet.get("control", "auto")
                subnet_inet = "inet"
                if subnet_is_ipv6(subnet):
                    subnet_inet += "6"
                else:
                    ipv4_subnet_mtu = subnet.get("mtu")
                iface["inet"] = subnet_inet
                if (
                    subnet["type"] == "dhcp4"
                    or subnet["type"] == "dhcp6"
                    or subnet["type"] == "ipv6_dhcpv6-stateful"
                ):
                    # Configure network settings using DHCP or DHCPv6
                    iface["mode"] = "dhcp"
                    if accept_ra is not None:
                        # Accept router advertisements (0=off, 1=on)
                        iface["accept_ra"] = "1" if accept_ra else "0"
                elif subnet["type"] == "ipv6_dhcpv6-stateless":
                    # Configure network settings using SLAAC from RAs
                    iface["mode"] = "auto"
                    # Use stateless DHCPv6 (0=off, 1=on)
                    iface["dhcp"] = "1"
                elif subnet["type"] == "ipv6_slaac":
                    # Configure network settings using SLAAC from RAs
                    iface["mode"] = "auto"
                    # Use stateless DHCPv6 (0=off, 1=on)
                    iface["dhcp"] = "0"
                elif subnet_is_ipv6(subnet):
                    # mode might be static6, eni uses 'static'
                    iface["mode"] = "static"
                    if accept_ra is not None:
                        # Accept router advertisements (0=off, 1=on)
                        iface["accept_ra"] = "1" if accept_ra else "0"

                # do not emit multiple 'auto $IFACE' lines as older (precise)
                # ifupdown complains
                if True in [
                    "auto %s" % (iface["name"]) in line for line in sections
                ]:
                    iface["control"] = "alias"

                lines = list(
                    _iface_start_entry(
                        iface, index, render_hwaddress=render_hwaddress
                    )
                    + _iface_add_subnet(iface, subnet)
                    + _iface_add_attrs(iface, index, ipv4_subnet_mtu)
                )
                for route in subnet.get("routes", []):
                    lines.extend(self._render_route(route, indent="    "))

                sections.append(lines)
        else:
            # ifenslave docs say to auto the slave devices
            lines = []
            if "bond-master" in iface or "bond-slaves" in iface:
                lines.append("auto {name}".format(**iface))
            lines.append("iface {name} {inet} {mode}".format(**iface))
            lines.extend(
                _iface_add_attrs(iface, index=0, ipv4_subnet_mtu=None)
            )
            sections.append(lines)
        return sections

    def _render_interfaces(self, network_state, render_hwaddress=False):
        """Given state, emit etc/network/interfaces content."""

        # handle 'lo' specifically as we need to insert the global dns entries
        # there (as that is the only interface that will be always up).
        lo = {
            "name": "lo",
            "type": "physical",
            "inet": "inet",
            "subnets": [{"type": "loopback", "control": "auto"}],
        }
        for iface in network_state.iter_interfaces():
            if iface.get("name") == "lo":
                lo = copy.deepcopy(iface)

        nameservers = network_state.dns_nameservers
        if nameservers:
            lo["subnets"][0]["dns_nameservers"] = " ".join(nameservers)

        searchdomains = network_state.dns_searchdomains
        if searchdomains:
            lo["subnets"][0]["dns_search"] = " ".join(searchdomains)

        # Apply a sort order to ensure that we write out the physical
        # interfaces first; this is critical for bonding
        order = {
            "loopback": 0,
            "physical": 1,
            "infiniband": 2,
            "bond": 3,
            "bridge": 4,
            "vlan": 5,
        }

        sections = []
        sections.extend(self._render_iface(lo))
        for iface in sorted(
            network_state.iter_interfaces(),
            key=lambda k: (order[k["type"]], k["name"]),
        ):

            if iface.get("name") == "lo":
                continue
            sections.extend(
                self._render_iface(iface, render_hwaddress=render_hwaddress)
            )

        for route in network_state.iter_routes():
            sections.append(self._render_route(route))

        return "\n\n".join(["\n".join(s) for s in sections]) + "\n"

    def render_network_state(self, network_state, templates=None, target=None):
        fpeni = subp.target_path(target, self.eni_path)
        util.ensure_dir(os.path.dirname(fpeni))
        header = self.eni_header if self.eni_header else ""
        util.write_file(fpeni, header + self._render_interfaces(network_state))

        if self.netrules_path:
            netrules = subp.target_path(target, self.netrules_path)
            util.ensure_dir(os.path.dirname(netrules))
            util.write_file(
                netrules, self._render_persistent_net(network_state)
            )


def network_state_to_eni(network_state, header=None, render_hwaddress=False):
    # render the provided network state, return a string of equivalent eni
    eni_path = "etc/network/interfaces"
    renderer = Renderer(
        config={
            "eni_path": eni_path,
            "eni_header": header,
            "netrules_path": None,
        }
    )
    if not header:
        header = ""
    if not header.endswith("\n"):
        header += "\n"
    contents = renderer._render_interfaces(
        network_state, render_hwaddress=render_hwaddress
    )
    return header + contents


def available(target=None):
    expected = ["ifquery", "ifup", "ifdown"]
    search = ["/sbin", "/usr/sbin"]
    for p in expected:
        if not subp.which(p, search=search, target=target):
            return False
    eni = subp.target_path(target, "etc/network/interfaces")
    if not os.path.isfile(eni):
        return False

    return True


# vi: ts=4 expandtab

Youez - 2016 - github.com/yon3zu
LinuXploit