123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- # Copyright 2016-2019 Dirk Thomas
- # Licensed under the Apache License, Version 2.0
- import argparse
- from collections import OrderedDict
- import os
- from pathlib import Path
- import sys
- FORMAT_STR_COMMENT_LINE = '# {comment}'
- FORMAT_STR_SET_ENV_VAR = 'export {name}="{value}"'
- FORMAT_STR_USE_ENV_VAR = '${name}'
- FORMAT_STR_INVOKE_SCRIPT = 'COLCON_CURRENT_PREFIX="{prefix}" _colcon_prefix_sh_source_script "{script_path}"'
- FORMAT_STR_REMOVE_TRAILING_SEPARATOR = 'if [ "$(echo -n ${name} | tail -c 1)" = ":" ]; then export {name}=${{{name}%?}} ; fi'
- DSV_TYPE_PREPEND_NON_DUPLICATE = 'prepend-non-duplicate'
- DSV_TYPE_PREPEND_NON_DUPLICATE_IF_EXISTS = 'prepend-non-duplicate-if-exists'
- DSV_TYPE_SET = 'set'
- DSV_TYPE_SET_IF_UNSET = 'set-if-unset'
- DSV_TYPE_SOURCE = 'source'
- def main(argv=sys.argv[1:]): # noqa: D103
- parser = argparse.ArgumentParser(
- description='Output shell commands for the packages in topological '
- 'order')
- parser.add_argument(
- 'primary_extension',
- help='The file extension of the primary shell')
- parser.add_argument(
- 'additional_extension', nargs='?',
- help='The additional file extension to be considered')
- parser.add_argument(
- '--merged-install', action='store_true',
- help='All install prefixes are merged into a single location')
- args = parser.parse_args(argv)
- packages = get_packages(Path(__file__).parent, args.merged_install)
- ordered_packages = order_packages(packages)
- for pkg_name in ordered_packages:
- if _include_comments():
- print(
- FORMAT_STR_COMMENT_LINE.format_map(
- {'comment': 'Package: ' + pkg_name}))
- prefix = os.path.abspath(os.path.dirname(__file__))
- if not args.merged_install:
- prefix = os.path.join(prefix, pkg_name)
- for line in get_commands(
- pkg_name, prefix, args.primary_extension,
- args.additional_extension
- ):
- print(line)
- for line in _remove_trailing_separators():
- print(line)
- def get_packages(prefix_path, merged_install):
- """
- Find packages based on colcon-specific files created during installation.
- :param Path prefix_path: The install prefix path of all packages
- :param bool merged_install: The flag if the packages are all installed
- directly in the prefix or if each package is installed in a subdirectory
- named after the package
- :returns: A mapping from the package name to the set of runtime
- dependencies
- :rtype: dict
- """
- packages = {}
- # since importing colcon_core isn't feasible here the following constant
- # must match colcon_core.location.get_relative_package_index_path()
- subdirectory = 'share/colcon-core/packages'
- if merged_install:
- # return if workspace is empty
- if not (prefix_path / subdirectory).is_dir():
- return packages
- # find all files in the subdirectory
- for p in (prefix_path / subdirectory).iterdir():
- if not p.is_file():
- continue
- if p.name.startswith('.'):
- continue
- add_package_runtime_dependencies(p, packages)
- else:
- # for each subdirectory look for the package specific file
- for p in prefix_path.iterdir():
- if not p.is_dir():
- continue
- if p.name.startswith('.'):
- continue
- p = p / subdirectory / p.name
- if p.is_file():
- add_package_runtime_dependencies(p, packages)
- # remove unknown dependencies
- pkg_names = set(packages.keys())
- for k in packages.keys():
- packages[k] = {d for d in packages[k] if d in pkg_names}
- return packages
- def add_package_runtime_dependencies(path, packages):
- """
- Check the path and if it exists extract the packages runtime dependencies.
- :param Path path: The resource file containing the runtime dependencies
- :param dict packages: A mapping from package names to the sets of runtime
- dependencies to add to
- """
- content = path.read_text()
- dependencies = set(content.split(os.pathsep) if content else [])
- packages[path.name] = dependencies
- def order_packages(packages):
- """
- Order packages topologically.
- :param dict packages: A mapping from package name to the set of runtime
- dependencies
- :returns: The package names
- :rtype: list
- """
- # select packages with no dependencies in alphabetical order
- to_be_ordered = list(packages.keys())
- ordered = []
- while to_be_ordered:
- pkg_names_without_deps = [
- name for name in to_be_ordered if not packages[name]]
- if not pkg_names_without_deps:
- reduce_cycle_set(packages)
- raise RuntimeError(
- 'Circular dependency between: ' + ', '.join(sorted(packages)))
- pkg_names_without_deps.sort()
- pkg_name = pkg_names_without_deps[0]
- to_be_ordered.remove(pkg_name)
- ordered.append(pkg_name)
- # remove item from dependency lists
- for k in list(packages.keys()):
- if pkg_name in packages[k]:
- packages[k].remove(pkg_name)
- return ordered
- def reduce_cycle_set(packages):
- """
- Reduce the set of packages to the ones part of the circular dependency.
- :param dict packages: A mapping from package name to the set of runtime
- dependencies which is modified in place
- """
- last_depended = None
- while len(packages) > 0:
- # get all remaining dependencies
- depended = set()
- for pkg_name, dependencies in packages.items():
- depended = depended.union(dependencies)
- # remove all packages which are not dependent on
- for name in list(packages.keys()):
- if name not in depended:
- del packages[name]
- if last_depended:
- # if remaining packages haven't changed return them
- if last_depended == depended:
- return packages.keys()
- # otherwise reduce again
- last_depended = depended
- def _include_comments():
- # skipping comment lines when COLCON_TRACE is not set speeds up the
- # processing especially on Windows
- return bool(os.environ.get('COLCON_TRACE'))
- def get_commands(pkg_name, prefix, primary_extension, additional_extension):
- commands = []
- package_dsv_path = os.path.join(prefix, 'share', pkg_name, 'package.dsv')
- if os.path.exists(package_dsv_path):
- commands += process_dsv_file(
- package_dsv_path, prefix, primary_extension, additional_extension)
- return commands
- def process_dsv_file(
- dsv_path, prefix, primary_extension=None, additional_extension=None
- ):
- commands = []
- if _include_comments():
- commands.append(FORMAT_STR_COMMENT_LINE.format_map({'comment': dsv_path}))
- with open(dsv_path, 'r') as h:
- content = h.read()
- lines = content.splitlines()
- basenames = OrderedDict()
- for i, line in enumerate(lines):
- # skip over empty or whitespace-only lines
- if not line.strip():
- continue
- try:
- type_, remainder = line.split(';', 1)
- except ValueError:
- raise RuntimeError(
- "Line %d in '%s' doesn't contain a semicolon separating the "
- 'type from the arguments' % (i + 1, dsv_path))
- if type_ != DSV_TYPE_SOURCE:
- # handle non-source lines
- try:
- commands += handle_dsv_types_except_source(
- type_, remainder, prefix)
- except RuntimeError as e:
- raise RuntimeError(
- "Line %d in '%s' %s" % (i + 1, dsv_path, e)) from e
- else:
- # group remaining source lines by basename
- path_without_ext, ext = os.path.splitext(remainder)
- if path_without_ext not in basenames:
- basenames[path_without_ext] = set()
- assert ext.startswith('.')
- ext = ext[1:]
- if ext in (primary_extension, additional_extension):
- basenames[path_without_ext].add(ext)
- # add the dsv extension to each basename if the file exists
- for basename, extensions in basenames.items():
- if not os.path.isabs(basename):
- basename = os.path.join(prefix, basename)
- if os.path.exists(basename + '.dsv'):
- extensions.add('dsv')
- for basename, extensions in basenames.items():
- if not os.path.isabs(basename):
- basename = os.path.join(prefix, basename)
- if 'dsv' in extensions:
- # process dsv files recursively
- commands += process_dsv_file(
- basename + '.dsv', prefix, primary_extension=primary_extension,
- additional_extension=additional_extension)
- elif primary_extension in extensions and len(extensions) == 1:
- # source primary-only files
- commands += [
- FORMAT_STR_INVOKE_SCRIPT.format_map({
- 'prefix': prefix,
- 'script_path': basename + '.' + primary_extension})]
- elif additional_extension in extensions:
- # source non-primary files
- commands += [
- FORMAT_STR_INVOKE_SCRIPT.format_map({
- 'prefix': prefix,
- 'script_path': basename + '.' + additional_extension})]
- return commands
- def handle_dsv_types_except_source(type_, remainder, prefix):
- commands = []
- if type_ in (DSV_TYPE_SET, DSV_TYPE_SET_IF_UNSET):
- try:
- env_name, value = remainder.split(';', 1)
- except ValueError:
- raise RuntimeError(
- "doesn't contain a semicolon separating the environment name "
- 'from the value')
- try_prefixed_value = os.path.join(prefix, value) if value else prefix
- if os.path.exists(try_prefixed_value):
- value = try_prefixed_value
- if type_ == DSV_TYPE_SET:
- commands += _set(env_name, value)
- elif type_ == DSV_TYPE_SET_IF_UNSET:
- commands += _set_if_unset(env_name, value)
- else:
- assert False
- elif type_ in (
- DSV_TYPE_PREPEND_NON_DUPLICATE,
- DSV_TYPE_PREPEND_NON_DUPLICATE_IF_EXISTS
- ):
- try:
- env_name_and_values = remainder.split(';')
- except ValueError:
- raise RuntimeError(
- "doesn't contain a semicolon separating the environment name "
- 'from the values')
- env_name = env_name_and_values[0]
- values = env_name_and_values[1:]
- for value in values:
- if not value:
- value = prefix
- elif not os.path.isabs(value):
- value = os.path.join(prefix, value)
- if (
- type_ == DSV_TYPE_PREPEND_NON_DUPLICATE_IF_EXISTS and
- not os.path.exists(value)
- ):
- comment = 'skip extending {env_name} with not existing path: ' \
- '{value}'.format_map(locals())
- if _include_comments():
- commands.append(
- FORMAT_STR_COMMENT_LINE.format_map({'comment': comment}))
- else:
- commands += _prepend_unique_value(env_name, value)
- else:
- raise RuntimeError(
- 'contains an unknown environment hook type: ' + type_)
- return commands
- env_state = {}
- def _prepend_unique_value(name, value):
- global env_state
- if name not in env_state:
- if os.environ.get(name):
- env_state[name] = set(os.environ[name].split(os.pathsep))
- else:
- env_state[name] = set()
- # prepend even if the variable has not been set yet, in case a shell script sets the
- # same variable without the knowledge of this Python script.
- # later _remove_trailing_separators() will cleanup any unintentional trailing separator
- extend = os.pathsep + FORMAT_STR_USE_ENV_VAR.format_map({'name': name})
- line = FORMAT_STR_SET_ENV_VAR.format_map(
- {'name': name, 'value': value + extend})
- if value not in env_state[name]:
- env_state[name].add(value)
- else:
- if not _include_comments():
- return []
- line = FORMAT_STR_COMMENT_LINE.format_map({'comment': line})
- return [line]
- # generate commands for removing prepended underscores
- def _remove_trailing_separators():
- # do nothing if the shell extension does not implement the logic
- if FORMAT_STR_REMOVE_TRAILING_SEPARATOR is None:
- return []
- global env_state
- commands = []
- for name in env_state:
- # skip variables that already had values before this script started prepending
- if name in os.environ:
- continue
- commands += [FORMAT_STR_REMOVE_TRAILING_SEPARATOR.format_map(
- {'name': name})]
- return commands
- def _set(name, value):
- global env_state
- env_state[name] = value
- line = FORMAT_STR_SET_ENV_VAR.format_map(
- {'name': name, 'value': value})
- return [line]
- def _set_if_unset(name, value):
- global env_state
- line = FORMAT_STR_SET_ENV_VAR.format_map(
- {'name': name, 'value': value})
- if env_state.get(name, os.environ.get(name)):
- line = FORMAT_STR_COMMENT_LINE.format_map({'comment': line})
- return [line]
- if __name__ == '__main__': # pragma: no cover
- try:
- rc = main()
- except RuntimeError as e:
- print(str(e), file=sys.stderr)
- rc = 1
- sys.exit(rc)
|