Skip to content

Aqueduct

Aqueduct

Aqueduct is used to migrate content from one Swimlane instance to another.

_Aqueduct__sort_sync_order(self, components) private

Ensures order of provided component name strings.

Parameters:

Name Type Description Default
components list

A list of component name strings.

required

Returns:

Type Description
list

An ordered list of component names.

Source code in aqueduct/aqueduct.py
def __sort_sync_order(self, components: list):
    """Ensures order of provided component name strings.

    Args:
        components (list): A list of component name strings.

    Returns:
        list: An ordered list of component names.
    """
    ordered = []
    for key, val in COMPONENTS.items():
        if key in components:
            ordered.append(key)
    return ordered

_Aqueduct__want_to_continue(self) private

A recursive method that will continually ask if you want to continue until you provide a correct answer.

Returns:

Type Description
bool

Returns whether or not the user wants to continue.

Source code in aqueduct/aqueduct.py
def __want_to_continue(self):
    """A recursive method that will continually ask if you want to continue until you provide a correct answer.

    Returns:
        bool: Returns whether or not the user wants to continue.
    """
    response = input("Do you want to continue (y or n): ")
    if response.capitalize()[0] == "N":
        return False
    elif response.capitalize()[0] == "Y":
        return True
    else:
        return self.__want_to_continue()

__init__(self, source, destination, dry_run=True, offline=False, update_reports=False, update_dashboards=False, continue_on_error=False, use_unsupported_version=False, force_unsupported_version=False, dump_content_path=None, mirror_app_fields_on_destination=False, update_default_reports=False) special

To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to migrate. The other is the destination of where the content will be migrated to.

Parameters:

Name Type Description Default
source SwimlaneInstance

The source Swimlane instance. Typically this is considered a development instance.

required
destination SwimlaneInstance

The destination Swimlane instance. Typically this is considered a production instance.

required
dry_run bool

Whether or not this run of aqueduct will transfer content to the destination instance. Set to False to migrate content. Default is True.

True
offline bool

Whether or not the destination instance has access to the internet for the purpose of downloading Python packages. Default is False

False
update_reports bool

Whether or not to force update reports if changes are detected. Default is False.

False
update_dashboards bool

(bool): Whether or not to force update dashboards if changes are detected. Default is False.

False
continue_on_error bool

(bool): Whether or not to continue when an API error occurs. Default is False.

False
use_unsupported_version bool

(bool): Whether or not to transfer content from one version Swimlane to a different version of Swimlane. Use this at your own risk. Default is False.

False
force_unsupported_version bool

(bool): Whether or not to force usage of an unsupported version. This is used when running aqueduct headless/non-interactive. Default is False.

False
dump_content_path str

(str, optional): The path to where content should be dumped. If provided, we will dump content to a zip file in the provided path.

None
mirror_app_fields_on_destination bool

(bool, optional): Whether or not to mirror application fields from the source to the destination. This may result in deletion of fields on the destination instance. Defaults to False.

False
update_default_reports bool

(bool, optional): Whether or not to update Default reports on the destination. Default is False.

False
Source code in aqueduct/aqueduct.py
def __init__(
    self,
    source: SwimlaneInstance,
    destination: SwimlaneInstance,
    dry_run: bool = True,
    offline: bool = False,
    update_reports: bool = False,
    update_dashboards: bool = False,
    continue_on_error: bool = False,
    use_unsupported_version: bool = False,
    force_unsupported_version: bool = False,
    dump_content_path: str = None,
    mirror_app_fields_on_destination: bool = False,
    update_default_reports: bool = False,
):
    """To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to
    migrate. The other is the destination of where the content will be migrated to.

    Args:
        source (SwimlaneInstance): The source Swimlane instance. Typically this is considered a
                                   development instance.
        destination (SwimlaneInstance): The destination Swimlane instance. Typically this is considered a production
                                        instance.
        dry_run (bool): Whether or not this run of aqueduct will transfer content to the destination instance.
                        Set to False to migrate content. Default is True.
        offline (bool): Whether or not the destination instance has access to the internet for the purpose of
                        downloading Python packages. Default is False
        update_reports (bool): Whether or not to force update reports if changes are detected. Default is False.
        update_dashboards: (bool): Whether or not to force update dashboards if changes are detected.
                                   Default is False.
        continue_on_error: (bool): Whether or not to continue when an API error occurs. Default is False.
        use_unsupported_version: (bool): Whether or not to transfer content from one version Swimlane to a different
                                         version of Swimlane. Use this at your own risk. Default is False.
        force_unsupported_version: (bool): Whether or not to force usage of an unsupported version. This is used
                                           when running aqueduct headless/non-interactive. Default is False.
        dump_content_path: (str, optional): The path to where content should be dumped. If provided, we will dump
                                            content to a zip file in the provided path.
        mirror_app_fields_on_destination: (bool, optional): Whether or not to mirror application fields from the
                                                            source to the destination. This may result in deletion
                                                            of fields on the destination instance. Defaults to
                                                            False.
        update_default_reports: (bool, optional): Whether or not to update Default reports on the destination.
                                                  Default is False.
    """
    src_version = version.parse(source.swimlane.product_version)
    dest_version = version.parse(destination.swimlane.product_version)
    if use_unsupported_version and src_version != dest_version:
        if src_version > dest_version:
            raise UnsupportedSwimlaneVersion(source=source, destination=destination)
        self.log(
            f"You specified you want to transfer of content from version {source.swimlane.product_version} to "
            f"{destination.swimlane.product_version}. This is not officially supported. "
            "Please use at your own risk.",
            level="info",
        )
        if not force_unsupported_version and not self.__want_to_continue():
            sys.exit()
    elif src_version != dest_version:
        raise UnsupportedSwimlaneVersion(source=source, destination=destination)
    Base.source_instance = source
    Base.source_instance.instance_type = "source"
    Base.destination_instance = destination
    Base.destination_instance.instance_type = "destination"
    Base.dry_run = dry_run
    Base.source_host = Base.source_instance.swimlane.host
    Base.dest_host = Base.destination_instance.swimlane.host
    Base.offline = offline
    Base.update_reports = update_reports
    Base.update_dashboards = update_dashboards
    Base.continue_on_error = continue_on_error
    Base.update_default_reports = update_default_reports
    Base.mirror_app_fields_on_destination = mirror_app_fields_on_destination
    Base.use_unsupported_version = use_unsupported_version

    if WARNING_MAP.get(str(dest_version)):
        for key, val in WARNING_MAP[str(dest_version)].items():
            if getattr(Base, key):
                self.log(val=val, level="warning")

    if dump_content_path:
        Base._dump_content = True
        Base.ZIP_FILE = OutputZip(zip_path=dump_content_path)
    else:
        Base._dump_content = False
        Base.ZIP_FILE = OutputZip(zip_path=os.getcwd())
    atexit.register(self._return_homework_list)
    atexit.register(self._close_and_write_zip)

_close_and_write_zip(self) private

Used to close and write a zip file to disk at exit.

The ZIP_FILE object will close no matter what when calling the write_to_disk method but it will only create a zip file on the disk when there are files in the BytesIO object.

This will be called no matter what but there are two situations in which data will be saved to disk.

1. If a dump_content_path is provided then we will dump all (defined) content into a zipfile in the
   provided path.
2. No matter if the dump_content_path is provided, if an error occurs with an API call we will dump
   the content into a zipfile named aqueduct_errors.zip in the currently working directory.

The created zipfile has the following structure (example only):

📦aqueduct
┣ 📂content
┃ ┣ 📂altered
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┗ 📜Phishing Triage.json
┃ ┃ ┗ 📂workspace
┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
┃ ┣ 📂destination
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┗ 📜Phishing Triage.json
┃ ┃ ┣ 📂workflow
┃ ┃ ┃ ┗ 📜aM_t7AuBscJN_Orf9.json
┃ ┃ ┗ 📂workspace
┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
┃ ┗ 📂source
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┣ 📜Alert & Incident Management.json
┃ ┃ ┗ 📂workflows
┃ ┃ ┃ ┣ 📜a7HOzfbLmUD7xOkKp.json
┣ 📜homework.txt
┗ 📜output.log

The three folders under the content directory are altered, source, and destination.

source - This means the original (GET) source JSON object from the source instance
altered - The altered folder contains JSON objects that have been "massaged" or updated based on the
          differences between both source and destination (if any). This it the JSON that is sent to the
          destination instance.
destination - This means the JSON response from the destination Swimlane instance.
Source code in aqueduct/aqueduct.py
def _close_and_write_zip(self) -> None:
    """Used to close and write a zip file to disk at exit.

    The ZIP_FILE object will close no matter what when calling the write_to_disk method but it will only
    create a zip file on the disk when there are files in the BytesIO object.

    This will be called no matter what but there are two situations in which data will be saved to disk.

        1. If a dump_content_path is provided then we will dump all (defined) content into a zipfile in the
           provided path.
        2. No matter if the dump_content_path is provided, if an error occurs with an API call we will dump
           the content into a zipfile named aqueduct_errors.zip in the currently working directory.

    The created zipfile has the following structure (example only):

        📦aqueduct
        ┣ 📂content
        ┃ ┣ 📂altered
        ┃ ┃ ┣ 📂application
        ┃ ┃ ┃ ┗ 📜Phishing Triage.json
        ┃ ┃ ┗ 📂workspace
        ┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
        ┃ ┣ 📂destination
        ┃ ┃ ┣ 📂application
        ┃ ┃ ┃ ┗ 📜Phishing Triage.json
        ┃ ┃ ┣ 📂workflow
        ┃ ┃ ┃ ┗ 📜aM_t7AuBscJN_Orf9.json
        ┃ ┃ ┗ 📂workspace
        ┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
        ┃ ┗ 📂source
        ┃ ┃ ┣ 📂application
        ┃ ┃ ┃ ┣ 📜Alert & Incident Management.json
        ┃ ┃ ┗ 📂workflows
        ┃ ┃ ┃ ┣ 📜a7HOzfbLmUD7xOkKp.json
        ┣ 📜homework.txt
        ┗ 📜output.log

    The three folders under the `content` directory are altered, source, and destination.

        source - This means the original (GET) source JSON object from the source instance
        altered - The altered folder contains JSON objects that have been "massaged" or updated based on the
                  differences between both source and destination (if any). This it the JSON that is sent to the
                  destination instance.
        destination - This means the JSON response from the destination Swimlane instance.
    """
    if Base.ZIP_FILE:
        Base.ZIP_FILE.write_to_disk()

_return_homework_list(self) private

Prints a summary homework list for quick actionable response.

You can access the homework dictionary from your Aqueduct instance.

Examples:

from aqueduct import Aqueduct

aq = Aqueduct(source=src, destination=dest)

print(aq._homework) # This will print out the dictionary of component lists.

Source code in aqueduct/aqueduct.py
def _return_homework_list(self):
    """Prints a summary homework list for quick actionable response.

    You can access the homework dictionary from your `Aqueduct` instance.
    Example:

        from aqueduct import Aqueduct

        aq = Aqueduct(source=src, destination=dest)

        print(aq._homework) # This will print out the dictionary of component lists.

    """
    if self._homework:
        print("HOMEWORK LIST:\n")
        for key in self._homework.keys():
            if self._homework[key]:
                print(key.title())
                for item in self._homework[key]:
                    print(f"\t{item}")

collect(self, application_name_list)

Collects information about content based on one or more application names on the source instance.

Parameters:

Name Type Description Default
application_name_list list

A list of one or more application names to gather source content from.

required

Returns:

Type Description
list

list: Returns a list of dictionary objects.

Source code in aqueduct/aqueduct.py
def collect(self, application_name_list: list) -> list:
    """Collects information about content based on one or more application names on the source instance.

    Args:
        application_name_list (list): A list of one or more application names to gather source content from.

    Returns:
        list: Returns a list of dictionary objects.
    """
    atexit.unregister(self._close_and_write_zip)
    return_list = []
    if not isinstance(application_name_list, list):
        application_name_list = [application_name_list]
    for application in application_name_list:
        return_list.append(Collector(application_name=application).collect())
    return return_list

gather(self)

Returns a list of application names on the provided source instance.

Returns:

Type Description
list

list: A list of application names on the provided source instance.

Source code in aqueduct/aqueduct.py
def gather(self) -> list:
    """Returns a list of application names on the provided source instance.

    Returns:
        list: A list of application names on the provided source instance.
    """
    atexit.unregister(self._close_and_write_zip)
    return list(self.source_instance.application_dict.keys())

sync(self, components=OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applets', <class 'aqueduct.components.applets.Applets'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)]), exclude={}, include={})

The main method to begin syncing components from one Swimlane instance to another.

There are several available components you can specify. The order of these components if forced. The defaults are:

  • keystore
  • packages
  • plugins
  • assets
  • workspaces
  • applets
  • applications (we update workflows here)
  • tasks
  • reports
  • dashboards
  • users
  • groups
  • roles

You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so provide a dictionary for each argument (include or exclude). For example:

exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.} include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}

aq.sync(include=include, exclude=exclude)

Parameters:

Name Type Description Default
components list

A list of one or more components to sync. Defaults to COMPONENTS.

OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applets', <class 'aqueduct.components.applets.Applets'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)])
exclude dict

A dictionary of component name keys and their named values. Defaults to None.

{}
include dict

A dictionary of component name keys and their named values. Defaults to None.

{}
Source code in aqueduct/aqueduct.py
def sync(self, components=COMPONENTS, exclude: dict = {}, include: dict = {}):
    """The main method to begin syncing components from one Swimlane instance to another.

    There are several available components you can specify. The order of these components if forced.
    The defaults are:

    * keystore
    * packages
    * plugins
    * assets
    * workspaces
    * applets
    * applications (we update workflows here)
    * tasks
    * reports
    * dashboards
    * users
    * groups
    * roles

    You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so
    provide a dictionary for each argument (include or exclude). For example:

    exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.}
    include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}

    aq.sync(include=include, exclude=exclude)

    Args:
        components (list, optional): A list of one or more components to sync. Defaults to COMPONENTS.
        exclude (dict, optional): A dictionary of component name keys and their named values. Defaults to None.
        include (dict, optional): A dictionary of component name keys and their named values. Defaults to None.
    """
    if exclude and include:
        raise TooManyParametersError(
            "You have provided both 'exclude' and 'include' parameters when only one is supported."
            "Please retry with only one of these paramters."
        )
    Base.include = include
    Base.exclude = exclude
    for component in self.__sort_sync_order(components):
        if COMPONENTS.get(component):
            getattr(COMPONENTS[component](), "sync")()
            self._components_used.append(component)
            print("\n")
    if Base.dry_run:
        print("DRY RUN RESULTS:\n")
        return self._get_formatted_diff_log()
    print(b64decode(self.__LOGO).decode("ascii"))

Base

tracking_id_map property writable

A dictionary map of tracking-ids.

The key is the source instance trackingFieldId and the value is the destinations trackingFieldId equivalent.

Returns:

Type Description
dict

A tracking-id map.

_get_formatted_diff_log(self) private

Returns a formatted list of strings from the DIFF_LOG property.

Returns:

Type Description
list

A list of strings.

Source code in aqueduct/base.py
def _get_formatted_diff_log(self):
    """Returns a formatted list of strings from the DIFF_LOG property.

    Returns:
        list: A list of strings.
    """
    return_list = []
    for item in Base.DIFF_LOG:
        log = self.DIFF_TEMPLATE.substitute(**item)
        self.log(log)
        return_list.append(log)
    return return_list

_is_in_include_exclude_lists(self, name, type) private

Checks to see if the name of the component content item is in a include or exclude list.

Parameters:

Name Type Description Default
name str

The component content item name.

required
type str

The component type.

required

Returns:

Type Description
bool

bool: Returns True if it is in the exclude or not in the include list else False.

Source code in aqueduct/base.py
def _is_in_include_exclude_lists(self, name: str, type: str) -> bool:
    """Checks to see if the name of the component content item is in a include or exclude list.

    Args:
        name (str): The component content item name.
        type (str): The component type.

    Returns:
        bool: Returns True if it is in the exclude or not in the include list else False.
    """
    if self.include and self.include.get(type):
        if name in self.include[type]:
            return False
        else:
            return True
    elif self.exclude and self.exclude.get(type):
        if name in self.exclude[type]:
            return True
        else:
            return False
    else:
        return False

add_to_diff_log(self, name, diff_type, subcomponent=None, value='')

Adds a dictionary of values to the DIFF_LOG list.

We create a dictionary of values and add them to our DIFF_LOG list but we also write the output to our log file.

We gather the component name by inspecting the calling stack.

Parameters:

Name Type Description Default
name str

The name of the component content item.

required
diff_type str

The type of diff this is. Must be one of 'added','updated','upgraded','removed','moved'.

required
subcomponent str

The subcomponent this is related to (e.g. fields in an app). Defaults to None.

None
value str

str. A value of the subcomponent to add to our dictionary. Defaults to "".

''

Exceptions:

Type Description
ValueError

Raises when the provided diff_type is not one of 'added','updated','upgraded','removed','moved'.

Source code in aqueduct/base.py
def add_to_diff_log(self, name, diff_type, subcomponent=None, value=""):
    """Adds a dictionary of values to the DIFF_LOG list.

    We create a dictionary of values and add them to our DIFF_LOG list but we also write the output to our log file.

    We gather the component name by inspecting the calling stack.

    Args:
        name (str): The name of the component content item.
        diff_type (str): The type of diff this is. Must be one of 'added','updated','upgraded','removed','moved'.
        subcomponent (str, optional): The subcomponent this is related to (e.g. fields in an app). Defaults to None.
        value (str, optional): str. A value of the subcomponent to add to our dictionary. Defaults to "".

    Raises:
        ValueError: Raises when the provided diff_type is not one of 'added','updated','upgraded','removed','moved'.
    """
    if diff_type in self.__diff_types:
        component = None
        parent = inspect.stack()[1][0].f_locals.get("self", None)
        component = parent.__class__.__name__
        if component and hasattr(f"_{component}", "__logger"):
            if subcomponent:
                getattr(parent, f"_{component}__logger").info(
                    f"Dry Run: Component '{component}' named '{name}' with subcomponent '{subcomponent}' with value"
                    f" of '{value}' would have been {diff_type} on destination.",
                )
            else:
                getattr(parent, f"_{component}__logger").info(
                    f"Dry Run: Component '{component}' named '{name}' would have been {diff_type} on destination."
                )
        self.DIFF_LOG.append(
            {
                "component": component,
                "subcomponent": subcomponent,
                "name": name,
                "value": value,
                "diff_type": diff_type,
            }
        )
    else:
        raise ValueError(f"Unknown type of '{type}' provided. Cannot add to diff log...")

add_to_homework_list(self, component_name, value)

Adds a list of items to a dictionary of component name keys

This method adds the provided value to a dictionary of component_name(s). Once Aqueduct is complete we will return this _homework dictionary in a formatted way so that users can have a quick way to understand any manual steps they must complete.

Parameters:

Name Type Description Default
component_name str

The name of the aqueduct component.

required
value str

The value to add to the list of component items.

required
Source code in aqueduct/base.py
def add_to_homework_list(self, component_name: str, value: str) -> None:
    """Adds a list of items to a dictionary of component name keys

    This method adds the provided value to a dictionary of component_name(s). Once Aqueduct is complete
    we will return this _homework dictionary in a formatted way so that users can have a quick way to understand
    any manual steps they must complete.

    Args:
        component_name (str): The name of the aqueduct component.
        value (str): The value to add to the list of component items.
    """
    if not self._homework.get(component_name):
        self._homework[component_name] = []
    self._homework[component_name].append(value)

log(self, val, level='info')

Used to centralize logging across components.

We identify the source of the logging class by inspecting the calling stack.

Parameters:

Name Type Description Default
val str

The log value string to output.

required
level str

The log level. Defaults to "info".

'info'
Source code in aqueduct/base.py
def log(self, val, level="info"):
    """Used to centralize logging across components.

    We identify the source of the logging class by inspecting the calling stack.

    Args:
        val (str): The log value string to output.
        level (str, optional): The log level. Defaults to "info".
    """
    component = None
    parent = inspect.stack()[1][0].f_locals.get("self", None)
    component = parent.__class__.__name__
    try:
        getattr(getattr(parent, f"_{component}__logger"), level)(val)
        self.OUTPUT_LOG.append(f"{component} - {level.upper()} - {val}")
    except AttributeError as ae:
        self.OUTPUT_LOG.append(f"{component} - {level.upper()} - {val}")

scrub(self, obj, bad_key='$type')

Used to remove a specific provided key from a dictionary that may contain both nested dictionaries and lists.

This method is recursive.

Parameters:

Name Type Description Default
obj dict

A dictionary or list to remove keys from.

required
bad_key str

The bad key to remove from the provided dict or list. Defaults to "$type".

'$type'
Source code in aqueduct/base.py
def scrub(self, obj, bad_key="$type"):
    """Used to remove a specific provided key from a dictionary
    that may contain both nested dictionaries and lists.

    This method is recursive.

    Args:
        obj (dict): A dictionary or list to remove keys from.
        bad_key (str, optional): The bad key to remove from the provided dict or list. Defaults to "$type".
    """
    if isinstance(obj, dict):
        for key in list(obj.keys()):
            if key == bad_key:
                del obj[key]
            else:
                self.scrub(obj[key], bad_key)
    elif isinstance(obj, list):
        for i in reversed(range(len(obj))):
            if obj[i] == bad_key:
                del obj[i]
            else:
                self.scrub(obj[i], bad_key)
    else:
        pass

SwimlaneInstance

Creates a connection to a single Swimlane instance

add_applet(self, applet)

Used to add a applet to a Swimlane instance

Parameters:

Name Type Description Default
applet dict

A applet item to add to a Swimlane instance

required

Returns:

Type Description
dict

A dictionary of a applet item

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_applet(self, applet: dict):
    """Used to add a applet to a Swimlane instance

    Args:
        applet (dict): A applet item to add to a Swimlane instance

    Returns:
        dict : A dictionary of a applet item
    """
    return self._make_request("POST", "/applet", json=applet)

add_application(self, application)

Adds an application based on the provided dictionary object

Parameters:

Name Type Description Default
application dict

A application dictionary object.

required

Returns:

Type Description
dict

A Swimlane application object

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_application(self, application):
    """Adds an application based on the provided dictionary object

    Args:
        application (dict): A application dictionary object.

    Returns:
        dict: A Swimlane application object
    """
    return self._make_request("POST", "/app", json=application)

add_asset(self, asset)

Adds a provided asset dictionary object to a Swimlane instance

Parameters:

Name Type Description Default
asset dict

A Swimlane asset dictionary object.

required

Returns:

Type Description
dict

dict: A Swimlane asset dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_asset(self, asset: dict) -> dict:
    """Adds a provided asset dictionary object to a Swimlane instance

    Args:
        asset (dict): A Swimlane asset dictionary object.

    Returns:
        dict: A Swimlane asset dictionary object.
    """
    return self._make_request("POST", "/asset", json=asset)

add_credential(self, credential)

Used to add a keystore key name (but not it's value) to a Swimlane instance

Parameters:

Name Type Description Default
credential dict

A keystore item to add to a Swimlane instance

required

Returns:

Type Description
dict

A dictionary of a keystore item

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_credential(self, credential: dict):
    """Used to add a keystore key name (but not it's value) to a Swimlane instance

    Args:
        credential (dict): A keystore item to add to a Swimlane instance

    Returns:
        dict : A dictionary of a keystore item
    """
    return self._make_request("POST", "/credentials", json=credential)

add_dashboard(self, dashboard)

Adds a provided dashboard dictionary object to a Swimlane instance

Parameters:

Name Type Description Default
dashboard dict

A Swimlane dashboard dictionary object.

required

Returns:

Type Description
dict

A Swimlane dashboard dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_dashboard(self, dashboard: dict):
    """Adds a provided dashboard dictionary object to a Swimlane instance

    Args:
        dashboard (dict): A Swimlane dashboard dictionary object.

    Returns:
        dict: A Swimlane dashboard dictionary object.
    """
    return self._make_request("POST", "/dashboard", json=dashboard)

add_group(self, group)

Adds a provided group dictionary object to a Swimlane instance

Parameters:

Name Type Description Default
group dict

A Swimlane group dictionary object.

required

Returns:

Type Description
dict

A Swimlane group dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_group(self, group: dict):
    """Adds a provided group dictionary object to a Swimlane instance

    Args:
        group (dict): A Swimlane group dictionary object.

    Returns:
        dict: A Swimlane group dictionary object.
    """
    try:
        return self._make_request("POST", "/groups", json=group)
    except SwimlaneHTTP400Error as sh:
        if sh.code == 1051:
            return True
        else:
            raise sh
    except Exception as e:
        raise e

add_report(self, report)

Adds a provided report dictionary object to a Swimlane instance

Parameters:

Name Type Description Default
report dict

A report dictionary object.

required

Returns:

Type Description
dict

A report dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_report(self, report: dict):
    """Adds a provided report dictionary object to a Swimlane instance

    Args:
        report (dict): A report dictionary object.

    Returns:
        dict: A report dictionary object.
    """
    return self._make_request("POST", "/reports", json=report)

add_role(self, role)

Adds a provided role dictionary object to a Swimlane instance

Parameters:

Name Type Description Default
role dict

A role dictionary object.

required

Returns:

Type Description
dict

A role dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_role(self, role: dict):
    """Adds a provided role dictionary object to a Swimlane instance

    Args:
        role (dict): A role dictionary object.

    Returns:
        dict: A role dictionary object.
    """
    return self._make_request("POST", "/roles", json=role)

add_task(self, task)

Adds a provided task dictionary to a Swimlane instance

Parameters:

Name Type Description Default
task dict

A Swimlane task dictionary.

required

Returns:

Type Description
dict

A Swimlane task dictionary.

Source code in aqueduct/instance.py
@dump_content
def add_task(self, task: dict):
    """Adds a provided task dictionary to a Swimlane instance

    Args:
        task (dict): A Swimlane task dictionary.

    Returns:
        dict: A Swimlane task dictionary.
    """
    try:
        return self._make_request("POST", "/task", json=task)
    except Exception as e:
        return False

add_user(self, user)

Adds a provided user dictionary object.

Parameters:

Name Type Description Default
user dict

A user dictionary object.

required

Returns:

Type Description
dict

A user dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_user(self, user: dict):
    """Adds a provided user dictionary object.

    Args:
        user (dict): A user dictionary object.

    Returns:
        dict: A user dictionary object.
    """
    return self._make_request("POST", "/user", json=user)

add_workflow(self, workflow)

Adds a Swimlane instance with the provided Workflow data model object

Parameters:

Name Type Description Default
workflow Workflow

A Workflow data model object

required

Returns:

Type Description
Workflow

A Workflow data model object

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_workflow(self, workflow):
    """Adds a Swimlane instance with the provided Workflow data model object

    Args:
        workflow (Workflow): A `Workflow` data model object

    Returns:
        Workflow: A `Workflow` data model object
    """
    try:
        return self._make_request("POST", "/workflow/", json=workflow)
    except Exception as e:
        return False

add_workspace(self, workspace)

Adds a provided Workspace object to a Swimlane instance

Parameters:

Name Type Description Default
workspace dict

A workspace dictionary object.

required

Returns:

Type Description
dict

A workspace dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_workspace(self, workspace: dict):
    """Adds a provided Workspace object to a Swimlane instance

    Args:
        workspace (dict): A workspace dictionary object.

    Returns:
        dict: A workspace dictionary object.
    """
    return self._make_request("POST", "/workspaces", json=workspace)

download_plugin(self, file_id)

A Swimlane internal fileId for a plugin or Python package to download

Parameters:

Name Type Description Default
file_id str

An internal Swimlane fileId to download a plugin or python package

required

Returns:

Type Description
BytesIO

A bytesIO object of the downloaded file

Source code in aqueduct/instance.py
@log_exception
def download_plugin(self, file_id: str):
    """A Swimlane internal fileId for a plugin or Python package to download

    Args:
        file_id (str): An internal Swimlane fileId to download a plugin or python package

    Returns:
        BytesIO: A bytesIO object of the downloaded file
    """
    stream = BytesIO()
    response = self.swimlane.request("GET", f"attachment/download/{file_id}", stream=True)
    for chunk in response.iter_content(1024):
        stream.write(chunk)
    stream.seek(0)
    return stream

get_applet(self, applet_id)

Retrieves a applet if it exists on the desired instance

Parameters:

Name Type Description Default
applet_id str

The ID of an applet

required

Returns:

Type Description
dict

A dictionary of a applet

Source code in aqueduct/instance.py
@log_exception
def get_applet(self, applet_id):
    """Retrieves a applet if it exists on the desired instance

    Args:
        applet_id (str): The ID of an applet

    Returns:
        dict : A dictionary of a applet
    """
    try:
        return self._make_request("GET", f"/applet/{applet_id}")
    except Exception as e:
        return False

get_applets(self)

Used to retrieve applets

Returns:

Type Description
dict

A list of applet dictionaries

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_applets(self):
    """Used to retrieve applets

    Returns:
        dict : A list of applet dictionaries
    """
    return self._make_request("GET", "/applet")

get_application(self, application_id)

Gets an application by a provided ID

If the application does not exist, then this method returns False.

If the application does exist it will return an JSON object

Parameters:

Name Type Description Default
application_id str

A Swimlane application ID

required

Returns:

Type Description
json

A Swimlane application JSON

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_application(self, application_id):
    """Gets an application by a provided ID

    If the application does not exist, then this method returns False.

    If the application does exist it will return an JSON object

    Args:
        application_id (str): A Swimlane application ID

    Returns:
        json: A Swimlane application JSON
    """
    try:
        return self._make_request("GET", f"/app/{application_id}")
    except Exception as e:
        return False

get_application_workspaces(self, application_id)

Gets a list of workspaces for an application ID.

Parameters:

Name Type Description Default
application_id str

A Swimlane application ID

required

Returns:

Type Description
list

A list of workspaces associated with an application.

Source code in aqueduct/instance.py
@log_exception
def get_application_workspaces(self, application_id):
    """Gets a list of workspaces for an application ID.

    Args:
        application_id (str): A Swimlane application ID

    Returns:
        list: A list of workspaces associated with an application.
    """
    try:
        return self._make_request("GET", f"/workspaces/app/{application_id}")
    except Exception as e:
        return False

get_applications(self)

Retrieves a list of applications for a Swimlane instance.

Returns:

Type Description
list

A list of json objects

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_applications(self):
    """Retrieves a list of applications for a Swimlane instance.

    Returns:
        list: A list of json objects
    """
    return self._make_request("GET", "/app")

get_applications_light(self)

Gets light version of all applications

Returns:

Type Description
list

A list of application light objects

Source code in aqueduct/instance.py
@log_exception
def get_applications_light(self):
    """Gets light version of all applications

    Returns:
        list: A list of application light objects
    """
    return self._make_request("GET", "/app/light")

get_asset(self, asset_id)

Gets an asset by a provided ID

If the asset does not exist, then this method returns False.

If the asset does exist it will return an asset dictionary.

Parameters:

Name Type Description Default
asset_id str

An Swimlane asset ID

required

Returns:

Type Description
dict

dict: A Swimlane instance asset dictionary.

Source code in aqueduct/instance.py
@log_exception
def get_asset(self, asset_id: str) -> dict:
    """Gets an asset by a provided ID

    If the asset does not exist, then this method returns False.

    If the asset does exist it will return an asset dictionary.

    Args:
        asset_id (str): An Swimlane asset ID

    Returns:
        dict: A Swimlane instance asset dictionary.
    """
    try:
        return self._make_request("GET", f"/asset/{asset_id}")
    except Exception as e:
        return False

get_assets(self)

Retrieves a list of assets for a Swimlane instance.

Returns:

Type Description
list

list: A list of asset JSON objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_assets(self) -> list:
    """Retrieves a list of assets for a Swimlane instance.

    Returns:
        list: A list of asset JSON objects.
    """
    assets = self._make_request("GET", "/asset")
    return assets if assets and isinstance(assets, list) else []

get_credential(self, name)

Retrieves a keystore item if it exists on the desired instance

Parameters:

Name Type Description Default
name str

Name of a keystore item

required

Returns:

Type Description
dict

A dictionary of a keystore item

Source code in aqueduct/instance.py
@log_exception
def get_credential(self, name: str):
    """Retrieves a keystore item if it exists on the desired instance

    Args:
        name (str): Name of a keystore item

    Returns:
        dict : A dictionary of a keystore item
    """
    try:
        return self._make_request("GET", f"/credentials/{name}")
    except Exception as e:
        return False

get_credentials(self)

Used to retrieve keystore items

Returns:

Type Description
dict

A dictionary of keystore keys and encrypted values

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_credentials(self):
    """Used to retrieve keystore items

    Returns:
        dict : A dictionary of keystore keys and encrypted values
    """
    return self._make_request("GET", "/credentials")

get_dashboard(self, dashboard_id)

Gets an dashboard by a provided ID

If the dashboard does not exist, then this method returns False.

If the dashboard does exist it will return an dashboard dictionary object.

Parameters:

Name Type Description Default
dashboard_id str

A Swimlane dashboard ID

required

Returns:

Type Description
dict

A Swimlane dashboard dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_dashboard(self, dashboard_id: str):
    """Gets an dashboard by a provided ID

    If the dashboard does not exist, then this method returns False.

    If the dashboard does exist it will return an dashboard dictionary object.

    Args:
        dashboard_id (str): A Swimlane dashboard ID

    Returns:
        dict: A Swimlane dashboard dictionary object.
    """
    try:
        return self._make_request("GET", f"/dashboard/{dashboard_id}")
    except Exception as e:
        return False

get_dashboards(self)

Retrieves a list of dashboards for a Swimlane instance.

Returns:

Type Description
list

A list of dashboard dictionary objects

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_dashboards(self):
    """Retrieves a list of dashboards for a Swimlane instance.

    Returns:
        list: A list of dashboard dictionary objects
    """
    dashboards = self._make_request("GET", "/dashboard")
    return dashboards if dashboards and isinstance(dashboards, list) else []

get_default_report_by_application_id(self, application_id)

Gets the default report for an application based on the provided ID

Parameters:

Name Type Description Default
application_id str

A application id.

required

Returns:

Type Description
dict

A Swimlane report dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_default_report_by_application_id(self, application_id):
    """Gets the default report for an application based on the provided ID

    Args:
        application_id (str): A application id.

    Returns:
        dict: A Swimlane report dictionary object.
    """
    try:
        return self._make_request("GET", f"/reports/app/{application_id}/default")
    except Exception as e:
        return False

get_group_by_id(self, group_id)

Gets an group by a provided id

If the group does not exist, then this method returns False.

If the group does exist it will return a group dictionary object.

Parameters:

Name Type Description Default
group_id str

A Swimlane group ID

required

Returns:

Type Description
dict

A Swimlane group object.

Source code in aqueduct/instance.py
@log_exception
def get_group_by_id(self, group_id: str):
    """Gets an group by a provided id

    If the group does not exist, then this method returns False.

    If the group does exist it will return a group dictionary object.

    Args:
        group_id (str): A Swimlane group ID

    Returns:
        dict: A Swimlane group object.
    """
    try:
        return self._make_request("GET", f"/groups/{group_id}")
    except Exception as e:
        return False

get_group_by_name(self, group_name)

Gets an group by a provided name

If the group does not exist, then this method returns False.

If the group does exist it will return a group dictionary object.

Parameters:

Name Type Description Default
group_name str

A Swimlane group name

required

Returns:

Type Description
dict

A Swimlane group object.

Source code in aqueduct/instance.py
@log_exception
def get_group_by_name(self, group_name: str):
    """Gets an group by a provided name

    If the group does not exist, then this method returns False.

    If the group does exist it will return a group dictionary object.

    Args:
        group_name (str): A Swimlane group name

    Returns:
        dict: A Swimlane group object.
    """
    try:
        resp = self._make_request("GET", f"/users/lookup?name={group_name}")
        return resp[0] if resp and isinstance(resp, list) else False
    except Exception as e:
        return False

get_groups(self)

Retrieves a list of groups for a Swimlane instance.

Returns:

Type Description
list

A list of group dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_groups(self):
    """Retrieves a list of groups for a Swimlane instance.

    Returns:
        list: A list of group dictionary objects.
    """
    groups = self._make_request("GET", "/groups")
    return groups["items"] if groups.get("items") else groups.get("groups", [])

get_pip_packages(self, versions=['Python2_7', 'Python3_6', 'Python3'])

Retrieves a list of pip packages for a Swimlane instance.

Parameters:

Name Type Description Default
versions list

A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3'].

['Python2_7', 'Python3_6', 'Python3']

Returns:

Type Description
list

A list of package dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_pip_packages(self, versions=["Python2_7", "Python3_6", "Python3"]):
    """Retrieves a list of pip packages for a Swimlane instance.

    Args:
        versions (list, optional): A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3'].

    Returns:
        list: A list of package dictionary objects.
    """
    return_list = []
    for version in versions:
        try:
            resp = self._make_request("GET", f"/pip/packages/{version}")
            if resp:
                for item in resp:
                    if item and isinstance(item, dict):
                        return_list.append(item)
        except Exception as e:
            continue
    return return_list

get_plugin(self, name)

Gets an plugin by a provided plugin name

If the plugin does not exist, then this method returns False.

If the plugin does exist it will return an plugin dictionary object.

Parameters:

Name Type Description Default
name str

A plugin name

required

Returns:

Type Description
dict

A Swimlane plugin dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_plugin(self, name: str):
    """Gets an plugin by a provided plugin name

    If the plugin does not exist, then this method returns False.

    If the plugin does exist it will return an plugin dictionary object.

    Args:
        name (str): A plugin name

    Returns:
        dict: A Swimlane plugin dictionary object.
    """
    try:
        return self._make_request("GET", f"/task/packages/{name}")
    except Exception as e:
        return False

get_plugins(self)

Retrieves a list of plugins for a Swimlane instance.

Returns:

Type Description
list

A list of plugin (light) dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_plugins(self):
    """Retrieves a list of plugins for a Swimlane instance.

    Returns:
        list: A list of plugin (light) dictionary objects.
    """
    plugins = self._make_request("GET", "/task/packages")
    return plugins if plugins and isinstance(plugins, list) else []

get_report(self, report_id)

Gets an report by a provided ID

If the report does not exist, then this method returns False.

If the report does exist it will return a report dictionary object.

Parameters:

Name Type Description Default
report_id str

A Swimlane report ID

required

Returns:

Type Description
dict

A Swimlane report dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_report(self, report_id: str):
    """Gets an report by a provided ID

    If the report does not exist, then this method returns False.

    If the report does exist it will return a report dictionary object.

    Args:
        report_id (str): A Swimlane report ID

    Returns:
        dict: A Swimlane report dictionary object.
    """
    try:
        return self._make_request("GET", f"/reports/{report_id}")
    except Exception as e:
        return False

get_reports(self)

Retrieves a list of reports for a Swimlane instance.

Returns:

Type Description
list

A list of report dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_reports(self):
    """Retrieves a list of reports for a Swimlane instance.

    Returns:
        list: A list of report dictionary objects.
    """
    reports = self._make_request("GET", "/reports")
    return reports if reports else []

get_role(self, role_id)

Gets an role by a provided ID

If the role does not exist, then this method returns False.

If the role does exist it will return an role dictionary object.

Parameters:

Name Type Description Default
role_id str

A Swimlane role ID

required

Returns:

Type Description
dict

A role dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_role(self, role_id: str):
    """Gets an role by a provided ID

    If the role does not exist, then this method returns False.

    If the role does exist it will return an role dictionary object.

    Args:
        role_id (str): A Swimlane role ID

    Returns:
        dict: A role dictionary object.
    """
    try:
        return self._make_request("GET", f"/roles/{role_id}")
    except Exception as e:
        return False

get_role_by_name(self, role_name)

Gets an role by a provided name

If the role does not exist, then this method returns False.

If the role does exist it will return an role dictionary object.

Parameters:

Name Type Description Default
role_name str

A Swimlane role name

required

Returns:

Type Description
dict

A role dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_role_by_name(self, role_name: str):
    """Gets an role by a provided name

    If the role does not exist, then this method returns False.

    If the role does exist it will return an role dictionary object.

    Args:
        role_name (str): A Swimlane role name

    Returns:
        dict: A role dictionary object.
    """
    try:
        return self._make_request("GET", f"/roles/?searchFieldName=name&searchValue={role_name}")
    except Exception as e:
        return False

get_roles(self)

Retrieves a list of roles for a Swimlane instance.

These roles are modeled after the role dictionary object.

Returns:

Type Description
list

A list of role dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_roles(self):
    """Retrieves a list of roles for a Swimlane instance.

    These roles are modeled after the role dictionary object.

    Returns:
        list: A list of role dictionary objects.
    """
    return_list = []
    roles = self._make_request("GET", "/roles")
    if roles and isinstance(roles, dict):
        # checking for items here. 10.4.0 does not use the items key
        # but greater versions do (e.g. 10.5>)
        if roles.get("items"):
            roles = roles["items"]
    if roles and isinstance(roles, list):
        for role in roles:
            return_list.append(role)
    return return_list

get_task(self, task_id)

Gets an task by a provided task ID.

If the task ID does not exist, then this method returns False.

If the task ID does exist it will return a dictionary object

Parameters:

Name Type Description Default
task_id str

A task ID to retrieve the entire task JSON.

required

Returns:

Type Description
dict

A Swimlane task dictionary.

Source code in aqueduct/instance.py
@log_exception
def get_task(self, task_id: str):
    """Gets an task by a provided task ID.

    If the task ID does not exist, then this method returns False.

    If the task ID does exist it will return a dictionary object

    Args:
        task_id (str): A task ID to retrieve the entire task JSON.

    Returns:
        dict: A Swimlane task dictionary.
    """
    try:
        return self._make_request("GET", f"/task/{task_id}")
    except Exception as e:
        return False

get_tasks(self)

Retrieves a list of tasks for a Swimlane instance.

Returns:

Type Description
list

A list of task objects

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_tasks(self):
    """Retrieves a list of tasks for a Swimlane instance.

    Returns:
        list: A list of task objects
    """
    tasks = self._make_request("GET", "/task/list")
    if tasks and tasks.get("tasks"):
        return tasks["tasks"]

get_tasks_by_application(self, application_id)

Gets a list of tasks by application ID.

If the application ID does not exist, then this method returns False.

If the task ID does exist it will return a list of tasks

Parameters:

Name Type Description Default
application_id str

A application ID to retrieve tasks from.

required

Returns:

Type Description
list

A list of tasks associated with an application.

Source code in aqueduct/instance.py
@log_exception
def get_tasks_by_application(self, application_id: str):
    """Gets a list of tasks by application ID.

    If the application ID does not exist, then this method returns False.

    If the task ID does exist it will return a list of tasks

    Args:
        application_id (str): A application ID to retrieve tasks from.

    Returns:
        list: A list of tasks associated with an application.
    """
    try:
        return self._make_request("GET", f"/task/list/{application_id}")
    except Exception as e:
        return False

get_user(self, user_id)

Gets an user by a provided ID

If the user does not exist, then this method returns False.

If the user does exist it will return an user dictionary object.

Parameters:

Name Type Description Default
user_id str

A Swimlane user ID

required

Returns:

Type Description
dict

A user dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_user(self, user_id: str):
    """Gets an user by a provided ID

    If the user does not exist, then this method returns False.

    If the user does exist it will return an user dictionary object.

    Args:
        user_id (str): A Swimlane user ID

    Returns:
        dict: A user dictionary object.
    """
    try:
        return self._make_request("GET", f"/user/{user_id}")
    except Exception as e:
        return False

get_users(self)

Retrieves a list of users for a Swimlane instance.

Returns:

Type Description
list

A list of user dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_users(self):
    """Retrieves a list of users for a Swimlane instance.

    Returns:
        list: A list of user dictionary objects.
    """
    users = self._make_request("GET", "/user/light")
    return users if users else []

get_workflow(self, application_id)

Gets an workflow by a provided application ID

If the workflow does not exist, then this method returns False.

If the workflow does exist it will return an Workflow object

Parameters:

Name Type Description Default
application_id str

A Swimlane application ID

required

Returns:

Type Description
Workflow

A Workflow data model object

Source code in aqueduct/instance.py
@log_exception
def get_workflow(self, application_id: str):
    """Gets an workflow by a provided application ID

    If the workflow does not exist, then this method returns False.

    If the workflow does exist it will return an `Workflow` object

    Args:
        application_id (str): A Swimlane application ID

    Returns:
        Workflow: A `Workflow` data model object
    """
    try:
        return self._make_request("GET", f"/workflow/{application_id}")
    except Exception as e:
        return False

get_workflows(self)

Retrieves a list of workflows for a Swimlane instance.

These workflow are modeled after the Workflow data model

Returns:

Type Description
list

A list of Workflow objects

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_workflows(self):
    """Retrieves a list of workflows for a Swimlane instance.

    These workflow are modeled after the `Workflow` data model

    Returns:
        list: A list of `Workflow` objects
    """
    return_list = []
    workflows = self._make_request("GET", "/workflow/")
    if workflows:
        for workflow in workflows:
            return_list.append(workflow)
    return return_list

get_workspace(self, workspace_id)

Gets an workspace by a provided ID

If the workspace does not exist, then this method returns False.

If the workspace does exist it will return an workspace dictionary object.

Parameters:

Name Type Description Default
workspace_id str

A Swimlane workspace ID

required

Returns:

Type Description
dict

A workspace dictionary object.

Source code in aqueduct/instance.py
@log_exception
def get_workspace(self, workspace_id: str):
    """Gets an workspace by a provided ID

    If the workspace does not exist, then this method returns False.

    If the workspace does exist it will return an workspace dictionary object.

    Args:
        workspace_id (str): A Swimlane workspace ID

    Returns:
        dict: A  workspace dictionary object.
    """
    try:
        return self._make_request("GET", f"/workspaces/{workspace_id}")
    except Exception as e:
        return False

get_workspaces(self)

Retrieves a list of workspaces for a Swimlane instance.

Returns:

Type Description
list

A list of workspace dictionary objects.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_workspaces(self):
    """Retrieves a list of workspaces for a Swimlane instance.

    Returns:
        list: A list of workspace dictionary objects.
    """
    workspaces = self._make_request("GET", "/workspaces")
    return workspaces if workspaces else []

install_package(self, package)

Installs a Python (pip) package based on the provided package dictionary object.

Parameters:

Name Type Description Default
package dict

A Swimlane package dictionary object.

required

Returns:

Type Description
dict

A dictionary of a Python pip package

Source code in aqueduct/instance.py
@log_exception
def install_package(self, package: dict):
    """Installs a Python (pip) package based on the provided package dictionary object.

    Args:
        package (dict): A Swimlane package dictionary object.

    Returns:
        dict: A dictionary of a Python pip package
    """
    json = {
        "name": package["name"],
        "version": package["version"],
        "pythonVersion": package["pythonVersion"],
    }
    return self._make_request("POST", "/pip/packages", json=json)

install_package_offline(self, filename, stream, data)

Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream

Parameters:

Name Type Description Default
filename str

A filename string

required
stream BytesIO

A BytesIO file stream

required
data dict

A dictionary of additional request information

required

Returns:

Type Description
json

JSON Response from installing a Python package wheel file

Source code in aqueduct/instance.py
@log_exception
def install_package_offline(self, filename, stream, data):
    """Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream

    Args:
        filename (str): A filename string
        stream (BytesIO): A BytesIO file stream
        data (dict): A dictionary of additional request information

    Returns:
        json: JSON Response from installing a Python package wheel file
    """
    return self.swimlane.request(
        "POST",
        "/pip/packages/offline",
        data=data,
        files={"wheel": (filename, stream.read())},
        timeout=120,
    )

search_user(self, query_string)

Searches for a user by a query string

If the user does not exist, then this method returns False.

If the user does exist it will return an user dictionary object.

Parameters:

Name Type Description Default
query_string str

A query string typically a display name

required

Returns:

Type Description
dict

A user dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def search_user(self, query_string: str):
    """Searches for a user by a query string

    If the user does not exist, then this method returns False.

    If the user does exist it will return an user dictionary object.

    Args:
        query_string (str): A query string typically a display name

    Returns:
        dict: A user dictionary object.
    """
    try:
        resp = self._make_request("GET", f"/user/lookup?name={query_string}")
        if resp and isinstance(resp, list):
            return resp[0]
    except Exception as e:
        return False

update_applet(self, applet)

Updates a provided applet with the data contained in the provided task dictionary.

Parameters:

Name Type Description Default
applet dict

A Swimlane applet dictionary.

required

Returns:

Type Description
dict

A Swimlane applet dictionary.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_applet(self, applet: dict):
    """Updates a provided applet with the data contained in the provided task dictionary.

    Args:
        applet (dict): A Swimlane applet dictionary.

    Returns:
        dict: A Swimlane applet dictionary.
    """
    return self._make_request("PUT", f"/applet/{applet['id']}", json=applet)

update_application(self, application)

Updates the application based on the provided dictionary object

Parameters:

Name Type Description Default
application dict

A application dictionary object.

required

Returns:

Type Description
dict

A Swimlane application object

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_application(self, application):
    """Updates the application based on the provided dictionary object

    Args:
        application (dict): A application dictionary object.

    Returns:
        dict: A Swimlane application object
    """
    return self._make_request("PUT", "/app", json=application)

update_asset(self, asset_id, asset)

Updates a provided asset ID with the data contained in the provided asset dictionary object.

Parameters:

Name Type Description Default
asset_id str

An asset ID to update on a Swimlane instance

required
asset dict

A Swimlane asset dictionary object.

required

Returns:

Type Description
dict

A Swimlane asset dictionary.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_asset(self, asset_id: str, asset: dict):
    """Updates a provided asset ID with the data contained in the provided asset dictionary object.

    Args:
        asset_id (str): An asset ID to update on a Swimlane instance
        asset (dict): A Swimlane asset dictionary object.

    Returns:
        dict: A Swimlane asset dictionary.
    """
    return self._make_request("PUT", f"/asset/{asset_id}", json=asset)

update_dashboard(self, dashboard)

Updates a Swimlane instance dashboard based on the provided dashboard dictionary.

Parameters:

Name Type Description Default
dashboard dict

A Swimlane dashboard dictionary object.

required

Returns:

Type Description
dict

A Swimlane dashboard dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_dashboard(self, dashboard: dict):
    """Updates a Swimlane instance dashboard based on the provided dashboard dictionary.

    Args:
        dashboard (dict): A Swimlane dashboard dictionary object.

    Returns:
        dict: A Swimlane dashboard dictionary object.
    """
    return self._make_request("PUT", f"/dashboard/{dashboard['id']}", json=dashboard)

update_default_report(self, report)

Updates a Swimlane applications default report with the provided report dictionary object.

Parameters:

Name Type Description Default
report dict

A report dictionary object.

required

Returns:

Type Description
dict

A report dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_default_report(self, report: dict):
    """Updates a Swimlane applications default report with the provided report dictionary object.

    Args:
        report (dict): A report dictionary object.

    Returns:
        dict: A report dictionary object.
    """
    return self._make_request("PUT", f"/reports/{report['id']}", json=report)

update_group(self, group_id, group)

Updates a provided group ID with the data contained in the provided group dictionary object.

Parameters:

Name Type Description Default
group_id str

A group ID to update on a Swimlane instance

required
group dict

A Swimlane group dictionary object.

required

Returns:

Type Description
dict

A Swimlane group dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_group(self, group_id: str, group: dict):
    """Updates a provided group ID with the data contained in the provided group dictionary object.

    Args:
        group_id (str): A group ID to update on a Swimlane instance
        group (dict): A Swimlane group dictionary object.

    Returns:
        dict: A Swimlane group dictionary object.
    """
    return self._make_request("PUT", f"/groups/{group_id}", json=group)

update_report(self, report_id, report)

Updates a provided report ID with the data contained in the provided report dictionary object.

Parameters:

Name Type Description Default
report_id str

A report ID to update on a Swimlane instance

required
report dict

A report dictionary object.

required

Returns:

Type Description
dict

A report dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_report(self, report_id: str, report: dict):
    """Updates a provided report ID with the data contained in the provided report dictionary object.

    Args:
        report_id (str): A report ID to update on a Swimlane instance
        report (dict): A report dictionary object.

    Returns:
        dict: A report dictionary object.
    """
    resp = self._make_request("PUT", f"/reports/{report_id}", json=report)
    if resp:
        return True

update_role(self, role_id, role)

Updates a provided role ID with the data contained in the provided role dictionary object.

Parameters:

Name Type Description Default
role_id str

A role ID to update on a Swimlane instance

required
role dict

A role dictionary object.

required

Returns:

Type Description
dict

A role dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_role(self, role_id: str, role: dict):
    """Updates a provided role ID with the data contained in the provided role dictionary object.

    Args:
        role_id (str): A role ID to update on a Swimlane instance
        role (dict): A role dictionary object.

    Returns:
        dict: A role dictionary object.
    """
    return self._make_request("PUT", f"/roles/{role_id}", json=role)

update_task(self, task_id, task)

Updates a provided task ID with the data contained in the provided task dictionary.

Parameters:

Name Type Description Default
task_id str

A task ID to update on a Swimlane instance

required
task dict

A Swimlane task dictionary.

required

Returns:

Type Description
dict

A Swimlane task dictionary.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_task(self, task_id: str, task: dict):
    """Updates a provided task ID with the data contained in the provided task dictionary.

    Args:
        task_id (str): A task ID to update on a Swimlane instance
        task (dict): A Swimlane task dictionary.

    Returns:
        dict: A Swimlane task dictionary.
    """
    return self._make_request("PUT", f"/task/{task_id}", json=task)

update_user(self, user_id, user)

Updates a provided user ID with the data contained in the provided user dictionary object.

Parameters:

Name Type Description Default
user_id str

A user ID to update on a Swimlane instance

required
user dict

A user dictionary object.

required

Returns:

Type Description
dict

A user dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_user(self, user_id: str, user: dict):
    """Updates a provided user ID with the data contained in the provided user dictionary object.

    Args:
        user_id (str): A user ID to update on a Swimlane instance
        user (dict): A user dictionary object.

    Returns:
        dict: A user dictionary object.
    """
    return self._make_request("PUT", f"/user/{user_id}", json=user)

update_workflow(self, workflow)

Updates a Swimlane instance with the provided Workflow data model object

Parameters:

Name Type Description Default
workflow Workflow

A Workflow data model object

required

Returns:

Type Description
Workflow

A Workflow data model object

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_workflow(self, workflow):
    """Updates a Swimlane instance with the provided `Workflow` data model object

    Args:
        workflow (Workflow): A `Workflow` data model object

    Returns:
        Workflow: A `Workflow` data model object
    """
    return self._make_request("PUT", f"/workflow/{workflow['id']}", json=workflow)

update_workspace(self, workspace_id, workspace)

Updates a provided workspace ID with the data contained in the provided workspace dictionary object.

Parameters:

Name Type Description Default
workspace_id str

A workspace ID to update on a Swimlane instance

required
workspace dict

A workspace dictionary object.

required

Returns:

Type Description
dict

A workspace dictionary object.

Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_workspace(self, workspace_id: str, workspace: dict):
    """Updates a provided workspace ID with the data contained in the provided workspace dictionary object.

    Args:
        workspace_id (str): A workspace ID to update on a Swimlane instance
        workspace (dict): A workspace dictionary object.

    Returns:
        dict: A workspace dictionary object.
    """
    return self._make_request("PUT", f"/workspaces/{workspace_id}", json=workspace)

upgrade_plugin(self, filename, stream)

Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream

Parameters:

Name Type Description Default
filename str

A filename string

required
stream BytesIO

A BytesIO file stream

required

Returns:

Type Description
json

JSON Response from uploading a plugin

Source code in aqueduct/instance.py
@log_exception
def upgrade_plugin(self, filename, stream):
    """Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream

    Args:
        filename (str): A filename string
        stream (BytesIO): A BytesIO file stream

    Returns:
        json: JSON Response from uploading a plugin
    """
    if not filename.endswith(".swimbundle"):
        filename = filename.split(".")[0] + ".swimbundle"
    return self.swimlane.request("POST", "/task/packages/upgrade", files={"file": (filename, stream.read())}).json()

upload_plugin(self, filename, stream)

Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream

Parameters:

Name Type Description Default
filename str

A filename string

required
stream BytesIO

A BytesIO file stream

required

Returns:

Type Description
json

JSON Response from uploading a plugin

Source code in aqueduct/instance.py
@log_exception
def upload_plugin(self, filename, stream):
    """Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream

    Args:
        filename (str): A filename string
        stream (BytesIO): A BytesIO file stream

    Returns:
        json: JSON Response from uploading a plugin
    """
    if not filename.endswith(".swimbundle"):
        filename = filename.split(".")[0] + ".swimbundle"
    return self.swimlane.request("POST", "/task/packages", files={"file": (filename, stream.read())}).json()

ComponentBase

A base class for all components.

This class is used to ensure that all derived classes have a sync method as well as shared methods.

_get_field_by_type(self, field_list, field_type='tracking') private

Returns a field dictionary by its defined type.

Parameters:

Name Type Description Default
field_list list

A list of application fields.

required
field_type str

The fieldType value of the field. Defaults to "tracking".

'tracking'

Returns:

Type Description
dict

dict: A swimlane application field dictionary.

Source code in aqueduct/components/base.py
def _get_field_by_type(self, field_list: list, field_type: str = "tracking") -> dict:
    """Returns a field dictionary by its defined type.

    Args:
        field_list (list): A list of application fields.
        field_type (str, optional): The fieldType value of the field. Defaults to "tracking".

    Returns:
        dict: A swimlane application field dictionary.
    """
    for field in field_list:
        if field.get("fieldType") and field["fieldType"] == field_type:
            return field

_replace(self, component, search_value, replace_value) private

Recursively replaces any key or values matching a provided value with another one.

Parameters:

Name Type Description Default
component dict

The given component dictionary object.

required
search_value str

The value to search for.

required
replace_value str

The value to replace any matches found with.

required

Returns:

Type Description
Dict[str, str]

dict: Returns an updated dictionary with the replaced values.

Source code in aqueduct/components/base.py
def _replace(self, component: dict, search_value: str, replace_value: str) -> Dict[str, str]:
    """Recursively replaces any key or values matching a provided value with another one.

    Args:
        component (dict): The given component dictionary object.
        search_value (str): The value to search for.
        replace_value (str): The value to replace any matches found with.

    Returns:
        dict: Returns an updated dictionary with the replaced values.
    """
    if isinstance(component, dict):
        new_dict = {}
        for k, v in component.items():
            key = k
            if k == search_value:
                key = replace_value
            new_dict[key] = self._replace(v, search_value, replace_value)
        return new_dict
    elif isinstance(component, list):
        new_list = []
        for v in component:
            new_list.append(self._replace(v, search_value, replace_value))
        return new_list
    else:
        if component == search_value:
            return replace_value
        else:
            return component

_set_unneeded_keys_to_empty_dict(self, component, keys=['createdByUser', 'modifiedByUser', 'permissions']) private

A component object to remove defined keys from.

Parameters:

Name Type Description Default
component dict or attrs

A Swimlane component object to clean.

required
keys list

A list of keys to set as empty dictionaries.

['createdByUser', 'modifiedByUser', 'permissions']

Returns:

Type Description
dict or attrs

Returns an updated component with the values set as empty dictionaries.

Source code in aqueduct/components/base.py
def _set_unneeded_keys_to_empty_dict(self, component, keys=["createdByUser", "modifiedByUser", "permissions"]):
    """A component object to remove defined keys from.

    Args:
        component (dict or attrs): A Swimlane component object to clean.
        keys (list): A list of keys to set as empty dictionaries.

    Returns:
        dict or attrs: Returns an updated component with the values set as empty dictionaries.
    """
    for key in keys:
        if isinstance(component, dict):
            if component.get(key):
                component[key] = {}
    return component

is_masked(self, value)

Returns true or false if the provided value is masked.

Parameters:

Name Type Description Default
value Any

A value to check if it is masked with ***.

required

Returns:

Type Description
bool

bool: Returns True if the value is masked. False if not.

Source code in aqueduct/components/base.py
def is_masked(self, value: Any) -> bool:
    """Returns true or false if the provided value is masked.

    Args:
        value (Any): A value to check if it is masked with ***.

    Returns:
        bool: Returns True if the value is masked. False if not.
    """
    if isinstance(value, str):
        return self.MASKED_VALUE.match(value)
    return False

sync(self)

Every component must have a defined sync method.

Exceptions:

Type Description
NotImplementedError

Raises when a component does not have a sync method defined.

Source code in aqueduct/components/base.py
@abstractmethod
def sync(self):
    """Every component must have a defined sync method.

    Raises:
        NotImplementedError: Raises when a component does not have a sync method defined.
    """
    raise NotImplementedError("The class does not have a sync method.")

Applications

Used to sync applications from a source instance to a destination instance of Swimlane.

_add_fields(self, source, destination) private

Appends fields to a destination application.

This method will append fields to a destination application which are in the source application but missing from the destination application.

Parameters:

Name Type Description Default
source dict

A Swimlane source instance application.

required
destination dict

A Swimlane destination instance application.

required

Returns:

Type Description
dict

A Swimlane destination application.

Source code in aqueduct/components/applications.py
def _add_fields(self, source: dict, destination: dict):
    """Appends fields to a destination application.

    This method will append fields to a destination application which
    are in the source application but missing from the destination
    application.

    Args:
        source (dict): A Swimlane source instance application.
        destination (dict): A Swimlane destination instance application.

    Returns:
        dict: A Swimlane destination application.
    """
    self.log(f"Checking application '{source['name']}' application for missing fields.")
    if ComponentBase.mirror_app_fields_on_destination:
        self.log(val=f"Force updating application '{source['name']}' fields on destination instance from source.")
        dest_tracking_id_field = self._remove_tracking_field(application=destination)
        self._remove_tracking_field(application=source)
        destination["fields"] = source["fields"]
        destination["fields"].append(dest_tracking_id_field)
        self.log(val=f"Force update of application '{source['name']}' fields successful.")
    else:
        for sfield in source["fields"]:
            if sfield.get("fieldType") and sfield["fieldType"].lower() == "tracking":
                continue
            if not self._is_field_in_fields(application_fields=destination["fields"], field=sfield):
                self.log(f"Field '{sfield['name']}' not in destination application.")
                destination["fields"].append(sfield)
                self.log(f"Successfully added '{sfield['name']}' to destination application")
                if ComponentBase.dry_run:
                    self.add_to_diff_log(source["name"], "added", subcomponent="field", value=sfield["name"])
    return destination

_is_field_in_fields(self, application_fields, field) private

Checks to see if a field matches in a list of application fields.

Parameters:

Name Type Description Default
application_fields list

A Swimlane instance application fields list.

required
field dict

A Swimlane instance application field.

required

Returns:

Type Description
bool

bool: Whether or not the field is in the provided list of fields.

Source code in aqueduct/components/applications.py
def _is_field_in_fields(self, application_fields: list, field: dict) -> bool:
    """Checks to see if a field matches in a list of application fields.

    Args:
        application_fields (list): A Swimlane instance application fields list.
        field (dict): A Swimlane instance application field.

    Returns:
        bool: Whether or not the field is in the provided list of fields.
    """
    for app_field in application_fields:
        if field.get("id") and app_field.get("id") and app_field["id"] == field["id"]:
            return True
    return False

_process_workspaces(self, application) private

Ensures that Swimlane workspaces exist before processing applications.

Parameters:

Name Type Description Default
application dict

A Swimlane instance application.

required

Exceptions:

Type Description
GetComponentError

Raised when an applications Workspace does not exist on the source instance.

Source code in aqueduct/components/applications.py
def _process_workspaces(self, application: dict) -> None:
    """Ensures that Swimlane workspaces exist before processing applications.

    Args:
        application (dict): A Swimlane instance application.

    Raises:
        GetComponentError: Raised when an applications Workspace does not exist on the source instance.
    """
    if application.get("workspaces") and application["workspaces"]:
        for workspace in application["workspaces"]:
            workspace_ = self.source_instance.get_workspace(workspace_id=workspace)
            if not workspace_:
                raise GetComponentError(name="workspace", id=workspace)
            if workspace_ and not self.destination_instance.get_workspace(workspace_id=workspace):
                Workspaces().sync_workspace(workspace=workspace_)

_remove_tracking_field(self, application) private

Removes the tracking-id field from an application.

This method removes an applications tracking field since Swimlane automatically generates this for every new application.

Parameters:

Name Type Description Default
application dict

A Swimlane application to remove the tracking field from

required
Source code in aqueduct/components/applications.py
def _remove_tracking_field(self, application: dict):
    """Removes the tracking-id field from an application.

    This method removes an applications tracking field since
    Swimlane automatically generates this for every new application.

    Args:
        application (dict): A Swimlane application to remove the tracking field from
    """
    return_dict = {}
    for field in application.get("fields"):
        if field.get("fieldType") and field["fieldType"].lower() == "tracking":
            if not ComponentBase.dry_run:
                self.log(f"Removing tracking-id field from application '{application['name']}'.")
                application["fields"].remove(field)
                return_dict = field
            else:
                self.add_to_diff_log(application["name"], "removed", subcomponent="tracking-field")
    return return_dict

get_reference_app_order(self)

Creates a ordered dictionary based on most to least referenced applications.

This method creates an order of applications to be added or updated on a destination instance based on most to least reference relationships. For example, a source application that references 5 applications will be before an application which has 3 references to applications.

Returns:

Type Description
dict

An reference application ordered (sorted) application dictionary

Source code in aqueduct/components/applications.py
def get_reference_app_order(self):
    """Creates a ordered dictionary based on most to least referenced applications.

    This method creates an order of applications to be added or updated on a destination
    instance based on most to least reference relationships. For example, a source application
    that references 5 applications will be before an application which has 3 references to applications.

    Returns:
        dict: An reference application ordered (sorted) application dictionary
    """
    reference_dict = {}
    for application in self.source_instance.get_applications():
        if application:
            application_ = self.source_instance.get_application(application_id=application["id"])
            if application_["id"] not in reference_dict:
                reference_dict[application_["id"]] = []
            for field in application_["fields"]:
                if field.get("$type") and field["$type"] == "Core.Models.Fields.Reference.ReferenceField, Core":
                    reference_dict[application_["id"]].append(field["targetId"])

    return_dict = OrderedDict()
    for item in sorted(reference_dict, key=lambda k: len(reference_dict[k]), reverse=True):
        return_dict[item] = reference_dict[item]
    return return_dict

sync(self)

This method will sync all applications on a source instance with a destination instance.

Source code in aqueduct/components/applications.py
def sync(self):
    """This method will sync all applications on a source instance with a destination instance."""
    self.log(f"Starting to sync 'Applications' from '{self.source_host}' to '{self.dest_host}'")
    for application_id, values in self.get_reference_app_order().items():
        self.sync_application(application_id=application_id)

    # Checking if we need to update applications reference fields that contain old (source instance)
    # tracking-ids in their columns. If so we remove them and update it with the new ones on the destination.
    if self.__applications_needing_updates:
        for application_id in self.__applications_needing_updates:
            dest_application = self.destination_instance.get_application(application_id)
            needs_update = False
            for field in dest_application["fields"]:
                if field.get("columns"):
                    column_list = []
                    for column in field["columns"]:
                        if self.tracking_id_map.get(column):
                            self.log(f"Removing old tracking-id '{column}' from application field '{field['key']}'")
                            column_list.append(self.tracking_id_map[column])
                            self.log(
                                f"Adding new tracking-id '{self.tracking_id_map[column]}' to application field "
                                f"'{field['key']}'"
                            )
                            needs_update = True
                        else:
                            column_list.append(column)
                    field["columns"] = column_list
            if needs_update:
                if not ComponentBase.dry_run:
                    self.log(f"Updating application '{dest_application['name']}' on destination with new.")
                    dest_application = self.destination_instance.update_application(dest_application)
                    self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
                else:
                    self.add_to_diff_log(dest_application["name"], "updated")

sync_application(self, application_id)

This method syncs a single application from a source instance to a destination instance.

Once an application_id on a source instance is provided we retrieve this application JSON. Next we remove the tracking_field from the application since Swimlane automatically generates a unique value for this field.

If workspaces are defined in the application and do not currently exist will attempt to create or update them using the Workspaces class.

If the source application does not exist on the destination, we will create it with the same IDs for 1. application 2. fields 3. layout 4. etc.

By doing this, syncing of applications (and their IDs) is much easier.

If the application exists, we proceed to remove specific fields that are not needed.

Next, we check for fields which have been added to the source application but do not exist on the destination instance. If fields are found we add them to the destination object.

Next, we check to see if the destination has fields which are not defined in the source instance. If fields are found, we then remove them from the layout view of the source application. This equates to moving them to the "hidden" field section within the application builder so they can still be retrieved and reorganized as needed.

Finally, we update the application on the destination instance.

After updating the application we then check to ensure that the workflow of that application is up to date and accurate.

Parameters:

Name Type Description Default
application_id str

A source application ID.

required
Source code in aqueduct/components/applications.py
def sync_application(self, application_id: str):
    """This method syncs a single application from a source instance to a destination instance.

    Once an application_id on a source instance is provided we retrieve this application JSON.
    Next we remove the tracking_field from the application since Swimlane automatically generates
    a unique value for this field.

    If workspaces are defined in the application and do not currently exist will attempt to create
    or update them using the Workspaces class.

    If the source application does not exist on the destination, we will create it with the same IDs for
        1. application
        2. fields
        3. layout
        4. etc.

    By doing this, syncing of applications (and their IDs) is much easier.

    If the application exists, we proceed to remove specific fields that are not needed.

    Next, we check for fields which have been added to the source application but do not
    exist on the destination instance. If fields are found we add them to the destination
    object.

    Next, we check to see if the destination has fields which are not defined in the source instance.
    If fields are found, we then remove them from the layout view of the source application. This equates
    to moving them to the "hidden" field section within the application builder so they can still be retrieved
    and reorganized as needed.

    Finally, we update the application on the destination instance.

    After updating the application we then check to ensure that the workflow of that application is up to date
    and accurate.

    Args:
        application_id (str): A source application ID.
    """
    application = self.source_instance.get_application(application_id=application_id)
    if not application:
        raise GetComponentError(name="application", id=application_id)
    self.log(f"Processing source instance application '{application['name']}'.")
    if not self._is_in_include_exclude_lists(name=application["name"], type="applications"):
        self._process_workspaces(application=application)
        source_tracking_field = self._remove_tracking_field(application)
        # scrubbing application dict to remove $type keys recursively.
        self.scrub(application)
        dest_application = self.destination_instance.get_application(application["id"])
        if not dest_application:
            if not ComponentBase.dry_run:
                self.log(f"Adding application '{application['name']}' on destination.")
                dest_application = self.destination_instance.add_application(application)
                self.__applications_needing_updates.append(application_id)
                self.log(f"Successfully added application '{application['name']}' on destination.")
                # Setting tracking_id_map just in case tasks are added and need updating
                # Tasks would need updating if they use an applications 'Tracking Id' field as an
                # input or output since Swimlane randomizes the Tracking ID when created
                self.tracking_id_map = (source_tracking_field["id"], dest_application["trackingFieldId"])
            else:
                self.add_to_diff_log(application["name"], "added")
        else:
            # removing these keys since they are not needed and only cause issues.
            dest_application = self._set_unneeded_keys_to_empty_dict(component=dest_application)
            dest_application = self._add_fields(source=application, destination=dest_application)
            if not ComponentBase.dry_run:
                self.log(f"Updating application '{dest_application['name']}' on destination.")
                # Here we are setting the destination applications layout key to the source applications layout key
                # we consider the source application layout to be the source of truth
                dest_application["layout"] = application["layout"]
                self.destination_instance.update_application(dest_application)
                self.__applications_needing_updates.append(application_id)
                self.tracking_id_map = (source_tracking_field["id"], dest_application["trackingFieldId"])
                self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
            else:
                self.add_to_diff_log(application["name"], "updated")
        self.log(f"Checking for changes in workflow for application '{application['name']}'")
        # we are providing the application as well as the name since there is a bug in Swimlane 10.5.2 with the
        # workflows endpoint so we need to look it up by application Id instead.
        Workflows().sync_workflow(application_name=application["name"], application_id=application_id)

Assets

Used to sync assets from a source instance to a destination instance of Swimlane.

destination_assets property readonly

A list of destination instance assets.

Returns:

Type Description
list

A list of destination asset names.

_check_for_homework_assignment(self, asset) private

Checks the provided asset dictionary for parameters containing secrets.

Parameters:

Name Type Description Default
asset dict

A swimlane asset dictionary object.

required
Source code in aqueduct/components/assets.py
def _check_for_homework_assignment(self, asset: dict) -> None:
    """Checks the provided asset dictionary for parameters containing secrets.

    Args:
        asset (dict): A swimlane asset dictionary object.
    """
    parameter_list = []
    if asset.get("parameters"):
        for key, val in asset["parameters"].items():
            if key != "$type":
                if self.is_masked(val):
                    parameter_list.append(key)
    if parameter_list:
        self.add_to_homework_list(
            component_name="assets",
            value=f"You must manually enter your secrets in asset '{asset['name']}' for the following parameters: "
            f"'{','.join([x for x in parameter_list])}'.",
        )

sync(self)

This method is used to sync (create) all assets from a source instance to a destination instance

Source code in aqueduct/components/assets.py
def sync(self):
    """This method is used to sync (create) all assets from a source instance to a destination instance"""
    self.log(f"Attempting to sync assets from '{self.source_host}' to '{self.dest_host}'.")
    for asset in self.source_instance.get_assets():
        self.sync_asset(asset=asset)
    self.log(f"Completed syncing of assets from '{self.source_host}' to '{self.dest_host}'.")

sync_asset(self, asset)

This method will create (add) a single asset from a source instance to a destination instance.

Currently we are only adding assets and NOT updating them but this functionality may expand in the future.

Parameters:

Name Type Description Default
asset dict

A single Swimlane asset dictionary from the Swimlane API

required
Source code in aqueduct/components/assets.py
def sync_asset(self, asset: dict) -> None:
    """This method will create (add) a single asset from a source instance to a destination instance.

    Currently we are only adding assets and NOT updating them but this functionality may expand in the future.

    Args:
        asset (dict): A single Swimlane asset dictionary from the Swimlane API
    """
    if not self._is_in_include_exclude_lists(asset["name"], "assets"):
        self.log(f"Processing asset '{asset['name']}'.")
        if asset["name"] not in self.destination_assets:
            self.log(f"Asset '{asset['name']}' was not found on destination.")
            if not ComponentBase.dry_run:
                dest_asset = self.destination_instance.add_asset(asset)
                if not dest_asset:
                    raise AddComponentError(model=asset, name=asset["name"])
                self.log(f"Asset '{asset['name']}' was successfully added to destination.")
            else:
                self.add_to_diff_log(asset["name"], "added")
            self._check_for_homework_assignment(asset=asset)
        else:
            self.log(f"Asset '{asset['name']}' already exists on destination. Skipping")

Dashboards

Used to sync dashboards from a source instance to a destination instance of Swimlane.

_get_destination_dashboard(self, source_dashboard_uid) private

Returns a matching destination instance dashboard if it exists.

Parameters:

Name Type Description Default
source_dashboard_uid str

The source instance dashboard UID.

required
Source code in aqueduct/components/dashboards.py
def _get_destination_dashboard(self, source_dashboard_uid: str):
    """Returns a matching destination instance dashboard if it exists.

    Args:
        source_dashboard_uid (str): The source instance dashboard UID.
    """
    dest_dashboards = self.destination_instance.get_dashboards()
    if dest_dashboards:
        for d in dest_dashboards:
            if d["uid"] == source_dashboard_uid:
                return d
    return None

_process_reports(self, dashboard) private

Ensures that any referenced reports actually exist on the source instance.

Parameters:

Name Type Description Default
dashboard dict

A Swimlane dashboard dictionary object.

required

Exceptions:

Type Description
GetComponentError

Raises when a defined report does not exist on a source Swimlane instance.

Source code in aqueduct/components/dashboards.py
def _process_reports(self, dashboard: dict):
    """Ensures that any referenced reports actually exist on the source instance.

    Args:
        dashboard (dict): A Swimlane dashboard dictionary object.

    Raises:
        GetComponentError: Raises when a defined report does not exist on a source Swimlane instance.
    """
    self.log(f"Processing dashboard '{dashboard['name']}' reports.")
    for item in dashboard["items"]:
        if item.get("reportId"):
            self.log(
                f"Dashboard '{dashboard['name']}' contains a report ({item['reportId']}). "
                "Checking to make sure it exists."
            )
            report = self.source_instance.get_report(report_id=item["reportId"])
            # TODO: Add main parameter to skip if the report does not exist on the source instance.
            # Sometimes a card on a dashboard exists on the dashboard but the associated report does not.
            if not report:
                self.add_to_homework_list(
                    component_name="dashboards",
                    value=f"The dashboard '{dashboard['name']}' contains a card pointing to a report "
                    f"({item['reportId']}) that does not exist on the source instance. "
                    "It is recommended to remove this dashboard card from your source instance.",
                )
                self.log(
                    f"The dashboard '{dashboard['name']}' contains a report card that does not exist on the "
                    f"source instance. Look for report id '{item['reportId']}' and remove it from the dashboard."
                )
            else:
                # Attempting to sync the report
                Reports().sync_report(report=report)
        else:
            self.log(
                val=f"Dashboard '{dashboard['name']}' includes a report card that is a card type of "
                f"'{item.get('cardType')}'."
            )

sync(self)

This method is used to sync all dashboards from a source instance to a destination instance

Source code in aqueduct/components/dashboards.py
def sync(self):
    """This method is used to sync all dashboards from a source instance to a destination instance"""
    self.log(f"Attempting to sync dashboards from '{self.source_host}' to '{self.dest_host}'")
    dashboards = self.source_instance.get_dashboards()
    if dashboards:
        for dashboard in dashboards:
            self.sync_dashboard(dashboard=dashboard)
    self.log(f"Completed syncing of dashboards from '{self.source_host}' to '{self.dest_host}'.")

sync_dashboard(self, dashboard)

This method syncs a single dashboard from a source instance to a destination instance.

This class first checks to see if the provided dashboard already exists on the destination instance. If it does not exist then we attempt to add the dashboard to the destination instance.

If the dashboard already exists on the destination instance, we first check it against all destination instance dashboards. This check involves comparing the provided source dashboard dict with the uid and name of a destination instance dashboard.

If a match is found, we then check if the version is the same. If it is we simply skip processing this dashboard.

If a match is found but the versions are different, we first ensure that all the reports in the dashboard are on the destination instance. Once that is complete, we modify the dashboard to remove unneeded keys and then update it as provided by the source instance.

Parameters:

Name Type Description Default
dashboard dict

A source instance dashboard dictionary.

required
Source code in aqueduct/components/dashboards.py
def sync_dashboard(self, dashboard: dict):
    """This method syncs a single dashboard from a source instance to a destination instance.

    This class first checks to see if the provided dashboard already exists on the destination instance.
    If it does not exist then we attempt to add the dashboard to the destination instance.

    If the dashboard already exists on the destination instance, we first check it against all destination
    instance dashboards. This check involves comparing the provided source dashboard dict with
    the `uid` and `name` of a destination instance dashboard.

    If a match is found, we then check if the version is the same.
    If it is we simply skip processing this dashboard.

    If a match is found but the versions are different, we first ensure that all the reports in the dashboard are on
    the destination instance. Once that is complete, we modify the dashboard to remove unneeded keys and then update
    it as provided by the source instance.

    Args:
        dashboard (dict): A source instance dashboard dictionary.
    """
    self.log(f"Processing dashboard '{dashboard['name']}' ({dashboard['id']})")
    self.scrub(dashboard)
    if not self._is_in_include_exclude_lists(dashboard["name"], "dashboards"):
        # making sure that all reports exist on the source instance.
        self._process_reports(dashboard=dashboard)
        dest_dashboard = self._get_destination_dashboard(source_dashboard_uid=dashboard["uid"])
        # if no dest_dashboard then we need to create it on the destination instance.
        if not dest_dashboard:
            if not ComponentBase.dry_run:
                self.log(
                    f"Adding dashboard '{dashboard['name']}' for workspaces "
                    f"'{dashboard.get('workspaces')}' on destination"
                )
                # These keys need to be in our object but they need to be empty.
                dashboard = self._set_unneeded_keys_to_empty_dict(component=dashboard)
                dest_dashboard = self.destination_instance.add_dashboard(dashboard)
                if not dest_dashboard:
                    raise AddComponentError(model=dashboard, name=dashboard["name"])
                self.log(f"Successfully added dashboard '{dashboard['name']}' to destination.")
            else:
                self.add_to_diff_log(dashboard["name"], "added")
        else:
            if self.update_dashboards:
                self.log(
                    f"Dashboard '{dashboard['name']}' for workspaces '{dashboard.get('workspaces')}' was found."
                    " Checking differences..."
                )
                self.scrub(dest_dashboard)
                if Differ(source=dashboard, destination=dest_dashboard).check():
                    if not ComponentBase.dry_run:
                        self.log(f"Updating '{dashboard['name']}' now.")
                        dest_dashboard = self.destination_instance.update_dashboard(dashboard)
                        if not dest_dashboard:
                            raise UpdateComponentError(model=dashboard, name=dashboard["name"])
                        self.log(f"Successfully updated dashboard '{dashboard['name']}'.")
                    else:
                        self.add_to_diff_log(dashboard["name"], "updated")
                else:
                    self.log(f"Dashboard '{dashboard['name']}' is the same on source and destination. Skipping...")
            else:
                self.log(
                    f"Skipping check of dashboard '{dashboard['name']}' for changes since "
                    "update_dashboards is False."
                )
    else:
        self.log(f"Skipping dashboard '{dashboard['name']}' since it is excluded.")

Groups

Used to sync groups from a source instance to a destination instance of Swimlane.

_get_destination_group(self, group) private

Attempts to retrieve a destination instance group.

Parameters:

Name Type Description Default
group dict

The source Swimlane instance group dictionary object.

required

Returns:

Type Description
dict or bool

Returns a group dictionary object if found if not then a boolean.

Source code in aqueduct/components/groups.py
def _get_destination_group(self, group: dict):
    """Attempts to retrieve a destination instance group.

    Args:
        group (dict): The source Swimlane instance group dictionary object.

    Returns:
        dict or bool: Returns a group dictionary object if found if not then a boolean.
    """
    try:
        return self.destination_instance.get_group_by_id(group["id"])
    except JSONDecodeError as jd:
        self.log(f"Unable to find group '{group['name']}' by id. Trying by name.")
    try:
        return self.destination_instance.get_group_by_name(group["name"])
    except JSONDecodeError as jd:
        self.log(f"Unable to find group '{group['name']}' by name. Assuming new group.")

_process_group(self, group) private

Processes a source Swimlane instance group objects related objects.

This method processes any users or roles defined associated with the provided group dictionary object.

Parameters:

Name Type Description Default
group dict

A source Swimlane instance group dictionary object.

required

Returns:

Type Description
dict

The source Swimlane instance group dictionary object with possible updates.

Source code in aqueduct/components/groups.py
def _process_group(self, group: dict):
    """Processes a source Swimlane instance group objects related objects.

    This method processes any users or roles defined associated with the provided group dictionary object.

    Args:
        group (dict): A source Swimlane instance group dictionary object.

    Returns:
        dict: The source Swimlane instance group dictionary object with possible updates.
    """
    if group.get("users") and group["users"]:
        self.log(f"Processing user association on destination with group '{group['name']}'.")
        user_list = []
        from .users import Users

        for user in group["users"]:
            _user = Users().sync_user(user_id=user["id"])
            if _user:
                user_list.append(_user)
        group["users"] = user_list

    if group.get("roles") and group["roles"]:
        self.log(f"Processing role association on destination with group '{group['name']}'.")
        role_list = []
        from .roles import Roles

        for role in group["roles"]:
            _role = Roles().sync_role(role=role)
            if _role:
                role_list.append(_role)
        group["roles"] = role_list
    return group

sync(self)

This method is used to sync all groups from a source instance to a destination instance.

Source code in aqueduct/components/groups.py
def sync(self):
    """This method is used to sync all groups from a source instance to a destination instance."""
    self.log(f"Attempting to sync groups from '{self.source_host}' to '{self.dest_host}'")
    groups = self.source_instance.get_groups()
    if groups:
        for group in groups:
            self.sync_group(group=group)
    self.log(f"Completed syncing of groups from '{self.source_host}' to '{self.dest_host}'.")

sync_group(self, group)

This class syncs a single source instance group to a destination instance.

We begin by processing the provided group and ensuring that all roles and users associated with the provided group are added to the destination instance.

Once that is complete, we then sync any nested groups within the provided source instance group.

If the provided group is already on the destination instance, then we just skip processing but if the provided group is not on the destination instance we add it.

Parameters:

Name Type Description Default
group dict

A source instance group dictionary object.

required
Source code in aqueduct/components/groups.py
def sync_group(self, group: dict):
    """This class syncs a single source instance group to a destination instance.

    We begin by processing the provided group and ensuring that all roles and users
    associated with the provided group are added to the destination instance.

    Once that is complete, we then sync any nested groups within the provided source instance group.

    If the provided group is already on the destination instance, then we just skip processing but if
    the provided group is not on the destination instance we add it.

    Args:
        group (dict): A source instance group dictionary object.
    """
    if (
        not self._is_in_include_exclude_lists(group["name"], "groups")
        and group["name"] not in self.group_exclusions
    ):
        self.log(f"Processing group '{group['name']}' ({group['id']})")
        group = self._process_group(group=group)
        # since groups can have nested groups we are iterating through them as well and processing.
        if group.get("groups") and group["groups"]:
            group_list = []
            for group_ in group["groups"]:
                group_list.append(self._process_group(group=group_))
            group["groups"] = group_list

        # after we are done processing we now attempt to get a destination instance group.
        dest_group = self._get_destination_group(group=group)

        if not dest_group:
            if not ComponentBase.dry_run:
                self.log(f"Creating new group '{group['name']}' on destination.")
                dest_group = self.destination_instance.add_group(group)
                if not dest_group:
                    raise AddComponentError(model=group, name=group["name"])
                self.log(f"Successfully added new group '{group['name']}' to destination.")
            else:
                self.add_to_diff_log(group["name"], "added")
        else:
            self.log(f"Group '{group['name']}' already exists on destination.")
    else:
        self.log(f"Skipping group '{group['name']}' since it is excluded.")

KeyStore

Used to sync keystore items from a source instance to a destination instance of Swimlane.

sync(self)

Syncing of the keystore will only create new entries into the destination systems keystore.

The creation of these items does not transfer or implement the transfer of the value since these are encrypted within Swimlane themselves.

Once items are created in the keystore on the destination system then you must manually enter the value for that keystore item.

Source code in aqueduct/components/keystore.py
def sync(self):
    """Syncing of the keystore will only create new entries into the destination systems keystore.

    The creation of these items does not transfer or implement the transfer of the value since
    these are encrypted within Swimlane themselves.

    Once items are created in the keystore on the destination system then you must manually enter
    the value for that keystore item.
    """
    self.log(f"Attempting to sync keystore from '{self.source_host}' to '{self.dest_host}'")
    credentials = self.source_instance.get_credentials()
    if credentials:
        credentials.pop("$type")
        for key, val in credentials.items():
            if not self._is_in_include_exclude_lists(key, "keystore"):
                self.log(f"Processing '{key}' credential")
                if not self.destination_instance.get_credential(key):
                    credential_ = self.source_instance.get_credential(key)
                    if not ComponentBase.dry_run:
                        self.log(f"Adding Keystore item '{key}' to destination.")
                        self.destination_instance.add_credential(credential_)
                        self.log(f"Successfully added new keystore item '{key}'")
                        self.add_to_homework_list(
                            component_name="keystore",
                            value=f"You must manually enter the value for the keystore item '{key}'.",
                        )
                    else:
                        self.add_to_diff_log(key, "added")
                else:
                    self.log(f"Keystore item '{key}' exists on destination. Skipping....")
            else:
                self.log(f"Skipping keystore item '{key}' since it is not included.")
    else:
        self.log("Unable to find any keystore values on the source Swimlane instance. Skipping....")
    self.log(f"Completed syncing of keystore from '{self.source_host}' to '{self.dest_host}'.")

Packages

Syncs Python packages from a source instance to destination.

This class can transfer Python package wheels directly using the offline switch when creating an Aqueduct object.

_install_wheels(self, package) private

Used to install wheels directly from one Swimlane instance to another.

This is considered an offline installation. To use this method please provide offline=True when instantiating the Aqueduct class.

Parameters:

Name Type Description Default
package dict

A Swimlane package dictionary object.

required
Source code in aqueduct/components/packages.py
def _install_wheels(self, package: dict):
    """Used to install wheels directly from one Swimlane instance to another.

    This is considered an offline installation. To use this method please provide
    `offline=True` when instantiating the Aqueduct class.

    Args:
        package (dict): A Swimlane package dictionary object.
    """
    if package.get("fileId") and package["fileId"]:
        self.log(f"Attempting to transfer wheels for package '{package['name']}'")
        filename = f"{package['name']}-{package['version'].capitalize()}-py3-none-any.whl"
        self.log(f"Downloading package '{package['name']}' version '{package['version']}' from source.")
        stream = self.source_instance.download_plugin(file_id=package["fileId"])
        self.log(f"Successfully downloaded '{package['name']}' from source.")

        self.log(f"Uploading package '{package['name']}' to destination.")
        try:
            data = {"pythonVersion": package["pythonVersion"].capitalize()}

            self.destination_instance.install_package_offline(filename=filename, stream=stream, data=data)
            self.log(f"Successfully uploaded package '{package['name']}' to destination.")
        except Exception as e:
            self.log(
                f"Error occurred when trying to upload wheels for package '{package['name']}' to destination."
                " \t\t<-- Please install manually!!"
            )
            self.add_to_homework_list(
                component_name="packages",
                value=f"You must manually install '{package['name']}' version '{package['version']}' "
                "on the destination instance.",
            )
    else:
        self.log(
            f"Unable to transfer wheel package '{package['name']}' to destination. \t\t<--- Must install manually!!"
        )

sync(self)

Sync will sync all installed Python packages on a source system with a destination system.

If you specified the offline switch as True then it will transfer the packages directly and install them manually instead of relying on Swimlane to install them from pypi.

Source code in aqueduct/components/packages.py
def sync(self):
    """Sync will sync all installed Python packages on a source system with a destination system.

    If you specified the `offline` switch as `True` then it will transfer the packages directly and
    install them manually instead of relying on Swimlane to install them from pypi.
    """
    self.log(f"Attempting to sync packages from '{self.source_host}' to '{self.dest_host}'")
    packages = self.source_instance.get_pip_packages()
    if packages:
        for package in packages:
            self.sync_package(package=package)
    self.log(f"Completed syncing of packages from '{self.source_host}' to '{self.dest_host}'.")

sync_package(self, package)

Syncs a single Python package object based on the Swimlane exported dictionary

Parameters:

Name Type Description Default
package dict

A Swimlane Python package dictionary object.

required
Source code in aqueduct/components/packages.py
def sync_package(self, package: dict):
    """Syncs a single Python package object based on the Swimlane exported dictionary

    Args:
        package (dict): A Swimlane Python package dictionary object.
    """
    if not self._is_in_include_exclude_lists(package["name"], "packages"):
        self.log(f"Processing package '{package['name']}'")
        if package["name"] not in self.destination_packages:
            if not ComponentBase.dry_run:
                pstring = f"{package['name']}=={package['version']}"
                self.log(f"Installing {package['pythonVersion']} package '{pstring}' on destination.")
                if ComponentBase.offline:
                    self._install_wheels(package=package)
                else:
                    try:
                        resp = self.destination_instance.install_package(package=package)
                    except Exception as e:
                        self.log(
                            f"Unable to install {package['pythonVersion']} package '{pstring}' on destination."
                            " \t\t<-- Please install manually..."
                        )
                        self.add_to_homework_list(
                            component_name="packages",
                            value=f"You must manually install '{package['name']}' version '{package['version']}' "
                            "on the destination instance.",
                        )
                        resp = None
                    if resp:
                        self.log(
                            f"Successfully installed {package['pythonVersion']} package '{pstring}' on destination."
                        )
            else:
                self.add_to_diff_log(package["name"], "added")
        else:
            self.log(f"Package '{package['name']}' already exists on destination '{self.dest_host}'. Skipping....")

Plugins

Used to sync plugins from a source instance to a destination instance of Swimlane.

_check_for_compatibility(self, plugin_name) private

Checks the source plugin for compatibility with the destination accepted plugins.

Parameters:

Name Type Description Default
plugin_name str

The name of the plugin on the source instance.

required
Source code in aqueduct/components/plugins.py
def _check_for_compatibility(self, plugin_name: str) -> None:
    """Checks the source plugin for compatibility with the destination accepted plugins.

    Args:
        plugin_name (str): The name of the plugin on the source instance.
    """
    dest_version = self.destination_instance.swimlane.product_version
    response = self.source_instance.get_plugin(plugin_name)
    if response and response.get("supportedSwimlaneVersion"):
        if response["supportedSwimlaneVersion"] == ">=10.0.0 <10.5.0":
            if version.parse(dest_version) >= version.parse("10.5.0"):
                log_string = f"The plugin '{plugin_name}' is not compatible with the destination instance version "
                log_string += f"({dest_version}). "
                log_string += "Please upgrade the plugin on the source instance before continuing."
                self.log(val=log_string, level="warning")
                self.add_to_homework_list(component_name="plugins", value=log_string)

_download_plugin(self, name, id) private

Downloads a plugin from a source Swimlane instance.

Parameters:

Name Type Description Default
name str

The name of the plugin to download.

required
id str

The id of the plugin to download.

required

Exceptions:

Type Description
GetComponentError

Raises when unable to download plugin from the source Swimlane instance.

Returns:

Type Description
Plugin

Returns a Swimlane source instance plugin object.

Source code in aqueduct/components/plugins.py
def _download_plugin(self, name: str, id: str):
    """Downloads a plugin from a source Swimlane instance.

    Args:
        name (str): The name of the plugin to download.
        id (str): The id of the plugin to download.

    Raises:
        GetComponentError: Raises when unable to download plugin from the source Swimlane instance.

    Returns:
        Plugin: Returns a Swimlane source instance plugin object.
    """
    try:
        self.log(f"Downloading plugin '{name}' from source.")
        return self.source_instance.download_plugin(id)
    except Exception as e:
        raise GetComponentError(type="Plugin", name=name, id=id)

sync(self)

This method is used to sync all plugins from a source instance to a destination instance

Source code in aqueduct/components/plugins.py
def sync(self):
    """This method is used to sync all plugins from a source instance to a destination instance"""
    self.log(f"Attempting to sync plugins from '{self.source_host}' to '{self.dest_host}'")
    self.dest_plugin_dict = self.destination_instance.plugin_dict
    plugin_dict = self.source_instance.plugin_dict
    if plugin_dict:
        for name, file_id in plugin_dict.items():
            self.sync_plugin(name=name, id=file_id)
    self.log(f"Completed syncing of plugins from '{self.source_host}' to '{self.dest_host}'.")

sync_plugin(self, name, id)

This class syncs a single Swimlane plugin based on the provided name and ID.

We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.

If it is, then we download the plugin from the source instance. If we are successful, we then add it to the destination instance.

If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the plugin from both the source and destination instances. We compare their versions and if the source has a version greater than the destination we then add it to the destination instance by upgrading the plugin.

Parameters:

Name Type Description Default
name str

The name of a Swimlane plugin

required
id str

The internal ID of a Swimlane plugin

required
Source code in aqueduct/components/plugins.py
def sync_plugin(self, name: str, id: str):
    """This class syncs a single Swimlane plugin based on the provided name and ID.

    We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.

    If it is, then we download the plugin from the source instance. If we are successful, we
    then add it to the destination instance.

    If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the
    plugin from both the source and destination instances. We compare their versions and if the source has
    a version greater than the destination we then add it to the destination instance by upgrading the plugin.

    Args:
        name (str): The name of a Swimlane plugin
        id (str): The internal ID of a Swimlane plugin
    """
    self.log(f"Processing '{name}' plugin with id '{id}'")
    if not self._is_in_include_exclude_lists(name, "plugins"):
        # checking to see if the destination instance already has source instance plugin.
        if not self.dest_plugin_dict.get(name):
            if not ComponentBase.dry_run:
                plugin = self._download_plugin(name=name, id=id)
                if plugin:
                    try:
                        self.log(f"Uploading plugin '{name}' to destination.")
                        self.destination_instance.upload_plugin(name, plugin)
                        self.log(f"Successfully uploaded plugin '{name}' to destination.")
                    except Exception as e:
                        self._check_for_compatibility(plugin_name=name)
                        self.log(
                            f"Failed to upload plugin '{name}' to destination '{self.dest_host}'",
                            level="warning",
                        )
                        self.add_to_homework_list(
                            component_name="plugins",
                            value=f"You must manually install plugin '{name}' and all tasks associated with it. "
                            "This also means you must hookup those tasks with the appropriate items in "
                            "workflow and push-button integrations.",
                        )
                        if not ComponentBase.continue_on_error:
                            raise e
            else:
                self.add_to_diff_log(name, "added")
        else:
            self.log(
                f"Plugin '{name}' already exists on destination '{self.dest_host}'. Checking for differences...."
            )
            # getting source and destination instance plugins to check differences between them.
            dest_plugin = self.destination_instance.get_plugin(name=name)
            if not dest_plugin:
                raise GetComponentError(type="Plugin", name=name)
            source_plugin = self.source_instance.get_plugin(name=name)
            if not source_plugin:
                raise GetComponentError(type="Plugin", name=name)
            if version.parse(source_plugin["version"]) > version.parse(dest_plugin["version"]):
                # now we actually download the source instance plugin so we can upgrade it on
                # the destination instance.
                plugin = self._download_plugin(name=name, id=id)
                if plugin:
                    if not ComponentBase.dry_run:
                        self.log(f"Upgrading plugin '{name}' on destination.")
                        self.destination_instance.upgrade_plugin(filename=name, stream=plugin)
                        self.log(f"Successfully upgraded plugin '{name}' on destination.")
                    else:
                        self.add_to_diff_log(name, "upgraded")
            else:
                self.log("Source and destination have the same version plugin. Skipping...")
    else:
        self.log(f"Skipping plugin '{name}' since it is excluded.")

Roles

Used to sync roles from a source instance to a destination instance of Swimlane.

_process_role(self, role) private

Used to ensure any referenced user and groups are processed before continuing with adding / updating roles.

Parameters:

Name Type Description Default
role dict

A source Swimlane instance role dictionary object.

required

Returns:

Type Description
dict

A modified source Swimlane instance role dictionary object.

Source code in aqueduct/components/roles.py
def _process_role(self, role: dict):
    """Used to ensure any referenced user and groups are processed before continuing with adding / updating roles.

    Args:
        role (dict): A source Swimlane instance role dictionary object.

    Returns:
        dict: A modified source Swimlane instance role dictionary object.
    """
    if role.get("users") and role["users"]:
        self.log(f"Processing users in role '{role['name']}'")
        user_list = []
        from .users import Users

        for user in role["users"]:
            _user = Users().sync_user(user_id=user["id"])
            if _user:
                user_list.append(_user)
        role["users"] = user_list
    if role.get("groups") and role["groups"]:
        self.log(f"Processing groups in role '{role['name']}'")
        group_list = []
        from .groups import Groups

        for group in role["groups"]:
            _group = Groups().sync_group(group=group)
            if _group:
                group_list.append(_group)
        role["groups"] = group_list
    return role

sync(self)

This method is used to sync all roles from a source instance to a destination instance.

Source code in aqueduct/components/roles.py
def sync(self):
    """This method is used to sync all roles from a source instance to a destination instance."""
    self.log(f"Attempting to sync roles from '{self.source_host}' to '{self.dest_host}'.")
    roles = self.source_instance.get_roles()
    if roles:
        for role in roles:
            self.sync_role(role=role)
    self.log(f"Completed syncing of roles from '{self.source_host}' to '{self.dest_host}'.")

sync_role(self, role)

Syncs a single source Swimlane instance role dictionary object.

Parameters:

Name Type Description Default
role dict

A source Swimlane instance role dictionary object.

required

Exceptions:

Type Description
AddComponentError

Raises when unable to add a role dictionary object to a destination Swimlane instance.

Returns:

Type Description
dict

A destination Swimlane instance `role dictionary object.

Source code in aqueduct/components/roles.py
def sync_role(self, role: dict):
    """Syncs a single source Swimlane instance role dictionary object.

    Args:
        role (dict): A source Swimlane instance role dictionary object.

    Raises:
        AddComponentError: Raises when unable to add a role dictionary object to a destination Swimlane instance.

    Returns:
        dict: A destination Swimlane instance `role dictionary object.
    """
    if not self._is_in_include_exclude_lists(role["name"], "roles") and role["name"] not in self.role_exclusions:
        self.log(f"Processing role '{role['name']}' ({role['id']})")
        self.scrub(role)
        role = self._process_role(role=role)
        dest_role = self.destination_instance.get_role(role_id=role["id"])
        if not dest_role:
            if not ComponentBase.dry_run:
                self.log(f"Creating new role '{role['name']}' on destination.")
                dest_role = self.destination_instance.add_role(role)
                if not dest_role:
                    raise AddComponentError(model=role, name=role["name"])
                self.log(f"Successfully added new role '{role['name']}' to destination.")
                return dest_role
            else:
                self.add_to_diff_log(role["name"], "added")
        else:
            self.log(f"Role '{role['name']}' already exists on destination.")
    else:
        self.log(f"Skipping role '{role['name']}' since it is excluded.")

Tasks

Used to sync tasks from a source instance to a destination instance of Swimlane.

destination_plugin_dict property readonly

Creates a destination Swimlane instance plugin dict used to update tasks.

We need to create this plugin dict since once you upload a plugin to a Swimlane instance the plugin imageId, fileId and actionId all change. So we gather the installed plugin ids and update any tasks so that they reference the new installed plugin.

The general structure of this plugin dict is:

{
    "sw_virus_total": {
        "GetAnalyses": {
            "id": "a2dA20RlvfcV5jf34",
            "imageId": "a8dvYs1Hjn3kFRxCi"
        },
        "fileId": "a2cwJFkbVYqB0xfVs"
    }
}

Returns:

Type Description
dict

A dictionary of values used to update a Swimlane task.

_check_tasks_using_tracking_id(self, task) private

Updates a task dictionary object that is using tracking-id as input or output with the correct tracking-id.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required

Returns:

Type Description
dict

dict: A updated source Swimlane instance dictionary.

Source code in aqueduct/components/tasks.py
def _check_tasks_using_tracking_id(self, task: dict) -> dict:
    """Updates a task dictionary object that is using tracking-id as input or output with the correct tracking-id.

    Args:
        task (dict): A source Swimlane instance task dictionary.

    Returns:
        dict: A updated source Swimlane instance dictionary.
    """
    if task.get("inputMapping") and task["inputMapping"]:
        task["inputMapping"] = self._update_task_mappings(
            task_mappings=task["inputMapping"], task_name=task["name"]
        )
    if task.get("outputs") and task["outputs"]:
        for output in task["outputs"]:
            if (
                output.get("backReferenceFieldId")
                and output["backReferenceFieldId"]
                and isinstance(output["backReferenceFieldId"], str)
            ):
                if self.tracking_id_map.get(output["backReferenceFieldId"]):
                    self.log(
                        f"Updating task '{task['name']}' output reference field with the correct tracking-id "
                        f"'{self.tracking_id_map[output['backReferenceFieldId']]}'."
                    )
                    output["backReferenceFieldId"] = self.tracking_id_map[output["backReferenceFieldId"]]
            if output.get("mappings") and output["mappings"]:
                output["mappings"] = self._update_task_mappings(
                    task_mappings=output["mappings"], task_name=task["name"]
                )
    return task

_get_action_type(self, task) private

Returns the actionType from the current task.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required

Exceptions:

Type Description
AccessKeyError

Raises when we are unable to access a certain key in the task dictionary.

Returns:

Type Description
str

str: An actionType string.

Source code in aqueduct/components/tasks.py
def _get_action_type(self, task: dict) -> str:
    """Returns the actionType from the current task.

    Args:
        task (dict): A source Swimlane instance task dictionary.

    Raises:
        AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.

    Returns:
        str: An actionType string.
    """
    try:
        return task["action"]["descriptor"]["actionType"]
    except KeyError as ke:
        raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)

_get_plugin_name(self, task) private

Returns the descriptor name from the current task.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required

Exceptions:

Type Description
AccessKeyError

Raises when we are unable to access a certain key in the task dictionary.

Returns:

Type Description
str

str: An actions descriptor name string.

Source code in aqueduct/components/tasks.py
def _get_plugin_name(self, task: dict) -> str:
    """Returns the descriptor name from the current task.

    Args:
        task (dict): A source Swimlane instance task dictionary.

    Raises:
        AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.

    Returns:
        str: An actions descriptor name string.
    """
    try:
        return task["action"]["descriptor"]["packageDescriptor"]["name"]
    except KeyError as ke:
        raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)

_update_package_descriptor_id(self, task, plugin_name, action_type) private

Updates the provided task with a new plugin task id.

When a plugin is installed on a destination system it generates new ids for the plugin. We must update our tasks with this new action id in order for them to work correctly.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required
plugin_name str

The plugin name that the task belongs to.

required
action_type str

The action name within a plugin for this task.

required

Exceptions:

Type Description
AccessKeyError

Raises when we are unable to access a certain key in the task dictionary.

Returns:

Type Description
dict

dict: An updated source Swimlane instance task dictionary.

Source code in aqueduct/components/tasks.py
def _update_package_descriptor_id(self, task: dict, plugin_name: str, action_type: str) -> dict:
    """Updates the provided task with a new plugin task id.

    When a plugin is installed on a destination system it generates new ids for the plugin.
    We must update our tasks with this new action id in order for them to work correctly.

    Args:
        task (dict): A source Swimlane instance task dictionary.
        plugin_name (str): The plugin name that the task belongs to.
        action_type (str): The action name within a plugin for this task.

    Raises:
        AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.

    Returns:
        dict: An updated source Swimlane instance task dictionary.
    """
    try:
        task["action"]["packageDescriptorId"] = self.destination_plugin_dict[plugin_name][action_type]["id"]
    except KeyError as ke:
        raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
    return task

_update_task_file_id(self, task, plugin_name) private

Updates the provided task with a new plugin fileId.

When a plugin is installed on a destination system it generates a new fileId. We must update our tasks with this new fileId in order for them to work correctly.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required
plugin_name str

The plugin name that the task belongs to.

required

Exceptions:

Type Description
AccessKeyError

Raises when we are unable to access a certain key in the task dictionary.

Returns:

Type Description
dict

dict: An updated source Swimlane instance task dictionary.

Source code in aqueduct/components/tasks.py
def _update_task_file_id(self, task: dict, plugin_name: str) -> dict:
    """Updates the provided task with a new plugin fileId.

    When a plugin is installed on a destination system it generates a new fileId.
    We must update our tasks with this new fileId in order for them to work correctly.

    Args:
        task (dict): A source Swimlane instance task dictionary.
        plugin_name (str): The plugin name that the task belongs to.

    Raises:
        AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.

    Returns:
        dict: An updated source Swimlane instance task dictionary.
    """
    try:
        task["action"]["descriptor"]["packageDescriptor"]["fileId"] = self.destination_plugin_dict[plugin_name][
            "fileId"
        ]
    except KeyError as ke:
        raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
    return task

_update_task_image_id(self, task, plugin_name, action_type) private

Updates the provided task with a new plugin imageId.

When a plugin is installed on a destination system it generates a new imageId. We must update our tasks with this new imageId in order for them to work correctly.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required
plugin_name str

The plugin name that the task belongs to.

required
action_type str

The action name within a plugin for this task.

required

Exceptions:

Type Description
AccessKeyError

Raises when we are unable to access a certain key in the task dictionary.

Returns:

Type Description
dict

dict: An updated source Swimlane instance task dictionary.

Source code in aqueduct/components/tasks.py
def _update_task_image_id(self, task: dict, plugin_name: str, action_type: str) -> dict:
    """Updates the provided task with a new plugin imageId.

    When a plugin is installed on a destination system it generates a new imageId.
    We must update our tasks with this new imageId in order for them to work correctly.

    Args:
        task (dict): A source Swimlane instance task dictionary.
        plugin_name (str): The plugin name that the task belongs to.
        action_type (str): The action name within a plugin for this task.

    Raises:
        AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.

    Returns:
        dict: An updated source Swimlane instance task dictionary.
    """
    try:
        task["action"]["descriptor"]["imageId"] = self.destination_plugin_dict[plugin_name][action_type]["imageId"]
    except KeyError as ke:
        raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
    return task

_update_task_mappings(self, task_mappings, task_name) private

Used to updated task inputs and output mappings with the correct tracking-id.

Parameters:

Name Type Description Default
task_mappings list

A list of input or output field mappings.

required
task_name str

The name of task these field mappings belong to.

required

Returns:

Type Description
list

list: The original or updated task field mappings.

Source code in aqueduct/components/tasks.py
def _update_task_mappings(self, task_mappings: list, task_name: str) -> list:
    """Used to updated task inputs and output mappings with the correct tracking-id.

    Args:
        task_mappings (list): A list of input or output field mappings.
        task_name (str): The name of task these field mappings belong to.

    Returns:
        list: The original or updated task field mappings.
    """
    return_list = []
    for mapping in task_mappings:
        if mapping.get("value") and mapping["value"] and isinstance(mapping["value"], str):
            if self.tracking_id_map.get(mapping["value"]):
                self.log(
                    f"Updating task '{task_name}' with the correct tracking-id "
                    f"'{self.tracking_id_map[mapping['value']]}'."
                )
                mapping["value"] = self.tracking_id_map[mapping["value"]]
        return_list.append(mapping)
    return return_list

sync(self)

This method is used to sync all tasks from a source instance to a destination instance.

Source code in aqueduct/components/tasks.py
def sync(self):
    """This method is used to sync all tasks from a source instance to a destination instance."""
    self.log(f"Starting to sync tasks from '{self.source_host}' to '{self.dest_host}'.")
    tasks = self.source_instance.get_tasks()
    if tasks:
        for task in tasks:
            self.sync_task(task=task)
    self.log(f"Completed syncing of tasks from '{self.source_host}' to '{self.dest_host}'.")

sync_task(self, task)

This method syncs a single task from a source Swimlane instance to a destination instance.

Using the provided task dictionary from Swimlane source instance we first get the actual task object from the source.

Next, we attempt to retrieve the task from the destination system. If the task does not exist on the destination instance, we add it. If it does exist, we check if the uid and the version are the same. If they are the same we skip updating the task. If they are different, we update the task on the destination instance.

Parameters:

Name Type Description Default
task dict

A Swimlane task object from a source system.

required

Returns:

Type Description
dict

dict: If we failed to add a task we return it so we can try again. Only if called using the sync method.

Source code in aqueduct/components/tasks.py
def sync_task(self, task: dict) -> dict:
    """This method syncs a single task from a source Swimlane instance to a destination instance.

    Using the provided task dictionary from Swimlane source instance we first get the actual task
    object from the source.

    Next, we attempt to retrieve the task from the destination system. If the task does not exist
    on the destination instance, we add it. If it does exist, we check if the `uid` and the `version` are the same.
    If they are the same we skip updating the task. If they are different, we update the task on the destination
    instance.

    Args:
        task (dict): A Swimlane task object from a source system.

    Returns:
        dict: If we failed to add a task we return it so we can try again. Only if called using the sync method.
    """
    if not self._is_in_include_exclude_lists(task["name"], "tasks"):
        self.log(f"Processing task '{task['name']}'.")
        task_ = self.source_instance.get_task(task["id"])
        if not task_:
            raise GetComponentError(type="task", name=task["name"], id="")
        if not ComponentBase.dry_run:
            task_ = self.update_task_plugin_ids(task=task_)
        dest_task = self.destination_instance.get_task(task_["id"])
        if not dest_task:
            if not ComponentBase.dry_run:
                self.log(f"Creating task '{task_['name']}' on destination.")
                try:
                    # checking tasks for inputs and outputs mapped to source application
                    # tracking-ids and replacing them with the new ones
                    task_ = self._check_tasks_using_tracking_id(task=task_)
                    dest_task = self.destination_instance.add_task(task_)
                    self.log(f"Successfully added task '{task_['name']}' to destination.")
                except Exception as e:
                    self.log(
                        f"Failed to add task '{task_['name']}' to destination.",
                        level="warning",
                    )
            else:
                self.add_to_diff_log(task_["name"], "added")
        else:
            self.log(f"Task '{task_['name']}' already exists on destination.")
            if task_["uid"] == dest_task["uid"]:
                dest_task = self._check_tasks_using_tracking_id(task=task_)
                if not ComponentBase.dry_run:
                    self.log(f"Task '{task_['name']}' has changed. Updating...")
                    try:
                        dest_task = self.destination_instance.update_task(dest_task["id"], dest_task)
                        if not dest_task:
                            raise UpdateComponentError(model=task_, name=task_["name"])
                        self.log(f"Successfully updated task '{task_['name']}' on destination.")
                    except Exception as e:
                        raise UpdateComponentError(model=task_)
                else:
                    self.add_to_diff_log(task_["name"], "updated")
            else:
                self.log(
                    f"The source task '{task['name']}' has a different UID ({task_['uid']}) then the "
                    f"destination task ({dest_task['uid']})"
                )
    else:
        self.log(f"Skipping task '{task['name']}' since it is excluded.")

update_task_plugin_ids(self, task)

Used to update tasks with newly uploaded plugin Ids so it can reference the correct plugin in Swimlane.

Parameters:

Name Type Description Default
task dict

A source Swimlane instance task dictionary.

required

Returns:

Type Description
Task

An updated source Swimlane instance task dictionary.

Source code in aqueduct/components/tasks.py
def update_task_plugin_ids(self, task: dict):
    """Used to update tasks with newly uploaded plugin Ids so it can reference the correct plugin in Swimlane.

    Args:
        task (dict): A source Swimlane instance task dictionary.

    Returns:
        Task: An updated source Swimlane instance task dictionary.
    """
    self.log("Retrieving plugins from destination instance", level="debug")
    action_type = self._get_action_type(task=task)
    if action_type and action_type not in self.BUILTIN_TASK_ACTION_TYPES:
        plugin_name = self._get_plugin_name(task=task)
        self.log(f"Updating task '{task['name']}' with correct plugin IDs")
        if self.destination_plugin_dict.get(plugin_name):
            if self.destination_plugin_dict[plugin_name].get(action_type):
                # We are updating the current source task_ so that it contains the correct Ids for the
                # destination plugin
                self.log(
                    f"Updating task '{task['name']}' with correct packageDescriptor fileId "
                    f"'{self.destination_plugin_dict[plugin_name]['fileId']}'"
                )
                task = self._update_task_file_id(task=task, plugin_name=plugin_name)
                self.log(
                    f"Updating task '{task['name']}' with correct imageId "
                    f"'{self.destination_plugin_dict[plugin_name][action_type]['imageId']}'"
                )
                task = self._update_task_image_id(task=task, plugin_name=plugin_name, action_type=action_type)
                self.log(
                    f"Updating task '{task['name']}' with correct packageDescriptorId "
                    f"'{self.destination_plugin_dict[plugin_name][action_type]['id']}'"
                )
                task = self._update_package_descriptor_id(
                    task=task, plugin_name=plugin_name, action_type=action_type
                )
        else:
            self.log(f"Unable to find plugin '{plugin_name}' on destination!")
    return task

Users

Used to sync users from a source instance to a destination instance of Swimlane.

_process_user(self, user) private

Processes roles and groups associated with a source Swimlane instance user dictionary.

Attempts to make sure that roles and groups are created on the Swimlane destination instance.

Parameters:

Name Type Description Default
user dict

A source Swimlane instance user dictionary object.

required

Returns:

Type Description
dict

dict: A modified source Swimlane instance user dictionary object.

Source code in aqueduct/components/users.py
def _process_user(self, user: dict) -> dict:
    """Processes roles and groups associated with a source Swimlane instance user dictionary.

    Attempts to make sure that roles and groups are created on the Swimlane destination instance.

    Args:
        user (dict): A source Swimlane instance user dictionary object.

    Returns:
        dict: A modified source Swimlane instance user dictionary object.
    """
    if user.get("roles") and user["roles"]:
        self.log(f"Processing roles for user '{user['name']}'")
        role_list = []
        from .roles import Roles

        for role in user["roles"]:
            _role = Roles().sync_role(role=role)
            if _role:
                role_list.append(_role)
        user["roles"] = role_list

    if user.get("groups") and user["groups"]:
        self.log(f"Processing groups for user '{user['name']}'")
        group_list = []
        from .groups import Groups

        for group in user["groups"]:
            _group = Groups().sync_group(group=group)
            if _group:
                group_list.append(_group)
        user["groups"] = group_list
    return user

sync(self)

This method is used to sync all users from a source instance to a destination instance.

Source code in aqueduct/components/users.py
def sync(self):
    """This method is used to sync all users from a source instance to a destination instance."""
    self.log(f"Attempting to sync users from '{self.source_host}' to '{self.dest_host}'.")
    users = self.source_instance.get_users()
    if users:
        for user in users:
            self.sync_user(user_id=user["id"])
    self.log(f"Completed syncing of users from '{self.source_host}' to '{self.dest_host}'.")

sync_user(self, user_id)

Syncs a single source Swimlane instance user with a destination instance.

Parameters:

Name Type Description Default
user_id str

A Swimlane user ID.

required

Exceptions:

Type Description
GetComponentError

Raises when the provided user_id is not found on the source Swimlane instance.

AddComponentError

Raises when unable to add a user to a destination Swimlane instance.

UpdateComponentError

Raises when unable to update a user on a destination Swimlane instance.

Returns:

Type Description
dict

A destination Swimlane instance user dictionary object.

Source code in aqueduct/components/users.py
def sync_user(self, user_id: str):
    """Syncs a single source Swimlane instance user with a destination instance.

    Args:
        user_id (str): A Swimlane user ID.

    Raises:
        GetComponentError: Raises when the provided user_id is not found on the source Swimlane instance.
        AddComponentError: Raises when unable to add a user to a destination Swimlane instance.
        UpdateComponentError: Raises when unable to update a user on a destination Swimlane instance.

    Returns:
        dict: A destination Swimlane instance user dictionary object.
    """
    user = self.source_instance.get_user(user_id)
    if not user:
        raise GetComponentError(type="User", id=user_id)
    self.log(f"Processing user '{user['displayName']}' ({user_id})")
    if user and not self._is_in_include_exclude_lists(user["displayName"], "users"):
        if (
            user["displayName"] != self.source_instance.swimlane.user.display_name
            or user["userName"] != self.source_instance.swimlane.user.username
        ):
            self.scrub(user)
            self.log(f"Attempting to sync user '{user['displayName']}' on destination.")
            dest_user = self.destination_instance.search_user(user["userName"])
            if not dest_user:
                if not ComponentBase.dry_run:
                    self.log(f"Adding new user '{user['displayName']}' to destination.")
                    dest_user = self.destination_instance.add_user(user)
                    if not dest_user:
                        raise AddComponentError(model=user, name=user["displayName"])
                    self.log(f"Successfully added user '{user['displayName']}' to destination.")
                    self.add_to_homework_list(
                        component_name="users",
                        value=f"You must set the password for the user '{user['displayName']}' on destination.",
                    )
                    return dest_user
                else:
                    self.add_to_diff_log(user["displayName"], "added")
            else:
                self.log(f"User '{user['displayName']}' exists on destination.")
                user = self._process_user(user=user)
                if not ComponentBase.dry_run:
                    user["id"] = dest_user["id"]
                    dest_user = self.destination_instance.update_user(dest_user["id"], user)
                    if not dest_user:
                        raise UpdateComponentError(model=user, name=user["displayName"])
                    self.log(f"Successfully updated user '{user['displayName']}' on destination.")
                    return dest_user
                else:
                    self.add_to_diff_log(user["displayName"], "updated")
        else:
            dname = self.source_instance.swimlane.user.display_name
            if not ComponentBase.dry_run:
                self.log(f"Unable to update the currently authenticated user '{dname}'. Skipping...")
            else:
                self.add_to_diff_log(name=dname, diff_type="added")
            self.add_to_homework_list(
                component_name="users",
                value=f"The current authenticated user '{dname}' will need to be created manually "
                "on the destination.",
            )
    else:
        self.log(f"Skipping user '{user['displayName']}' since it is excluded.")

Workflows

Used to sync workflows from a source instance to a destination instance of Swimlane.

_update_workflow_stages_with_new_id(self, source, new_parent_id) private

Updates a workflows stages to match the new parentId.

Parameters:

Name Type Description Default
source dict

A source workflow dictionary object.

required
new_parent_id str

The new parentId value to replace in a workflows stages.

required

Returns:

Type Description
dict

dict: An updated source workflow dictionary object.

Source code in aqueduct/components/workflows.py
def _update_workflow_stages_with_new_id(self, source: dict, new_parent_id: str) -> dict:
    """Updates a workflows stages to match the new parentId.

    Args:
        source (dict): A source workflow dictionary object.
        new_parent_id (str): The new parentId value to replace in a workflows stages.

    Returns:
        dict: An updated source workflow dictionary object.
    """
    if source.get("stages") and source["stages"]:
        source_stages = []
        for source_stage in source["stages"]:
            if source_stage["parentId"] == source["id"]:
                source_stage["parentId"] = new_parent_id
            source_stages.append(source_stage)
        source["stages"] = source_stages
    return source

sync(self)

This method is used to sync all workflows from a source instance to a destination instance.

Source code in aqueduct/components/workflows.py
def sync(self):
    """This method is used to sync all workflows from a source instance to a destination instance."""
    raise NotImplementedError("General workflow syncing is currently not implemented.")

sync_workflow(self, application_name, application_id=None)

This methods syncs a single applications workflow from a source Swimlane instance to a destination instance.

If an application_name is in our include or exclude filters we will either ignore or process the workflow updates for that application.

Once an application_name is provided we retrieve the workflow for that application from our workflow_dict. Additionally we retrieve the destination workflow for the provided application.

We create a temporary object that compares the stages of a source workflow to a destination workflow. If they are exactly the same we skip updating the workflow. If they are not, we copy the source workflow to the destination and update it to reflect the new workflow ID.

Finally we update the destination workflow with our changes.

Parameters:

Name Type Description Default
application_name str

The name of an application to check and update workflow if applicable.

required
application_id str

The application ID. Default is None.

None
Source code in aqueduct/components/workflows.py
def sync_workflow(self, application_name: str, application_id: str = None):
    """This methods syncs a single applications workflow from a source Swimlane instance to
    a destination instance.

    If an application_name is in our include or exclude filters we will either ignore or
    process the workflow updates for that application.

    Once an application_name is provided we retrieve the workflow for that application from
    our workflow_dict. Additionally we retrieve the destination workflow for the provided
    application.

    We create a temporary object that compares the stages of a source workflow to a destination
    workflow. If they are exactly the same we skip updating the workflow. If they are not, we
    copy the source workflow to the destination and update it to reflect the new workflow ID.

    Finally we update the destination workflow with our changes.

    Args:
        application_name (str): The name of an application to check and update workflow if applicable.
        application_id (str, optional): The application ID. Default is None.
    """
    # TODO: Add logic to handle when a piece of workflow does not exists. For example, when a task does not exist.
    workflow = self.source_instance.workflow_dict.get(application_name)
    if not workflow and application_id:
        workflow = self.source_instance.get_workflow(application_id)
    if workflow:
        self.log(
            f"Processing workflow '{workflow['id']}' for application '{application_name}' "
            f"({workflow['applicationId']})."
        )
        dest_workflow = self.destination_instance.get_workflow(application_id=workflow["applicationId"])
        if dest_workflow:
            # We use the source workflow as truth and update it's parentId with the id of the destination
            # workflow.
            workflow = self._update_workflow_stages_with_new_id(source=workflow, new_parent_id=dest_workflow["id"])
            # once we have updated it, it should be similar to the destination workflow
            # So we check to see if they are NOT the same.
            if Differ(source=workflow["stages"], destination=dest_workflow["stages"]).check():
                if not ComponentBase.dry_run:
                    self.log("Source and destination workflows are different. Updating...")
                    for item in ["$type", "permissions", "version"]:
                        workflow.pop(item)
                    workflow["id"] = dest_workflow["id"]
                    resp = self.destination_instance.update_workflow(workflow=workflow)
                    self.log(f"Successfully updated workflow for application '{application_name}'.")
                else:
                    self.add_to_diff_log(f"{application_name} - Workflow", "updated")
            else:
                self.log("Source and destination workflow is the same. Skipping...")
        else:  # May be an edge case and could possibly be removed.
            if not ComponentBase.dry_run:
                self.log(
                    f"Adding workflow for application '{application_name}' "
                    f"({self.source_instance.application_dict[application_name]['id']})."
                )
                dest_workflow = self.destination_instance.add_workflow(workflow=workflow)
                self.log(f"Successfully added workflow for application '{application_name}'.")
            else:
                self.add_to_diff_log(f"{application_name} - Workflow", "added")
    else:
        raise GetComponentError(type="Workflow", name=application_name)

Workspaces

Used to sync workspaces from a source instance to a destination instance of Swimlane.

sync(self)

This method is used to sync all workspaces from a source instance to a destination instance.

Source code in aqueduct/components/workspaces.py
def sync(self):
    """This method is used to sync all workspaces from a source instance to a destination instance."""
    self.log(f"Starting to sync workspaces from '{self.source_host}' to '{self.dest_host}'")
    workspaces = self.source_instance.get_workspaces()
    if workspaces:
        for workspace in workspaces:
            self.sync_workspace(workspace=workspace)
    self.log(f"Completed syncing of workspaces from '{self.source_host}' to '{self.dest_host}'.")

sync_workspace(self, workspace)

Used to sync Swimlane workspaces.

Parameters:

Name Type Description Default
workspace dict

A source Swimlane instance workspace dictionary object.

required

Exceptions:

Type Description
AddComponentError

Raises when unable to add a workspace to a destination Swimlane instance.

UpdateComponentError

Raises when unable to update a workspace on a destination Swimlane instance.

Source code in aqueduct/components/workspaces.py
def sync_workspace(self, workspace: dict):
    """Used to sync Swimlane workspaces.

    Args:
        workspace (dict): A source Swimlane instance workspace dictionary object.

    Raises:
        AddComponentError: Raises when unable to add a workspace to a destination Swimlane instance.
        UpdateComponentError: Raises when unable to update a workspace on a destination Swimlane instance.
    """
    if not self._is_in_include_exclude_lists(workspace["name"], "workspaces"):
        self.log(f"Processing workspace '{workspace['name']}'")
        dest_workspace = self.destination_instance.get_workspace(workspace["id"])
        if not dest_workspace:
            if not ComponentBase.dry_run:
                self.log(f"Adding workspace '{workspace['name']}' to destination.")
                # ensuring that applications and dashboards are unique in the workspace components lists
                try:
                    for key in ["applications", "dashboards"]:
                        if workspace.get(key):
                            workspace[key] = list(set(workspace[key]))
                except Exception as e:
                    self.log(
                        "Error when forcing applications and dashboards lists to be unique. "
                        "Continuing anyways..."
                    )
                dest_workspace = self.destination_instance.add_workspace(workspace)
                if not dest_workspace:
                    raise AddComponentError(model=workspace, name=workspace["name"])
                self.log(f"Successfully added workspace '{workspace['name']}' to destination.")
            else:
                self.add_to_diff_log(workspace["name"], "added")
        else:
            self.log(f"Workspace '{workspace['name']}' already exists on destination. Checking differences...")
            if not ComponentBase.dry_run:
                dashboards = []
                self.log(
                    f"Checking that dashboards associated with workspace '{workspace['name']}' "
                    "exist on destination."
                )
                if workspace.get("dashboards") and workspace["dashboards"]:
                    for dashboard in workspace["dashboards"]:
                        for key, val in self.source_instance.dashboard_dict.items():
                            if dashboard == val and self.destination_instance.dashboard_dict.get(key):
                                self.log(
                                    f"Found {key} in dest dashboard with value of"
                                    f" {self.destination_instance.dashboard_dict[key]}"
                                )
                                dashboards.append(self.destination_instance.dashboard_dict[key])
                    workspace["dashboards"] = dashboards
                if workspace.get("applications") and workspace["applications"]:
                    self.log(
                        f"Checking that applications associated with workspace '{workspace['name']}' "
                        "exist on destination."
                    )
                    from .applications import Applications

                    for application_id in workspace["applications"]:
                        if application_id and application_id not in self.destination_instance.application_id_list:
                            Applications().sync_application(application_id=application_id)
                resp = self.destination_instance.update_workspace(workspace_id=workspace["id"], workspace=workspace)
                if not resp:
                    raise UpdateComponentError(model=workspace, name=workspace["name"])
                self.log(f"Successfully updated workspace '{workspace['name']}' on destination.")
            else:
                self.add_to_diff_log(workspace["name"], "updated")