#!/usr/bin/env python
#
# ovf.py - Class for OVF/OVA handling
#
# August 2013, Glenn F. Matthews
# Copyright (c) 2013-2017 the COT project developers.
# See the COPYRIGHT.txt file at the top-level directory of this distribution
# and at https://github.com/glennmatthews/cot/blob/master/COPYRIGHT.txt.
#
# This file is part of the Common OVF Tool (COT) project.
# It is subject to the license terms in the LICENSE.txt file found in the
# top-level directory of this distribution and at
# https://github.com/glennmatthews/cot/blob/master/LICENSE.txt. No part
# of COT, including this file, may be copied, modified, propagated, or
# distributed except according to the terms contained in the LICENSE.txt file.
"""Module for handling OVF and OVA virtual machine description files.
**Classes**
.. autosummary::
:nosignatures:
OVF
"""
import logging
import os
import os.path
import re
import tarfile
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import ParseError
import textwrap
from COT.xml_file import XML
from COT.data_validation import (
match_or_die, check_for_conflict, file_checksum,
ValueTooHighError, ValueUnsupportedError, canonicalize_nic_subtype,
)
from COT.file_reference import FileReference, FileOnDisk
from COT.platforms import Platform
from COT.disks import DiskRepresentation
from COT.utilities import pretty_bytes, tar_entry_size
from ..vm_description import VMDescription, VMInitError
from .name_helper import name_helper, CIM_URI
from .hardware import OVFHardware, OVFHardwareDataError
from .item import list_union
from .utilities import (
int_bytes_to_programmatic_units, parse_manifest, programmatic_bytes_to_int,
)
logger = logging.getLogger(__name__)
[docs]class OVF(VMDescription, XML):
"""Representation of the contents of an OVF or OVA.
**Properties**
.. autosummary::
:nosignatures:
input_file
output_file
ovf_version
product_class
platform
config_profiles
default_config_profile
environment_properties
environment_transports
networks
network_descriptions
system_types
version_short
version_long
"""
# API methods to be called by clients
[docs] @staticmethod
def detect_type_from_name(filename):
"""Check the given filename to see if it looks like a type we support.
For our purposes, the file needs to match ".ov[af]" to appear to be
an OVF/OVA file. We also support names like "foo.ovf.20150101" as those
have been seen in the wild.
Does not check file contents, as the given filename may not yet exist.
Args:
filename (str): File name/path
Returns:
str: '.ovf', '.box' or '.ova'
Raises:
ValueUnsupportedError: if filename doesn't match ovf/ova
"""
# We don't care about any directory path
filename = os.path.basename(filename)
extension = os.path.splitext(filename)[1]
if extension == ".ovf" or extension == ".ova" or extension == ".box":
return extension
# Some sources of files are not good about preserving the extension
# and hence tend to append additional extensions - while this may open
# us to incorrect behavior (assuming that 'foo.ovf.zip' is a valid OVF
# when it's probably a zip of an OVF) we'll err on the side of
# accepting too much rather than incorrectly rejecting something like
# "foo.ova.2014.05.06A" that's just lazily named.
match = re.search(r"(\.ov[fa])[^a-zA-Z0-9]", filename)
if match:
extension = match.group(1)
logger.warning("Filename '%s' does not end in '.ovf' or '.ova', "
"but found '%s' in mid-filename; treating as such.",
filename, extension)
return extension
raise ValueUnsupportedError("filename", filename, ('.ovf', '.ova'))
def _ovf_descriptor_from_name(self, input_file):
"""Get the OVF descriptor for the given file.
1. The file may be an OVF descriptor itself.
2. The file may be an OVA, in which case we need to untar it and
return the path to the extracted OVF descriptor.
Args:
input_file (str): Path to an OVF descriptor or OVA file.
Returns:
str: OVF descriptor path
"""
extension = self.detect_type_from_name(input_file)
if extension == '.ova' or extension == '.box':
# Untar the ova to our working directory
return self.untar(input_file)
elif extension == '.ovf':
return input_file
else:
return None
[docs] def __init__(self, input_file, output_file):
"""Open the specified OVF and read its XML into memory.
Args:
input_file (str): Data file to read in.
output_file (str): File name to write to. If this VM is read-only,
(there will never be an output file) this value should be
``None``; if the output filename is not yet known, use ``""``
and subsequently set :attr:`output_file` when it is determined.
Raises:
VMInitError:
* if the OVF descriptor cannot be located
* if an XML parsing error occurs
* if the XML is not actually an OVF descriptor
* if the OVF hardware validation fails
Exception: will call :meth:`destroy` to clean up before reraising
any exception encountered.
"""
try:
self.output_extension = None
VMDescription.__init__(self, input_file, output_file)
# Make sure we know how to read the input
self.ovf_descriptor = self._ovf_descriptor_from_name(input_file)
if self.ovf_descriptor is None:
# We should never get here, but be safe...
raise VMInitError(
2,
"File does not appear to be an OVA or OVF",
input_file)
# Open the provided OVF
try:
XML.__init__(self, self.ovf_descriptor)
except ParseError as exc:
raise VMInitError(2,
"XML error in parsing file: " + str(exc),
self.ovf_descriptor)
# Quick sanity check before we go any further:
if ((not re.search(r"Envelope", self.root.tag)) or
(XML.strip_ns(self.root.tag) != 'Envelope')):
raise VMInitError(
2,
"File does not appear to be an OVF descriptor - "
"expected top-level element {0} but found {1} instead"
.format('Envelope', self.root.tag),
self.ovf_descriptor)
self._ovf_version = None
self.name_helper = name_helper(self.ovf_version)
for (prefix, uri) in self.NSM.items():
ET.register_namespace(prefix, uri)
# Register additional non-standard namespaces we're aware of:
ET.register_namespace('vmw', "http://www.vmware.com/schema/ovf")
ET.register_namespace('vbox',
"http://www.virtualbox.org/ovf/machine")
ET.register_namespace(
'pasd',
CIM_URI + "/cim-schema/2/CIM_ProcessorAllocationSettingData")
# Go ahead and set pointers to some of the most useful XML sections
self.envelope = self.root
self.references = self.find_child(
self.envelope,
self.REFERENCES,
required=True)
self.disk_section = self.find_child(
self.envelope,
self.DISK_SECTION,
attrib=self.DISK_SECTION_ATTRIB)
self.network_section = self.find_child(
self.envelope,
self.NETWORK_SECTION,
attrib=self.NETWORK_SECTION_ATTRIB)
self.deploy_opt_section = self.find_child(
self.envelope,
self.DEPLOY_OPT_SECTION,
required=False)
self.virtual_system = self.find_child(
self.envelope,
self.VIRTUAL_SYSTEM,
attrib=self.VIRTUAL_SYSTEM_ATTRIB,
required=True)
self.product_section = self.find_child(
self.virtual_system,
self.PRODUCT_SECTION,
attrib=self.PRODUCT_SECTION_ATTRIB)
self.annotation_section = self.find_child(
self.virtual_system,
self.ANNOTATION_SECTION,
attrib=self.ANNOTATION_SECTION_ATTRIB)
self.virtual_hw_section = self.find_child(
self.virtual_system,
self.VIRTUAL_HW_SECTION,
attrib=self.VIRTUAL_HW_SECTION_ATTRIB,
required=True)
# Initialize various caches
self._configuration_profiles = None
self._file_references = {}
self._platform = None
try:
self.hardware = OVFHardware(self)
except OVFHardwareDataError as exc:
raise VMInitError(1,
"OVF descriptor is invalid: {0}".format(exc),
self.ovf_descriptor)
assert self.platform
self.file_references = self._init_check_file_entries()
"""Dictionary of FileReferences for this package.
Does not include the manifest file."""
except Exception:
self.destroy()
raise
def _compare_file_lists(self, descriptor_file_list, manifest_file_list):
"""Helper for _init_check_file_entries method.
Args:
descriptor_file_list (list): List of file names derived from the
OVF descriptor.
manifest_file_list (list): List of file names derived from the
manifest file (minus the descriptor itself).
"""
if not manifest_file_list:
return
descriptor_in_manifest = False
# DSP0243 2.1.0: "The manifest file shall contain SHA digests for all
# distinct files referenced in the References element
# of the OVF descriptor and for no other files."
for filename in manifest_file_list:
if filename == os.path.basename(self.ovf_descriptor):
# Manifest should reference the descriptor, but of course the
# descriptor does not reference itself
descriptor_in_manifest = True
elif filename not in descriptor_file_list:
logger.error("The manifest lists file '%s' but the OVF"
" descriptor does not include it in its"
" References section", filename)
for filename in descriptor_file_list:
if filename not in manifest_file_list:
logger.error("The OVF descriptor references file '%s' but"
" this file is not included in the manifest",
filename)
if not descriptor_in_manifest:
logger.error("The manifest does not list the OVF descriptor")
def _init_check_file_entries(self):
"""Check files described in the OVF and store file references.
Also compare the referenced files against the manifest, if any.
Returns:
dict: File HREF (file name) --> :class:`~COT.FileReference` object.
Note that this does *not* include the OVF manifest file.
"""
descriptor_files = dict(
[(elem.get(self.FILE_HREF), elem.get(self.FILE_SIZE)) for
elem in self.references.findall(self.FILE)])
if self.input_file == self.ovf_descriptor:
# Check files in the directory referenced by the OVF descriptor
input_path = os.path.dirname(os.path.abspath(self.ovf_descriptor))
else:
# OVA - check contents of TAR file.
input_path = os.path.abspath(self.input_file)
file_references = {}
mf_filename = os.path.splitext(
os.path.basename(self.ovf_descriptor))[0] + ".mf"
manifest_entries = {}
try:
# We don't store the manifest file itself in file_references,
# as it's basically a read-once file and storing it in the file
# references causes much confusion when writing back out to
# generate the OVF descriptor and manifest file.
manifest_file = FileReference.create(input_path, mf_filename)
with manifest_file.open('rb') as file_obj:
manifest_text = file_obj.read().decode()
manifest_entries = parse_manifest(manifest_text)
except IOError:
logger.debug("Manifest file is missing or unreadable.")
self._compare_file_lists(descriptor_files.keys(),
manifest_entries.keys())
# Check the checksum of the descriptor itself
# We don't store this in file_references as that would be
# prone to self-recursion.
m_algo, m_cksum = manifest_entries.get(
os.path.basename(self.ovf_descriptor), (None, None))
if m_algo and m_algo != self.checksum_algorithm:
# TODO: log a warning? Discard the checksum?
pass
FileReference.create(
input_path, os.path.basename(self.ovf_descriptor),
checksum_algorithm=self.checksum_algorithm,
expected_checksum=m_cksum)
# Now check the checksum of the other files
for file_href, file_size in descriptor_files.items():
m_algo, m_cksum = manifest_entries.get(file_href, (None, None))
if m_algo and m_algo != self.checksum_algorithm:
# TODO: log a warning? Discard the checksum?
pass
try:
file_references[file_href] = FileReference.create(
input_path, file_href,
checksum_algorithm=self.checksum_algorithm,
expected_checksum=m_cksum,
expected_size=file_size)
except IOError:
logger.error("File '%s' referenced in the OVF descriptor "
"does not exist.", file_href)
continue
return file_references
@property
def output_file(self):
"""OVF or OVA file that will be created or updated by :meth:`write`.
Raises:
ValueUnsupportedError: if :func:`detect_type_from_name` fails
"""
return super(OVF, self).output_file
@output_file.setter
def output_file(self, output_file):
# Make sure we can write the requested output format, or abort:
if output_file:
self.output_extension = self.detect_type_from_name(output_file)
super(OVF, self.__class__).output_file.fset(self, output_file)
@property
def ovf_version(self):
"""Float representing the OVF specification version in use.
Supported values at present are 0.9, 1.0, and 2.0.
"""
if self._ovf_version is None:
root_namespace = XML.get_ns(self.root.tag)
logger.debug("Root namespace is " + root_namespace)
if root_namespace == 'http://www.vmware.com/schema/ovf/1/envelope':
logger.info("OVF version is 0.9")
self._ovf_version = 0.9
elif root_namespace == 'http://schemas.dmtf.org/ovf/envelope/1':
logger.info("OVF version is 1.x")
self._ovf_version = 1.0
elif root_namespace == 'http://schemas.dmtf.org/ovf/envelope/2':
logger.info("OVF version is 2.x")
self._ovf_version = 2.0
else:
raise VMInitError(
2,
"File has an Envelope but it is in unknown namespace '{0}'"
.format(root_namespace),
self.ovf_descriptor)
return self._ovf_version
@property
def checksum_algorithm(self):
"""The preferred file checksum algorithm for this OVF."""
if self.ovf_version >= 2.0:
# OVF 2.x uses SHA256 for manifest
return 'sha256'
else:
# OVF 0.x and 1.x use SHA1
return 'sha1'
@property
def product_class(self):
"""The product class identifier, such as com.cisco.csr1000v."""
if self._product_class is None and self.product_section is not None:
self._product_class = self.product_section.get(self.PRODUCT_CLASS)
return super(OVF, self).product_class
@product_class.setter
def product_class(self, product_class):
if product_class == self.product_class:
return
self.product_section = self._ensure_section(
self.PRODUCT_SECTION,
"Product Information",
attrib=self.PRODUCT_SECTION_ATTRIB,
parent=self.virtual_system)
if self.product_class:
logger.debug("Changing product class from '%s' to '%s'",
self.product_class, product_class)
self.product_section.set(self.PRODUCT_CLASS, product_class)
self._product_class = product_class
# Change platform as well!
self._platform = None
assert self.platform
@property
def platform(self):
"""The platform type, as determined from the OVF descriptor.
This will be the class :class:`~COT.platforms.Platform` or
a more-specific subclass if recognized as such.
"""
if self._platform is None:
self._platform = Platform.for_product_string(self.product_class)
logger.info("OVF product class %s --> platform %s",
self.product_class, self.platform)
return self._platform
[docs] def validate_hardware(self):
"""Check sanity of hardware properties for this VM/product/platform.
Returns:
bool: ``True`` if hardware is sane, ``False`` if not.
"""
result = True
# TODO refactor to share logic with profile_info_list()
profile_ids = self.config_profiles
if not profile_ids:
profile_ids = [None]
plat = self.platform
def _validate_helper(label, validator, *args):
"""Call validation function, catch errors and warn user instead.
Args:
label (str): Label to prepend to any warning messages
validator (function): Validation function to call.
*args (list): Arguments to validation function.
Returns:
bool: True if valid, False if invalid
"""
try:
validator(*args)
return True
except ValueUnsupportedError as exc:
logger.warning(label + str(exc))
return False
for profile_id in profile_ids:
profile_str = ""
if profile_id:
profile_str = "In profile '{0}':".format(profile_id)
cpu_item = self.hardware.find_item('cpu', profile=profile_id)
if cpu_item:
cpus = cpu_item.get_value(self.VIRTUAL_QUANTITY,
[profile_id])
result &= _validate_helper(profile_str,
plat.validate_cpu_count, int(cpus))
ram_item = self.hardware.find_item('memory', profile=profile_id)
if ram_item:
megabytes = (programmatic_bytes_to_int(
ram_item.get_value(self.VIRTUAL_QUANTITY, [profile_id]),
ram_item.get_value(self.ALLOCATION_UNITS, [profile_id])
) / (1024 * 1024))
result &= _validate_helper(profile_str,
plat.validate_memory_amount,
int(megabytes))
nics = self.hardware.get_item_count('ethernet', profile_id)
result &= _validate_helper(profile_str,
plat.validate_nic_count, nics)
eth_subtypes = list_union(
*[eth.get_all_values(self.RESOURCE_SUB_TYPE) for
eth in self.hardware.find_all_items('ethernet')])
result &= _validate_helper(profile_str,
plat.validate_nic_types, eth_subtypes)
# TODO: validate_ide_subtypes
# TODO: validate_scsi_subtypes
return result
@property
def config_profiles(self):
"""The list of supported configuration profiles.
If this OVF has no defined profiles, returns an empty list.
If there is a default profile, it will be first in the list.
"""
if self._configuration_profiles is None:
profile_ids = []
if self.deploy_opt_section is not None:
profiles = self.deploy_opt_section.findall(self.CONFIG)
for profile in profiles:
# Force the "default" profile to the head of the list
if (profile.get(self.CONFIG_DEFAULT) == 'true' or
profile.get(self.CONFIG_DEFAULT) == '1'):
profile_ids.insert(0, profile.get(self.CONFIG_ID))
else:
profile_ids.append(profile.get(self.CONFIG_ID))
logger.verbose("Current configuration profiles are: %s",
profile_ids)
self._configuration_profiles = profile_ids
return self._configuration_profiles
@property
def environment_properties(self):
"""The array of environment properties.
Array of dicts (one per property) with the keys ``"key"``, ``"value"``,
``"qualifiers"``, ``"type"``, ``"user_configurable"``, ``"label"``,
and ``"description"``.
"""
result = []
if self.ovf_version < 1.0 or self.product_section is None:
return result
elems = self.product_section.findall(self.PROPERTY)
for elem in elems:
label = elem.findtext(self.PROPERTY_LABEL, "")
descr = elem.findtext(self.PROPERTY_DESC, "")
result.append({
'key': elem.get(self.PROP_KEY),
'value': elem.get(self.PROP_VALUE),
'qualifiers': elem.get(self.PROP_QUAL, ""),
'type': elem.get(self.PROP_TYPE, ""),
'user_configurable': elem.get(self.PROP_USER_CONFIGABLE, ""),
'label': label,
'description': descr,
})
return result
@property
def environment_transports(self):
"""The list of environment transport method strings."""
if self.ovf_version < 1.0:
return None
if self.virtual_hw_section is not None:
value = self.virtual_hw_section.get(self.ENVIRONMENT_TRANSPORT)
if value:
return value.split(" ")
return None
@environment_transports.setter
def environment_transports(self, transports):
if self.ovf_version < 1.0:
raise NotImplementedError("No support for setting environment"
"transports value on OVF 0.9 format.")
transports_string = " ".join(transports)
logger.debug("Setting %s to '%s'", self.ENVIRONMENT_TRANSPORT,
transports_string)
self.virtual_hw_section.set(self.ENVIRONMENT_TRANSPORT,
transports_string)
@property
def networks(self):
"""The list of network names currently defined in this VM."""
if self.network_section is None:
return []
return [network.get(self.NETWORK_NAME) for
network in self.network_section.findall(self.NETWORK)]
@property
def network_descriptions(self):
"""The list of network descriptions currently defined in this VM.
Returns:
list: List of network description strings
"""
if self.network_section is None:
return []
return [network.findtext(self.NWK_DESC, "") for
network in self.network_section.findall(self.NETWORK)]
@property
def system_types(self):
"""List of virtual system type(s) supported by this virtual machine.
For an OVF, this corresponds to the ``VirtualSystemType`` element.
"""
if self.virtual_hw_section is not None:
system = self.virtual_hw_section.find(self.SYSTEM)
if system is not None:
value = system.findtext(self.VIRTUAL_SYSTEM_TYPE, None)
if value:
return value.split(" ")
return None
@system_types.setter
def system_types(self, type_list):
type_string = " ".join(type_list)
logger.debug("Setting VirtualSystemType to '%s'", type_string)
system = self.virtual_hw_section.find(self.SYSTEM)
if system is None:
system = XML.set_or_make_child(self.virtual_hw_section,
self.SYSTEM,
ordering=(self.INFO, self.SYSTEM,
self.ITEM))
# A System must have some additional children to be valid:
XML.set_or_make_child(system, self.VSSD + "ElementName",
"Virtual System Type")
XML.set_or_make_child(system, self.VSSD + "InstanceID", 0)
XML.set_or_make_child(system, self.VIRTUAL_SYSTEM_TYPE, type_string)
@property
def product(self):
"""Short descriptive product string (XML ``Product`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.PRODUCT, None)
return None
@product.setter
def product(self, product_string):
logger.debug("Updating Product element in OVF")
self._set_product_section_child(self.PRODUCT, product_string)
@property
def vendor(self):
"""Short descriptive vendor string (XML ``Vendor`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.VENDOR, None)
return None
@vendor.setter
def vendor(self, vendor_string):
logger.debug("Updating Vendor element in OVF")
self._set_product_section_child(self.VENDOR, vendor_string)
@property
def version_short(self):
"""Short descriptive version string (XML ``Version`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.VERSION, None)
return None
@version_short.setter
def version_short(self, version_string):
logger.debug("Updating Version element in OVF")
self._set_product_section_child(self.VERSION, version_string)
@property
def version_long(self):
"""Long descriptive version string (XML ``FullVersion`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.FULL_VERSION, None)
return None
@version_long.setter
def version_long(self, version_string):
logger.debug("Updating FullVersion element in OVF")
self._set_product_section_child(self.FULL_VERSION, version_string)
@property
def product_url(self):
"""Product URL string (XML ``ProductUrl`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.PRODUCT_URL, None)
return None
@product_url.setter
def product_url(self, product_url_string):
logger.debug("Updating ProductUrl element in OVF")
self._set_product_section_child(self.PRODUCT_URL, product_url_string)
@property
def vendor_url(self):
"""Vendor URL string (XML ``VendorUrl`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.VENDOR_URL, None)
return None
@vendor_url.setter
def vendor_url(self, vendor_url_string):
logger.debug("Updating VendorUrl element in OVF")
self._set_product_section_child(self.VENDOR_URL, vendor_url_string)
@property
def application_url(self):
"""Application URL string (XML ``AppUrl`` element)."""
if self.product_section is not None:
return self.product_section.findtext(self.APPLICATION_URL, None)
return None
@application_url.setter
def application_url(self, app_url_string):
logger.debug("Updating AppUrl element in OVF")
self._set_product_section_child(self.APPLICATION_URL, app_url_string)
def __getattr__(self, name):
"""Transparently pass attribute lookups off to name_helper.
Args:
name (str): Attribute being looked up.
Returns:
Attribute value
Raises:
AttributeError: Magic methods (``__foo``) will not be passed
through but will raise an AttributeError as usual.
"""
# Don't pass 'special' attributes through to the helper
if re.match(r"^__", name):
raise AttributeError("'OVF' object has no attribute '{0}'"
.format(name))
return getattr(self.name_helper, name)
[docs] def predicted_output_size(self):
"""Estimate how much disk space (in bytes) is needed to write out.
Since OVA (TAR) is an uncompressed format, the disk space required
is approximately the same for both OVF and OVA output. Thus we can
provide this value even if :attr:`output_file` is ``None``.
In the TAR format, each file in the archive has a 512-byte header
and its total size is rounded up to a multiple of 512 bytes. The
archive is terminated by 2 512-byte blocks filled with zero, and
the overall archive file size is a multiple of 10 kiB.
Returns:
int: Estimated number of bytes consumed when writing out to
:attr:`output_file` (plus any associated files).
"""
# Size of the OVF descriptor
needed = tar_entry_size(len(ET.tostring(self.root)))
# Account for the size of all the referenced files
manifest_size = 0
for href, file_ref in self.file_references.items():
# Approximate size of a manifest entry for this file
if self.ovf_version >= 2.0:
# SHA256(href)= <64 hex digits>
# so 64 + href length + ~12 other characters
manifest_size += 76 + len(href)
else:
# SHA1(href)= <40 hex digits>
# so 40 + href length + ~10 other characters
manifest_size += 50 + len(href)
# Size of the file proper
needed += tar_entry_size(file_ref.size)
# Manifest file
needed += tar_entry_size(manifest_size)
# Archive end - two 512-byte records filled with zeros
needed += 1024
# Overall size must be a multiple of 10 kiB
needed += (10240 - needed) % 10240
logger.debug("Estimated output size is %s", pretty_bytes(needed))
return needed
[docs] def write(self):
"""Write OVF or OVA to :attr:`output_file`, if set."""
if not self.output_file:
return
logger.info("Updating and validating internal data before writing"
" out to disk")
prefix = os.path.splitext(self.output_file)[0]
extension = self.output_extension
# Update the XML ElementTree to reflect any hardware changes
self.hardware.update_xml()
# Validate the hardware to be written
self.validate_hardware()
# Make sure file references are correct:
self._refresh_file_references()
# Make sure all defined networks are actually used by NICs,
# and delete any networks that are unused.
self._refresh_networks()
logger.info("Writing out to file %s", self.output_file)
if extension == '.ova':
ovf_file = os.path.join(self.working_dir, "{0}.ovf"
.format(os.path.basename(prefix)))
self.write_xml(ovf_file)
self.generate_manifest(ovf_file)
self.tar(ovf_file, self.output_file)
elif extension == '.ovf':
self.write_xml(self.output_file)
# Copy all files from working directory to destination
dest_dir = os.path.dirname(os.path.abspath(self.output_file))
for file_ref in self.file_references.values():
file_ref.copy_to(dest_dir)
# Generate manifest
self.generate_manifest(self.output_file)
else:
# We should never get here, but to be safe:
raise NotImplementedError("Not sure how to write a '{0}' file"
.format(extension))
def _refresh_file_references(self):
"""Check all File entries to make sure they are valid and up to date.
Helper method for :func:`write`.
"""
# Refresh the file references
to_delete = []
for filename, file_ref in self.file_references.items():
if file_ref.exists:
file_ref.refresh()
else:
# file used to exist but no longer does??
logger.error("Referenced file '%s' does not exist!", filename)
to_delete.append(filename)
for filename in to_delete:
del self.file_references[filename]
for file_elem in self.references.findall(self.FILE):
href = file_elem.get(self.FILE_HREF)
if href not in self.file_references:
# TODO this should probably have a confirm() check...
logger.notice("Removing reference to missing file %s", href)
self.references.remove(file_elem)
# TODO remove references to this file from Disk, Item?
for filename, file_ref in self.file_references.items():
file_elem = self.find_child(self.references, self.FILE,
{self.FILE_HREF: filename})
assert file_elem is not None
file_elem.set(self.FILE_SIZE, str(file_ref.size))
real_capacity = None
disk_item = self.find_disk_from_file_id(
file_elem.get(self.FILE_ID))
if disk_item is not None:
# We can't check disk capacity inside a tar file.
# It seems wasteful to extract the disk file (could be
# quite large) from the TAR just to check, so we don't.
if file_ref.file_path is not None:
diskrep = DiskRepresentation.from_file(file_ref.file_path)
real_capacity = diskrep.capacity
if disk_item is not None and real_capacity is not None:
reported_capacity = str(self.get_capacity_from_disk(disk_item))
if reported_capacity != real_capacity:
logger.warning(
"Capacity of disk '%s' seems to have changed "
"from %s (reported in the original OVF) "
"to %s (actual capacity). "
"The updated OVF will reflect this change.",
filename, reported_capacity, real_capacity)
self.set_capacity_of_disk(disk_item, real_capacity)
def _refresh_networks(self):
"""Make sure all defined networks are actually used by NICs.
Delete any networks that are unused and warn the user.
Helper method for :func:`write`.
"""
if self.network_section is None:
return
networks = self.network_section.findall(self.NETWORK)
items = self.virtual_hw_section.findall(self.ETHERNET_PORT_ITEM)
connected_networks = set()
for item in items:
conn = item.find(self.EPASD + self.CONNECTION)
if conn is not None:
connected_networks.add(conn.text)
for net in networks:
name = net.get(self.NETWORK_NAME)
if name not in connected_networks:
logger.notice("Removing unused network %s", name)
self.network_section.remove(net)
# If all networks were removed, remove the NetworkSection too
if not self.network_section.findall(self.NETWORK):
logger.notice("No networks left - removing NetworkSection")
self.envelope.remove(self.network_section)
self.network_section = None
def _info_string_header(self, width):
"""Generate OVF/OVA file header for :meth:`info_string`.
Args:
width (int): Line length to wrap to where possible.
Returns:
str: File header
"""
str_list = []
str_list.append('-' * width)
str_list.append(self.input_file)
if self.platform and self.platform.__class__ is not Platform:
str_list.append("COT detected platform type: {0}"
.format(self.platform))
str_list.append('-' * width)
return '\n'.join(str_list)
def _info_string_product(self, verbosity_option, wrapper):
"""Generate product information as part of :meth:`info_string`.
Args:
verbosity_option (str): 'brief', None (default), or 'verbose'
wrapper (textwrap.TextWrapper): Helper object for wrapping text
lines if needed.
Returns:
str: Product information
"""
if ((not any([self.product, self.vendor, self.version_short])) and
(verbosity_option == 'brief' or not any([
self.product_url, self.vendor_url, self.version_long]))):
return None
str_list = []
wrapper.initial_indent = ''
wrapper.subsequent_indent = ' '
# All elements in this section are optional
for label, value, default, verbose_only in [
["Product: ", self.product, "(No product string)", False],
[" ", self.product_url, "(No product URL)", True],
["Vendor: ", self.vendor, "(No vendor string)", False],
[" ", self.vendor_url, "(No vendor URL)", True],
["Version: ", self.version_short,
"(No version string)", False],
[" ", self.version_long,
"(No detailed version string)", True],
]:
if verbosity_option == 'brief' and verbose_only:
continue
if value is None:
value = default
str_list.extend(wrapper.wrap("{0}{1}".format(label, value)))
return "\n".join(str_list)
def _info_string_annotation(self, wrapper):
"""Generate annotation information as part of :meth:`info_string`.
Args:
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: Annotation information string, or None
"""
if self.annotation_section is None:
return None
ann = self.annotation_section.find(self.ANNOTATION)
if ann is None or not ann.text:
return None
str_list = []
first = True
wrapper.initial_indent = 'Annotation: '
wrapper.subsequent_indent = ' '
for line in ann.text.splitlines():
if not line:
str_list.append("")
else:
str_list.extend(wrapper.wrap(line))
if first:
wrapper.initial_indent = wrapper.subsequent_indent
first = False
return "\n".join(str_list)
def _info_string_eula(self, verbosity_option, wrapper):
"""Generate EULA information as part of :meth:`info_string`.
Args:
verbosity_option (str): 'brief', None (default), or 'verbose'
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: EULA information
"""
# An OVF may have zero, one, or more
eula_header = False
str_list = []
for eula in self.find_all_children(self.virtual_system,
self.EULA_SECTION,
self.EULA_SECTION_ATTRIB):
info = eula.find(self.INFO)
lic = eula.find(self.EULA_LICENSE)
if lic is not None and lic.text:
if not eula_header:
str_list.append("End User License Agreement(s):")
eula_header = True
if info is not None and info.text:
wrapper.initial_indent = ' '
wrapper.subsequent_indent = ' '
str_list.extend(wrapper.wrap(info.text))
if verbosity_option != 'verbose':
str_list.append(" (not displayed, use 'cot info "
"--verbose' if desired)")
else:
wrapper.initial_indent = ' '
wrapper.subsequent_indent = ' '
for line in lic.text.splitlines():
if not line:
str_list.append("")
else:
str_list.extend(wrapper.wrap(line))
return "\n".join(str_list)
INFO_STRING_DISK_TEMPLATE = (
"{{0:{0}}} " # file/disk name - width is dynamically set
"{{1:>9}} " # file size - width 9 for "999.9 MiB"
"{{2:>9}} " # disk capacity - width 9 for "999.9 MiB"
"{{3:.20}}" # disk info - width 20 for "harddisk @ SCSI 1:15"
)
INFO_STRING_DISK_COLUMNS_WIDTH = (1 + 9 + 1 + 9 + 1 + 20)
INFO_STRING_FILE_TEMPLATE = (
"{{0:{0}}} " # file/disk name - width is dynamically set
"{{1:>9}}" # file size - width 9 for "999.9 MiB"
)
def _info_strings_for_file(self, file_obj):
"""Get attributes of a file which may describe a disk as well.
Helper for :meth:`_info_string_files_disks`.
Args:
file_obj (xml.etree.ElementTree.Element): File to inspect
Returns:
tuple: (file_id, file_size, disk_id, disk_capacity, device_info)
"""
# FILE_SIZE is optional
reported_size = file_obj.get(self.FILE_SIZE)
if reported_size is None:
# TODO - check file size in working dir and/or tarfile
file_size_str = ""
else:
file_size_str = pretty_bytes(reported_size)
disk_obj = self.find_disk_from_file_id(file_obj.get(self.FILE_ID))
if disk_obj is None:
disk_id = ""
disk_cap_string = ""
device_item = self.find_item_from_file(file_obj)
else:
disk_id = disk_obj.get(self.DISK_ID)
disk_cap_string = pretty_bytes(
self.get_capacity_from_disk(disk_obj))
device_item = self.find_item_from_disk(disk_obj)
device_str = self.device_info_str(device_item)
return (file_obj.get(self.FILE_ID),
file_size_str,
disk_id,
disk_cap_string,
device_str)
def _info_string_files_disks(self, width, verbosity_option):
"""Describe files and disks as part of :meth:`info_string`.
Args:
width (int): Line length to wrap to where possible.
verbosity_option (str): 'brief', None (default), or 'verbose'
Returns:
str: File/disk information string, or None
"""
file_list = self.references.findall(self.FILE)
disk_list = (self.disk_section.findall(self.DISK)
if self.disk_section is not None else [])
if not (file_list or disk_list):
return None
href_w = 0
if file_list:
href_w = max([len(f.get(self.FILE_HREF)) for f in file_list])
href_w = min(href_w, (width - self.INFO_STRING_DISK_COLUMNS_WIDTH - 2))
href_w = max(href_w, 18) # len("(placeholder disk)")
href_w += 2 # leading whitespace for disks
template = self.INFO_STRING_DISK_TEMPLATE.format(href_w)
template2 = self.INFO_STRING_FILE_TEMPLATE.format(href_w)
str_list = [template.format("Files and Disks:",
"File Size", "Capacity", "Device"),
template.format("", "---------", "---------",
"--------------------")]
for file_obj in file_list:
(file_id, file_size,
disk_id, disk_cap, device_str) = self._info_strings_for_file(
file_obj)
href_str = " " + file_obj.get(self.FILE_HREF)
# Truncate to fit in available space
if len(href_str) > href_w:
href_str = href_str[:(href_w-3)] + "..."
if disk_cap or device_str:
str_list.append(template.format(href_str, file_size,
disk_cap, device_str))
else:
str_list.append(template2.format(href_str, file_size))
if verbosity_option == 'verbose':
str_list.append(" File ID: {0}".format(file_id))
if disk_id:
str_list.append(" Disk ID: {0}".format(disk_id))
# Find placeholder disks as well
for disk in disk_list:
file_id = disk.get(self.DISK_FILE_REF)
file_obj = self.find_child(self.references, self.FILE,
attrib={self.FILE_ID: file_id})
if file_obj is not None:
continue # already reported on above
disk_cap_string = pretty_bytes(self.get_capacity_from_disk(disk))
device_item = self.find_item_from_disk(disk)
device_str = self.device_info_str(device_item)
str_list.append(template.format(" (disk placeholder)",
"--",
disk_cap_string,
device_str))
return "\n".join(str_list)
def _info_string_hardware(self, wrapper):
"""Describe hardware subtypes as part of :meth:`info_string`.
Args:
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: Hardware information string, or None
"""
virtual_system_types = self.system_types
scsi_subtypes = list_union(
*[scsi_ctrl.get_all_values(self.RESOURCE_SUB_TYPE) for
scsi_ctrl in self.hardware.find_all_items('scsi')])
ide_subtypes = list_union(
*[ide_ctrl.get_all_values(self.RESOURCE_SUB_TYPE) for
ide_ctrl in self.hardware.find_all_items('ide')])
eth_subtypes = list_union(
*[eth.get_all_values(self.RESOURCE_SUB_TYPE) for
eth in self.hardware.find_all_items('ethernet')])
if ((virtual_system_types is not None) or
(scsi_subtypes or ide_subtypes or eth_subtypes)):
str_list = ["Hardware Variants:"]
wrapper.subsequent_indent = ' ' * 28
if virtual_system_types is not None:
wrapper.initial_indent = " System types: "
str_list.extend(wrapper.wrap(" ".join(virtual_system_types)))
if scsi_subtypes:
wrapper.initial_indent = " SCSI device types: "
str_list.extend(wrapper.wrap(" ".join(scsi_subtypes)))
if ide_subtypes:
wrapper.initial_indent = " IDE device types: "
str_list.extend(wrapper.wrap(" ".join(ide_subtypes)))
if eth_subtypes:
wrapper.initial_indent = " Ethernet device types: "
str_list.extend(wrapper.wrap(" ".join(eth_subtypes)))
return "\n".join(str_list)
return None
def _info_string_networks(self, verbosity_option, wrapper):
"""Describe virtual networks as part of :meth:`info_string`.
Args:
verbosity_option (str): 'brief', None (default), or 'verbose'
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: Network information string, or None
"""
if self.network_section is None:
return None
str_list = ["Networks:"]
width = wrapper.width
names = []
descs = []
for network in self.network_section.findall(self.NETWORK):
names.append(network.get(self.NETWORK_NAME))
descs.append(network.findtext(self.NWK_DESC, None))
max_n = max([len(name) for name in names])
max_d = max([len(str(desc)) for desc in descs])
truncate = (max_n + max_d + 6 >= width and
verbosity_option != 'verbose')
wrapper.initial_indent = " "
wrapper.subsequent_indent = ' ' * (5 + max_n)
if truncate:
max_d = width - 6 - max_n
for name, desc in zip(names, descs):
if not desc:
str_list.append(" " + name)
elif truncate and len(desc) > max_d:
str_list.append(' {name:{w}} "{tdesc}..."'.format(
name=name, w=max_n, tdesc=desc[:max_d-3]))
else:
str_list.extend(wrapper.wrap(
'{name:{w}} "{desc}"'.format(name=name, w=max_n,
desc=desc)))
return "\n".join(str_list)
def _info_string_nics(self, verbosity_option, wrapper):
"""Describe NICs as part of :meth:`info_string`.
Args:
verbosity_option (str): 'brief', None (default), or 'verbose'
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: NIC information string, or None
"""
if verbosity_option == 'brief':
return None
nics = self.hardware.find_all_items('ethernet')
if not nics:
return None
str_list = ["NICs and Associated Networks:"]
wrapper.initial_indent = ' '
wrapper.subsequent_indent = ' '
max_len = max([len(str(nic.get_value(self.ELEMENT_NAME)))
for nic in nics])
max_len = max(max_len, len("<instance 10>"))
template = " {name:{len}} : {nwk}"
for nic in nics:
network_name = nic.get_value(self.CONNECTION)
nic_name = nic.get_value(self.ELEMENT_NAME)
if nic_name is None:
nic_name = "<instance {0}>".format(
nic.get_value(self.INSTANCE_ID))
str_list.append(template.format(name=nic_name,
len=max_len,
nwk=network_name))
if verbosity_option == 'verbose':
desc = nic.get_value(self.ITEM_DESCRIPTION)
if desc is None:
desc = nic.get_value(self.CAPTION)
if desc is not None:
str_list.extend(wrapper.wrap(desc))
return "\n".join(str_list)
def _info_string_environment(self, wrapper):
"""Describe environment for :meth:`info_string`.
Args:
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: Environment information string, or None
"""
if not self.environment_transports:
return None
str_list = ["Environment:"]
wrapper.initial_indent = ' '
wrapper.subsequent_indent = ' '
str_list.extend(wrapper.wrap(
"Transport types: {0}"
.format(" ".join(self.environment_transports))))
return "\n".join(str_list)
def _info_string_properties(self, verbosity_option, wrapper):
"""Describe config properties for :meth:`info_string`.
Args:
verbosity_option (str): 'brief', None (default), or 'verbose'
wrapper (textwrap.TextWrapper): Helper object for wrapping
text lines if needed.
Returns:
str: Property information string, or None
"""
properties = self.environment_properties
if not properties:
return None
str_list = ["Properties:"]
max_key = 2 + max([len(str(ph['key'])) for ph in properties])
max_label = max([len(str(ph['label'])) for ph in properties])
max_value = max([len(str(ph['value'])) for ph in properties])
width = wrapper.width
if all(ph['label'] for ph in properties):
max_width = max_label
else:
max_width = max(max_key, max_label)
wrapper.initial_indent = ' '
wrapper.subsequent_indent = ' '
for propdict in properties:
# If we have a label, and the terminal is wide enough,
# display "<key> label value", else if no label, display
# "<key> value", else only display "label value"
if max_label > 0 and (max_key + max_label + max_value <
width - 8):
format_str = ' {key:{kw}} {label:{lw}} {val}'
str_list.append(format_str.format(
key="<{0}>".format(propdict['key']),
kw=max_key,
label=propdict['label'],
lw=max_label,
val=('"{0}"'.format(propdict['value'])
if propdict['value'] is not None
else '--')))
else:
str_list.append(' {label:{width}} {val}'.format(
label=(propdict['label'] if propdict['label']
else "<{0}>".format(propdict['key'])),
width=max_width,
val=('"{0}"'.format(propdict['value'])
if propdict['value'] is not None
else '--')))
if verbosity_option == 'verbose':
for line in propdict['description'].splitlines():
if not line:
str_list.append("")
else:
str_list.extend(wrapper.wrap(line))
return "\n".join(str_list)
[docs] def info_string(self, width=79, verbosity_option=None):
"""Get a descriptive string summarizing the contents of this OVF.
Args:
width (int): Line length to wrap to where possible.
verbosity_option (str): 'brief', None (default), or 'verbose'
Returns:
str: Wrapped, appropriately verbose string.
"""
# Supposedly it's quicker to construct a list of strings then merge
# them all together with 'join()' rather than it is to repeatedly
# append to an existing string with '+'.
# I haven't profiled this to verify - it's fast enough for now.
# Don't break in mid-word or on hyphens, as the usual case where
# we may exceed the available width is URI literals, and there's
# no ideal way to wrap these.
wrapper = textwrap.TextWrapper(width=width,
break_long_words=False,
break_on_hyphens=False)
# File description
header = self._info_string_header(width)
section_list = [
self._info_string_product(verbosity_option, wrapper),
self._info_string_annotation(wrapper),
self._info_string_eula(verbosity_option, wrapper),
self._info_string_files_disks(width, verbosity_option),
self._info_string_hardware(wrapper),
self.profile_info_string(width, verbosity_option),
self._info_string_networks(verbosity_option, wrapper),
self._info_string_nics(verbosity_option, wrapper),
self._info_string_environment(wrapper),
self._info_string_properties(verbosity_option, wrapper)
]
# Discard empty sections
section_list = [s for s in section_list if s]
return header + '\n' + "\n\n".join(section_list)
[docs] def device_info_str(self, device_item):
"""Get a one-line summary of a hardware device.
Args:
device_item (OVFItem): Device to summarize
Returns:
str: Descriptive string such as "harddisk @ IDE 1:0"
"""
if device_item is None:
return ""
controller_item = self.find_parent_from_item(device_item)
if controller_item is None:
ctrl_type = "(?)"
ctrl_addr = "?"
else:
ctrl_type = controller_item.hardware_type.upper()
ctrl_addr = controller_item.get_value(self.ADDRESS)
return "{0} @ {1} {2}:{3}".format(
device_item.hardware_type,
ctrl_type,
ctrl_addr,
device_item.get_value(self.ADDRESS_ON_PARENT))
PROFILE_INFO_TEMPLATE = (
"{{0:{0}}} " # profile name - width is dynamically set
"{{1:>4}} " # CPUs - width 4 for "CPUs"
"{{2:>9}} " # memory - width 9 for "999.9 MiB"
"{{3:>4}} " # NICs - width 4 for "NICs"
"{{4:>7}} " # serial - width 7 for "Serials"
"{{5:>14}}" # disks - width 14 for "Disks/Capacity","10 / 999.9 MiB"
)
[docs] def profile_info_list(self, width=79, verbose=False):
"""Get a list describing available configuration profiles.
Args:
width (int): Line length to wrap to if possible
verbose (bool): if True, generate multiple lines per profile
Returns:
tuple: (header, list)
"""
str_list = []
default_profile_id = self.default_config_profile
profile_ids = self.config_profiles
if not profile_ids:
profile_ids = [None]
prof_w = max(len("Configuration Profiles: "),
2 + max([(len(str(pid))) for pid in profile_ids]),
2 + len(str(default_profile_id) + " (default)"))
# Profile information
template = self.PROFILE_INFO_TEMPLATE.format(prof_w)
header = template.format("Configuration Profiles:", "CPUs", "Memory",
"NICs", "Serials", "Disks/Capacity")
header += "\n" + template.format("", "----", "---------", "----",
"-------", "--------------")
if verbose:
wrapper = textwrap.TextWrapper(width=width,
initial_indent=' ',
subsequent_indent=' ' * 21)
index = 0
for profile_id in profile_ids:
cpus = 0
cpu_item = self.hardware.find_item('cpu', profile=profile_id)
if cpu_item:
cpus = cpu_item.get_value(self.VIRTUAL_QUANTITY,
[profile_id])
mem_bytes = 0
ram_item = self.hardware.find_item('memory', profile=profile_id)
if ram_item:
mem_bytes = programmatic_bytes_to_int(
ram_item.get_value(self.VIRTUAL_QUANTITY, [profile_id]),
ram_item.get_value(self.ALLOCATION_UNITS, [profile_id]))
nics = self.hardware.get_item_count('ethernet', profile_id)
serials = self.hardware.get_item_count('serial', profile_id)
disk_count = self.hardware.get_item_count('harddisk',
profile_id)
disks_size = 0
if self.disk_section is not None:
for disk in self.disk_section.findall(self.DISK):
disks_size += self.get_capacity_from_disk(disk)
profile_str = " " + str(profile_id)
if profile_id == default_profile_id:
profile_str += " (default)"
str_list.append(template.format(
profile_str,
cpus,
pretty_bytes(mem_bytes),
nics,
serials,
"{0:2} / {1:>9}".format(disk_count,
pretty_bytes(disks_size))))
if profile_id is not None and verbose:
profile = self.find_child(self.deploy_opt_section,
self.CONFIG,
attrib={self.CONFIG_ID: profile_id})
str_list.extend(wrapper.wrap(
'{0:15} "{1}"'.format("Label:",
profile.findtext(self.CFG_LABEL))))
str_list.extend(wrapper.wrap(
'{0:15} "{1}"'.format("Description:",
profile.findtext(self.CFG_DESC))))
index += 1
return (header, str_list)
[docs] def profile_info_string(self, width=79, verbosity_option=None):
"""Get a string summarizing available configuration profiles.
Args:
width (int): Line length to wrap to if possible
verbosity_option (str): 'brief', None (default), or 'verbose'
Returns:
str: Appropriately formatted and verbose string.
"""
header, str_list = self.profile_info_list(
width, (verbosity_option != 'brief'))
return "\n".join([header] + str_list)
[docs] def create_configuration_profile(self, pid, label, description):
"""Create or update a configuration profile with the given ID.
Args:
pid (str): Profile identifier
label (str): Brief descriptive label for the profile
description (str): Verbose description of the profile
"""
self.deploy_opt_section = self._ensure_section(
self.DEPLOY_OPT_SECTION, "Configuration Profiles")
cfg = self.find_child(self.deploy_opt_section, self.CONFIG,
attrib={self.CONFIG_ID: pid})
if cfg is None:
logger.debug("Creating new Configuration element")
cfg = ET.SubElement(self.deploy_opt_section, self.CONFIG,
{self.CONFIG_ID: pid})
self.set_or_make_child(cfg, self.CFG_LABEL, label)
self.set_or_make_child(cfg, self.CFG_DESC, description)
# Clear cache
logger.debug("New profile %s created - clear config_profiles cache",
pid)
self._configuration_profiles = None
[docs] def delete_configuration_profile(self, profile):
"""Delete the profile with the given ID.
Args:
profile (str): Profile ID to delete.
Raises:
LookupError: if the profile does not exist.
"""
cfg = self.find_child(self.deploy_opt_section, self.CONFIG,
attrib={self.CONFIG_ID: profile})
if cfg is None:
raise LookupError("No such configuration profile '{0}'"
.format(profile))
logger.notice("Deleting configuration profile %s", profile)
# Delete references to this profile from the hardware
items = self.hardware.find_all_items(profile_list=[profile])
logger.debug("Removing profile %s from %s hardware items",
profile, len(items))
for item in items:
item.remove_profile(profile, split_default=False)
# Delete the profile declaration itself
self.deploy_opt_section.remove(cfg)
if not self.deploy_opt_section.findall(self.CONFIG):
self.envelope.remove(self.deploy_opt_section)
# Clear cache
logger.debug("Profile %s deleted - clear config_profiles cache",
profile)
self._configuration_profiles = None
# TODO - how to insert a doc about the profile_list (see vm_description.py)
[docs] def set_cpu_count(self, cpus, profile_list):
"""Set the number of CPUs.
Args:
cpus (int): Number of CPUs
profile_list (list): Change only the given profiles
"""
logger.debug("Updating CPU count in OVF under profile %s to %s",
profile_list, cpus)
self.platform.validate_cpu_count(cpus)
self.hardware.set_value_for_all_items('cpu',
self.VIRTUAL_QUANTITY, cpus,
profile_list,
create_new=True)
[docs] def set_memory(self, megabytes, profile_list):
"""Set the amount of RAM, in megabytes.
Args:
megabytes (int): Memory value, in megabytes
profile_list (list): Change only the given profiles
"""
logger.debug("Updating RAM in OVF under profile %s to %s",
profile_list, megabytes)
self.platform.validate_memory_amount(megabytes)
self.hardware.set_value_for_all_items('memory',
self.VIRTUAL_QUANTITY, megabytes,
profile_list,
create_new=True)
self.hardware.set_value_for_all_items('memory',
self.ALLOCATION_UNITS,
'byte * 2^20',
profile_list)
[docs] def set_nic_types(self, type_list, profile_list):
"""Set the hardware type(s) for NICs.
Args:
type_list (list): NIC hardware type(s)
profile_list (list): Change only the given profiles.
"""
# Just to be safe...
type_list = [canonicalize_nic_subtype(t) for t in type_list]
self.platform.validate_nic_types(type_list)
self.hardware.set_value_for_all_items('ethernet',
self.RESOURCE_SUB_TYPE,
type_list,
profile_list)
[docs] def get_nic_count(self, profile_list):
"""Get the number of NICs under the given profile(s).
Args:
profile_list (list): Profile(s) of interest.
Returns:
dict: ``{ profile_name : nic_count }``
"""
return self.hardware.get_item_count_per_profile('ethernet',
profile_list)
[docs] def set_nic_count(self, count, profile_list):
"""Set the given profile(s) to have the given number of NICs.
Args:
count (int): number of NICs
profile_list (list): Change only the given profiles
"""
logger.debug("Updating NIC count in OVF under profile %s to %s",
profile_list, count)
self.platform.validate_nic_count(count)
self.hardware.set_item_count_per_profile('ethernet', count,
profile_list)
[docs] def create_network(self, label, description):
"""Define a new network with the given label and description.
Also serves to update the description of an existing network label.
Args:
label (str): Brief label for the network
description (str): Verbose description of the network
"""
self.network_section = self._ensure_section(
self.NETWORK_SECTION,
"Logical networks",
attrib=self.NETWORK_SECTION_ATTRIB)
network = self.set_or_make_child(self.network_section, self.NETWORK,
attrib={self.NETWORK_NAME: label})
self.set_or_make_child(network, self.NWK_DESC, description)
[docs] def set_nic_networks(self, network_list, profile_list):
"""Set the NIC to network mapping for NICs under the given profile(s).
.. note::
If the length of :attr:`network_list` is less than the number of
NICs, will use the last entry in the list for all remaining NICs.
Args:
network_list (list): List of networks to map NICs to
profile_list (list): Change only the given profiles
"""
self.hardware.set_item_values_per_profile('ethernet',
self.CONNECTION,
network_list,
profile_list,
default=network_list[-1])
[docs] def set_nic_mac_addresses(self, mac_list, profile_list):
"""Set the MAC addresses for NICs under the given profile(s).
.. note::
If the length of :attr:`mac_list` is less than the number of NICs,
will use the last entry in the list for all remaining NICs.
Args:
mac_list (list): List of MAC addresses to assign to NICs
profile_list (list): Change only the given profiles
"""
self.hardware.set_item_values_per_profile('ethernet',
self.ADDRESS,
mac_list,
profile_list,
default=mac_list[-1])
[docs] def set_nic_names(self, name_list, profile_list):
"""Set the device names for NICs under the given profile(s).
Args:
name_list (list): List of names to assign.
profile_list (list): Change only the given profiles
"""
self.hardware.set_item_values_per_profile('ethernet',
self.ELEMENT_NAME,
name_list,
profile_list)
[docs] def get_serial_count(self, profile_list):
"""Get the number of serial ports under the given profile(s).
Args:
profile_list (list): Profile(s) of interest.
Returns:
dict: ``{ profile_name : serial_count }``
"""
return self.hardware.get_item_count_per_profile('serial', profile_list)
[docs] def set_serial_count(self, count, profile_list):
"""Set the given profile(s) to have the given number of serial ports.
Args:
count (int): Number of serial ports
profile_list (list): Change only the given profiles
"""
logger.debug("Updating serial port count under profile %s to %s",
profile_list, count)
self.hardware.set_item_count_per_profile('serial', count, profile_list)
[docs] def set_serial_connectivity(self, conn_list, profile_list):
"""Set the serial port connectivity under the given profile(s).
Args:
conn_list (list): List of connectivity strings
profile_list (list): Change only the given profiles
"""
self.hardware.set_item_values_per_profile('serial',
self.ADDRESS, conn_list,
profile_list, default="")
[docs] def get_serial_connectivity(self, profile):
"""Get the serial port connectivity strings under the given profile.
Args:
profile (str): Profile of interest.
Returns:
list: connectivity strings
"""
return [item.get_value(self.ADDRESS) for item in
self.hardware.find_all_items('serial', profile_list=[profile])]
[docs] def set_scsi_subtypes(self, type_list, profile_list):
"""Set the device subtype(s) for the SCSI controller(s).
Args:
type_list (list): SCSI subtype string list
profile_list (list): Change only the given profiles
"""
# TODO validate supported types by platform
self.hardware.set_value_for_all_items('scsi',
self.RESOURCE_SUB_TYPE,
type_list,
profile_list)
[docs] def set_ide_subtypes(self, type_list, profile_list):
"""Set the device subtype(s) for the IDE controller(s).
Args:
type_list (list): IDE subtype string list
profile_list (list): Change only the given profiles
"""
# TODO validate supported types by platform
self.hardware.set_value_for_all_items('ide',
self.RESOURCE_SUB_TYPE,
type_list,
profile_list)
[docs] def get_property_value(self, key):
"""Get the value of the given property.
Args:
key (str): Property identifier
Returns:
str: Value of this property as a string, or ``None``
"""
if self.ovf_version < 1.0 or self.product_section is None:
return None
prop = self.find_child(self.product_section, self.PROPERTY,
attrib={self.PROP_KEY: key})
if prop is None:
return None
return prop.get(self.PROP_VALUE)
def _validate_value_for_property(self, prop, value):
"""Check whether the proposed value is valid for the given property.
This applies agnostic criteria such as property type and qualifiers;
it knows nothing of the property's actual meaning.
Args:
prop (xml.etree.ElementTree.Element): Existing Property element.
value (str): Proposed value to set for this property.
Returns:
str: the value, potentially canonicalized.
Raises:
ValueUnsupportedError: if the value does not meet criteria.
"""
key = prop.get(self.PROP_KEY)
# Check type validity and canonicalize if needed
prop_type = prop.get(self.PROP_TYPE, "")
if prop_type == "boolean":
# XML prefers to represent booleans as 'true' or 'false'
value = str(value).lower()
if str(value).lower() in ['true', '1', 't', 'y', 'yes']:
value = 'true'
elif str(value).lower() in ['false', '0', 'f', 'n', 'no']:
value = 'false'
else:
raise ValueUnsupportedError(key, value, "a boolean value")
elif prop_type == "string":
value = str(value)
# Check property qualifiers
prop_qual = prop.get(self.PROP_QUAL, "")
if prop_qual:
match = re.search(r"MaxLen\((\d+)\)", prop_qual)
if match:
max_len = int(match.group(1))
if len(value) > max_len:
raise ValueUnsupportedError(
key, value, "string no longer than {0} characters"
.format(max_len))
match = re.search(r"MinLen\((\d+)\)", prop_qual)
if match:
min_len = int(match.group(1))
if len(value) < min_len:
raise ValueUnsupportedError(
key, value, "string no shorter than {0} characters"
.format(min_len))
return value
[docs] def set_property_value(self, key, value,
user_configurable=None, property_type=None,
label=None, description=None):
"""Set the value of the given property (converting value if needed).
Args:
key (str): Property identifier
value (object): Value to set for this property
user_configurable (bool): Should this property be configurable at
deployment time by the user?
property_type (str): Value type - 'string' or 'boolean'
label (str): Brief explanatory label for this property
description (str): Detailed description of this property
Returns:
str: the (converted) value that was set.
Raises:
NotImplementedError: if :attr:`ovf_version` is less than 1.0;
OVF version 0.9 is not currently supported.
"""
if self.ovf_version < 1.0:
raise NotImplementedError("No support for setting environment "
"properties under OVF v0.9")
self.product_section = self._ensure_section(
self.PRODUCT_SECTION,
"Product Information",
attrib=self.PRODUCT_SECTION_ATTRIB,
parent=self.virtual_system)
prop = self.find_child(self.product_section, self.PROPERTY,
attrib={self.PROP_KEY: key})
if prop is None:
prop = self.set_or_make_child(self.product_section, self.PROPERTY,
attrib={self.PROP_KEY: key})
# Properties *must* have a type to be valid
if property_type is None:
property_type = 'string'
if user_configurable is not None:
prop.set(self.PROP_USER_CONFIGABLE, str(user_configurable).lower())
if property_type is not None:
prop.set(self.PROP_TYPE, property_type)
# Revalidate any existing value if not setting a new value
if value is None:
value = prop.get(self.PROP_VALUE)
if value is not None:
# Make sure the requested value is valid
value = self._validate_value_for_property(prop, value)
prop.set(self.PROP_VALUE, value)
if label is not None:
self.set_or_make_child(prop, self.PROPERTY_LABEL, label)
if description is not None:
self.set_or_make_child(prop, self.PROPERTY_DESC, description)
return value
[docs] def config_file_to_properties(self, file_path, user_configurable=None):
"""Import each line of a text file into a configuration property.
Args:
file_path (str): File name to import.
user_configurable (bool): Should the resulting properties be
configurable at deployment time by the user?
Raises:
NotImplementedError: if the :attr:`platform` for this OVF
does not define
:const:`~COT.platforms.Platform.LITERAL_CLI_STRING`
"""
if not self.platform.LITERAL_CLI_STRING:
raise NotImplementedError("no known support for literal CLI on " +
str(self.platform))
property_num = 0
with open(file_path, 'r') as fileobj:
for line in fileobj:
line = line.strip()
# Skip blank lines and comment lines
if (not line) or line[0] == '!':
continue
property_num += 1
self.set_property_value(
"{0}-{1:04d}".format(self.platform.LITERAL_CLI_STRING,
property_num),
line,
user_configurable)
[docs] def convert_disk_if_needed(self, disk_image, kind):
"""Convert the disk to a more appropriate format if needed.
* All hard disk files are converted to stream-optimized VMDK as it
is the only format that VMware supports in OVA packages.
* CD-ROM iso images are accepted without change.
Args:
disk_image (COT.disks.DiskRepresentation): Image to inspect and
possibly convert
kind (str): Image type (harddisk/cdrom)
Returns:
DiskRepresentation: :attr:`disk_image`, if no conversion was
required, or a new :class:`~COT.disks.DiskRepresentation` instance
representing a converted image that has been created in
:attr:`output_dir`.
"""
if kind != 'harddisk':
logger.debug("No disk conversion needed")
return disk_image
# Convert hard disk to VMDK format, streamOptimized subformat
if (disk_image.disk_format == 'vmdk' and
disk_image.disk_subformat == 'streamOptimized'):
logger.debug("No disk conversion needed")
return disk_image
logger.debug("Converting %s (%s, %s) to streamOptimized VMDK",
disk_image.path, disk_image.disk_format,
disk_image.disk_subformat)
return disk_image.convert_to(new_format='vmdk',
new_subformat='streamOptimized',
new_directory=self.working_dir)
[docs] def search_from_filename(self, filename):
"""From the given filename, try to find any existing objects.
This implementation uses the given :attr:`filename` to find a matching
``File`` in the OVF, then using that to find a matching ``Disk`` and
``Item`` entries.
Args:
filename (str): Filename to search from
Returns:
tuple: ``(file, disk, ctrl_item, disk_item)``, any or all of which
may be ``None``
Raises:
LookupError: If the ``disk_item`` is found but no ``ctrl_item`` is
found to be its parent.
"""
file_obj = None
disk = None
ctrl_item = None
disk_item = None
logger.debug("Looking for existing disk info based on filename %s",
filename)
file_obj = self.find_child(self.references, self.FILE,
attrib={self.FILE_HREF: filename})
if file_obj is None:
return (file_obj, disk, ctrl_item, disk_item)
file_id = file_obj.get(self.FILE_ID)
disk = self.find_disk_from_file_id(file_id)
disk_item_1 = self.find_item_from_file(file_obj)
disk_item_2 = self.find_item_from_disk(disk)
disk_item = check_for_conflict("disk Item", [disk_item_1, disk_item_2])
ctrl_item = self.find_parent_from_item(disk_item)
if disk_item is not None and ctrl_item is None:
raise LookupError("Found disk Item {0} but no controller Item "
"as its parent?"
.format(disk_item))
return (file_obj, disk, ctrl_item, disk_item)
[docs] def search_from_file_id(self, file_id):
"""From the given file ID, try to find any existing objects.
This implementation uses the given :attr:`file_id` to find a matching
``File`` in the OVF, then using that to find a matching ``Disk`` and
``Item`` entries.
Args:
file_id (str): File ID to search from
Returns:
tuple: ``(file, disk, ctrl_item, disk_item)``, any or all of which
may be ``None``
Raises:
LookupError: If the ``disk`` entry is found but no corresponding
``file`` is found.
LookupError: If the ``disk_item`` is found but no ``ctrl_item`` is
found to be its parent.
"""
if file_id is None:
return (None, None, None, None)
logger.debug(
"Looking for existing disk information based on file_id %s",
file_id)
file_obj = None
disk = None
ctrl_item = None
disk_item = None
file_obj = self.find_child(self.references, self.FILE,
attrib={self.FILE_ID: file_id})
disk = self.find_disk_from_file_id(file_id)
if disk is not None and file_obj is None:
# Should never happen - OVF is not valid
raise LookupError("Malformed OVF? Found Disk with fileRef {0} but "
"no corresponding File with id {0}"
.format(file_id))
disk_item_1 = self.find_item_from_file(file_obj)
disk_item_2 = self.find_item_from_disk(disk)
disk_item = check_for_conflict("disk Item", [disk_item_1, disk_item_2])
ctrl_item = self.find_parent_from_item(disk_item)
if disk_item is not None and ctrl_item is None:
raise LookupError("Found disk Item {0} but no controller Item "
"as its parent?"
.format(disk_item))
return (file_obj, disk, ctrl_item, disk_item)
[docs] def search_from_controller(self, controller, address):
"""From the controller type and device address, look for existing disk.
This implementation uses the parameters to find matching
controller and disk ``Item`` elements, then using the disk ``Item``
to find matching ``File`` and/or ``Disk``.
Args:
controller (str): ``'ide'`` or ``'scsi'``
address (str): Device address such as ``'1:0'``
Returns:
tuple: ``(file, disk, ctrl_item, disk_item)``, any or all of which
may be ``None``
"""
if controller is None or address is None:
return (None, None, None, None)
logger.debug("Looking for existing disk information based on "
"controller type (%s) and disk address (%s)",
controller, address)
file_obj = None
disk = None
ctrl_item = None
disk_item = None
ctrl_addr = address.split(":")[0]
disk_addr = address.split(":")[1]
logger.debug("Searching for controller address %s", ctrl_addr)
ctrl_item = self.hardware.find_item(controller,
{self.ADDRESS: ctrl_addr})
if ctrl_item is None:
return (file_obj, disk, ctrl_item, disk_item)
# From controller Item to its child disk Item
ctrl_instance = ctrl_item.get_value(self.INSTANCE_ID)
logger.debug("Searching for disk address %s with parent %s",
disk_addr, ctrl_instance)
disk_item = self.hardware.find_item(
properties={self.PARENT: ctrl_instance,
self.ADDRESS_ON_PARENT: disk_addr})
if disk_item is None:
return (file_obj, disk, ctrl_item, disk_item)
host_resource = disk_item.get_value(self.HOST_RESOURCE)
if host_resource is None:
logger.debug("Disk item has no RASD:HostResource - "
"i.e., empty drive")
return (file_obj, disk, ctrl_item, disk_item)
if (host_resource.startswith(self.HOST_RSRC_DISK_REF) or
host_resource.startswith(self.OLD_HOST_RSRC_DISK_REF)):
logger.debug("Looking for Disk and File matching disk Item")
# From disk Item to Disk
disk_id = os.path.basename(host_resource)
if self.disk_section is not None:
disk = self.find_child(self.disk_section, self.DISK,
attrib={self.DISK_ID: disk_id})
if disk is not None:
# From Disk to File
file_id = disk.get(self.DISK_FILE_REF)
file_obj = self.find_child(self.references, self.FILE,
attrib={self.FILE_ID: file_id})
elif (host_resource.startswith(self.HOST_RSRC_FILE_REF) or
host_resource.startswith(self.OLD_HOST_RSRC_FILE_REF)):
logger.debug("Looking for File and Disk matching disk Item")
# From disk Item to File
file_id = os.path.basename(host_resource)
file_obj = self.find_child(self.references, self.FILE,
attrib={self.FILE_ID: file_id})
if self.disk_section is not None:
disk = self.find_child(self.disk_section, self.DISK,
attrib={self.DISK_FILE_REF: file_id})
else:
logger.error(
"Unrecognized HostResource format '%s'; unable to identify "
"which File and Disk are associated with this disk Item",
host_resource)
return (file_obj, disk, ctrl_item, disk_item)
[docs] def find_open_controller(self, controller_type):
"""Find the first open slot on a controller of the given type.
Args:
controller_type (str): ``'ide'`` or ``'scsi'``
Returns:
tuple: ``(ctrl_item, address_string)`` or ``(None, None)``
"""
for ctrl_item in self.hardware.find_all_items(controller_type):
ctrl_instance = ctrl_item.get_value(self.INSTANCE_ID)
ctrl_addr = ctrl_item.get_value(self.ADDRESS)
logger.debug("Found controller instance %s address %s",
ctrl_instance, ctrl_addr)
disk_list = self.hardware.find_all_items(
properties={self.PARENT: ctrl_instance})
address_list = [disk.get_value(self.ADDRESS_ON_PARENT) for
disk in disk_list]
disk_addr = 0
while str(disk_addr) in address_list:
disk_addr += 1
if ((controller_type == 'scsi' and disk_addr > 7) or
(controller_type == 'ide' and disk_addr > 1)):
logger.debug("Controller address %s is already full",
ctrl_addr)
else:
logger.verbose("Found open slot %s:%s for %s controller",
ctrl_addr, disk_addr, controller_type)
return (ctrl_item, "{0}:{1}".format(ctrl_addr, disk_addr))
logger.notice("No open controller found")
return (None, None)
[docs] def get_id_from_file(self, file_obj):
"""Get the file ID from the given opaque file object.
Args:
file_obj (xml.etree.ElementTree.Element): 'File' element
Returns:
str: 'id' attribute value of this element
"""
return file_obj.get(self.FILE_ID)
[docs] def get_path_from_file(self, file_obj):
"""Get the file path from the given opaque file object.
Args:
file_obj (xml.etree.ElementTree.Element): 'File' element
Returns:
str: 'href' attribute value of this element
"""
return file_obj.get(self.FILE_HREF)
[docs] def get_file_ref_from_disk(self, disk):
"""Get the file reference from the given opaque disk object.
Args:
disk (xml.etree.ElementTree.Element): 'Disk' element
Returns:
str: 'fileRef' attribute value of this element
"""
return disk.get(self.DISK_FILE_REF)
[docs] def get_common_subtype(self, device_type):
"""Get the sub-type common to all devices of the given type.
Args:
device_type (str): Device type such as ``'ide'`` or ``'memory'``.
Returns:
str: Subtype string common to all devices of the type, or ``None``,
if multiple such devices exist and they do not all have the same
sub-type.
"""
subtype = None
for item in self.hardware.find_all_items(device_type):
item_subtype = item.get_value(self.RESOURCE_SUB_TYPE)
if subtype is None:
subtype = item_subtype
logger.verbose("Found %s subtype %s", device_type, subtype)
elif subtype != item_subtype:
logger.warning("Found different subtypes ('%s', '%s') for "
"device type %s - no common subtype exists",
subtype, item_subtype, device_type)
return None
return subtype
[docs] def check_sanity_of_disk_device(self, disk, file_obj,
disk_item, ctrl_item):
"""Check if the given disk is linked properly to the other objects.
Args:
disk (xml.etree.ElementTree.Element): Disk object to validate
file_obj (xml.etree.ElementTree.Element): File object which this
disk should be linked to (optional)
disk_item (OVFItem): Disk device object which should link to this
disk (optional)
ctrl_item (OVFItem): Controller device object which should link
to the :attr:`disk_item`
Raises:
ValueMismatchError: if the given items are not linked properly.
ValueUnsupportedError: if the :attr:`disk_item` has a
``HostResource`` value in an unrecognized or invalid format.
"""
if disk_item is None:
return
if ctrl_item is not None:
match_or_die("disk Item Parent", disk_item.get_value(self.PARENT),
"controller Item InstanceID",
ctrl_item.get_value(self.INSTANCE_ID))
host_resource = disk_item.get_value(self.HOST_RESOURCE)
if host_resource is not None:
if ((host_resource.startswith(self.HOST_RSRC_DISK_REF) or
host_resource.startswith(self.OLD_HOST_RSRC_DISK_REF)) and
disk is not None):
match_or_die("disk Item HostResource",
os.path.basename(host_resource),
"Disk diskId", disk.get(self.DISK_ID))
elif ((host_resource.startswith(self.HOST_RSRC_FILE_REF) or
host_resource.startswith(self.OLD_HOST_RSRC_FILE_REF)) and
file_obj is not None):
match_or_die("disk Item HostResource",
os.path.basename(host_resource),
"File id", file_obj.get(self.FILE_ID))
else:
# TODO: this is not a user input error, it's an OVF error
# so ValueUnsupportedError isn't really right?
raise ValueUnsupportedError("HostResource prefix",
host_resource,
[self.HOST_RSRC_FILE_REF,
self.HOST_RSRC_DISK_REF,
self.OLD_HOST_RSRC_FILE_REF,
self.OLD_HOST_RSRC_DISK_REF])
[docs] def add_file(self, file_path, file_id, file_obj=None, disk=None):
"""Add a new file object to the VM or overwrite the provided one.
Args:
file_path (str): Path to file to add
file_id (str): Identifier string for the file in the VM
file_obj (xml.etree.ElementTree.Element): Existing file object to
overwrite
disk (xml.etree.ElementTree.Element): Existing disk object
referencing :attr:`file`.
Returns:
xml.etree.ElementTree.Element: New or updated file object
"""
logger.debug("Adding File to OVF")
if file_obj is not None:
href = file_obj.get(self.FILE_HREF)
if href in self.file_references.keys():
del self.file_references[href]
file_obj.clear()
elif disk is None:
file_obj = ET.SubElement(self.references, self.FILE)
else:
# The OVF standard requires that Disks which reference a File
# be listed in the same order as the Files.
# Since there's already a Disk, make sure the new File is ordered
# appropriately.
# This is complicated by the fact that we may have
# Files which are not Disks and Disks with no backing File.
all_files = self.references.findall(self.FILE)
all_disks = self.disk_section.findall(self.DISK)
# Starting from the Disk entry corresponding to our new File,
# search forward until we find the next Disk (if any) which has a
# File, and insert our new File before that File.
disk_index = all_disks.index(disk)
file_index = len(all_files)
while disk_index < len(all_disks):
tmp_file_id = all_disks[disk_index].get(self.DISK_FILE_REF)
next_file = self.find_child(self.references, self.FILE,
attrib={self.FILE_ID: tmp_file_id})
if next_file is not None:
file_index = all_files.index(next_file)
break
disk_index += 1
file_obj = ET.Element(self.FILE)
self.references.insert(file_index, file_obj)
file_size_string = str(os.path.getsize(file_path))
file_name = os.path.basename(file_path)
file_obj.set(self.FILE_ID, file_id)
file_obj.set(self.FILE_HREF, file_name)
file_obj.set(self.FILE_SIZE, file_size_string)
# Make a note of the file's location - we'll copy it at write time.
# The file_path is always a FileOnDisk
self.file_references[file_name] = FileOnDisk(
os.path.dirname(os.path.abspath(file_path)), file_name,
checksum_algorithm=self.checksum_algorithm)
return file_obj
[docs] def remove_file(self, file_obj, disk=None, disk_drive=None):
"""Remove the given file object from the VM.
Args:
file_obj (xml.etree.ElementTree.Element): File object to remove
disk (xml.etree.ElementTree.Element): Disk object referencing
:attr:`file`
disk_drive (OVFItem): Disk drive mapping :attr:`file` to a device
Raises:
ValueUnsupportedError: If the ``disk_drive`` is a device type other
than 'cdrom' or 'harddisk'
"""
self.references.remove(file_obj)
del self.file_references[file_obj.get(self.FILE_HREF)]
if disk is not None:
self.disk_section.remove(disk)
if disk_drive is not None:
# For a CD-ROM drive, we can simply unmap the file.
# For a hard disk, we need to delete the device altogether.
drive_type = disk_drive.get_value(self.RESOURCE_TYPE)
if drive_type == self.RES_MAP['cdrom']:
disk_drive.set_property(self.HOST_RESOURCE, '')
elif drive_type == self.RES_MAP['harddisk']:
self.hardware.delete_item(disk_drive)
else:
raise ValueUnsupportedError("drive type", drive_type,
"CD-ROM ({0}) or hard disk ({1})"
.format(self.RES_MAP['cdrom'],
self.RES_MAP['harddisk']))
[docs] def add_disk(self, disk_repr, file_id, drive_type, disk=None):
"""Add a new disk object to the VM or overwrite the provided one.
Args:
disk_repr (COT.disks.DiskRepresentation): Disk file representation
file_id (str): Identifier string for the file/disk mapping
drive_type (str): 'harddisk' or 'cdrom'
disk (xml.etree.ElementTree.Element): Existing object to overwrite
Returns:
xml.etree.ElementTree.Element: New or updated disk object
"""
if drive_type != 'harddisk':
if disk is not None:
logger.notice("CD-ROMs do not require a Disk element. "
"Existing element will be deleted.")
if self.disk_section is not None:
self.disk_section.remove(disk)
if not self.disk_section.findall(self.DISK):
logger.notice("No Disks left - removing DiskSection")
self.envelope.remove(self.disk_section)
self.disk_section = None
disk = None
else:
logger.debug("Not adding Disk element to OVF, as CD-ROMs "
"do not require a Disk")
return disk
# Else, adding a hard disk:
self.disk_section = self._ensure_section(
self.DISK_SECTION,
"Virtual disk information",
attrib=self.DISK_SECTION_ATTRIB)
logger.debug("Adding Disk to OVF")
if disk is not None:
disk_id = disk.get(self.DISK_ID)
disk.clear()
else:
disk_id = file_id
disk = ET.SubElement(self.disk_section, self.DISK)
self.set_capacity_of_disk(disk, disk_repr.capacity)
disk.set(self.DISK_ID, disk_id)
disk.set(self.DISK_FILE_REF, file_id)
disk.set(self.DISK_FORMAT,
("http://www.vmware.com/interfaces/"
"specifications/vmdk.html#streamOptimized"))
return disk
[docs] def add_controller_device(self, device_type, subtype, address,
ctrl_item=None):
"""Create a new IDE or SCSI controller, or update existing one.
Args:
device_type (str): ``'ide'`` or ``'scsi'``
subtype (object): (Optional) subtype string such as ``'virtio'``
or list of subtype strings
address (int): Controller address such as 0 or 1 (optional)
ctrl_item (OVFItem): Existing controller device to update (optional)
Returns:
OVFItem: New or updated controller device object
Raises:
ValueTooHighError: if no more controllers can be created
"""
if ctrl_item is None:
logger.notice("%s controller not found, creating new Item",
device_type.upper())
(_, ctrl_item) = self.hardware.new_item(device_type)
if address is None:
# Find a controller address that isn't already used
address_list = [
ci.get_value(self.ADDRESS) for
ci in self.hardware.find_all_items(device_type)]
address = 0
while str(address) in address_list:
address += 1
logger.verbose("Selected address %s for new controller",
address)
if device_type == "scsi" and int(address) > 3:
raise ValueTooHighError("SCSI controller address", address, 3)
elif device_type == "ide" and int(address) > 1:
raise ValueTooHighError("IDE controller address", address, 1)
ctrl_item.set_property(self.ADDRESS, address)
ctrl_item.set_property(self.ELEMENT_NAME, "{0} Controller"
.format(device_type.upper()))
ctrl_item.set_property(self.ITEM_DESCRIPTION,
"{0} Controller {1}"
.format(device_type.upper(), address))
# Change subtype of existing controller or new controller
if subtype is not None:
ctrl_item.set_property(self.RESOURCE_SUB_TYPE, subtype)
return ctrl_item
def _create_new_disk_device(self, drive_type, address, name, ctrl_item):
"""Helper for :meth:`add_disk_device`, in the case of no prior Item.
Args:
drive_type (str): ``'harddisk'`` or ``'cdrom'``
address (str): Address on controller, such as "1:0" (optional)
name (str): Device name string (optional)
ctrl_item (OVFItem): Controller object to serve as parent
Returns:
tuple: (disk_item, disk_name)
Raises:
ValueTooHighError: if the requested address is out of range
for the given controller, or if the controller is already full.
ValueUnsupportedError: if ``name`` is not specified and
``disk_type`` is not 'harddisk' or 'cdrom'.
"""
ctrl_instance = ctrl_item.get_value(self.INSTANCE_ID)
if address is None:
logger.debug("Working to identify address of new disk")
items = self.hardware.find_all_items(
properties={self.PARENT: ctrl_instance})
addresses = [item.get_value(self.ADDRESS_ON_PARENT) for
item in items]
address = 0
while str(address) in addresses:
address += 1
logger.warning("New disk address on parent not specified, "
"guessing it should be %s", address)
ctrl_type = ctrl_item.hardware_type
# Make sure the address is valid!
if ctrl_type == "scsi" and int(address) > 15:
raise ValueTooHighError("disk address on SCSI controller",
address, 15)
elif ctrl_type == "ide" and int(address) > 1:
raise ValueTooHighError("disk address on IDE controller",
address, 1)
if name is None:
if drive_type == 'cdrom':
name = "CD-ROM Drive"
elif drive_type == 'harddisk':
name = "Hard Disk Drive"
else:
# Should never get here!
raise ValueUnsupportedError("disk drive type", drive_type,
"'cdrom' or 'harddisk'")
(_, disk_item) = self.hardware.new_item(drive_type)
disk_item.set_property(self.ADDRESS_ON_PARENT, address)
disk_item.set_property(self.PARENT, ctrl_instance)
return disk_item, name
[docs] def add_disk_device(self, drive_type, address, name, description,
disk, file_obj, ctrl_item, disk_item=None):
"""Create a new disk hardware device or overwrite an existing one.
Args:
drive_type (str): ``'harddisk'`` or ``'cdrom'``
address (str): Address on controller, such as "1:0" (optional)
name (str): Device name string (optional)
description (str): Description string (optional)
disk (xml.etree.ElementTree.Element): Disk object to map to
this device
file_obj (xml.etree.ElementTree.Element): File object to map to
this device
ctrl_item (OVFItem): Controller object to serve as parent
disk_item (OVFItem): Existing disk device to update instead of
making a new device.
Returns:
xml.etree.ElementTree.Element: New or updated disk device object.
"""
if disk_item is None:
logger.notice("Disk Item not found, adding new Item")
disk_item, name = self._create_new_disk_device(
drive_type, address, name, ctrl_item)
else:
logger.debug("Updating existing disk Item")
# Make these changes to the disk Item regardless of new/existing
disk_item.set_property(self.RESOURCE_TYPE, self.RES_MAP[drive_type])
if drive_type == 'harddisk':
# Link to the Disk we created
disk_item.set_property(self.HOST_RESOURCE,
(self.HOST_RSRC_DISK_REF +
disk.get(self.DISK_ID)))
else:
# No Disk for CD-ROM; link to the File instead
disk_item.set_property(self.HOST_RESOURCE,
(self.HOST_RSRC_FILE_REF +
file_obj.get(self.FILE_ID)))
if name is not None:
disk_item.set_property(self.ELEMENT_NAME, name)
if description is not None:
disk_item.set_property(self.ITEM_DESCRIPTION, description)
return disk_item
# Helper methods - for internal use only
[docs] def untar(self, file_path):
"""Untar the OVF descriptor from an .ova to the working directory.
Args:
file_path (str): OVA file path
Returns:
str: Path to extracted OVF descriptor
Raises:
VMInitError: if the given file doesn't represent a valid OVA archive.
"""
logger.verbose("Untarring %s to working directory %s",
file_path, self.working_dir)
try:
tarf = tarfile.open(file_path, 'r')
except (EOFError, tarfile.TarError) as exc:
raise VMInitError(1, "Could not untar file: {0}".format(exc.args),
file_path)
try:
# The OVF standard says, with regard to OVAs:
# ...the files shall be in the following order inside the archive:
# 1) OVF descriptor
# 2) OVF manifest (optional)
# 3) OVF certificate (optional)
# 4) The remaining files shall be in the same order as listed
# in the References section...
# 5) OVF manifest (optional)
# 6) OVF certificate (optional)
#
# For now we just validate #1.
if not tarf.getmembers():
raise VMInitError(1, "No files to untar", file_path)
# Make sure the provided file doesn't contain any malicious paths
# http://stackoverflow.com/questions/8112742/
for pathname in tarf.getnames():
logger.debug("Examining path of %s prior to untar", pathname)
if not (os.path.abspath(os.path.join(self.working_dir,
pathname))
.startswith(self.working_dir)):
raise VMInitError(1, "Tar file contains malicious/unsafe "
"file path '{0}'!".format(pathname),
file_path)
ovf_descriptor = tarf.getmembers()[0]
if os.path.splitext(ovf_descriptor.name)[1] != '.ovf':
# Do we have an OVF descriptor elsewhere in the file?
candidates = [mem for mem in tarf.getmembers() if
os.path.splitext(mem.name)[1] == '.ovf']
if not candidates:
raise VMInitError(1,
"TAR file does not seem to contain any"
" .ovf file to serve as OVF descriptor"
" - OVA is invalid!",
file_path)
ovf_descriptor = candidates[0]
logger.error(
"OVF file %s found, but is not the first file in the TAR "
"as it should be - OVA is not standard-compliant!",
ovf_descriptor.name)
# TODO: In theory we could read the ovf descriptor XML directly
# from the TAR and not need to even extract this file to disk...
tarf.extract(ovf_descriptor, path=self.working_dir)
logger.debug(
"Extracted OVF descriptor from %s to working dir %s",
file_path, self.working_dir)
finally:
tarf.close()
# Find the OVF file
return os.path.join(self.working_dir, ovf_descriptor.name)
[docs] def generate_manifest(self, ovf_file):
"""Construct the manifest file for this package, if possible.
Args:
ovf_file (str): OVF descriptor file path
Returns:
bool: True if the manifest was successfully generated,
False if not successful (such as if checksum helper tools are
unavailable).
"""
(prefix, _) = os.path.splitext(ovf_file)
logger.verbose("Generating manifest for %s", ovf_file)
manifest = prefix + '.mf'
with open(ovf_file, 'rb') as ovfobj:
checksum = file_checksum(ovfobj, self.checksum_algorithm)
with open(manifest, 'wb') as mfobj:
mfobj.write("{algo}({file})= {sum}\n"
.format(algo=self.checksum_algorithm.upper(),
file=os.path.basename(ovf_file),
sum=checksum)
.encode('utf-8'))
# Checksum all referenced files as well
for file_obj in self.references.findall(self.FILE):
file_name = file_obj.get(self.FILE_HREF)
file_ref = self.file_references[file_name]
mfobj.write("{algo}({file})= {sum}\n"
.format(algo=self.checksum_algorithm.upper(),
file=file_name, sum=file_ref.checksum)
.encode('utf-8'))
logger.debug("Manifest generated successfully")
return True
[docs] def tar(self, ovf_descriptor, tar_file):
"""Create a .ova tar file based on the given OVF descriptor.
Args:
ovf_descriptor (str): File path for an OVF descriptor
tar_file (str): File path for the desired OVA archive.
"""
logger.verbose("Creating tar file %s", tar_file)
(prefix, _) = os.path.splitext(ovf_descriptor)
# Issue #66 - need to detect any of the possible scenarios:
# 1) output path and input path are the same real path
# (not just string-equal!)
# 2) output file and input file are the same file (including links)
# but not error out if (common case) output_file doesn't exist yet.
if (os.path.realpath(self.input_file) == os.path.realpath(tar_file) or
(os.path.exists(tar_file) and
os.path.samefile(self.input_file, tar_file))):
# We're about to overwrite the input OVA with a new OVA.
# (Python tarfile module doesn't support in-place edits.)
# Any files that we need to carry over need to be extracted NOW!
logger.info(
"Input OVA will be overwritten. Extracting files from %s to"
" working directory before overwriting it.", self.input_file)
for filename in self.file_references:
file_ref = self.file_references[filename]
if file_ref.file_path is None:
file_ref.copy_to(self.working_dir)
self.file_references[filename] = FileReference.create(
self.working_dir, filename,
checksum_algorithm=self.checksum_algorithm,
expected_checksum=file_ref.checksum,
expected_size=file_ref.size)
# Be sure to dereference any links to the actual file content!
with tarfile.open(tar_file, 'w', dereference=True) as tarf:
# OVF is always first
logger.debug("Adding OVF descriptor %s to %s",
ovf_descriptor, tar_file)
tarf.add(ovf_descriptor, os.path.basename(ovf_descriptor))
# Add manifest if present
manifest_path = prefix + '.mf'
if os.path.exists(manifest_path):
logger.debug("Adding manifest to %s", tar_file)
tarf.add(manifest_path, os.path.basename(manifest_path))
if os.path.exists("{0}.cert".format(prefix)):
logger.warning("COT doesn't know how to re-sign a certificate"
" file, so the existing certificate will be"
" omitted from %s.", tar_file)
# Add all other files mentioned in the OVF
for file_obj in self.references.findall(self.FILE):
file_name = file_obj.get(self.FILE_HREF)
file_ref = self.file_references[file_name]
logger.debug("Adding associated file %s to %s",
file_name, tar_file)
file_ref.add_to_archive(tarf)
def _ensure_section(self, section_tag, info_string,
attrib=None, parent=None):
"""If the OVF doesn't already have the given Section, create it.
Args:
section_tag (str): XML tag of the desired section.
info_string (str): Info string to set if a new Section is created.
attrib (dict): Attributes to filter by when looking for any existing
section (optional).
parent (xml.etree.ElementTree.Element): Parent element (optional).
If not specified, :attr:`envelope` will be the parent.
Returns:
xml.etree.ElementTree.Element: Section element that was found or
created
"""
if parent is None:
parent = self.envelope
section = self.find_child(parent, section_tag, attrib=attrib)
if section is not None:
return section
logger.notice("No existing %s. Creating it.",
XML.strip_ns(section_tag))
if attrib:
section = ET.Element(section_tag, attrib=attrib)
else:
section = ET.Element(section_tag)
# Section elements may be in arbitrary order relative to one another,
# but they MUST come after the References and before the VirtualSystem.
# We'll construct them immediately before the VirtualSystem.
index = 0
for child in list(parent):
if child.tag == self.VIRTUAL_SYSTEM:
break
index += 1
parent.insert(index, section)
# All Sections must have an Info child
self.set_or_make_child(section, self.INFO, info_string)
return section
def _set_product_section_child(self, child_tag, child_text):
"""Update or create the given child of the ProductSection.
Creates the ProductSection itself if necessary.
Args:
child_tag (str): XML tag of the product section child element.
child_text (str): Text to set for the child element.
Returns:
xml.etree.ElementTree.Element: The product section element that
was updated or created
"""
self.product_section = self._ensure_section(
self.PRODUCT_SECTION,
"Product Information",
attrib=self.PRODUCT_SECTION_ATTRIB,
parent=self.virtual_system)
return self.set_or_make_child(self.product_section, child_tag,
child_text)
[docs] def find_parent_from_item(self, item):
"""Find the parent Item of the given Item.
Args:
item (OVFItem): Item whose parent is desired
Returns:
OVFItem: instance representing the parent device, or None
"""
if item is None:
return None
parent_instance = item.get_value(self.PARENT)
if parent_instance is None:
logger.warning("Item instance %s has no 'Parent' subelement."
" Unable to identify parent Item.",
item.get_value(self.INSTANCE_ID))
return None
return self.hardware.find_item(
properties={self.INSTANCE_ID: parent_instance})
[docs] def find_item_from_disk(self, disk):
"""Find the disk Item that references the given Disk.
Args:
disk (xml.etree.ElementTree.Element): Disk element
Returns:
OVFItem: Corresponding instance, or None
"""
if disk is None:
return None
disk_id = disk.get(self.DISK_ID)
match = self.hardware.find_item(
properties={
self.HOST_RESOURCE: (self.HOST_RSRC_DISK_REF + disk_id)
})
if not match:
match = self.hardware.find_item(
properties={
self.HOST_RESOURCE: (self.OLD_HOST_RSRC_DISK_REF + disk_id)
})
return match
[docs] def find_item_from_file(self, file_obj):
"""Find the disk Item that references the given File.
Args:
file_obj (xml.etree.ElementTree.Element): File element
Returns:
OVFItem: Corresponding instance, or None.
"""
if file_obj is None:
return None
file_id = file_obj.get(self.FILE_ID)
match = self.hardware.find_item(
properties={
self.HOST_RESOURCE: (self.HOST_RSRC_FILE_REF + file_id)
})
if not match:
match = self.hardware.find_item(
properties={
self.HOST_RESOURCE: (self.OLD_HOST_RSRC_FILE_REF + file_id)
})
return match
[docs] def find_disk_from_file_id(self, file_id):
"""Find the Disk that uses the given file_id for backing.
Args:
file_id (str): File identifier string
Returns:
xml.etree.ElementTree.Element: Disk matching the file, or None
"""
if file_id is None or self.disk_section is None:
return None
return self.find_child(self.disk_section, self.DISK,
attrib={self.DISK_FILE_REF: file_id})
[docs] def find_empty_drive(self, drive_type):
"""Find a disk device that exists but contains no data.
Args:
drive_type (str): Either 'cdrom' or 'harddisk'
Returns:
OVFItem: Instance representing this disk device, or None.
Raises:
ValueUnsupportedError: if ``drive_type`` is unrecognized.
"""
if drive_type == 'cdrom':
# Find a drive that has no HostResource property
drives = self.hardware.find_all_items(
resource_type=drive_type,
properties={self.HOST_RESOURCE: None})
if drives:
return drives[0]
return None
elif drive_type == 'harddisk':
# All harddisk items must have a HostResource, so we need a
# different way to indicate an empty drive. By convention,
# we do this with a small placeholder disk (one with a Disk entry
# but no corresponding File included in the OVF package).
if self.disk_section is None:
logger.debug("No DiskSection, so no placeholder disk!")
return None
for disk in self.disk_section.findall(self.DISK):
file_id = disk.get(self.DISK_FILE_REF)
if file_id is None:
# Found placeholder disk!
# Now find the drive that's using this disk.
return self.find_item_from_disk(disk)
logger.debug("No placeholder disk found.")
return None
else:
raise ValueUnsupportedError("drive type",
drive_type,
"'cdrom' or 'harddisk'")
[docs] def find_device_location(self, device):
"""Find the controller type and address of a given device object.
Args:
device (OVFItem): Hardware device object.
Returns:
tuple: ``(type, address)``, such as ``("ide", "1:0")``.
Raises:
LookupError: if the controller is not found.
"""
controller = self.find_parent_from_item(device)
if controller is None:
raise LookupError("No parent controller for device?")
return (controller.hardware_type,
(controller.get_value(self.ADDRESS) + ':' +
device.get_value(self.ADDRESS_ON_PARENT)))
[docs] def get_id_from_disk(self, disk):
"""Get the identifier string associated with the given Disk object.
Args:
disk (xml.etree.ElementTree.Element): Disk object to inspect
Returns:
str: Disk identifier
"""
return disk.get(self.DISK_ID)
[docs] def get_capacity_from_disk(self, disk):
"""Get the capacity of the given Disk in bytes.
Args:
disk (xml.etree.ElementTree.Element): Disk element to inspect
Returns:
int: Disk capacity, in bytes
"""
cap = int(disk.get(self.DISK_CAPACITY))
cap_units = disk.get(self.DISK_CAP_UNITS, 'byte')
return programmatic_bytes_to_int(cap, cap_units)
[docs] def set_capacity_of_disk(self, disk, capacity_bytes):
"""Set the storage capacity of the given Disk.
Tries to use the most human-readable form possible (i.e., 8 GiB
instead of 8589934592 bytes).
Args:
disk (xml.etree.ElementTree.Element): Disk to update
capacity_bytes (int): Disk capacity, in bytes
"""
if self.ovf_version < 1.0:
# In OVF 0.9 only bytes is supported as a unit
disk.set(self.DISK_CAPACITY, capacity_bytes)
else:
(capacity, cap_units) = int_bytes_to_programmatic_units(
capacity_bytes)
disk.set(self.DISK_CAPACITY, capacity)
disk.set(self.DISK_CAP_UNITS, cap_units)