Aqueduct
Aqueduct (Base)
Aqueduct is used to migrate content from one Swimlane instance to another.
__init__(self, source, destination, dry_run=True, offline=False, update_reports=False, update_dashboards=False, continue_on_error=False, use_unsupported_version=False, force_unsupported_version=False)
special
To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to migrate. The other is the destination of where the content will be migrated to.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
SwimlaneInstance |
The source Swimlane instance. Typically this is considered a development instance. |
required |
destination |
SwimlaneInstance |
The destination Swimlane instance. Typically this is considered a production instance. |
required |
dry_run |
bool |
Whether or not this run of aqueduct will transfer content to the destination instance. Set to False to migrate content. Default is True. |
True |
offline |
bool |
Whether or not the destination instance has access to the internet for the purpose of downloading Python packages. Default is False |
False |
update_reports |
bool |
Whether or not to force update reports if changes are detected. Default is False. |
False |
update_dashboards |
bool |
(bool): Whether or not to force update dashboards if changes are detected. Default is False. |
False |
continue_on_error |
bool |
(bool): Whether or not to continue when an API error occurs. Default is False. |
False |
use_unsupported_version |
bool |
(bool): Whether or not to transfer content from one version Swimlane to a different version of Swimlane. Use this at your own risk. Default is False. |
False |
force_unsupported_version |
bool |
(bool): Whether or not to force usage of an unsupported version. This is used when running aqueduct headless/non-interactive. Default is False. |
False |
Source code in aqueduct/aqueduct.py
def __init__(
self,
source: SwimlaneInstance,
destination: SwimlaneInstance,
dry_run: bool = True,
offline: bool = False,
update_reports: bool = False,
update_dashboards: bool = False,
continue_on_error: bool = False,
use_unsupported_version: bool = False,
force_unsupported_version: bool = False,
):
"""To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to
migrate. The other is the destination of where the content will be migrated to.
Args:
source (SwimlaneInstance): The source Swimlane instance. Typically this is considered a
development instance.
destination (SwimlaneInstance): The destination Swimlane instance. Typically this is considered a production
instance.
dry_run (bool): Whether or not this run of aqueduct will transfer content to the destination instance.
Set to False to migrate content. Default is True.
offline (bool): Whether or not the destination instance has access to the internet for the purpose of
downloading Python packages. Default is False
update_reports (bool): Whether or not to force update reports if changes are detected. Default is False.
update_dashboards: (bool): Whether or not to force update dashboards if changes are detected.
Default is False.
continue_on_error: (bool): Whether or not to continue when an API error occurs. Default is False.
use_unsupported_version: (bool): Whether or not to transfer content from one version Swimlane to a different
version of Swimlane. Use this at your own risk. Default is False.
force_unsupported_version: (bool): Whether or not to force usage of an unsupported version. This is used
when running aqueduct headless/non-interactive. Default is False.
"""
if use_unsupported_version and source.swimlane.product_version != destination.swimlane.product_version:
self.log(
f"You specified you want to transfer of content from version {source.swimlane.product_version} to "
f"{destination.swimlane.product_version}. This is not officially supported. "
"Please use at your own risk.",
level="info",
)
if not force_unsupported_version and not self.__want_to_continue():
sys.exit()
elif source.swimlane.product_version != destination.swimlane.product_version:
raise UnsupportedSwimlaneVersion(source=source, destination=destination)
Base.source_instance = source
Base.destination_instance = destination
Base.dry_run = dry_run
Base.source_host = Base.source_instance.swimlane.host
Base.dest_host = Base.destination_instance.swimlane.host
Base.offline = offline
Base.update_reports = update_reports
Base.update_dashboards = update_dashboards
Base.continue_on_error = continue_on_error
sync(self, components=OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)]), exclude={}, include={})
The main method to begin syncing components from one Swimlane instance to another.
There are several available components you can specify. The order of these components if forced. The defaults are:
- keystore
- packages
- plugins
- assets
- workspaces
- applications (we update workflows here)
- tasks
- reports
- dashboards
- users
- groups
- roles
You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so provide a dictionary for each argument (include or exclude). For example:
exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.} include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}
aq.sync(include=include, exclude=exclude)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
components |
list |
A list of one or more components to sync. Defaults to COMPONENTS. |
OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)]) |
Source code in aqueduct/aqueduct.py
def sync(self, components=COMPONENTS, exclude: dict = {}, include: dict = {}):
"""The main method to begin syncing components from one Swimlane instance to another.
There are several available components you can specify. The order of these components if forced.
The defaults are:
* keystore
* packages
* plugins
* assets
* workspaces
* applications (we update workflows here)
* tasks
* reports
* dashboards
* users
* groups
* roles
You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so
provide a dictionary for each argument (include or exclude). For example:
exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.}
include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}
aq.sync(include=include, exclude=exclude)
Args:
components (list, optional): A list of one or more components to sync. Defaults to COMPONENTS.
"""
Base.include = include
Base.exclude = exclude
for component in self.__sort_sync_order(components):
if COMPONENTS.get(component):
getattr(COMPONENTS[component](), "sync")()
if Base.dry_run:
return self._get_formatted_diff_log()
print(b64decode(self.__LOGO).decode("ascii"))
Base
scrub(self, obj, bad_key='$type')
Used to remove a specific provided key from a dictionary that may contain both nested dictionaries and lists.
This method is recursive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
dict |
A dictionary or list to remove keys from. |
required |
bad_key |
str |
The bad key to remove from the provided dict or list. Defaults to "$type". |
'$type' |
Source code in aqueduct/base.py
def scrub(self, obj, bad_key="$type"):
"""Used to remove a specific provided key from a dictionary
that may contain both nested dictionaries and lists.
This method is recursive.
Args:
obj (dict): A dictionary or list to remove keys from.
bad_key (str, optional): The bad key to remove from the provided dict or list. Defaults to "$type".
"""
if isinstance(obj, dict):
for key in list(obj.keys()):
if key == bad_key:
del obj[key]
else:
self.scrub(obj[key], bad_key)
elif isinstance(obj, list):
for i in reversed(range(len(obj))):
if obj[i] == bad_key:
del obj[i]
else:
self.scrub(obj[i], bad_key)
else:
pass
SwimlaneInstance
Creates a connection to a single Swimlane instance
add_application(self, application)
Adds an application based on the provided dictionary object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A application dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane application object |
Source code in aqueduct/instance.py
@log_exception
def add_application(self, application):
"""Adds an application based on the provided dictionary object
Args:
application (dict): A application dictionary object.
Returns:
dict: A Swimlane application object
"""
return self._make_request("POST", "/app", json=application)
add_asset(self, asset)
Adds a provided Asset object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset |
Asset |
An |
required |
Returns:
Type | Description |
---|---|
Asset |
An |
Source code in aqueduct/instance.py
@log_exception
def add_asset(self, asset: Asset):
"""Adds a provided Asset object to a Swimlane instance
Args:
asset (Asset): An `Asset` data model object
Returns:
Asset: An `Asset` data model object
"""
return Asset(**self._make_request("POST", "/asset", json=asdict(asset)))
add_credential(self, credential)
Used to add a keystore key name (but not it's value) to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
credential |
dict |
A keystore item to add to a Swimlane instance |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a keystore item |
Source code in aqueduct/instance.py
@log_exception
def add_credential(self, credential: dict):
"""Used to add a keystore key name (but not it's value) to a Swimlane instance
Args:
credential (dict): A keystore item to add to a Swimlane instance
Returns:
dict : A dictionary of a keystore item
"""
return self._make_request("POST", "/credentials", json=credential)
add_dashboard(self, dashboard)
Adds a provided Dashboard object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard |
Dashboard |
A |
required |
Returns:
Type | Description |
---|---|
Dashboard |
A |
Source code in aqueduct/instance.py
@log_exception
def add_dashboard(self, dashboard: Dashboard):
"""Adds a provided Dashboard object to a Swimlane instance
Args:
dashboard (Dashboard): A `Dashboard` data model object
Returns:
Dashboard: A `Dashboard` data model object
"""
return Dashboard(**self._make_request("POST", "/dashboard", json=asdict(dashboard)))
add_group(self, group)
Adds a provided Group object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
Group |
A |
required |
Returns:
Type | Description |
---|---|
Group |
A |
Source code in aqueduct/instance.py
@log_exception
def add_group(self, group: Group):
"""Adds a provided Group object to a Swimlane instance
Args:
group (Group): A `Group` data model object
Returns:
Group: A `Group` data model object
"""
try:
return Group(**self._make_request("POST", "/groups", json=asdict(group)))
except SwimlaneHTTP400Error as sh:
if sh.code == 1051:
return True
else:
raise sh
except Exception as e:
raise e
add_report(self, report)
Adds a provided Report
object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report |
Report |
A |
required |
Returns:
Type | Description |
---|---|
Report |
A |
Source code in aqueduct/instance.py
@log_exception
def add_report(self, report: Report):
"""Adds a provided `Report` object to a Swimlane instance
Args:
report (Report): A `Report` data model object
Returns:
Report: A `Report` data model object
"""
return Report(**self._make_request("POST", "/reports", json=asdict(report)))
add_role(self, role)
Adds a provided Role
object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
Role |
A |
required |
Returns:
Type | Description |
---|---|
Role |
A |
Source code in aqueduct/instance.py
@log_exception
def add_role(self, role: Role):
"""Adds a provided `Role` object to a Swimlane instance
Args:
role (Role): A `Role` data model object
Returns:
Role: A `Role` data model object
"""
return Role(**self._make_request("POST", "/roles", json=asdict(role)))
add_task(self, task)
Adds a provided Task object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
Task |
A |
required |
Returns:
Type | Description |
---|---|
Task |
A |
Source code in aqueduct/instance.py
def add_task(self, task: Task):
"""Adds a provided Task object to a Swimlane instance
Args:
task (Task): A `Task` data model object
Returns:
Task: A `Task` data model object
"""
try:
return Task(**self._make_request("POST", "/task", json=asdict(task)))
except Exception as e:
AddComponentError(model=task, name=task.name, reason=e.__dict__.get("argument"))
return False
add_user(self, user)
Adds a provided User object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
User |
A |
required |
Returns:
Type | Description |
---|---|
User |
A |
Source code in aqueduct/instance.py
@log_exception
def add_user(self, user: User):
"""Adds a provided User object to a Swimlane instance
Args:
user (User): A `User` data model object
Returns:
User: A `User` data model object
"""
return User(**self._make_request("POST", "/user", json=asdict(user)))
add_workflow(self, workflow)
Adds a Swimlane instance with the provided Workflow data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
A |
required |
Returns:
Type | Description |
---|---|
Workflow |
A |
Source code in aqueduct/instance.py
@log_exception
def add_workflow(self, workflow):
"""Adds a Swimlane instance with the provided Workflow data model object
Args:
workflow (Workflow): A `Workflow` data model object
Returns:
Workflow: A `Workflow` data model object
"""
try:
return self._make_request("POST", "/workflow/", json=workflow)
except Exception as e:
return False
add_workspace(self, workspace)
Adds a provided Workspace object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace |
Workspace |
A |
required |
Returns:
Type | Description |
---|---|
Workspace |
A |
Source code in aqueduct/instance.py
@log_exception
def add_workspace(self, workspace: Workspace):
"""Adds a provided Workspace object to a Swimlane instance
Args:
workspace (Workspace): A `Workspace` data model object
Returns:
Workspace: A `Workspace` data model object
"""
return Workspace(**self._make_request("POST", "/workspaces", json=asdict(workspace)))
download_plugin(self, file_id)
A Swimlane internal fileId for a plugin or Python package to download
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id |
str |
An internal Swimlane fileId to download a plugin or python package |
required |
Returns:
Type | Description |
---|---|
BytesIO |
A bytesIO object of the downloaded file |
Source code in aqueduct/instance.py
@log_exception
def download_plugin(self, file_id: str):
"""A Swimlane internal fileId for a plugin or Python package to download
Args:
file_id (str): An internal Swimlane fileId to download a plugin or python package
Returns:
BytesIO: A bytesIO object of the downloaded file
"""
stream = BytesIO()
response = self.swimlane.request("GET", f"attachment/download/{file_id}", stream=True)
for chunk in response.iter_content(1024):
stream.write(chunk)
stream.seek(0)
return stream
get_application(self, application_id)
Gets an application by a provided ID
If the application does not exist, then this method returns False.
If the application does exist it will return an JSON object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A Swimlane application ID |
required |
Returns:
Type | Description |
---|---|
json |
A Swimlane application JSON |
Source code in aqueduct/instance.py
@log_exception
def get_application(self, application_id):
"""Gets an application by a provided ID
If the application does not exist, then this method returns False.
If the application does exist it will return an JSON object
Args:
application_id (str): A Swimlane application ID
Returns:
json: A Swimlane application JSON
"""
try:
return self._make_request("GET", f"/app/{application_id}")
except Exception as e:
return False
get_applications(self)
Retrieves a list of applications for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of json objects |
Source code in aqueduct/instance.py
@log_exception
def get_applications(self):
"""Retrieves a list of applications for a Swimlane instance.
Returns:
list: A list of json objects
"""
return self._make_request("GET", "/app")
get_applications_light(self)
Gets light version of all applications
Returns:
Type | Description |
---|---|
list |
A list of application light objects |
Source code in aqueduct/instance.py
@log_exception
def get_applications_light(self):
"""Gets light version of all applications
Returns:
list: A list of application light objects
"""
return self._make_request("GET", "/app/light")
get_asset(self, asset_id)
Gets an asset by a provided ID
If the asset does not exist, then this method returns False.
If the asset does exist it will return an Asset
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset_id |
str |
An Swimlane asset ID |
required |
Returns:
Type | Description |
---|---|
Asset |
An |
Source code in aqueduct/instance.py
@log_exception
def get_asset(self, asset_id: str):
"""Gets an asset by a provided ID
If the asset does not exist, then this method returns False.
If the asset does exist it will return an `Asset` object
Args:
asset_id (str): An Swimlane asset ID
Returns:
Asset: An `Asset` data model object
"""
try:
return Asset(**self._make_request("GET", f"/asset/{asset_id}"))
except Exception as e:
return False
get_assets(self)
Retrieves a list of assets for a Swimlane instance.
These assets are modeled after the Asset
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_assets(self):
"""Retrieves a list of assets for a Swimlane instance.
These assets are modeled after the `Asset` data model
Returns:
list: A list of `Asset` objects
"""
return_list = []
assets = self._make_request("GET", "/asset")
if assets and isinstance(assets, list):
for asset in assets:
return_list.append(Asset(**asset))
return return_list
get_credential(self, name)
Retrieves a keystore item if it exists on the desired instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of a keystore item |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a keystore item |
Source code in aqueduct/instance.py
@log_exception
def get_credential(self, name: str):
"""Retrieves a keystore item if it exists on the desired instance
Args:
name (str): Name of a keystore item
Returns:
dict : A dictionary of a keystore item
"""
try:
return self._make_request("GET", f"/credentials/{name}")
except Exception as e:
return False
get_credentials(self)
Used to retrieve keystore items
Returns:
Type | Description |
---|---|
dict |
A dictionary of keystore keys and encrypted values |
Source code in aqueduct/instance.py
@log_exception
def get_credentials(self):
"""Used to retrieve keystore items
Returns:
dict : A dictionary of keystore keys and encrypted values
"""
return self._make_request("GET", "/credentials")
get_dashboard(self, dashboard_id)
Gets an dashboard by a provided ID
If the dashboard does not exist, then this method returns False.
If the dashboard does exist it will return an Dashboard
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard_id |
str |
A Swimlane dashboard ID |
required |
Returns:
Type | Description |
---|---|
Dashboard |
A |
Source code in aqueduct/instance.py
@log_exception
def get_dashboard(self, dashboard_id: str):
"""Gets an dashboard by a provided ID
If the dashboard does not exist, then this method returns False.
If the dashboard does exist it will return an `Dashboard` object
Args:
dashboard_id (str): A Swimlane dashboard ID
Returns:
Dashboard: A `Dashboard` data model object
"""
try:
return Dashboard(**self._make_request("GET", f"/dashboard/{dashboard_id}"))
except Exception as e:
return False
get_dashboards(self)
Retrieves a list of dashboards for a Swimlane instance.
These dashboards are modeled after the Dashboard
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_dashboards(self):
"""Retrieves a list of dashboards for a Swimlane instance.
These dashboards are modeled after the `Dashboard` data model
Returns:
list: A list of `Dashboard` objects
"""
return_list = []
dashboards = self._make_request("GET", "/dashboard")
if dashboards and isinstance(dashboards, list):
for dashboard in dashboards:
return_list.append(Dashboard(**dashboard))
return return_list
get_default_report_by_application_id(self, application_id)
Gets the default report for an application based on the provided ID
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A application dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane application object |
Source code in aqueduct/instance.py
@log_exception
def get_default_report_by_application_id(self, application_id):
"""Gets the default report for an application based on the provided ID
Args:
application_id (str): A application dictionary object.
Returns:
dict: A Swimlane application object
"""
try:
return Report(**self._make_request("GET", f"/reports/app/{application_id}/default"))
except Exception as e:
return False
get_group_by_id(self, group_id)
Gets an group by a provided id
If the group does not exist, then this method returns False.
If the group does exist it will return an Group
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
str |
A Swimlane group ID |
required |
Returns:
Type | Description |
---|---|
Group |
A |
Source code in aqueduct/instance.py
@log_exception
def get_group_by_id(self, group_id: str):
"""Gets an group by a provided id
If the group does not exist, then this method returns False.
If the group does exist it will return an `Group` object
Args:
group_id (str): A Swimlane group ID
Returns:
Group: A `Group` data model object
"""
try:
return Group(**self._make_request("GET", f"/groups/{group_id}"))
except Exception as e:
return False
get_group_by_name(self, group_name)
Gets an group by a provided name
If the group does not exist, then this method returns False.
If the group does exist it will return an Group
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_name |
str |
A Swimlane group name |
required |
Returns:
Type | Description |
---|---|
Group |
A |
Source code in aqueduct/instance.py
@log_exception
def get_group_by_name(self, group_name: str):
"""Gets an group by a provided name
If the group does not exist, then this method returns False.
If the group does exist it will return an `Group` object
Args:
group_name (str): A Swimlane group name
Returns:
Group: A `Group` data model object
"""
try:
resp = self._make_request("GET", f"/users/lookup?name={group_name}")
if resp and isinstance(resp, list):
return Group(**resp[0])
except Exception as e:
return False
get_groups(self)
Retrieves a list of groups for a Swimlane instance.
These groups are modeled after the Group
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_groups(self):
"""Retrieves a list of groups for a Swimlane instance.
These groups are modeled after the `Group` data model
Returns:
list: A list of `Group` objects
"""
return_list = []
groups = self._make_request("GET", "/groups")
if groups and groups.get("items"):
for item in groups["items"]:
if item:
return_list.append(Group(**item))
elif groups and groups.get("groups"):
for item in groups["groups"]:
if item:
return_list.append(Group(**item))
return return_list
get_pip_packages(self, versions=['Python2_7', 'Python3_6', 'Python3'])
Retrieves a list of pip packages for a Swimlane instance.
These packages are modeled after the Package
data model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
versions |
list |
A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3']. |
['Python2_7', 'Python3_6', 'Python3'] |
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_pip_packages(self, versions=["Python2_7", "Python3_6", "Python3"]):
"""Retrieves a list of pip packages for a Swimlane instance.
These packages are modeled after the `Package` data model
Args:
versions (list, optional): A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3'].
Returns:
list: A list of `Package` objects
"""
return_list = []
for version in versions:
try:
resp = self._make_request("GET", f"/pip/packages/{version}")
if resp:
for item in resp:
return_list.append(Package(**item))
except Exception as e:
continue
return return_list
get_plugin(self, name)
Gets an plugin by a provided plugin name
If the plugin does not exist, then this method returns False.
If the plugin does exist it will return an Plugin
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
A plugin name |
required |
Returns:
Type | Description |
---|---|
Plugin |
A |
Source code in aqueduct/instance.py
@log_exception
def get_plugin(self, name: str):
"""Gets an plugin by a provided plugin name
If the plugin does not exist, then this method returns False.
If the plugin does exist it will return an `Plugin` object
Args:
name (str): A plugin name
Returns:
Plugin: A `Plugin` data model object
"""
try:
return Plugin(**self._make_request("GET", f"/task/packages/{name}"))
except Exception as e:
return False
get_plugins(self)
Retrieves a list of plugins for a Swimlane instance.
These plugins are modeled after the PluginLight
data model
Returns:
Type | Description |
---|---|
list |
A list of PluginLight objects |
Source code in aqueduct/instance.py
@log_exception
def get_plugins(self):
"""Retrieves a list of plugins for a Swimlane instance.
These plugins are modeled after the `PluginLight` data model
Returns:
list: A list of PluginLight objects
"""
return_list = []
plugins = self._make_request("GET", "/task/packages")
if plugins and isinstance(plugins, list):
for item in plugins:
if item:
return_list.append(PluginLight(**item))
return return_list
get_report(self, report_id)
Gets an report by a provided ID
If the report does not exist, then this method returns False.
If the report does exist it will return an Report
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report_id |
str |
A Swimlane report ID |
required |
Returns:
Type | Description |
---|---|
Report |
A |
Source code in aqueduct/instance.py
@log_exception
def get_report(self, report_id: str):
"""Gets an report by a provided ID
If the report does not exist, then this method returns False.
If the report does exist it will return an `Report` object
Args:
report_id (str): A Swimlane report ID
Returns:
Report: A `Report` data model object
"""
try:
return Report(**self._make_request("GET", f"/reports/{report_id}"))
except Exception as e:
return False
get_reports(self)
Retrieves a list of reports for a Swimlane instance.
These reports are modeled after the Report
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_reports(self):
"""Retrieves a list of reports for a Swimlane instance.
These reports are modeled after the `Report` data model
Returns:
list: A list of `Report` objects
"""
return_list = []
reports = self._make_request("GET", "/reports")
if reports:
for report in reports:
return_list.append(Report(**report))
return return_list
get_role(self, role_id)
Gets an role by a provided ID
If the role does not exist, then this method returns False.
If the role does exist it will return an Role
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_id |
str |
A Swimlane role ID |
required |
Returns:
Type | Description |
---|---|
Role |
A |
Source code in aqueduct/instance.py
@log_exception
def get_role(self, role_id: str):
"""Gets an role by a provided ID
If the role does not exist, then this method returns False.
If the role does exist it will return an `Role` object
Args:
role_id (str): A Swimlane role ID
Returns:
Role: A `Role` data model object
"""
try:
return Role(**self._make_request("GET", f"/roles/{role_id}"))
except Exception as e:
return False
get_role_by_name(self, role_name)
Gets an role by a provided name
If the role does not exist, then this method returns False.
If the role does exist it will return an Role
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name |
str |
A Swimlane role name |
required |
Returns:
Type | Description |
---|---|
Role |
A |
Source code in aqueduct/instance.py
@log_exception
def get_role_by_name(self, role_name: str):
"""Gets an role by a provided name
If the role does not exist, then this method returns False.
If the role does exist it will return an `Role` object
Args:
role_name (str): A Swimlane role name
Returns:
Role: A `Role` data model object
"""
try:
return Role(**self._make_request("GET", f"/roles/?searchFieldName=name&searchValue={role_name}"))
except Exception as e:
return False
get_roles(self)
Retrieves a list of roles for a Swimlane instance.
These roles are modeled after the Role
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_roles(self):
"""Retrieves a list of roles for a Swimlane instance.
These roles are modeled after the `Role` data model
Returns:
list: A list of `Role` objects
"""
return_list = []
roles = self._make_request("GET", "/roles")
if roles and isinstance(roles, dict):
# checking for items here. 10.4.0 does not use the items key
# but greater versions do (e.g. 10.5>)
if roles.get("items"):
roles = roles["items"]
if roles and isinstance(roles, list):
for role in roles:
return_list.append(Role(**role))
return return_list
get_task(self, task_id)
Gets an task by a provided task ID.
If the task ID does not exist, then this method returns False.
If the task ID does exist it will return an Task
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_id |
str |
A task ID to retrieve the entire |
required |
Returns:
Type | Description |
---|---|
Task |
A |
Source code in aqueduct/instance.py
@log_exception
def get_task(self, task_id: str):
"""Gets an task by a provided task ID.
If the task ID does not exist, then this method returns False.
If the task ID does exist it will return an `Task` object
Args:
task_id (str): A task ID to retrieve the entire `Task` object
Returns:
Task: A `Task` data model object
"""
try:
return Task(**self._make_request("GET", f"/task/{task_id}"))
except TypeError as te:
from .utils.exceptions import ModelError
raise ModelError(err=te, name="Task")
except Exception as e:
return False
get_tasks(self)
Retrieves a list of tasks for a Swimlane instance.
These tasks are modeled after the TaskLight
data model
Returns:
Type | Description |
---|---|
list |
A list of TaskLight objects |
Source code in aqueduct/instance.py
@log_exception
def get_tasks(self):
"""Retrieves a list of tasks for a Swimlane instance.
These tasks are modeled after the `TaskLight` data model
Returns:
list: A list of TaskLight objects
"""
return_list = []
tasks = self._make_request("GET", "/task/list")
if tasks and tasks.get("tasks"):
for task in tasks["tasks"]:
return_list.append(TaskLight(**task))
return return_list
get_user(self, user_id)
Gets an user by a provided ID
If the user does not exist, then this method returns False.
If the user does exist it will return an User
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
A Swimlane user ID |
required |
Returns:
Type | Description |
---|---|
User |
A |
Source code in aqueduct/instance.py
@log_exception
def get_user(self, user_id: str):
"""Gets an user by a provided ID
If the user does not exist, then this method returns False.
If the user does exist it will return an `User` object
Args:
user_id (str): A Swimlane user ID
Returns:
User: A `User` data model object
"""
try:
return User(**self._make_request("GET", f"/user/{user_id}"))
except Exception as e:
return False
get_users(self)
Retrieves a list of users for a Swimlane instance.
These users are modeled after the User
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_users(self):
"""Retrieves a list of users for a Swimlane instance.
These users are modeled after the `User` data model
Returns:
list: A list of `User` objects
"""
user_list = []
users = self._make_request("GET", "/user/light")
if users:
for user in users:
user_list.append(UserLight(**user))
return user_list
get_workflow(self, application_id)
Gets an workflow by a provided application ID
If the workflow does not exist, then this method returns False.
If the workflow does exist it will return an Workflow
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A Swimlane application ID |
required |
Returns:
Type | Description |
---|---|
Workflow |
A |
Source code in aqueduct/instance.py
@log_exception
def get_workflow(self, application_id: str):
"""Gets an workflow by a provided application ID
If the workflow does not exist, then this method returns False.
If the workflow does exist it will return an `Workflow` object
Args:
application_id (str): A Swimlane application ID
Returns:
Workflow: A `Workflow` data model object
"""
try:
return self._make_request("GET", f"/workflow/{application_id}")
except Exception as e:
return False
get_workflows(self)
Retrieves a list of workflows for a Swimlane instance.
These workflow are modeled after the Workflow
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_workflows(self):
"""Retrieves a list of workflows for a Swimlane instance.
These workflow are modeled after the `Workflow` data model
Returns:
list: A list of `Workflow` objects
"""
return_list = []
workflows = self._make_request("GET", "/workflow/")
if workflows:
for workflow in workflows:
return_list.append(workflow)
return return_list
get_workspace(self, workspace_id)
Gets an workspace by a provided ID
If the workspace does not exist, then this method returns False.
If the workspace does exist it will return an Workspace
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
str |
A Swimlane workspace ID |
required |
Returns:
Type | Description |
---|---|
Workspace |
A |
Source code in aqueduct/instance.py
@log_exception
def get_workspace(self, workspace_id: str):
"""Gets an workspace by a provided ID
If the workspace does not exist, then this method returns False.
If the workspace does exist it will return an `Workspace` object
Args:
workspace_id (str): A Swimlane workspace ID
Returns:
Workspace: A `Workspace` data model object
"""
try:
return Workspace(**self._make_request("GET", f"/workspaces/{workspace_id}"))
except Exception as e:
return False
get_workspaces(self)
Retrieves a list of workspaces for a Swimlane instance.
These workspaces are modeled after the Workspace
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@log_exception
def get_workspaces(self):
"""Retrieves a list of workspaces for a Swimlane instance.
These workspaces are modeled after the `Workspace` data model
Returns:
list: A list of `Workspace` objects
"""
return_list = []
workspaces = self._make_request("GET", "/workspaces")
if workspaces:
for workspace in workspaces:
return_list.append(Workspace(**workspace))
return return_list
install_package(self, package)
Installs a Python (pip) package based on the provided Package data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
package |
Package |
A |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a Python pip package |
Source code in aqueduct/instance.py
@log_exception
def install_package(self, package: Package):
"""Installs a Python (pip) package based on the provided Package data model object
Args:
package (Package): A `Package` data model object
Returns:
dict: A dictionary of a Python pip package
"""
json = {
"name": package.name,
"version": package.version,
"pythonVersion": package.pythonVersion,
}
return self._make_request("POST", "/pip/packages", json=json)
install_package_offline(self, filename, stream, data)
Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
A filename string |
required |
stream |
BytesIO |
A BytesIO file stream |
required |
data |
dict |
A dictionary of additional request information |
required |
Returns:
Type | Description |
---|---|
json |
JSON Response from installing a Python package wheel file |
Source code in aqueduct/instance.py
@log_exception
def install_package_offline(self, filename, stream, data):
"""Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream
Args:
filename (str): A filename string
stream (BytesIO): A BytesIO file stream
data (dict): A dictionary of additional request information
Returns:
json: JSON Response from installing a Python package wheel file
"""
return self.swimlane.request(
"POST",
"/pip/packages/offline",
data=data,
files={"wheel": (filename, stream.read())},
timeout=120,
)
search_user(self, query_string)
Searches for a user by a query string
If the user does not exist, then this method returns False.
If the user does exist it will return an User
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query_string |
str |
A query string typically a display name |
required |
Returns:
Type | Description |
---|---|
User |
A |
Source code in aqueduct/instance.py
@log_exception
def search_user(self, query_string: str):
"""Searches for a user by a query string
If the user does not exist, then this method returns False.
If the user does exist it will return an `User` object
Args:
query_string (str): A query string typically a display name
Returns:
User: A `User` data model object
"""
try:
resp = self._make_request("GET", f"/user/lookup?name={query_string}")
if resp and isinstance(resp, list):
return UserLight(**resp[0])
except Exception as e:
return False
update_application(self, application)
Updates the application based on the provided dictionary object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A application dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane application object |
Source code in aqueduct/instance.py
@log_exception
def update_application(self, application):
"""Updates the application based on the provided dictionary object
Args:
application (dict): A application dictionary object.
Returns:
dict: A Swimlane application object
"""
return self._make_request("PUT", "/app", json=application)
update_asset(self, asset_id, asset)
Updates a provided asset ID with the data contained in the provided Asset data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset_id |
str |
An asset ID to update on a Swimlane instance |
required |
asset |
Asset |
An |
required |
Returns:
Type | Description |
---|---|
Asset |
An |
Source code in aqueduct/instance.py
@log_exception
def update_asset(self, asset_id: str, asset: Asset):
"""Updates a provided asset ID with the data contained in the provided Asset data model object
Args:
asset_id (str): An asset ID to update on a Swimlane instance
asset (Asset): An `Asset` data model object
Returns:
Asset: An `Asset` data model object
"""
return Asset(**self._make_request("PUT", f"/asset/{asset_id}", json=asdict(asset)))
update_dashboard(self, dashboard)
Updates a Swimlane instance dashboard based on the provided Dashboard data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard |
Dashboard |
A |
required |
Returns:
Type | Description |
---|---|
Dashboard |
A |
Source code in aqueduct/instance.py
@log_exception
def update_dashboard(self, dashboard: Dashboard):
"""Updates a Swimlane instance dashboard based on the provided Dashboard data model object
Args:
dashboard (Dashboard): A `Dashboard` data model object
Returns:
Dashboard: A `Dashboard` data model object
"""
return Dashboard(**self._make_request("PUT", f"/dashboard/{dashboard.id}", json=asdict(dashboard)))
update_default_report(self, report)
Updates a Swimlane applications default report with the data contained in the provided Report
data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report |
Report |
A |
required |
Returns:
Type | Description |
---|---|
Report |
A |
Source code in aqueduct/instance.py
@log_exception
def update_default_report(self, report: Report):
"""Updates a Swimlane applications default report with the data contained in the provided `Report` data model object
Args:
report (Report): A `Report` data model object
Returns:
Report: A `Report` data model object
"""
return Report(**self._make_request("PUT", f"/reports/{report.id}", json=asdict(report)))
update_group(self, group_id, group)
Updates a provided group ID with the data contained in the provided Group data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
str |
A group ID to update on a Swimlane instance |
required |
group |
Group |
A |
required |
Returns:
Type | Description |
---|---|
Group |
A |
Source code in aqueduct/instance.py
@log_exception
def update_group(self, group_id: str, group: Group):
"""Updates a provided group ID with the data contained in the provided Group data model object
Args:
group_id (str): A group ID to update on a Swimlane instance
group (Group): A `Group` data model object
Returns:
Group: A `Group` data model object
"""
return Group(**self._make_request("PUT", f"/groups/{group_id}", json=asdict(group)))
update_report(self, report_id, report)
Updates a provided report ID with the data contained in the provided Report data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report_id |
str |
A report ID to update on a Swimlane instance |
required |
report |
Report |
A |
required |
Returns:
Type | Description |
---|---|
Report |
A |
Source code in aqueduct/instance.py
@log_exception
def update_report(self, report_id: str, report: Report):
"""Updates a provided report ID with the data contained in the provided Report data model object
Args:
report_id (str): A report ID to update on a Swimlane instance
report (Report): A `Report` data model object
Returns:
Report: A `Report` data model object
"""
resp = self._make_request("PUT", f"/reports/{report_id}", json=asdict(report))
if resp:
return True
update_role(self, role_id, role)
Updates a provided role ID with the data contained in the provided Role data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_id |
str |
A role ID to update on a Swimlane instance |
required |
role |
Role |
A |
required |
Returns:
Type | Description |
---|---|
Role |
A |
Source code in aqueduct/instance.py
@log_exception
def update_role(self, role_id: str, role: Role):
"""Updates a provided role ID with the data contained in the provided Role data model object
Args:
role_id (str): A role ID to update on a Swimlane instance
role (Role): A `Role` data model object
Returns:
Role: A `Role` data model object
"""
return self._make_request("PUT", f"/roles/{role_id}", json=asdict(role))
update_task(self, task_id, task)
Updates a provided task ID with the data contained in the provided Task
data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_id |
str |
A task ID to update on a Swimlane instance |
required |
task |
Task |
A |
required |
Returns:
Type | Description |
---|---|
Task |
A |
Source code in aqueduct/instance.py
@log_exception
def update_task(self, task_id: str, task: Task):
"""Updates a provided task ID with the data contained in the provided `Task` data model object
Args:
task_id (str): A task ID to update on a Swimlane instance
task (Task): A `Task` data model object
Returns:
Task: A `Task` data model object
"""
return Task(**self._make_request("PUT", f"/task/{task_id}", json=asdict(task)))
update_user(self, user_id, user)
Updates a provided user ID with the data contained in the provided User data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
A user ID to update on a Swimlane instance |
required |
user |
User |
A |
required |
Returns:
Type | Description |
---|---|
User |
A |
Source code in aqueduct/instance.py
@log_exception
def update_user(self, user_id: str, user: User):
"""Updates a provided user ID with the data contained in the provided User data model object
Args:
user_id (str): A user ID to update on a Swimlane instance
user (User): A `User` data model object
Returns:
User: A `User` data model object
"""
return User(**self._make_request("PUT", f"/user/{user_id}", json=asdict(user)))
update_workflow(self, workflow)
Updates a Swimlane instance with the provided Workflow
data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
A |
required |
Returns:
Type | Description |
---|---|
Workflow |
A |
Source code in aqueduct/instance.py
@log_exception
def update_workflow(self, workflow):
"""Updates a Swimlane instance with the provided `Workflow` data model object
Args:
workflow (Workflow): A `Workflow` data model object
Returns:
Workflow: A `Workflow` data model object
"""
return self._make_request("PUT", f"/workflow/{workflow['id']}", json=workflow)
update_workspace(self, workspace_id, workspace)
Updates a provided workspace ID with the data contained in the provided Workspace data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
str |
A workspace ID to update on a Swimlane instance |
required |
workspace |
Workspace |
A |
required |
Returns:
Type | Description |
---|---|
Workspace |
A |
Source code in aqueduct/instance.py
@log_exception
def update_workspace(self, workspace_id: str, workspace: Workspace):
"""Updates a provided workspace ID with the data contained in the provided Workspace data model object
Args:
workspace_id (str): A workspace ID to update on a Swimlane instance
workspace (Workspace): A `Workspace` data model object
Returns:
Workspace: A `Workspace` data model object
"""
return Workspace(**self._make_request("PUT", f"/workspaces/{workspace_id}", json=asdict(workspace)))
upgrade_plugin(self, filename, stream)
Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
A filename string |
required |
stream |
BytesIO |
A BytesIO file stream |
required |
Returns:
Type | Description |
---|---|
json |
JSON Response from uploading a plugin |
Source code in aqueduct/instance.py
@log_exception
def upgrade_plugin(self, filename, stream):
"""Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream
Args:
filename (str): A filename string
stream (BytesIO): A BytesIO file stream
Returns:
json: JSON Response from uploading a plugin
"""
if not filename.endswith(".swimbundle"):
filename = filename.split(".")[0] + ".swimbundle"
return self.swimlane.request("POST", "/task/packages/upgrade", files={"file": (filename, stream.read())}).json()
upload_plugin(self, filename, stream)
Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
A filename string |
required |
stream |
BytesIO |
A BytesIO file stream |
required |
Returns:
Type | Description |
---|---|
json |
JSON Response from uploading a plugin |
Source code in aqueduct/instance.py
@log_exception
def upload_plugin(self, filename, stream):
"""Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream
Args:
filename (str): A filename string
stream (BytesIO): A BytesIO file stream
Returns:
json: JSON Response from uploading a plugin
"""
if not filename.endswith(".swimbundle"):
filename = filename.split(".")[0] + ".swimbundle"
return self.swimlane.request("POST", "/task/packages", files={"file": (filename, stream.read())}).json()
Applications (Base)
Used to sync applications from a source instance to a destination instance of Swimlane
_remove_tracking_field(self, application)
private
This method removes an applications tracking field since Swimlane automatically generates this for every new application.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A Swimlane application to remove the tracking field from |
required |
Source code in aqueduct/components/applications.py
def _remove_tracking_field(self, application: dict):
"""This method removes an applications tracking field since
Swimlane automatically generates this for every new application.
Args:
application (dict): A Swimlane application to remove the tracking field from
"""
return_dict = {}
for field in application.get("fields"):
if field.get("fieldType") and field["fieldType"].lower() == "tracking":
if not Base.dry_run:
application["fields"].remove(field)
return_dict = field
else:
self.add_to_diff_log(application["name"], "removed", subcomponent="tracking-field")
return return_dict
get_reference_app_order(self)
This method creates an order of applications to be added or updated on a destination instance based on most to least reference relationships. For example, a source application that references 5 applications will be before an application which has 3 references to applications.
Returns:
Type | Description |
---|---|
dict |
An reference application ordered (sorted) application dictionary |
Source code in aqueduct/components/applications.py
def get_reference_app_order(self):
"""This method creates an order of applications to be added or updated on a destination
instance based on most to least reference relationships. For example, a source application
that references 5 applications will be before an application which has 3 references to applications.
Returns:
dict: An reference application ordered (sorted) application dictionary
"""
reference_dict = {}
for application in self.source_instance.get_applications():
if application:
application_ = self.source_instance.get_application(application_id=application["id"])
if application_["id"] not in reference_dict:
reference_dict[application_["id"]] = []
for field in application_["fields"]:
if field.get("$type") and field["$type"] == "Core.Models.Fields.Reference.ReferenceField, Core":
reference_dict[application_["id"]].append(field["targetId"])
return_dict = OrderedDict()
for item in sorted(reference_dict, key=lambda k: len(reference_dict[k]), reverse=True):
return_dict[item] = reference_dict[item]
return return_dict
sync(self)
This method will sync all applications on a source instance with a destination instance.
Source code in aqueduct/components/applications.py
def sync(self):
"""This method will sync all applications on a source instance with a destination instance."""
self.log(f"Starting to sync 'Applications' from '{self.source_host}' to '{self.dest_host}'")
for application_id, values in self.get_reference_app_order().items():
self.sync_application(application_id=application_id)
# Checking if we need to update applications reference fields that contain old (source instance)
# tracking-ids in their columns. If so we remove them and update it with the new ones on the destination.
if self.__applications_needing_updates:
for application_id in self.__applications_needing_updates:
dest_application = self.destination_instance.get_application(application_id)
needs_update = False
for field in dest_application["fields"]:
if field.get("columns"):
column_list = []
for column in field["columns"]:
if self.tracking_id_map.get(column):
self.log(
f"Removing old tracking-id '{column}' from application "
f"'{dest_application['name']}' field '{field['key']}'"
)
column_list.append(self.tracking_id_map[column])
self.log(
f"Adding new tracking-id '{self.tracking_id_map[column]}' to application "
f"'{dest_application['name']}' field '{field['key']}'"
)
needs_update = True
else:
column_list.append(column)
field["columns"] = column_list
if needs_update:
self.log(f"Updating application '{dest_application['name']}' on destination with new.")
dest_application = self.destination_instance.update_application(dest_application)
self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
sync_application(self, application_id)
This method syncs a single application from a source instance to a destination instance.
Once an application_id on a source instance is provided we retrieve this application JSON. Next we remove the tracking_field from the application since Swimlane automatically generates a unique value for this field.
If workspaces are defined in the application and do not currently exist will attempt to create or update them using the Workspaces class.
If the source application does not exist on the destination, we will create it with the same IDs for 1. application 2. fields 3. layout 4. etc.
By doing this, syncing of applications (and their IDs) is much easier.
If the application exists, we proceed to remove specific fields that are not needed.
Next, we check for fields which have been added to the source application but do not exist on the destination instance. If fields are found we add them to the destination object.
Next, we check to see if the destination has fields which are not defined in the source instance. If fields are found, we then remove them from the layout view of the source application. This equates to moving them to the "hidden" field section within the application builder so they can still be retrieved and reorganized as needed.
Finally, we update the application on the destination instance.
After updating the application we then check to ensure that the workflow of that application is up to date and accurate.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A source application ID. |
required |
Source code in aqueduct/components/applications.py
def sync_application(self, application_id: str):
"""This method syncs a single application from a source instance to a destination instance.
Once an application_id on a source instance is provided we retrieve this application JSON.
Next we remove the tracking_field from the application since Swimlane automatically generates
a unique value for this field.
If workspaces are defined in the application and do not currently exist will attempt to create
or update them using the Workspaces class.
If the source application does not exist on the destination, we will create it with the same IDs for
1. application
2. fields
3. layout
4. etc.
By doing this, syncing of applications (and their IDs) is much easier.
If the application exists, we proceed to remove specific fields that are not needed.
Next, we check for fields which have been added to the source application but do not
exist on the destination instance. If fields are found we add them to the destination
object.
Next, we check to see if the destination has fields which are not defined in the source instance.
If fields are found, we then remove them from the layout view of the source application. This equates
to moving them to the "hidden" field section within the application builder so they can still be retrieved
and reorganized as needed.
Finally, we update the application on the destination instance.
After updating the application we then check to ensure that the workflow of that application is up to date
and accurate.
Args:
application_id (str): A source application ID.
"""
application = self.source_instance.get_application(application_id=application_id)
if not application:
raise GetComponentError(name="application", id=application_id)
self._process_workspaces(application=application)
source_tracking_field = self._remove_tracking_field(application)
self.scrub(application)
if not self._is_in_include_exclude_lists(application["name"], "applications"):
dest_application = self.destination_instance.get_application(application["id"])
if not dest_application:
if not Base.dry_run:
self.log(f"Adding application '{application['name']}' on destination.")
dest_application = self.destination_instance.add_application(application)
self.__applications_needing_updates.append(application_id)
self.log(f"Successfully added application '{application['name']}' on destination.")
# Setting tracking_id_map just in case tasks are added and need updating
# Tasks would need updating if they use an applications 'Tracking Id' field as an
# input or output since Swimlane randomizes the Tracking ID when created
self.tracking_id_map = (source_tracking_field["id"], dest_application["trackingFieldId"])
else:
self.add_to_diff_log(application["name"], "added")
else:
for item in ["createdByUser", "modifiedByUser", "permissions"]:
if dest_application.get(item):
dest_application.pop(item)
dest_application = self.__add_fields(application, dest_application)
dest_application = self.__remove_fields(application, dest_application)
if not Base.dry_run:
self.log(f"Updating application '{dest_application['name']}' on destination.")
self.destination_instance.update_application(dest_application)
self.__applications_needing_updates.append(application_id)
self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
else:
self.add_to_diff_log(application["name"], "updated")
self.log(f"Checking for changes in workflow for application '{application['name']}'")
Workflows().sync_workflow(application_name=application["name"])
else:
self.log(f"Skipping application '{application['name']}' since it is excluded.")
Assets (Base)
Used to sync assets from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync (create) all assets from a source instance to a destination instance
Source code in aqueduct/components/assets.py
def sync(self):
"""This method is used to sync (create) all assets from a source instance to a destination instance"""
self.log(f"Attempting to sync assets from '{self.source_host}' to '{self.dest_host}'")
self.__set_dest_assets()
for asset in self.source_instance.get_assets():
self.sync_asset(asset=asset)
sync_asset(self, asset)
This method will create (add) a single asset from a source instance to a destination instance.
Currently we are only adding assets and NOT updating them but this functionality may expand in the future.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset |
dict |
A single Swimlane asset dictionary from the Swimlane API |
required |
Source code in aqueduct/components/assets.py
def sync_asset(self, asset: Asset):
"""This method will create (add) a single asset from a source instance to a destination instance.
Currently we are only adding assets and NOT updating them but this functionality may expand in the future.
Args:
asset (dict): A single Swimlane asset dictionary from the Swimlane API
"""
if not self._is_in_include_exclude_lists(asset.name, "assets"):
self.log(f"Processing asset '{asset.name}'.")
self.__set_dest_assets()
if asset.name not in self.dest_assets:
self.log(f"Asset '{asset.name}' was not found on destination.")
if not Base.dry_run:
dest_asset = self.destination_instance.add_asset(asset)
if not dest_asset:
raise AddComponentError(model=asset, name=asset.name)
else:
self.add_to_diff_log(asset.name, "added")
self.log(f"Asset '{asset.name}' was successfully added to destination.")
else:
self.log(f"Asset '{asset.name}' already exists on destination. Skipping")
Groups (Base)
Used to sync groups from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync all groups from a source instance to a destination instance
Source code in aqueduct/components/groups.py
def sync(self):
"""This method is used to sync all groups from a source instance to a destination instance"""
self.log(f"Attempting to sync groups from '{self.source_host}' to '{self.dest_host}'")
groups = self.source_instance.get_groups()
if groups:
for group in groups:
self.sync_group(group=group)
sync_group(self, group)
This class syncs a single source instance group to a destination instance.
We begin by processing the provided group and ensuring that all roles and users associated with the provided group are added to the destination instance.
Once that is complete, we then sync any nested groups within the provided source instance group.
If the provided group is already on the destination instance, then we just skip processing but if the provided group is not on the destination instance we add it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
Group |
A source instance Group data model object. |
required |
Source code in aqueduct/components/groups.py
def sync_group(self, group: Group):
"""This class syncs a single source instance group to a destination instance.
We begin by processing the provided group and ensuring that all roles and users
associated with the provided group are added to the destination instance.
Once that is complete, we then sync any nested groups within the provided source instance group.
If the provided group is already on the destination instance, then we just skip processing but if
the provided group is not on the destination instance we add it.
Args:
group (Group): A source instance Group data model object.
"""
if not self._is_in_include_exclude_lists(group.name, "groups") and group.name not in self.group_exclusions:
self.log(f"Processing group '{group.name}' ({group.id})")
group = self.__process_group(group=group)
if group.groups:
group_list = []
for group_ in group.groups:
group_list.append(self.__process_group(group=group_))
group.groups = group_list
dest_group = self.__get_destination_group(group=group)
if not dest_group:
if not Base.dry_run:
self.log(f"Creating new group '{group.name}' on destination.")
dest_group = self.destination_instance.add_group(group)
if not dest_group:
raise AddComponentError(model=group, name=group.name)
self.log(f"Successfully added new group '{group.name}' to destination.")
else:
self.add_to_diff_log(group.name, "added")
else:
self.log(f"Group '{group.name}' already exists on destination.")
KeyStore (Base)
Used to sync keystore items from a source instance to a destination instance of Swimlane
sync(self)
Syncing of the keystore will only create new entries into the destination systems keystore. The creation of these items does not transfer or implement the transfer of the value since these are encrypted within Swimlane themselves.
Once items are created in the keystore on the destination system then you must manually enter the value for that keystore item.
Source code in aqueduct/components/keystore.py
def sync(self):
"""Syncing of the keystore will only create new entries into the destination systems keystore.
The creation of these items does not transfer or implement the transfer of the value since
these are encrypted within Swimlane themselves.
Once items are created in the keystore on the destination system then you must manually enter
the value for that keystore item.
"""
self.log(f"Attempting to sync keystore from '{self.source_host}' to '{self.dest_host}'")
credentials = self.source_instance.get_credentials()
if credentials:
credentials.pop("$type")
for key, val in credentials.items():
if not self._is_in_include_exclude_lists(key, "keystore"):
self.log(f"Processing '{key}' credential")
if not self.destination_instance.get_credential(key):
self.log(f"Adding Keystore item '{key}' to destination.")
credential_ = self.source_instance.get_credential(key)
if not Base.dry_run:
self.destination_instance.add_credential(credential_)
self.log(f"Successfully added new keystore item '{key}'")
else:
self.add_to_diff_log(key, "added")
else:
self.log(f"Keystore item '{key}' exists on destination. Skipping....")
else:
self.log(f"Skipping keystore item '{key}' since it is not included.")
else:
self.log("Unable to find any keystore values on the source Swimlane instance. Skipping....")
Plugins (Base)
Used to sync plugins from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync all plugins from a source instance to a destination instance
Source code in aqueduct/components/plugins.py
def sync(self):
"""This method is used to sync all plugins from a source instance to a destination instance"""
self.log(f"Attempting to sync plugins from '{self.source_host}' to '{self.dest_host}'")
self.dest_plugin_dict = self.destination_instance.plugin_dict
plugin_dict = self.source_instance.plugin_dict
if plugin_dict:
for name, id in plugin_dict.items():
self.sync_plugin(name=name, id=id)
sync_plugin(self, name, id)
This class syncs a single Swimlane plugin based on the provided name and ID.
We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.
If it is, then we download the plugin from the source instance. If we are successful, we then add it to the destination instance.
If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the plugin from both the source and destination instances. We compare their versions and if the source has a version greater than the destination we then add it to the destination instance by upgrading the plugin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of a Swimlane plugin |
required |
id |
str |
The internal ID of a Swimlane plugin |
required |
Source code in aqueduct/components/plugins.py
def sync_plugin(self, name: str, id: str):
"""This class syncs a single Swimlane plugin based on the provided name and ID.
We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.
If it is, then we download the plugin from the source instance. If we are successful, we
then add it to the destination instance.
If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the
plugin from both the source and destination instances. We compare their versions and if the source has
a version greater than the destination we then add it to the destination instance by upgrading the plugin.
Args:
name (str): The name of a Swimlane plugin
id (str): The internal ID of a Swimlane plugin
"""
if not self._is_in_include_exclude_lists(name, "plugins"):
self.log(f"Processing '{name}' plugin with id '{id}'")
if not self.dest_plugin_dict.get(name):
plugin = self.__download_plugin(name=name, id=id)
if plugin:
if not Base.dry_run:
try:
self.log(f"Uploading plugin '{name}' to destination.")
resp = self.destination_instance.upload_plugin(name, plugin)
self.log(f"Successfully uploaded plugin '{name}' to destination.")
except Exception as e:
self.log(
f"Failed to upload plugin '{name}' to destination '{self.dest_host}'",
level="warning",
)
else:
self.add_to_diff_log(name, "added")
else:
self.log(
f"Plugin '{name}' already exists on destination '{self.dest_host}'. Checking for differences...."
)
dest_plugin = self.destination_instance.get_plugin(name=name)
if not dest_plugin:
raise GetComponentError(type="Plugin", name=name)
source_plugin = self.source_instance.get_plugin(name=name)
if not source_plugin:
raise GetComponentError(type="Plugin", name=name)
if version.parse(source_plugin.version) > version.parse(dest_plugin.version):
plugin = self.__download_plugin(name=name, id=id)
if plugin:
if not Base.dry_run:
self.log(f"Upgrading plugin '{name}' on destination.")
self.destination_instance.upgrade_plugin(filename=name, stream=plugin)
self.log(f"Successfully upgraded plugin '{name}' on destination.")
else:
self.add_to_diff_log(name, "upgraded")
else:
self.log("Source and destination have the same version plugin. Skipping...")
Roles (Base)
Used to sync roles from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync all roles from a source instance to a destination instance
Source code in aqueduct/components/roles.py
def sync(self):
"""This method is used to sync all roles from a source instance to a destination instance"""
self.log(f"Attempting to sync roles from '{self.source_host}' to '{self.dest_host}'")
roles = self.source_instance.get_roles()
if roles:
for role in roles:
self.sync_role(role=role)
Tasks (Base)
Used to sync tasks from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync all tasks from a source instance to a destination instance
Source code in aqueduct/components/tasks.py
def sync(self):
"""This method is used to sync all tasks from a source instance to a destination instance"""
self.log(f"Starting to sync tasks from '{self.source_host}' to '{self.dest_host}'")
failed_task_list = []
tasks = self.source_instance.get_tasks()
if tasks:
for task in tasks:
resp = self.sync_task(task=task)
if resp:
failed_task_list.append(resp)
self.log(f"The following tasks failed to process: {[x.name for x in failed_task_list]}")
if failed_task_list:
count = 1
self.log("Retrying failed task migration from host to destination.")
while count < 3:
for task_ in failed_task_list:
self.log(f"Attempt {count}: Processing task '{task_.name}'.")
self.log(f"Creating task '{task_.name}' on destination.")
try:
dest_task = self.destination_instance.add_task(task_)
self.log(f"Successfully added task '{task_.name}' to destination.")
failed_task_list.remove(task_)
except Exception as e:
self.log(
f"Failed to add task '{task_.name}' to destination.",
level="warning",
)
count += 1
if len(failed_task_list) > 0:
self.log(
f"The following task were unable to be added to destination! {[x.name for x in failed_task_list]}",
level="critical",
)
if not self.continue_on_error:
task_names = [x.name for x in failed_task_list]
raise Exception(f"The following task were unable to be added to destination! {task_names}")
sync_task(self, task)
This method syncs a single task from a source Swimlane instance to a destination instance.
Using the provided task dictionary from Swimlane source instance we first get the actual task object from the source.
Next, we attempt to retrieve the task from the destination system. If the task does not exist
on the destination instance, we add it. If it does exist, we check if the uid
and the version
are the same.
If they are the same we skip updating the task. If they are different, we update the task on the destination
instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A Swimlane task object from a source system. |
required |
Returns:
Type | Description |
---|---|
Task or TaskLight |
If we failed to add a task we return it so we can try again. Only if called using the sync method. |
Source code in aqueduct/components/tasks.py
def sync_task(self, task: Task or TaskLight):
"""This method syncs a single task from a source Swimlane instance to a destination instance.
Using the provided task dictionary from Swimlane source instance we first get the actual task
object from the source.
Next, we attempt to retrieve the task from the destination system. If the task does not exist
on the destination instance, we add it. If it does exist, we check if the `uid` and the `version` are the same.
If they are the same we skip updating the task. If they are different, we update the task on the destination
instance.
Args:
task (dict): A Swimlane task object from a source system.
Returns:
Task or TaskLight: If we failed to add a task we return it so we can try again.
Only if called using the sync method.
"""
if not self._is_in_include_exclude_lists(task.name, "tasks"):
self.log(f"Processing task '{task.name}'.")
task_ = self.source_instance.get_task(task.id)
if not task_:
raise GetComponentError(type="task", name=task.name, id="")
task_ = self.update_task_plugin_ids(task=task_)
dest_task = self.destination_instance.get_task(task_.id)
if not dest_task:
if not Base.dry_run:
self.log(f"Creating task '{task_.name}' on destination.")
try:
# checking tasks for inputs and outputs mapped to source application
# tracking-ids and replacing them with the new ones
task_ = self._check_tasks_using_tracking_id(task=task_)
dest_task = self.destination_instance.add_task(task_)
self.log(f"Successfully added task '{task_.name}' to destination.")
except Exception as e:
self.log(
f"Failed to add task '{task_.name}' to destination.",
level="warning",
)
self.log("Will attempt again before finalizing.")
return task_
else:
self.add_to_diff_log(task_.name, "added")
else:
self.log(f"Task '{task_.name}' already exists on destination.")
if task_.uid == dest_task.uid:
if task_.version == dest_task.version:
self.log(f"Task '{task_.name}' has not changed on destination. Skipping...")
else:
if not Base.dry_run:
if task_.name in self._processed_task_list:
self.log(f"Task '{task_.name}' already updated by applications component. Skipping...")
else:
self.log(f"Task '{task_.name}' has changed. Updating...")
try:
dest_task = self.destination_instance.update_task(dest_task.id, task_)
if not dest_task:
raise UpdateComponentError(model=task_, name=task_.name)
self.log(f"Successfully updated task '{task_.name}' on destination.")
except Exception as e:
raise UpdateComponentError(model=task_)
else:
self.add_to_diff_log(task_.name, "updated")
Users (Base)
Used to sync users from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync all users from a source instance to a destination instance
Source code in aqueduct/components/users.py
def sync(self):
"""This method is used to sync all users from a source instance to a destination instance"""
self.log(f"Attempting to sync users from '{self.source_host}' to '{self.dest_host}'")
users = self.source_instance.get_users()
if users:
for user in users:
self.sync_user(user_id=user.id)
Workspaces (Base)
Used to sync workspaces from a source instance to a destination instance of Swimlane
sync(self)
This method is used to sync all workspaces from a source instance to a destination instance
Source code in aqueduct/components/workspaces.py
def sync(self):
"""This method is used to sync all workspaces from a source instance to a destination instance"""
self.log(f"Starting to sync workspaces from '{self.source_host}' to '{self.dest_host}'")
workspaces = self.source_instance.get_workspaces()
if workspaces:
for workspace in workspaces:
self.sync_workspace(workspace=workspace)