Skip to content

Aqueduct

Aqueduct (Base)

Aqueduct is used to migrate content from one Swimlane instance to another.

__init__(self, source, destination, dry_run=True, offline=False, update_reports=False, update_dashboards=False, continue_on_error=False, use_unsupported_version=False, force_unsupported_version=False) special

To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to migrate. The other is the destination of where the content will be migrated to.

Parameters:

Name Type Description Default
source SwimlaneInstance

The source Swimlane instance. Typically this is considered a development instance.

required
destination SwimlaneInstance

The destination Swimlane instance. Typically this is considered a production instance.

required
dry_run bool

Whether or not this run of aqueduct will transfer content to the destination instance. Set to False to migrate content. Default is True.

True
offline bool

Whether or not the destination instance has access to the internet for the purpose of downloading Python packages. Default is False

False
update_reports bool

Whether or not to force update reports if changes are detected. Default is False.

False
update_dashboards bool

(bool): Whether or not to force update dashboards if changes are detected. Default is False.

False
continue_on_error bool

(bool): Whether or not to continue when an API error occurs. Default is False.

False
use_unsupported_version bool

(bool): Whether or not to transfer content from one version Swimlane to a different version of Swimlane. Use this at your own risk. Default is False.

False
force_unsupported_version bool

(bool): Whether or not to force usage of an unsupported version. This is used when running aqueduct headless/non-interactive. Default is False.

False
Source code in aqueduct/aqueduct.py
def __init__(
    self,
    source: SwimlaneInstance,
    destination: SwimlaneInstance,
    dry_run: bool = True,
    offline: bool = False,
    update_reports: bool = False,
    update_dashboards: bool = False,
    continue_on_error: bool = False,
    use_unsupported_version: bool = False,
    force_unsupported_version: bool = False,
):
    """To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to
    migrate. The other is the destination of where the content will be migrated to.

    Args:
        source (SwimlaneInstance): The source Swimlane instance. Typically this is considered a
                                   development instance.
        destination (SwimlaneInstance): The destination Swimlane instance. Typically this is considered a production
                                        instance.
        dry_run (bool): Whether or not this run of aqueduct will transfer content to the destination instance.
                        Set to False to migrate content. Default is True.
        offline (bool): Whether or not the destination instance has access to the internet for the purpose of
                        downloading Python packages. Default is False
        update_reports (bool): Whether or not to force update reports if changes are detected. Default is False.
        update_dashboards: (bool): Whether or not to force update dashboards if changes are detected.
                                   Default is False.
        continue_on_error: (bool): Whether or not to continue when an API error occurs. Default is False.
        use_unsupported_version: (bool): Whether or not to transfer content from one version Swimlane to a different
                                         version of Swimlane. Use this at your own risk. Default is False.
        force_unsupported_version: (bool): Whether or not to force usage of an unsupported version. This is used
                                           when running aqueduct headless/non-interactive. Default is False.
    """
    if use_unsupported_version and source.swimlane.product_version != destination.swimlane.product_version:
        self.log(
            f"You specified you want to transfer of content from version {source.swimlane.product_version} to "
            f"{destination.swimlane.product_version}. This is not officially supported. "
            "Please use at your own risk.",
            level="info",
        )
        if not force_unsupported_version and not self.__want_to_continue():
            sys.exit()
    elif source.swimlane.product_version != destination.swimlane.product_version:
        raise UnsupportedSwimlaneVersion(source=source, destination=destination)
    Base.source_instance = source
    Base.destination_instance = destination
    Base.dry_run = dry_run
    Base.source_host = Base.source_instance.swimlane.host
    Base.dest_host = Base.destination_instance.swimlane.host
    Base.offline = offline
    Base.update_reports = update_reports
    Base.update_dashboards = update_dashboards
    Base.continue_on_error = continue_on_error

sync(self, components=OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)]), exclude={}, include={})

The main method to begin syncing components from one Swimlane instance to another.

There are several available components you can specify. The order of these components if forced. The defaults are:

  • keystore
  • packages
  • plugins
  • assets
  • workspaces
  • applications (we update workflows here)
  • tasks
  • reports
  • dashboards
  • users
  • groups
  • roles

You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so provide a dictionary for each argument (include or exclude). For example:

exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.} include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}

aq.sync(include=include, exclude=exclude)

Parameters:

Name Type Description Default
components list

A list of one or more components to sync. Defaults to COMPONENTS.

OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)])
Source code in aqueduct/aqueduct.py
def sync(self, components=COMPONENTS, exclude: dict = {}, include: dict = {}):
    """The main method to begin syncing components from one Swimlane instance to another.

    There are several available components you can specify. The order of these components if forced.
    The defaults are:

    * keystore
    * packages
    * plugins
    * assets
    * workspaces
    * applications (we update workflows here)
    * tasks
    * reports
    * dashboards
    * users
    * groups
    * roles

    You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so
    provide a dictionary for each argument (include or exclude). For example:

    exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.}
    include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}

    aq.sync(include=include, exclude=exclude)

    Args:
        components (list, optional): A list of one or more components to sync. Defaults to COMPONENTS.
    """
    Base.include = include
    Base.exclude = exclude
    for component in self.__sort_sync_order(components):
        if COMPONENTS.get(component):
            getattr(COMPONENTS[component](), "sync")()
    if Base.dry_run:
        return self._get_formatted_diff_log()
    print(b64decode(self.__LOGO).decode("ascii"))

Base

scrub(self, obj, bad_key='$type')

Used to remove a specific provided key from a dictionary that may contain both nested dictionaries and lists.

This method is recursive.

Parameters:

Name Type Description Default
obj dict

A dictionary or list to remove keys from.

required
bad_key str

The bad key to remove from the provided dict or list. Defaults to "$type".

'$type'
Source code in aqueduct/base.py
def scrub(self, obj, bad_key="$type"):
    """Used to remove a specific provided key from a dictionary
    that may contain both nested dictionaries and lists.

    This method is recursive.

    Args:
        obj (dict): A dictionary or list to remove keys from.
        bad_key (str, optional): The bad key to remove from the provided dict or list. Defaults to "$type".
    """
    if isinstance(obj, dict):
        for key in list(obj.keys()):
            if key == bad_key:
                del obj[key]
            else:
                self.scrub(obj[key], bad_key)
    elif isinstance(obj, list):
        for i in reversed(range(len(obj))):
            if obj[i] == bad_key:
                del obj[i]
            else:
                self.scrub(obj[i], bad_key)
    else:
        pass

SwimlaneInstance

Creates a connection to a single Swimlane instance

add_application(self, application)

Adds an application based on the provided dictionary object

Parameters:

Name Type Description Default
application dict

A application dictionary object.

required

Returns:

Type Description
dict

A Swimlane application object

Source code in aqueduct/instance.py
@log_exception
def add_application(self, application):
    """Adds an application based on the provided dictionary object

    Args:
        application (dict): A application dictionary object.

    Returns:
        dict: A Swimlane application object
    """
    return self._make_request("POST", "/app", json=application)

add_asset(self, asset)

Adds a provided Asset object to a Swimlane instance

Parameters:

Name Type Description Default
asset Asset

An Asset data model object

required

Returns:

Type Description
Asset

An Asset data model object

Source code in aqueduct/instance.py
@log_exception
def add_asset(self, asset: Asset):
    """Adds a provided Asset object to a Swimlane instance

    Args:
        asset (Asset): An `Asset` data model object

    Returns:
        Asset: An `Asset` data model object
    """
    return Asset(**self._make_request("POST", "/asset", json=asdict(asset)))

add_credential(self, credential)

Used to add a keystore key name (but not it's value) to a Swimlane instance

Parameters:

Name Type Description Default
credential dict

A keystore item to add to a Swimlane instance

required

Returns:

Type Description
dict

A dictionary of a keystore item

Source code in aqueduct/instance.py
@log_exception
def add_credential(self, credential: dict):
    """Used to add a keystore key name (but not it's value) to a Swimlane instance

    Args:
        credential (dict): A keystore item to add to a Swimlane instance

    Returns:
        dict : A dictionary of a keystore item
    """
    return self._make_request("POST", "/credentials", json=credential)

add_dashboard(self, dashboard)

Adds a provided Dashboard object to a Swimlane instance

Parameters:

Name Type Description Default
dashboard Dashboard

A Dashboard data model object

required

Returns:

Type Description
Dashboard

A Dashboard data model object

Source code in aqueduct/instance.py
@log_exception
def add_dashboard(self, dashboard: Dashboard):
    """Adds a provided Dashboard object to a Swimlane instance

    Args:
        dashboard (Dashboard): A `Dashboard` data model object

    Returns:
        Dashboard: A `Dashboard` data model object
    """
    return Dashboard(**self._make_request("POST", "/dashboard", json=asdict(dashboard)))

add_group(self, group)

Adds a provided Group object to a Swimlane instance

Parameters:

Name Type Description Default
group Group

A Group data model object

required

Returns:

Type Description
Group

A Group data model object

Source code in aqueduct/instance.py
@log_exception
def add_group(self, group: Group):
    """Adds a provided Group object to a Swimlane instance

    Args:
        group (Group): A `Group` data model object

    Returns:
        Group: A `Group` data model object
    """
    try:
        return Group(**self._make_request("POST", "/groups", json=asdict(group)))
    except SwimlaneHTTP400Error as sh:
        if sh.code == 1051:
            return True
        else:
            raise sh
    except Exception as e:
        raise e

add_report(self, report)

Adds a provided Report object to a Swimlane instance

Parameters:

Name Type Description Default
report Report

A Report data model object

required

Returns:

Type Description
Report

A Report data model object

Source code in aqueduct/instance.py
@log_exception
def add_report(self, report: Report):
    """Adds a provided `Report` object to a Swimlane instance

    Args:
        report (Report): A `Report` data model object

    Returns:
        Report: A `Report` data model object
    """
    return Report(**self._make_request("POST", "/reports", json=asdict(report)))

add_role(self, role)

Adds a provided Role object to a Swimlane instance

Parameters:

Name Type Description Default
role Role

A Role data model object

required

Returns:

Type Description
Role

A Role data model object

Source code in aqueduct/instance.py
@log_exception
def add_role(self, role: Role):
    """Adds a provided `Role` object to a Swimlane instance

    Args:
        role (Role): A `Role` data model object

    Returns:
        Role: A `Role` data model object
    """
    return Role(**self._make_request("POST", "/roles", json=asdict(role)))

add_task(self, task)

Adds a provided Task object to a Swimlane instance

Parameters:

Name Type Description Default
task Task

A Task data model object

required

Returns:

Type Description
Task

A Task data model object

Source code in aqueduct/instance.py
def add_task(self, task: Task):
    """Adds a provided Task object to a Swimlane instance

    Args:
        task (Task): A `Task` data model object

    Returns:
        Task: A `Task` data model object
    """
    try:
        return Task(**self._make_request("POST", "/task", json=asdict(task)))
    except Exception as e:
        AddComponentError(model=task, name=task.name, reason=e.__dict__.get("argument"))
        return False

add_user(self, user)

Adds a provided User object to a Swimlane instance

Parameters:

Name Type Description Default
user User

A User data model object

required

Returns:

Type Description
User

A User data model object

Source code in aqueduct/instance.py
@log_exception
def add_user(self, user: User):
    """Adds a provided User object to a Swimlane instance

    Args:
        user (User): A `User` data model object

    Returns:
        User: A `User` data model object
    """
    return User(**self._make_request("POST", "/user", json=asdict(user)))

add_workflow(self, workflow)

Adds a Swimlane instance with the provided Workflow data model object

Parameters:

Name Type Description Default
workflow Workflow

A Workflow data model object

required

Returns:

Type Description
Workflow

A Workflow data model object

Source code in aqueduct/instance.py
@log_exception
def add_workflow(self, workflow):
    """Adds a Swimlane instance with the provided Workflow data model object

    Args:
        workflow (Workflow): A `Workflow` data model object

    Returns:
        Workflow: A `Workflow` data model object
    """
    try:
        return self._make_request("POST", "/workflow/", json=workflow)
    except Exception as e:
        return False

add_workspace(self, workspace)

Adds a provided Workspace object to a Swimlane instance

Parameters:

Name Type Description Default
workspace Workspace

A Workspace data model object

required

Returns:

Type Description
Workspace

A Workspace data model object

Source code in aqueduct/instance.py
@log_exception
def add_workspace(self, workspace: Workspace):
    """Adds a provided Workspace object to a Swimlane instance

    Args:
        workspace (Workspace): A `Workspace` data model object

    Returns:
        Workspace: A `Workspace` data model object
    """
    return Workspace(**self._make_request("POST", "/workspaces", json=asdict(workspace)))

download_plugin(self, file_id)

A Swimlane internal fileId for a plugin or Python package to download

Parameters:

Name Type Description Default
file_id str

An internal Swimlane fileId to download a plugin or python package

required

Returns:

Type Description
BytesIO

A bytesIO object of the downloaded file

Source code in aqueduct/instance.py
@log_exception
def download_plugin(self, file_id: str):
    """A Swimlane internal fileId for a plugin or Python package to download

    Args:
        file_id (str): An internal Swimlane fileId to download a plugin or python package

    Returns:
        BytesIO: A bytesIO object of the downloaded file
    """
    stream = BytesIO()
    response = self.swimlane.request("GET", f"attachment/download/{file_id}", stream=True)
    for chunk in response.iter_content(1024):
        stream.write(chunk)
    stream.seek(0)
    return stream

get_application(self, application_id)

Gets an application by a provided ID

If the application does not exist, then this method returns False.

If the application does exist it will return an JSON object

Parameters:

Name Type Description Default
application_id str

A Swimlane application ID

required

Returns:

Type Description
json

A Swimlane application JSON

Source code in aqueduct/instance.py
@log_exception
def get_application(self, application_id):
    """Gets an application by a provided ID

    If the application does not exist, then this method returns False.

    If the application does exist it will return an JSON object

    Args:
        application_id (str): A Swimlane application ID

    Returns:
        json: A Swimlane application JSON
    """
    try:
        return self._make_request("GET", f"/app/{application_id}")
    except Exception as e:
        return False

get_applications(self)

Retrieves a list of applications for a Swimlane instance.

Returns:

Type Description
list

A list of json objects

Source code in aqueduct/instance.py
@log_exception
def get_applications(self):
    """Retrieves a list of applications for a Swimlane instance.

    Returns:
        list: A list of json objects
    """
    return self._make_request("GET", "/app")

get_applications_light(self)

Gets light version of all applications

Returns:

Type Description
list

A list of application light objects

Source code in aqueduct/instance.py
@log_exception
def get_applications_light(self):
    """Gets light version of all applications

    Returns:
        list: A list of application light objects
    """
    return self._make_request("GET", "/app/light")

get_asset(self, asset_id)

Gets an asset by a provided ID

If the asset does not exist, then this method returns False.

If the asset does exist it will return an Asset object

Parameters:

Name Type Description Default
asset_id str

An Swimlane asset ID

required

Returns:

Type Description
Asset

An Asset data model object

Source code in aqueduct/instance.py
@log_exception
def get_asset(self, asset_id: str):
    """Gets an asset by a provided ID

    If the asset does not exist, then this method returns False.

    If the asset does exist it will return an `Asset` object

    Args:
        asset_id (str): An Swimlane asset ID

    Returns:
        Asset: An `Asset` data model object
    """
    try:
        return Asset(**self._make_request("GET", f"/asset/{asset_id}"))
    except Exception as e:
        return False

get_assets(self)

Retrieves a list of assets for a Swimlane instance.

These assets are modeled after the Asset data model

Returns:

Type Description
list

A list of Asset objects

Source code in aqueduct/instance.py
@log_exception
def get_assets(self):
    """Retrieves a list of assets for a Swimlane instance.

    These assets are modeled after the `Asset` data model

    Returns:
        list: A list of `Asset` objects
    """
    return_list = []
    assets = self._make_request("GET", "/asset")
    if assets and isinstance(assets, list):
        for asset in assets:
            return_list.append(Asset(**asset))
    return return_list

get_credential(self, name)

Retrieves a keystore item if it exists on the desired instance

Parameters:

Name Type Description Default
name str

Name of a keystore item

required

Returns:

Type Description
dict

A dictionary of a keystore item

Source code in aqueduct/instance.py
@log_exception
def get_credential(self, name: str):
    """Retrieves a keystore item if it exists on the desired instance

    Args:
        name (str): Name of a keystore item

    Returns:
        dict : A dictionary of a keystore item
    """
    try:
        return self._make_request("GET", f"/credentials/{name}")
    except Exception as e:
        return False

get_credentials(self)

Used to retrieve keystore items

Returns:

Type Description
dict

A dictionary of keystore keys and encrypted values

Source code in aqueduct/instance.py
@log_exception
def get_credentials(self):
    """Used to retrieve keystore items

    Returns:
        dict : A dictionary of keystore keys and encrypted values
    """
    return self._make_request("GET", "/credentials")

get_dashboard(self, dashboard_id)

Gets an dashboard by a provided ID

If the dashboard does not exist, then this method returns False.

If the dashboard does exist it will return an Dashboard object

Parameters:

Name Type Description Default
dashboard_id str

A Swimlane dashboard ID

required

Returns:

Type Description
Dashboard

A Dashboard data model object

Source code in aqueduct/instance.py
@log_exception
def get_dashboard(self, dashboard_id: str):
    """Gets an dashboard by a provided ID

    If the dashboard does not exist, then this method returns False.

    If the dashboard does exist it will return an `Dashboard` object

    Args:
        dashboard_id (str): A Swimlane dashboard ID

    Returns:
        Dashboard: A `Dashboard` data model object
    """
    try:
        return Dashboard(**self._make_request("GET", f"/dashboard/{dashboard_id}"))
    except Exception as e:
        return False

get_dashboards(self)

Retrieves a list of dashboards for a Swimlane instance.

These dashboards are modeled after the Dashboard data model

Returns:

Type Description
list

A list of Dashboard objects

Source code in aqueduct/instance.py
@log_exception
def get_dashboards(self):
    """Retrieves a list of dashboards for a Swimlane instance.

    These dashboards are modeled after the `Dashboard` data model

    Returns:
        list: A list of `Dashboard` objects
    """
    return_list = []
    dashboards = self._make_request("GET", "/dashboard")
    if dashboards and isinstance(dashboards, list):
        for dashboard in dashboards:
            return_list.append(Dashboard(**dashboard))
    return return_list

get_default_report_by_application_id(self, application_id)

Gets the default report for an application based on the provided ID

Parameters:

Name Type Description Default
application_id str

A application dictionary object.

required

Returns:

Type Description
dict

A Swimlane application object

Source code in aqueduct/instance.py
@log_exception
def get_default_report_by_application_id(self, application_id):
    """Gets the default report for an application based on the provided ID

    Args:
        application_id (str): A application dictionary object.

    Returns:
        dict: A Swimlane application object
    """
    try:
        return Report(**self._make_request("GET", f"/reports/app/{application_id}/default"))
    except Exception as e:
        return False

get_group_by_id(self, group_id)

Gets an group by a provided id

If the group does not exist, then this method returns False.

If the group does exist it will return an Group object

Parameters:

Name Type Description Default
group_id str

A Swimlane group ID

required

Returns:

Type Description
Group

A Group data model object

Source code in aqueduct/instance.py
@log_exception
def get_group_by_id(self, group_id: str):
    """Gets an group by a provided id

    If the group does not exist, then this method returns False.

    If the group does exist it will return an `Group` object

    Args:
        group_id (str): A Swimlane group ID

    Returns:
        Group: A `Group` data model object
    """
    try:
        return Group(**self._make_request("GET", f"/groups/{group_id}"))
    except Exception as e:
        return False

get_group_by_name(self, group_name)

Gets an group by a provided name

If the group does not exist, then this method returns False.

If the group does exist it will return an Group object

Parameters:

Name Type Description Default
group_name str

A Swimlane group name

required

Returns:

Type Description
Group

A Group data model object

Source code in aqueduct/instance.py
@log_exception
def get_group_by_name(self, group_name: str):
    """Gets an group by a provided name

    If the group does not exist, then this method returns False.

    If the group does exist it will return an `Group` object

    Args:
        group_name (str): A Swimlane group name

    Returns:
        Group: A `Group` data model object
    """
    try:
        resp = self._make_request("GET", f"/users/lookup?name={group_name}")
        if resp and isinstance(resp, list):
            return Group(**resp[0])
    except Exception as e:
        return False

get_groups(self)

Retrieves a list of groups for a Swimlane instance.

These groups are modeled after the Group data model

Returns:

Type Description
list

A list of Group objects

Source code in aqueduct/instance.py
@log_exception
def get_groups(self):
    """Retrieves a list of groups for a Swimlane instance.

    These groups are modeled after the `Group` data model

    Returns:
        list: A list of `Group` objects
    """
    return_list = []
    groups = self._make_request("GET", "/groups")
    if groups and groups.get("items"):
        for item in groups["items"]:
            if item:
                return_list.append(Group(**item))
    elif groups and groups.get("groups"):
        for item in groups["groups"]:
            if item:
                return_list.append(Group(**item))
    return return_list

get_pip_packages(self, versions=['Python2_7', 'Python3_6', 'Python3'])

Retrieves a list of pip packages for a Swimlane instance.

These packages are modeled after the Package data model

Parameters:

Name Type Description Default
versions list

A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3'].

['Python2_7', 'Python3_6', 'Python3']

Returns:

Type Description
list

A list of Package objects

Source code in aqueduct/instance.py
@log_exception
def get_pip_packages(self, versions=["Python2_7", "Python3_6", "Python3"]):
    """Retrieves a list of pip packages for a Swimlane instance.

    These packages are modeled after the `Package` data model

    Args:
        versions (list, optional): A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3'].

    Returns:
        list: A list of `Package` objects
    """
    return_list = []
    for version in versions:
        try:
            resp = self._make_request("GET", f"/pip/packages/{version}")
            if resp:
                for item in resp:
                    return_list.append(Package(**item))
        except Exception as e:
            continue
    return return_list

get_plugin(self, name)

Gets an plugin by a provided plugin name

If the plugin does not exist, then this method returns False.

If the plugin does exist it will return an Plugin object

Parameters:

Name Type Description Default
name str

A plugin name

required

Returns:

Type Description
Plugin

A Plugin data model object

Source code in aqueduct/instance.py
@log_exception
def get_plugin(self, name: str):
    """Gets an plugin by a provided plugin name

    If the plugin does not exist, then this method returns False.

    If the plugin does exist it will return an `Plugin` object

    Args:
        name (str): A plugin name

    Returns:
        Plugin: A `Plugin` data model object
    """
    try:
        return Plugin(**self._make_request("GET", f"/task/packages/{name}"))
    except Exception as e:
        return False

get_plugins(self)

Retrieves a list of plugins for a Swimlane instance.

These plugins are modeled after the PluginLight data model

Returns:

Type Description
list

A list of PluginLight objects

Source code in aqueduct/instance.py
@log_exception
def get_plugins(self):
    """Retrieves a list of plugins for a Swimlane instance.

    These plugins are modeled after the `PluginLight` data model

    Returns:
        list: A list of PluginLight objects
    """
    return_list = []
    plugins = self._make_request("GET", "/task/packages")
    if plugins and isinstance(plugins, list):
        for item in plugins:
            if item:
                return_list.append(PluginLight(**item))
    return return_list

get_report(self, report_id)

Gets an report by a provided ID

If the report does not exist, then this method returns False.

If the report does exist it will return an Report object

Parameters:

Name Type Description Default
report_id str

A Swimlane report ID

required

Returns:

Type Description
Report

A Report data model object

Source code in aqueduct/instance.py
@log_exception
def get_report(self, report_id: str):
    """Gets an report by a provided ID

    If the report does not exist, then this method returns False.

    If the report does exist it will return an `Report` object

    Args:
        report_id (str): A Swimlane report ID

    Returns:
        Report: A `Report` data model object
    """
    try:
        return Report(**self._make_request("GET", f"/reports/{report_id}"))
    except Exception as e:
        return False

get_reports(self)

Retrieves a list of reports for a Swimlane instance.

These reports are modeled after the Report data model

Returns:

Type Description
list

A list of Report objects

Source code in aqueduct/instance.py
@log_exception
def get_reports(self):
    """Retrieves a list of reports for a Swimlane instance.

    These reports are modeled after the `Report` data model

    Returns:
        list: A list of `Report` objects
    """
    return_list = []
    reports = self._make_request("GET", "/reports")
    if reports:
        for report in reports:
            return_list.append(Report(**report))
    return return_list

get_role(self, role_id)

Gets an role by a provided ID

If the role does not exist, then this method returns False.

If the role does exist it will return an Role object

Parameters:

Name Type Description Default
role_id str

A Swimlane role ID

required

Returns:

Type Description
Role

A Role data model object

Source code in aqueduct/instance.py
@log_exception
def get_role(self, role_id: str):
    """Gets an role by a provided ID

    If the role does not exist, then this method returns False.

    If the role does exist it will return an `Role` object

    Args:
        role_id (str): A Swimlane role ID

    Returns:
        Role: A `Role` data model object
    """
    try:
        return Role(**self._make_request("GET", f"/roles/{role_id}"))
    except Exception as e:
        return False

get_role_by_name(self, role_name)

Gets an role by a provided name

If the role does not exist, then this method returns False.

If the role does exist it will return an Role object

Parameters:

Name Type Description Default
role_name str

A Swimlane role name

required

Returns:

Type Description
Role

A Role data model object

Source code in aqueduct/instance.py
@log_exception
def get_role_by_name(self, role_name: str):
    """Gets an role by a provided name

    If the role does not exist, then this method returns False.

    If the role does exist it will return an `Role` object

    Args:
        role_name (str): A Swimlane role name

    Returns:
        Role: A `Role` data model object
    """
    try:
        return Role(**self._make_request("GET", f"/roles/?searchFieldName=name&searchValue={role_name}"))
    except Exception as e:
        return False

get_roles(self)

Retrieves a list of roles for a Swimlane instance.

These roles are modeled after the Role data model

Returns:

Type Description
list

A list of Role objects

Source code in aqueduct/instance.py
@log_exception
def get_roles(self):
    """Retrieves a list of roles for a Swimlane instance.

    These roles are modeled after the `Role` data model

    Returns:
        list: A list of `Role` objects
    """
    return_list = []
    roles = self._make_request("GET", "/roles")
    if roles and isinstance(roles, dict):
        # checking for items here. 10.4.0 does not use the items key
        # but greater versions do (e.g. 10.5>)
        if roles.get("items"):
            roles = roles["items"]
    if roles and isinstance(roles, list):
        for role in roles:
            return_list.append(Role(**role))
    return return_list

get_task(self, task_id)

Gets an task by a provided task ID.

If the task ID does not exist, then this method returns False.

If the task ID does exist it will return an Task object

Parameters:

Name Type Description Default
task_id str

A task ID to retrieve the entire Task object

required

Returns:

Type Description
Task

A Task data model object

Source code in aqueduct/instance.py
@log_exception
def get_task(self, task_id: str):
    """Gets an task by a provided task ID.

    If the task ID does not exist, then this method returns False.

    If the task ID does exist it will return an `Task` object

    Args:
        task_id (str): A task ID to retrieve the entire `Task` object

    Returns:
        Task: A `Task` data model object
    """
    try:
        return Task(**self._make_request("GET", f"/task/{task_id}"))
    except TypeError as te:
        from .utils.exceptions import ModelError

        raise ModelError(err=te, name="Task")
    except Exception as e:
        return False

get_tasks(self)

Retrieves a list of tasks for a Swimlane instance.

These tasks are modeled after the TaskLight data model

Returns:

Type Description
list

A list of TaskLight objects

Source code in aqueduct/instance.py
@log_exception
def get_tasks(self):
    """Retrieves a list of tasks for a Swimlane instance.

    These tasks are modeled after the `TaskLight` data model

    Returns:
        list: A list of TaskLight objects
    """
    return_list = []
    tasks = self._make_request("GET", "/task/list")
    if tasks and tasks.get("tasks"):
        for task in tasks["tasks"]:
            return_list.append(TaskLight(**task))
    return return_list

get_user(self, user_id)

Gets an user by a provided ID

If the user does not exist, then this method returns False.

If the user does exist it will return an User object

Parameters:

Name Type Description Default
user_id str

A Swimlane user ID

required

Returns:

Type Description
User

A User data model object

Source code in aqueduct/instance.py
@log_exception
def get_user(self, user_id: str):
    """Gets an user by a provided ID

    If the user does not exist, then this method returns False.

    If the user does exist it will return an `User` object

    Args:
        user_id (str): A Swimlane user ID

    Returns:
        User: A `User` data model object
    """
    try:
        return User(**self._make_request("GET", f"/user/{user_id}"))
    except Exception as e:
        return False

get_users(self)

Retrieves a list of users for a Swimlane instance.

These users are modeled after the User data model

Returns:

Type Description
list

A list of User objects

Source code in aqueduct/instance.py
@log_exception
def get_users(self):
    """Retrieves a list of users for a Swimlane instance.

    These users are modeled after the `User` data model

    Returns:
        list: A list of `User` objects
    """
    user_list = []
    users = self._make_request("GET", "/user/light")
    if users:
        for user in users:
            user_list.append(UserLight(**user))
    return user_list

get_workflow(self, application_id)

Gets an workflow by a provided application ID

If the workflow does not exist, then this method returns False.

If the workflow does exist it will return an Workflow object

Parameters:

Name Type Description Default
application_id str

A Swimlane application ID

required

Returns:

Type Description
Workflow

A Workflow data model object

Source code in aqueduct/instance.py
@log_exception
def get_workflow(self, application_id: str):
    """Gets an workflow by a provided application ID

    If the workflow does not exist, then this method returns False.

    If the workflow does exist it will return an `Workflow` object

    Args:
        application_id (str): A Swimlane application ID

    Returns:
        Workflow: A `Workflow` data model object
    """
    try:
        return self._make_request("GET", f"/workflow/{application_id}")
    except Exception as e:
        return False

get_workflows(self)

Retrieves a list of workflows for a Swimlane instance.

These workflow are modeled after the Workflow data model

Returns:

Type Description
list

A list of Workflow objects

Source code in aqueduct/instance.py
@log_exception
def get_workflows(self):
    """Retrieves a list of workflows for a Swimlane instance.

    These workflow are modeled after the `Workflow` data model

    Returns:
        list: A list of `Workflow` objects
    """
    return_list = []
    workflows = self._make_request("GET", "/workflow/")
    if workflows:
        for workflow in workflows:
            return_list.append(workflow)
    return return_list

get_workspace(self, workspace_id)

Gets an workspace by a provided ID

If the workspace does not exist, then this method returns False.

If the workspace does exist it will return an Workspace object

Parameters:

Name Type Description Default
workspace_id str

A Swimlane workspace ID

required

Returns:

Type Description
Workspace

A Workspace data model object

Source code in aqueduct/instance.py
@log_exception
def get_workspace(self, workspace_id: str):
    """Gets an workspace by a provided ID

    If the workspace does not exist, then this method returns False.

    If the workspace does exist it will return an `Workspace` object

    Args:
        workspace_id (str): A Swimlane workspace ID

    Returns:
        Workspace: A `Workspace` data model object
    """
    try:
        return Workspace(**self._make_request("GET", f"/workspaces/{workspace_id}"))
    except Exception as e:
        return False

get_workspaces(self)

Retrieves a list of workspaces for a Swimlane instance.

These workspaces are modeled after the Workspace data model

Returns:

Type Description
list

A list of Workspace objects

Source code in aqueduct/instance.py
@log_exception
def get_workspaces(self):
    """Retrieves a list of workspaces for a Swimlane instance.

    These workspaces are modeled after the `Workspace` data model

    Returns:
        list: A list of `Workspace` objects
    """
    return_list = []
    workspaces = self._make_request("GET", "/workspaces")
    if workspaces:
        for workspace in workspaces:
            return_list.append(Workspace(**workspace))
    return return_list

install_package(self, package)

Installs a Python (pip) package based on the provided Package data model object

Parameters:

Name Type Description Default
package Package

A Package data model object

required

Returns:

Type Description
dict

A dictionary of a Python pip package

Source code in aqueduct/instance.py
@log_exception
def install_package(self, package: Package):
    """Installs a Python (pip) package based on the provided Package data model object

    Args:
        package (Package): A `Package` data model object

    Returns:
        dict: A dictionary of a Python pip package
    """
    json = {
        "name": package.name,
        "version": package.version,
        "pythonVersion": package.pythonVersion,
    }
    return self._make_request("POST", "/pip/packages", json=json)

install_package_offline(self, filename, stream, data)

Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream

Parameters:

Name Type Description Default
filename str

A filename string

required
stream BytesIO

A BytesIO file stream

required
data dict

A dictionary of additional request information

required

Returns:

Type Description
json

JSON Response from installing a Python package wheel file

Source code in aqueduct/instance.py
@log_exception
def install_package_offline(self, filename, stream, data):
    """Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream

    Args:
        filename (str): A filename string
        stream (BytesIO): A BytesIO file stream
        data (dict): A dictionary of additional request information

    Returns:
        json: JSON Response from installing a Python package wheel file
    """
    return self.swimlane.request(
        "POST",
        "/pip/packages/offline",
        data=data,
        files={"wheel": (filename, stream.read())},
        timeout=120,
    )

search_user(self, query_string)

Searches for a user by a query string

If the user does not exist, then this method returns False.

If the user does exist it will return an User object

Parameters:

Name Type Description Default
query_string str

A query string typically a display name

required

Returns:

Type Description
User

A User data model object

Source code in aqueduct/instance.py
@log_exception
def search_user(self, query_string: str):
    """Searches for a user by a query string

    If the user does not exist, then this method returns False.

    If the user does exist it will return an `User` object

    Args:
        query_string (str): A query string typically a display name

    Returns:
        User: A `User` data model object
    """
    try:
        resp = self._make_request("GET", f"/user/lookup?name={query_string}")
        if resp and isinstance(resp, list):
            return UserLight(**resp[0])
    except Exception as e:
        return False

update_application(self, application)

Updates the application based on the provided dictionary object

Parameters:

Name Type Description Default
application dict

A application dictionary object.

required

Returns:

Type Description
dict

A Swimlane application object

Source code in aqueduct/instance.py
@log_exception
def update_application(self, application):
    """Updates the application based on the provided dictionary object

    Args:
        application (dict): A application dictionary object.

    Returns:
        dict: A Swimlane application object
    """
    return self._make_request("PUT", "/app", json=application)

update_asset(self, asset_id, asset)

Updates a provided asset ID with the data contained in the provided Asset data model object

Parameters:

Name Type Description Default
asset_id str

An asset ID to update on a Swimlane instance

required
asset Asset

An Asset data model object

required

Returns:

Type Description
Asset

An Asset data model object

Source code in aqueduct/instance.py
@log_exception
def update_asset(self, asset_id: str, asset: Asset):
    """Updates a provided asset ID with the data contained in the provided Asset data model object

    Args:
        asset_id (str): An asset ID to update on a Swimlane instance
        asset (Asset): An `Asset` data model object

    Returns:
        Asset: An `Asset` data model object
    """
    return Asset(**self._make_request("PUT", f"/asset/{asset_id}", json=asdict(asset)))

update_dashboard(self, dashboard)

Updates a Swimlane instance dashboard based on the provided Dashboard data model object

Parameters:

Name Type Description Default
dashboard Dashboard

A Dashboard data model object

required

Returns:

Type Description
Dashboard

A Dashboard data model object

Source code in aqueduct/instance.py
@log_exception
def update_dashboard(self, dashboard: Dashboard):
    """Updates a Swimlane instance dashboard based on the provided Dashboard data model object

    Args:
        dashboard (Dashboard): A `Dashboard` data model object

    Returns:
        Dashboard: A `Dashboard` data model object
    """
    return Dashboard(**self._make_request("PUT", f"/dashboard/{dashboard.id}", json=asdict(dashboard)))

update_default_report(self, report)

Updates a Swimlane applications default report with the data contained in the provided Report data model object

Parameters:

Name Type Description Default
report Report

A Report data model object

required

Returns:

Type Description
Report

A Report data model object

Source code in aqueduct/instance.py
@log_exception
def update_default_report(self, report: Report):
    """Updates a Swimlane applications default report with the data contained in the provided `Report` data model object

    Args:
        report (Report): A `Report` data model object

    Returns:
        Report: A `Report` data model object
    """
    return Report(**self._make_request("PUT", f"/reports/{report.id}", json=asdict(report)))

update_group(self, group_id, group)

Updates a provided group ID with the data contained in the provided Group data model object

Parameters:

Name Type Description Default
group_id str

A group ID to update on a Swimlane instance

required
group Group

A Group data model object

required

Returns:

Type Description
Group

A Group data model object

Source code in aqueduct/instance.py
@log_exception
def update_group(self, group_id: str, group: Group):
    """Updates a provided group ID with the data contained in the provided Group data model object

    Args:
        group_id (str): A group ID to update on a Swimlane instance
        group (Group): A `Group` data model object

    Returns:
        Group: A `Group` data model object
    """
    return Group(**self._make_request("PUT", f"/groups/{group_id}", json=asdict(group)))

update_report(self, report_id, report)

Updates a provided report ID with the data contained in the provided Report data model object

Parameters:

Name Type Description Default
report_id str

A report ID to update on a Swimlane instance

required
report Report

A Report data model object

required

Returns:

Type Description
Report

A Report data model object

Source code in aqueduct/instance.py
@log_exception
def update_report(self, report_id: str, report: Report):
    """Updates a provided report ID with the data contained in the provided Report data model object

    Args:
        report_id (str): A report ID to update on a Swimlane instance
        report (Report): A `Report` data model object

    Returns:
        Report: A `Report` data model object
    """
    resp = self._make_request("PUT", f"/reports/{report_id}", json=asdict(report))
    if resp:
        return True

update_role(self, role_id, role)

Updates a provided role ID with the data contained in the provided Role data model object

Parameters:

Name Type Description Default
role_id str

A role ID to update on a Swimlane instance

required
role Role

A Role data model object

required

Returns:

Type Description
Role

A Role data model object

Source code in aqueduct/instance.py
@log_exception
def update_role(self, role_id: str, role: Role):
    """Updates a provided role ID with the data contained in the provided Role data model object

    Args:
        role_id (str): A role ID to update on a Swimlane instance
        role (Role): A `Role` data model object

    Returns:
        Role: A `Role` data model object
    """
    return self._make_request("PUT", f"/roles/{role_id}", json=asdict(role))

update_task(self, task_id, task)

Updates a provided task ID with the data contained in the provided Task data model object

Parameters:

Name Type Description Default
task_id str

A task ID to update on a Swimlane instance

required
task Task

A Task data model object

required

Returns:

Type Description
Task

A Task data model object

Source code in aqueduct/instance.py
@log_exception
def update_task(self, task_id: str, task: Task):
    """Updates a provided task ID with the data contained in the provided `Task` data model object

    Args:
        task_id (str): A task ID to update on a Swimlane instance
        task (Task): A `Task` data model object

    Returns:
        Task: A `Task` data model object
    """
    return Task(**self._make_request("PUT", f"/task/{task_id}", json=asdict(task)))

update_user(self, user_id, user)

Updates a provided user ID with the data contained in the provided User data model object

Parameters:

Name Type Description Default
user_id str

A user ID to update on a Swimlane instance

required
user User

A User data model object

required

Returns:

Type Description
User

A User data model object

Source code in aqueduct/instance.py
@log_exception
def update_user(self, user_id: str, user: User):
    """Updates a provided user ID with the data contained in the provided User data model object

    Args:
        user_id (str): A user ID to update on a Swimlane instance
        user (User): A `User` data model object

    Returns:
        User: A `User` data model object
    """
    return User(**self._make_request("PUT", f"/user/{user_id}", json=asdict(user)))

update_workflow(self, workflow)

Updates a Swimlane instance with the provided Workflow data model object

Parameters:

Name Type Description Default
workflow Workflow

A Workflow data model object

required

Returns:

Type Description
Workflow

A Workflow data model object

Source code in aqueduct/instance.py
@log_exception
def update_workflow(self, workflow):
    """Updates a Swimlane instance with the provided `Workflow` data model object

    Args:
        workflow (Workflow): A `Workflow` data model object

    Returns:
        Workflow: A `Workflow` data model object
    """
    return self._make_request("PUT", f"/workflow/{workflow['id']}", json=workflow)

update_workspace(self, workspace_id, workspace)

Updates a provided workspace ID with the data contained in the provided Workspace data model object

Parameters:

Name Type Description Default
workspace_id str

A workspace ID to update on a Swimlane instance

required
workspace Workspace

A Workspace data model object

required

Returns:

Type Description
Workspace

A Workspace data model object

Source code in aqueduct/instance.py
@log_exception
def update_workspace(self, workspace_id: str, workspace: Workspace):
    """Updates a provided workspace ID with the data contained in the provided Workspace data model object

    Args:
        workspace_id (str): A workspace ID to update on a Swimlane instance
        workspace (Workspace): A `Workspace` data model object

    Returns:
        Workspace: A `Workspace` data model object
    """
    return Workspace(**self._make_request("PUT", f"/workspaces/{workspace_id}", json=asdict(workspace)))

upgrade_plugin(self, filename, stream)

Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream

Parameters:

Name Type Description Default
filename str

A filename string

required
stream BytesIO

A BytesIO file stream

required

Returns:

Type Description
json

JSON Response from uploading a plugin

Source code in aqueduct/instance.py
@log_exception
def upgrade_plugin(self, filename, stream):
    """Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream

    Args:
        filename (str): A filename string
        stream (BytesIO): A BytesIO file stream

    Returns:
        json: JSON Response from uploading a plugin
    """
    if not filename.endswith(".swimbundle"):
        filename = filename.split(".")[0] + ".swimbundle"
    return self.swimlane.request("POST", "/task/packages/upgrade", files={"file": (filename, stream.read())}).json()

upload_plugin(self, filename, stream)

Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream

Parameters:

Name Type Description Default
filename str

A filename string

required
stream BytesIO

A BytesIO file stream

required

Returns:

Type Description
json

JSON Response from uploading a plugin

Source code in aqueduct/instance.py
@log_exception
def upload_plugin(self, filename, stream):
    """Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream

    Args:
        filename (str): A filename string
        stream (BytesIO): A BytesIO file stream

    Returns:
        json: JSON Response from uploading a plugin
    """
    if not filename.endswith(".swimbundle"):
        filename = filename.split(".")[0] + ".swimbundle"
    return self.swimlane.request("POST", "/task/packages", files={"file": (filename, stream.read())}).json()

Applications (Base)

Used to sync applications from a source instance to a destination instance of Swimlane

_remove_tracking_field(self, application) private

This method removes an applications tracking field since Swimlane automatically generates this for every new application.

Parameters:

Name Type Description Default
application dict

A Swimlane application to remove the tracking field from

required
Source code in aqueduct/components/applications.py
def _remove_tracking_field(self, application: dict):
    """This method removes an applications tracking field since
    Swimlane automatically generates this for every new application.

    Args:
        application (dict): A Swimlane application to remove the tracking field from
    """
    return_dict = {}
    for field in application.get("fields"):
        if field.get("fieldType") and field["fieldType"].lower() == "tracking":
            if not Base.dry_run:
                application["fields"].remove(field)
                return_dict = field
            else:
                self.add_to_diff_log(application["name"], "removed", subcomponent="tracking-field")
    return return_dict

get_reference_app_order(self)

This method creates an order of applications to be added or updated on a destination instance based on most to least reference relationships. For example, a source application that references 5 applications will be before an application which has 3 references to applications.

Returns:

Type Description
dict

An reference application ordered (sorted) application dictionary

Source code in aqueduct/components/applications.py
def get_reference_app_order(self):
    """This method creates an order of applications to be added or updated on a destination
    instance based on most to least reference relationships. For example, a source application
    that references 5 applications will be before an application which has 3 references to applications.

    Returns:
        dict: An reference application ordered (sorted) application dictionary
    """
    reference_dict = {}
    for application in self.source_instance.get_applications():
        if application:
            application_ = self.source_instance.get_application(application_id=application["id"])
            if application_["id"] not in reference_dict:
                reference_dict[application_["id"]] = []
            for field in application_["fields"]:
                if field.get("$type") and field["$type"] == "Core.Models.Fields.Reference.ReferenceField, Core":
                    reference_dict[application_["id"]].append(field["targetId"])

    return_dict = OrderedDict()
    for item in sorted(reference_dict, key=lambda k: len(reference_dict[k]), reverse=True):
        return_dict[item] = reference_dict[item]
    return return_dict

sync(self)

This method will sync all applications on a source instance with a destination instance.

Source code in aqueduct/components/applications.py
def sync(self):
    """This method will sync all applications on a source instance with a destination instance."""
    self.log(f"Starting to sync 'Applications' from '{self.source_host}' to '{self.dest_host}'")
    for application_id, values in self.get_reference_app_order().items():
        self.sync_application(application_id=application_id)

    # Checking if we need to update applications reference fields that contain old (source instance)
    # tracking-ids in their columns. If so we remove them and update it with the new ones on the destination.
    if self.__applications_needing_updates:
        for application_id in self.__applications_needing_updates:
            dest_application = self.destination_instance.get_application(application_id)
            needs_update = False
            for field in dest_application["fields"]:
                if field.get("columns"):
                    column_list = []
                    for column in field["columns"]:
                        if self.tracking_id_map.get(column):
                            self.log(
                                f"Removing old tracking-id '{column}' from application "
                                f"'{dest_application['name']}' field '{field['key']}'"
                            )
                            column_list.append(self.tracking_id_map[column])
                            self.log(
                                f"Adding new tracking-id '{self.tracking_id_map[column]}' to application "
                                f"'{dest_application['name']}' field '{field['key']}'"
                            )
                            needs_update = True
                        else:
                            column_list.append(column)
                    field["columns"] = column_list
            if needs_update:
                self.log(f"Updating application '{dest_application['name']}' on destination with new.")
                dest_application = self.destination_instance.update_application(dest_application)
                self.log(f"Successfully updated application '{dest_application['name']}' on destination.")

sync_application(self, application_id)

This method syncs a single application from a source instance to a destination instance.

Once an application_id on a source instance is provided we retrieve this application JSON. Next we remove the tracking_field from the application since Swimlane automatically generates a unique value for this field.

If workspaces are defined in the application and do not currently exist will attempt to create or update them using the Workspaces class.

If the source application does not exist on the destination, we will create it with the same IDs for 1. application 2. fields 3. layout 4. etc.

By doing this, syncing of applications (and their IDs) is much easier.

If the application exists, we proceed to remove specific fields that are not needed.

Next, we check for fields which have been added to the source application but do not exist on the destination instance. If fields are found we add them to the destination object.

Next, we check to see if the destination has fields which are not defined in the source instance. If fields are found, we then remove them from the layout view of the source application. This equates to moving them to the "hidden" field section within the application builder so they can still be retrieved and reorganized as needed.

Finally, we update the application on the destination instance.

After updating the application we then check to ensure that the workflow of that application is up to date and accurate.

Parameters:

Name Type Description Default
application_id str

A source application ID.

required
Source code in aqueduct/components/applications.py
def sync_application(self, application_id: str):
    """This method syncs a single application from a source instance to a destination instance.

    Once an application_id on a source instance is provided we retrieve this application JSON.
    Next we remove the tracking_field from the application since Swimlane automatically generates
    a unique value for this field.

    If workspaces are defined in the application and do not currently exist will attempt to create
    or update them using the Workspaces class.

    If the source application does not exist on the destination, we will create it with the same IDs for
        1. application
        2. fields
        3. layout
        4. etc.

    By doing this, syncing of applications (and their IDs) is much easier.

    If the application exists, we proceed to remove specific fields that are not needed.

    Next, we check for fields which have been added to the source application but do not
    exist on the destination instance. If fields are found we add them to the destination
    object.

    Next, we check to see if the destination has fields which are not defined in the source instance.
    If fields are found, we then remove them from the layout view of the source application. This equates
    to moving them to the "hidden" field section within the application builder so they can still be retrieved
    and reorganized as needed.

    Finally, we update the application on the destination instance.

    After updating the application we then check to ensure that the workflow of that application is up to date
    and accurate.

    Args:
        application_id (str): A source application ID.
    """
    application = self.source_instance.get_application(application_id=application_id)
    if not application:
        raise GetComponentError(name="application", id=application_id)

    self._process_workspaces(application=application)
    source_tracking_field = self._remove_tracking_field(application)
    self.scrub(application)
    if not self._is_in_include_exclude_lists(application["name"], "applications"):
        dest_application = self.destination_instance.get_application(application["id"])
        if not dest_application:
            if not Base.dry_run:
                self.log(f"Adding application '{application['name']}' on destination.")
                dest_application = self.destination_instance.add_application(application)
                self.__applications_needing_updates.append(application_id)
                self.log(f"Successfully added application '{application['name']}' on destination.")
                # Setting tracking_id_map just in case tasks are added and need updating
                # Tasks would need updating if they use an applications 'Tracking Id' field as an
                # input or output since Swimlane randomizes the Tracking ID when created
                self.tracking_id_map = (source_tracking_field["id"], dest_application["trackingFieldId"])
            else:
                self.add_to_diff_log(application["name"], "added")
        else:
            for item in ["createdByUser", "modifiedByUser", "permissions"]:
                if dest_application.get(item):
                    dest_application.pop(item)
            dest_application = self.__add_fields(application, dest_application)
            dest_application = self.__remove_fields(application, dest_application)
            if not Base.dry_run:
                self.log(f"Updating application '{dest_application['name']}' on destination.")
                self.destination_instance.update_application(dest_application)
                self.__applications_needing_updates.append(application_id)
                self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
            else:
                self.add_to_diff_log(application["name"], "updated")
        self.log(f"Checking for changes in workflow for application '{application['name']}'")
        Workflows().sync_workflow(application_name=application["name"])
    else:
        self.log(f"Skipping application '{application['name']}' since it is excluded.")

Assets (Base)

Used to sync assets from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync (create) all assets from a source instance to a destination instance

Source code in aqueduct/components/assets.py
def sync(self):
    """This method is used to sync (create) all assets from a source instance to a destination instance"""
    self.log(f"Attempting to sync assets from '{self.source_host}' to '{self.dest_host}'")
    self.__set_dest_assets()
    for asset in self.source_instance.get_assets():
        self.sync_asset(asset=asset)

sync_asset(self, asset)

This method will create (add) a single asset from a source instance to a destination instance.

Currently we are only adding assets and NOT updating them but this functionality may expand in the future.

Parameters:

Name Type Description Default
asset dict

A single Swimlane asset dictionary from the Swimlane API

required
Source code in aqueduct/components/assets.py
def sync_asset(self, asset: Asset):
    """This method will create (add) a single asset from a source instance to a destination instance.

    Currently we are only adding assets and NOT updating them but this functionality may expand in the future.

    Args:
        asset (dict): A single Swimlane asset dictionary from the Swimlane API
    """
    if not self._is_in_include_exclude_lists(asset.name, "assets"):
        self.log(f"Processing asset '{asset.name}'.")
        self.__set_dest_assets()
        if asset.name not in self.dest_assets:
            self.log(f"Asset '{asset.name}' was not found on destination.")
            if not Base.dry_run:
                dest_asset = self.destination_instance.add_asset(asset)
                if not dest_asset:
                    raise AddComponentError(model=asset, name=asset.name)
            else:
                self.add_to_diff_log(asset.name, "added")
            self.log(f"Asset '{asset.name}' was successfully added to destination.")
        else:
            self.log(f"Asset '{asset.name}' already exists on destination. Skipping")

Groups (Base)

Used to sync groups from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync all groups from a source instance to a destination instance

Source code in aqueduct/components/groups.py
def sync(self):
    """This method is used to sync all groups from a source instance to a destination instance"""
    self.log(f"Attempting to sync groups from '{self.source_host}' to '{self.dest_host}'")
    groups = self.source_instance.get_groups()
    if groups:
        for group in groups:
            self.sync_group(group=group)

sync_group(self, group)

This class syncs a single source instance group to a destination instance.

We begin by processing the provided group and ensuring that all roles and users associated with the provided group are added to the destination instance.

Once that is complete, we then sync any nested groups within the provided source instance group.

If the provided group is already on the destination instance, then we just skip processing but if the provided group is not on the destination instance we add it.

Parameters:

Name Type Description Default
group Group

A source instance Group data model object.

required
Source code in aqueduct/components/groups.py
def sync_group(self, group: Group):
    """This class syncs a single source instance group to a destination instance.

    We begin by processing the provided group and ensuring that all roles and users
    associated with the provided group are added to the destination instance.

    Once that is complete, we then sync any nested groups within the provided source instance group.

    If the provided group is already on the destination instance, then we just skip processing but if
    the provided group is not on the destination instance we add it.

    Args:
        group (Group): A source instance Group data model object.
    """
    if not self._is_in_include_exclude_lists(group.name, "groups") and group.name not in self.group_exclusions:
        self.log(f"Processing group '{group.name}' ({group.id})")
        group = self.__process_group(group=group)
        if group.groups:
            group_list = []
            for group_ in group.groups:
                group_list.append(self.__process_group(group=group_))
            group.groups = group_list

        dest_group = self.__get_destination_group(group=group)

        if not dest_group:
            if not Base.dry_run:
                self.log(f"Creating new group '{group.name}' on destination.")
                dest_group = self.destination_instance.add_group(group)
                if not dest_group:
                    raise AddComponentError(model=group, name=group.name)
                self.log(f"Successfully added new group '{group.name}' to destination.")
            else:
                self.add_to_diff_log(group.name, "added")
        else:
            self.log(f"Group '{group.name}' already exists on destination.")

KeyStore (Base)

Used to sync keystore items from a source instance to a destination instance of Swimlane

sync(self)

Syncing of the keystore will only create new entries into the destination systems keystore. The creation of these items does not transfer or implement the transfer of the value since these are encrypted within Swimlane themselves.

Once items are created in the keystore on the destination system then you must manually enter the value for that keystore item.

Source code in aqueduct/components/keystore.py
def sync(self):
    """Syncing of the keystore will only create new entries into the destination systems keystore.
    The creation of these items does not transfer or implement the transfer of the value since
    these are encrypted within Swimlane themselves.

    Once items are created in the keystore on the destination system then you must manually enter
    the value for that keystore item.
    """
    self.log(f"Attempting to sync keystore from '{self.source_host}' to '{self.dest_host}'")
    credentials = self.source_instance.get_credentials()
    if credentials:
        credentials.pop("$type")
        for key, val in credentials.items():
            if not self._is_in_include_exclude_lists(key, "keystore"):
                self.log(f"Processing '{key}' credential")
                if not self.destination_instance.get_credential(key):
                    self.log(f"Adding Keystore item '{key}' to destination.")
                    credential_ = self.source_instance.get_credential(key)
                    if not Base.dry_run:
                        self.destination_instance.add_credential(credential_)
                        self.log(f"Successfully added new keystore item '{key}'")
                    else:
                        self.add_to_diff_log(key, "added")
                else:
                    self.log(f"Keystore item '{key}' exists on destination. Skipping....")
            else:
                self.log(f"Skipping keystore item '{key}' since it is not included.")
    else:
        self.log("Unable to find any keystore values on the source Swimlane instance. Skipping....")

Plugins (Base)

Used to sync plugins from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync all plugins from a source instance to a destination instance

Source code in aqueduct/components/plugins.py
def sync(self):
    """This method is used to sync all plugins from a source instance to a destination instance"""
    self.log(f"Attempting to sync plugins from '{self.source_host}' to '{self.dest_host}'")
    self.dest_plugin_dict = self.destination_instance.plugin_dict
    plugin_dict = self.source_instance.plugin_dict
    if plugin_dict:
        for name, id in plugin_dict.items():
            self.sync_plugin(name=name, id=id)

sync_plugin(self, name, id)

This class syncs a single Swimlane plugin based on the provided name and ID.

We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.

If it is, then we download the plugin from the source instance. If we are successful, we then add it to the destination instance.

If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the plugin from both the source and destination instances. We compare their versions and if the source has a version greater than the destination we then add it to the destination instance by upgrading the plugin.

Parameters:

Name Type Description Default
name str

The name of a Swimlane plugin

required
id str

The internal ID of a Swimlane plugin

required
Source code in aqueduct/components/plugins.py
def sync_plugin(self, name: str, id: str):
    """This class syncs a single Swimlane plugin based on the provided name and ID.

    We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.

    If it is, then we download the plugin from the source instance. If we are successful, we
    then add it to the destination instance.

    If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the
    plugin from both the source and destination instances. We compare their versions and if the source has
    a version greater than the destination we then add it to the destination instance by upgrading the plugin.

    Args:
        name (str): The name of a Swimlane plugin
        id (str): The internal ID of a Swimlane plugin
    """
    if not self._is_in_include_exclude_lists(name, "plugins"):
        self.log(f"Processing '{name}' plugin with id '{id}'")
        if not self.dest_plugin_dict.get(name):
            plugin = self.__download_plugin(name=name, id=id)
            if plugin:
                if not Base.dry_run:
                    try:
                        self.log(f"Uploading plugin '{name}' to destination.")
                        resp = self.destination_instance.upload_plugin(name, plugin)
                        self.log(f"Successfully uploaded plugin '{name}' to destination.")
                    except Exception as e:
                        self.log(
                            f"Failed to upload plugin '{name}' to destination '{self.dest_host}'",
                            level="warning",
                        )
                else:
                    self.add_to_diff_log(name, "added")
        else:
            self.log(
                f"Plugin '{name}' already exists on destination '{self.dest_host}'. Checking for differences...."
            )
            dest_plugin = self.destination_instance.get_plugin(name=name)
            if not dest_plugin:
                raise GetComponentError(type="Plugin", name=name)
            source_plugin = self.source_instance.get_plugin(name=name)
            if not source_plugin:
                raise GetComponentError(type="Plugin", name=name)
            if version.parse(source_plugin.version) > version.parse(dest_plugin.version):
                plugin = self.__download_plugin(name=name, id=id)
                if plugin:
                    if not Base.dry_run:
                        self.log(f"Upgrading plugin '{name}' on destination.")
                        self.destination_instance.upgrade_plugin(filename=name, stream=plugin)
                        self.log(f"Successfully upgraded plugin '{name}' on destination.")
                    else:
                        self.add_to_diff_log(name, "upgraded")
            else:
                self.log("Source and destination have the same version plugin. Skipping...")

Roles (Base)

Used to sync roles from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync all roles from a source instance to a destination instance

Source code in aqueduct/components/roles.py
def sync(self):
    """This method is used to sync all roles from a source instance to a destination instance"""
    self.log(f"Attempting to sync roles from '{self.source_host}' to '{self.dest_host}'")
    roles = self.source_instance.get_roles()
    if roles:
        for role in roles:
            self.sync_role(role=role)

Tasks (Base)

Used to sync tasks from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync all tasks from a source instance to a destination instance

Source code in aqueduct/components/tasks.py
def sync(self):
    """This method is used to sync all tasks from a source instance to a destination instance"""
    self.log(f"Starting to sync tasks from '{self.source_host}' to '{self.dest_host}'")
    failed_task_list = []
    tasks = self.source_instance.get_tasks()
    if tasks:
        for task in tasks:
            resp = self.sync_task(task=task)
            if resp:
                failed_task_list.append(resp)

    self.log(f"The following tasks failed to process: {[x.name for x in failed_task_list]}")
    if failed_task_list:
        count = 1
        self.log("Retrying failed task migration from host to destination.")
        while count < 3:
            for task_ in failed_task_list:
                self.log(f"Attempt {count}: Processing task '{task_.name}'.")
                self.log(f"Creating task '{task_.name}' on destination.")
                try:
                    dest_task = self.destination_instance.add_task(task_)
                    self.log(f"Successfully added task '{task_.name}' to destination.")
                    failed_task_list.remove(task_)
                except Exception as e:
                    self.log(
                        f"Failed to add task '{task_.name}' to destination.",
                        level="warning",
                    )
            count += 1

        if len(failed_task_list) > 0:
            self.log(
                f"The following task were unable to be added to destination! {[x.name for x in failed_task_list]}",
                level="critical",
            )
            if not self.continue_on_error:
                task_names = [x.name for x in failed_task_list]
                raise Exception(f"The following task were unable to be added to destination! {task_names}")

sync_task(self, task)

This method syncs a single task from a source Swimlane instance to a destination instance.

Using the provided task dictionary from Swimlane source instance we first get the actual task object from the source.

Next, we attempt to retrieve the task from the destination system. If the task does not exist on the destination instance, we add it. If it does exist, we check if the uid and the version are the same. If they are the same we skip updating the task. If they are different, we update the task on the destination instance.

Parameters:

Name Type Description Default
task dict

A Swimlane task object from a source system.

required

Returns:

Type Description
Task or TaskLight

If we failed to add a task we return it so we can try again. Only if called using the sync method.

Source code in aqueduct/components/tasks.py
def sync_task(self, task: Task or TaskLight):
    """This method syncs a single task from a source Swimlane instance to a destination instance.

    Using the provided task dictionary from Swimlane source instance we first get the actual task
    object from the source.

    Next, we attempt to retrieve the task from the destination system. If the task does not exist
    on the destination instance, we add it. If it does exist, we check if the `uid` and the `version` are the same.
    If they are the same we skip updating the task. If they are different, we update the task on the destination
    instance.

    Args:
        task (dict): A Swimlane task object from a source system.

    Returns:
        Task or TaskLight: If we failed to add a task we return it so we can try again.
                           Only if called using the sync method.
    """
    if not self._is_in_include_exclude_lists(task.name, "tasks"):
        self.log(f"Processing task '{task.name}'.")
        task_ = self.source_instance.get_task(task.id)
        if not task_:
            raise GetComponentError(type="task", name=task.name, id="")
        task_ = self.update_task_plugin_ids(task=task_)
        dest_task = self.destination_instance.get_task(task_.id)
        if not dest_task:
            if not Base.dry_run:
                self.log(f"Creating task '{task_.name}' on destination.")
                try:
                    # checking tasks for inputs and outputs mapped to source application
                    # tracking-ids and replacing them with the new ones
                    task_ = self._check_tasks_using_tracking_id(task=task_)
                    dest_task = self.destination_instance.add_task(task_)
                    self.log(f"Successfully added task '{task_.name}' to destination.")
                except Exception as e:
                    self.log(
                        f"Failed to add task '{task_.name}' to destination.",
                        level="warning",
                    )
                    self.log("Will attempt again before finalizing.")
                    return task_
            else:
                self.add_to_diff_log(task_.name, "added")
        else:
            self.log(f"Task '{task_.name}' already exists on destination.")
            if task_.uid == dest_task.uid:
                if task_.version == dest_task.version:
                    self.log(f"Task '{task_.name}' has not changed on destination. Skipping...")
                else:
                    if not Base.dry_run:
                        if task_.name in self._processed_task_list:
                            self.log(f"Task '{task_.name}' already updated by applications component. Skipping...")
                        else:
                            self.log(f"Task '{task_.name}' has changed. Updating...")
                            try:
                                dest_task = self.destination_instance.update_task(dest_task.id, task_)
                                if not dest_task:
                                    raise UpdateComponentError(model=task_, name=task_.name)
                                self.log(f"Successfully updated task '{task_.name}' on destination.")
                            except Exception as e:
                                raise UpdateComponentError(model=task_)
                    else:
                        self.add_to_diff_log(task_.name, "updated")

Users (Base)

Used to sync users from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync all users from a source instance to a destination instance

Source code in aqueduct/components/users.py
def sync(self):
    """This method is used to sync all users from a source instance to a destination instance"""
    self.log(f"Attempting to sync users from '{self.source_host}' to '{self.dest_host}'")
    users = self.source_instance.get_users()
    if users:
        for user in users:
            self.sync_user(user_id=user.id)

Workspaces (Base)

Used to sync workspaces from a source instance to a destination instance of Swimlane

sync(self)

This method is used to sync all workspaces from a source instance to a destination instance

Source code in aqueduct/components/workspaces.py
def sync(self):
    """This method is used to sync all workspaces from a source instance to a destination instance"""
    self.log(f"Starting to sync workspaces from '{self.source_host}' to '{self.dest_host}'")
    workspaces = self.source_instance.get_workspaces()
    if workspaces:
        for workspace in workspaces:
            self.sync_workspace(workspace=workspace)