Aqueduct
Aqueduct
Aqueduct is used to migrate content from one Swimlane instance to another.
_Aqueduct__sort_sync_order(self, components)
private
Ensures order of provided component name strings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
components |
list |
A list of component name strings. |
required |
Returns:
Type | Description |
---|---|
list |
An ordered list of component names. |
Source code in aqueduct/aqueduct.py
def __sort_sync_order(self, components: list):
"""Ensures order of provided component name strings.
Args:
components (list): A list of component name strings.
Returns:
list: An ordered list of component names.
"""
ordered = []
for key, val in COMPONENTS.items():
if key in components:
ordered.append(key)
return ordered
_Aqueduct__want_to_continue(self)
private
A recursive method that will continually ask if you want to continue until you provide a correct answer.
Returns:
Type | Description |
---|---|
bool |
Returns whether or not the user wants to continue. |
Source code in aqueduct/aqueduct.py
def __want_to_continue(self):
"""A recursive method that will continually ask if you want to continue until you provide a correct answer.
Returns:
bool: Returns whether or not the user wants to continue.
"""
response = input("Do you want to continue (y or n): ")
if response.capitalize()[0] == "N":
return False
elif response.capitalize()[0] == "Y":
return True
else:
return self.__want_to_continue()
__init__(self, source, destination, dry_run=True, offline=False, update_reports=False, update_dashboards=False, continue_on_error=False, use_unsupported_version=False, force_unsupported_version=False, dump_content_path=None, mirror_app_fields_on_destination=False, update_default_reports=False)
special
To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to migrate. The other is the destination of where the content will be migrated to.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
SwimlaneInstance |
The source Swimlane instance. Typically this is considered a development instance. |
required |
destination |
SwimlaneInstance |
The destination Swimlane instance. Typically this is considered a production instance. |
required |
dry_run |
bool |
Whether or not this run of aqueduct will transfer content to the destination instance. Set to False to migrate content. Default is True. |
True |
offline |
bool |
Whether or not the destination instance has access to the internet for the purpose of downloading Python packages. Default is False |
False |
update_reports |
bool |
Whether or not to force update reports if changes are detected. Default is False. |
False |
update_dashboards |
bool |
(bool): Whether or not to force update dashboards if changes are detected. Default is False. |
False |
continue_on_error |
bool |
(bool): Whether or not to continue when an API error occurs. Default is False. |
False |
use_unsupported_version |
bool |
(bool): Whether or not to transfer content from one version Swimlane to a different version of Swimlane. Use this at your own risk. Default is False. |
False |
force_unsupported_version |
bool |
(bool): Whether or not to force usage of an unsupported version. This is used when running aqueduct headless/non-interactive. Default is False. |
False |
dump_content_path |
str |
(str, optional): The path to where content should be dumped. If provided, we will dump content to a zip file in the provided path. |
None |
mirror_app_fields_on_destination |
bool |
(bool, optional): Whether or not to mirror application fields from the source to the destination. This may result in deletion of fields on the destination instance. Defaults to False. |
False |
update_default_reports |
bool |
(bool, optional): Whether or not to update Default reports on the destination. Default is False. |
False |
Source code in aqueduct/aqueduct.py
def __init__(
self,
source: SwimlaneInstance,
destination: SwimlaneInstance,
dry_run: bool = True,
offline: bool = False,
update_reports: bool = False,
update_dashboards: bool = False,
continue_on_error: bool = False,
use_unsupported_version: bool = False,
force_unsupported_version: bool = False,
dump_content_path: str = None,
mirror_app_fields_on_destination: bool = False,
update_default_reports: bool = False,
):
"""To use aqueduct you must provide two SwimlaneInstance objects. One is the source of content wanting to
migrate. The other is the destination of where the content will be migrated to.
Args:
source (SwimlaneInstance): The source Swimlane instance. Typically this is considered a
development instance.
destination (SwimlaneInstance): The destination Swimlane instance. Typically this is considered a production
instance.
dry_run (bool): Whether or not this run of aqueduct will transfer content to the destination instance.
Set to False to migrate content. Default is True.
offline (bool): Whether or not the destination instance has access to the internet for the purpose of
downloading Python packages. Default is False
update_reports (bool): Whether or not to force update reports if changes are detected. Default is False.
update_dashboards: (bool): Whether or not to force update dashboards if changes are detected.
Default is False.
continue_on_error: (bool): Whether or not to continue when an API error occurs. Default is False.
use_unsupported_version: (bool): Whether or not to transfer content from one version Swimlane to a different
version of Swimlane. Use this at your own risk. Default is False.
force_unsupported_version: (bool): Whether or not to force usage of an unsupported version. This is used
when running aqueduct headless/non-interactive. Default is False.
dump_content_path: (str, optional): The path to where content should be dumped. If provided, we will dump
content to a zip file in the provided path.
mirror_app_fields_on_destination: (bool, optional): Whether or not to mirror application fields from the
source to the destination. This may result in deletion
of fields on the destination instance. Defaults to
False.
update_default_reports: (bool, optional): Whether or not to update Default reports on the destination.
Default is False.
"""
src_version = version.parse(source.swimlane.product_version)
dest_version = version.parse(destination.swimlane.product_version)
if use_unsupported_version and src_version != dest_version:
if src_version > dest_version:
raise UnsupportedSwimlaneVersion(source=source, destination=destination)
self.log(
f"You specified you want to transfer of content from version {source.swimlane.product_version} to "
f"{destination.swimlane.product_version}. This is not officially supported. "
"Please use at your own risk.",
level="info",
)
if not force_unsupported_version and not self.__want_to_continue():
sys.exit()
elif src_version != dest_version:
raise UnsupportedSwimlaneVersion(source=source, destination=destination)
Base.source_instance = source
Base.source_instance.instance_type = "source"
Base.destination_instance = destination
Base.destination_instance.instance_type = "destination"
Base.dry_run = dry_run
Base.source_host = Base.source_instance.swimlane.host
Base.dest_host = Base.destination_instance.swimlane.host
Base.offline = offline
Base.update_reports = update_reports
Base.update_dashboards = update_dashboards
Base.continue_on_error = continue_on_error
Base.update_default_reports = update_default_reports
Base.mirror_app_fields_on_destination = mirror_app_fields_on_destination
Base.use_unsupported_version = use_unsupported_version
if WARNING_MAP.get(str(dest_version)):
for key, val in WARNING_MAP[str(dest_version)].items():
if getattr(Base, key):
self.log(val=val, level="warning")
if dump_content_path:
Base._dump_content = True
Base.ZIP_FILE = OutputZip(zip_path=dump_content_path)
else:
Base._dump_content = False
Base.ZIP_FILE = OutputZip(zip_path=os.getcwd())
atexit.register(self._return_homework_list)
atexit.register(self._close_and_write_zip)
_close_and_write_zip(self)
private
Used to close and write a zip file to disk at exit.
The ZIP_FILE object will close no matter what when calling the write_to_disk method but it will only create a zip file on the disk when there are files in the BytesIO object.
This will be called no matter what but there are two situations in which data will be saved to disk.
1. If a dump_content_path is provided then we will dump all (defined) content into a zipfile in the
provided path.
2. No matter if the dump_content_path is provided, if an error occurs with an API call we will dump
the content into a zipfile named aqueduct_errors.zip in the currently working directory.
The created zipfile has the following structure (example only):
📦aqueduct
┣ 📂content
┃ ┣ 📂altered
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┗ 📜Phishing Triage.json
┃ ┃ ┗ 📂workspace
┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
┃ ┣ 📂destination
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┗ 📜Phishing Triage.json
┃ ┃ ┣ 📂workflow
┃ ┃ ┃ ┗ 📜aM_t7AuBscJN_Orf9.json
┃ ┃ ┗ 📂workspace
┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
┃ ┗ 📂source
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┣ 📜Alert & Incident Management.json
┃ ┃ ┗ 📂workflows
┃ ┃ ┃ ┣ 📜a7HOzfbLmUD7xOkKp.json
┣ 📜homework.txt
┗ 📜output.log
The three folders under the content
directory are altered, source, and destination.
source - This means the original (GET) source JSON object from the source instance
altered - The altered folder contains JSON objects that have been "massaged" or updated based on the
differences between both source and destination (if any). This it the JSON that is sent to the
destination instance.
destination - This means the JSON response from the destination Swimlane instance.
Source code in aqueduct/aqueduct.py
def _close_and_write_zip(self) -> None:
"""Used to close and write a zip file to disk at exit.
The ZIP_FILE object will close no matter what when calling the write_to_disk method but it will only
create a zip file on the disk when there are files in the BytesIO object.
This will be called no matter what but there are two situations in which data will be saved to disk.
1. If a dump_content_path is provided then we will dump all (defined) content into a zipfile in the
provided path.
2. No matter if the dump_content_path is provided, if an error occurs with an API call we will dump
the content into a zipfile named aqueduct_errors.zip in the currently working directory.
The created zipfile has the following structure (example only):
📦aqueduct
┣ 📂content
┃ ┣ 📂altered
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┗ 📜Phishing Triage.json
┃ ┃ ┗ 📂workspace
┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
┃ ┣ 📂destination
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┗ 📜Phishing Triage.json
┃ ┃ ┣ 📂workflow
┃ ┃ ┃ ┗ 📜aM_t7AuBscJN_Orf9.json
┃ ┃ ┗ 📂workspace
┃ ┃ ┃ ┣ 📜QuickStart Record Generator.json
┃ ┗ 📂source
┃ ┃ ┣ 📂application
┃ ┃ ┃ ┣ 📜Alert & Incident Management.json
┃ ┃ ┗ 📂workflows
┃ ┃ ┃ ┣ 📜a7HOzfbLmUD7xOkKp.json
┣ 📜homework.txt
┗ 📜output.log
The three folders under the `content` directory are altered, source, and destination.
source - This means the original (GET) source JSON object from the source instance
altered - The altered folder contains JSON objects that have been "massaged" or updated based on the
differences between both source and destination (if any). This it the JSON that is sent to the
destination instance.
destination - This means the JSON response from the destination Swimlane instance.
"""
if Base.ZIP_FILE:
Base.ZIP_FILE.write_to_disk()
_return_homework_list(self)
private
Prints a summary homework list for quick actionable response.
You can access the homework dictionary from your Aqueduct
instance.
Examples:
from aqueduct import Aqueduct
aq = Aqueduct(source=src, destination=dest)
print(aq._homework) # This will print out the dictionary of component lists.
Source code in aqueduct/aqueduct.py
def _return_homework_list(self):
"""Prints a summary homework list for quick actionable response.
You can access the homework dictionary from your `Aqueduct` instance.
Example:
from aqueduct import Aqueduct
aq = Aqueduct(source=src, destination=dest)
print(aq._homework) # This will print out the dictionary of component lists.
"""
if self._homework:
print("HOMEWORK LIST:\n")
for key in self._homework.keys():
if self._homework[key]:
print(key.title())
for item in self._homework[key]:
print(f"\t{item}")
collect(self, application_name_list)
Collects information about content based on one or more application names on the source instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_name_list |
list |
A list of one or more application names to gather source content from. |
required |
Returns:
Type | Description |
---|---|
list |
list: Returns a list of dictionary objects. |
Source code in aqueduct/aqueduct.py
def collect(self, application_name_list: list) -> list:
"""Collects information about content based on one or more application names on the source instance.
Args:
application_name_list (list): A list of one or more application names to gather source content from.
Returns:
list: Returns a list of dictionary objects.
"""
atexit.unregister(self._close_and_write_zip)
return_list = []
if not isinstance(application_name_list, list):
application_name_list = [application_name_list]
for application in application_name_list:
return_list.append(Collector(application_name=application).collect())
return return_list
gather(self)
Returns a list of application names on the provided source instance.
Returns:
Type | Description |
---|---|
list |
list: A list of application names on the provided source instance. |
Source code in aqueduct/aqueduct.py
def gather(self) -> list:
"""Returns a list of application names on the provided source instance.
Returns:
list: A list of application names on the provided source instance.
"""
atexit.unregister(self._close_and_write_zip)
return list(self.source_instance.application_dict.keys())
sync(self, components=OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applets', <class 'aqueduct.components.applets.Applets'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)]), exclude={}, include={})
The main method to begin syncing components from one Swimlane instance to another.
There are several available components you can specify. The order of these components if forced. The defaults are:
- keystore
- packages
- plugins
- assets
- workspaces
- applets
- applications (we update workflows here)
- tasks
- reports
- dashboards
- users
- groups
- roles
You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so provide a dictionary for each argument (include or exclude). For example:
exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.} include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}
aq.sync(include=include, exclude=exclude)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
components |
list |
A list of one or more components to sync. Defaults to COMPONENTS. |
OrderedDict([('keystore', <class 'aqueduct.components.keystore.KeyStore'>), ('packages', <class 'aqueduct.components.packages.Packages'>), ('plugins', <class 'aqueduct.components.plugins.Plugins'>), ('assets', <class 'aqueduct.components.assets.Assets'>), ('workspaces', <class 'aqueduct.components.workspaces.Workspaces'>), ('applets', <class 'aqueduct.components.applets.Applets'>), ('applications', <class 'aqueduct.components.applications.Applications'>), ('tasks', <class 'aqueduct.components.tasks.Tasks'>), ('reports', <class 'aqueduct.components.reports.Reports'>), ('dashboards', <class 'aqueduct.components.dashboards.Dashboards'>), ('users', <class 'aqueduct.components.users.Users'>), ('groups', <class 'aqueduct.components.groups.Groups'>), ('roles', <class 'aqueduct.components.roles.Roles'>)]) |
exclude |
dict |
A dictionary of component name keys and their named values. Defaults to None. |
{} |
include |
dict |
A dictionary of component name keys and their named values. Defaults to None. |
{} |
Source code in aqueduct/aqueduct.py
def sync(self, components=COMPONENTS, exclude: dict = {}, include: dict = {}):
"""The main method to begin syncing components from one Swimlane instance to another.
There are several available components you can specify. The order of these components if forced.
The defaults are:
* keystore
* packages
* plugins
* assets
* workspaces
* applets
* applications (we update workflows here)
* tasks
* reports
* dashboards
* users
* groups
* roles
You can include and exclude specific component items like specific applications, tasks, plugins, etc. To do so
provide a dictionary for each argument (include or exclude). For example:
exclude = {'applications': ["Phishing Triage"], 'tasks': ['PT - Get Emails'], etc.}
include = {'applications': ["Security Alert & Incident Management"], 'reports': ['SAIM - New Incidents'], etc.}
aq.sync(include=include, exclude=exclude)
Args:
components (list, optional): A list of one or more components to sync. Defaults to COMPONENTS.
exclude (dict, optional): A dictionary of component name keys and their named values. Defaults to None.
include (dict, optional): A dictionary of component name keys and their named values. Defaults to None.
"""
if exclude and include:
raise TooManyParametersError(
"You have provided both 'exclude' and 'include' parameters when only one is supported."
"Please retry with only one of these paramters."
)
Base.include = include
Base.exclude = exclude
for component in self.__sort_sync_order(components):
if COMPONENTS.get(component):
getattr(COMPONENTS[component](), "sync")()
self._components_used.append(component)
print("\n")
if Base.dry_run:
print("DRY RUN RESULTS:\n")
return self._get_formatted_diff_log()
print(b64decode(self.__LOGO).decode("ascii"))
Base
tracking_id_map
property
writable
A dictionary map of tracking-ids.
The key is the source instance trackingFieldId and the value is the destinations trackingFieldId equivalent.
Returns:
Type | Description |
---|---|
dict |
A tracking-id map. |
_get_formatted_diff_log(self)
private
Returns a formatted list of strings from the DIFF_LOG property.
Returns:
Type | Description |
---|---|
list |
A list of strings. |
Source code in aqueduct/base.py
def _get_formatted_diff_log(self):
"""Returns a formatted list of strings from the DIFF_LOG property.
Returns:
list: A list of strings.
"""
return_list = []
for item in Base.DIFF_LOG:
log = self.DIFF_TEMPLATE.substitute(**item)
self.log(log)
return_list.append(log)
return return_list
_is_in_include_exclude_lists(self, name, type)
private
Checks to see if the name of the component content item is in a include or exclude list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The component content item name. |
required |
type |
str |
The component type. |
required |
Returns:
Type | Description |
---|---|
bool |
bool: Returns True if it is in the exclude or not in the include list else False. |
Source code in aqueduct/base.py
def _is_in_include_exclude_lists(self, name: str, type: str) -> bool:
"""Checks to see if the name of the component content item is in a include or exclude list.
Args:
name (str): The component content item name.
type (str): The component type.
Returns:
bool: Returns True if it is in the exclude or not in the include list else False.
"""
if self.include and self.include.get(type):
if name in self.include[type]:
return False
else:
return True
elif self.exclude and self.exclude.get(type):
if name in self.exclude[type]:
return True
else:
return False
else:
return False
add_to_diff_log(self, name, diff_type, subcomponent=None, value='')
Adds a dictionary of values to the DIFF_LOG list.
We create a dictionary of values and add them to our DIFF_LOG list but we also write the output to our log file.
We gather the component name by inspecting the calling stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the component content item. |
required |
diff_type |
str |
The type of diff this is. Must be one of 'added','updated','upgraded','removed','moved'. |
required |
subcomponent |
str |
The subcomponent this is related to (e.g. fields in an app). Defaults to None. |
None |
value |
str |
str. A value of the subcomponent to add to our dictionary. Defaults to "". |
'' |
Exceptions:
Type | Description |
---|---|
ValueError |
Raises when the provided diff_type is not one of 'added','updated','upgraded','removed','moved'. |
Source code in aqueduct/base.py
def add_to_diff_log(self, name, diff_type, subcomponent=None, value=""):
"""Adds a dictionary of values to the DIFF_LOG list.
We create a dictionary of values and add them to our DIFF_LOG list but we also write the output to our log file.
We gather the component name by inspecting the calling stack.
Args:
name (str): The name of the component content item.
diff_type (str): The type of diff this is. Must be one of 'added','updated','upgraded','removed','moved'.
subcomponent (str, optional): The subcomponent this is related to (e.g. fields in an app). Defaults to None.
value (str, optional): str. A value of the subcomponent to add to our dictionary. Defaults to "".
Raises:
ValueError: Raises when the provided diff_type is not one of 'added','updated','upgraded','removed','moved'.
"""
if diff_type in self.__diff_types:
component = None
parent = inspect.stack()[1][0].f_locals.get("self", None)
component = parent.__class__.__name__
if component and hasattr(f"_{component}", "__logger"):
if subcomponent:
getattr(parent, f"_{component}__logger").info(
f"Dry Run: Component '{component}' named '{name}' with subcomponent '{subcomponent}' with value"
f" of '{value}' would have been {diff_type} on destination.",
)
else:
getattr(parent, f"_{component}__logger").info(
f"Dry Run: Component '{component}' named '{name}' would have been {diff_type} on destination."
)
self.DIFF_LOG.append(
{
"component": component,
"subcomponent": subcomponent,
"name": name,
"value": value,
"diff_type": diff_type,
}
)
else:
raise ValueError(f"Unknown type of '{type}' provided. Cannot add to diff log...")
add_to_homework_list(self, component_name, value)
Adds a list of items to a dictionary of component name keys
This method adds the provided value to a dictionary of component_name(s). Once Aqueduct is complete we will return this _homework dictionary in a formatted way so that users can have a quick way to understand any manual steps they must complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_name |
str |
The name of the aqueduct component. |
required |
value |
str |
The value to add to the list of component items. |
required |
Source code in aqueduct/base.py
def add_to_homework_list(self, component_name: str, value: str) -> None:
"""Adds a list of items to a dictionary of component name keys
This method adds the provided value to a dictionary of component_name(s). Once Aqueduct is complete
we will return this _homework dictionary in a formatted way so that users can have a quick way to understand
any manual steps they must complete.
Args:
component_name (str): The name of the aqueduct component.
value (str): The value to add to the list of component items.
"""
if not self._homework.get(component_name):
self._homework[component_name] = []
self._homework[component_name].append(value)
log(self, val, level='info')
Used to centralize logging across components.
We identify the source of the logging class by inspecting the calling stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
val |
str |
The log value string to output. |
required |
level |
str |
The log level. Defaults to "info". |
'info' |
Source code in aqueduct/base.py
def log(self, val, level="info"):
"""Used to centralize logging across components.
We identify the source of the logging class by inspecting the calling stack.
Args:
val (str): The log value string to output.
level (str, optional): The log level. Defaults to "info".
"""
component = None
parent = inspect.stack()[1][0].f_locals.get("self", None)
component = parent.__class__.__name__
try:
getattr(getattr(parent, f"_{component}__logger"), level)(val)
self.OUTPUT_LOG.append(f"{component} - {level.upper()} - {val}")
except AttributeError as ae:
self.OUTPUT_LOG.append(f"{component} - {level.upper()} - {val}")
scrub(self, obj, bad_key='$type')
Used to remove a specific provided key from a dictionary that may contain both nested dictionaries and lists.
This method is recursive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
dict |
A dictionary or list to remove keys from. |
required |
bad_key |
str |
The bad key to remove from the provided dict or list. Defaults to "$type". |
'$type' |
Source code in aqueduct/base.py
def scrub(self, obj, bad_key="$type"):
"""Used to remove a specific provided key from a dictionary
that may contain both nested dictionaries and lists.
This method is recursive.
Args:
obj (dict): A dictionary or list to remove keys from.
bad_key (str, optional): The bad key to remove from the provided dict or list. Defaults to "$type".
"""
if isinstance(obj, dict):
for key in list(obj.keys()):
if key == bad_key:
del obj[key]
else:
self.scrub(obj[key], bad_key)
elif isinstance(obj, list):
for i in reversed(range(len(obj))):
if obj[i] == bad_key:
del obj[i]
else:
self.scrub(obj[i], bad_key)
else:
pass
SwimlaneInstance
Creates a connection to a single Swimlane instance
add_applet(self, applet)
Used to add a applet to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
applet |
dict |
A applet item to add to a Swimlane instance |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a applet item |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_applet(self, applet: dict):
"""Used to add a applet to a Swimlane instance
Args:
applet (dict): A applet item to add to a Swimlane instance
Returns:
dict : A dictionary of a applet item
"""
return self._make_request("POST", "/applet", json=applet)
add_application(self, application)
Adds an application based on the provided dictionary object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A application dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane application object |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_application(self, application):
"""Adds an application based on the provided dictionary object
Args:
application (dict): A application dictionary object.
Returns:
dict: A Swimlane application object
"""
return self._make_request("POST", "/app", json=application)
add_asset(self, asset)
Adds a provided asset dictionary object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset |
dict |
A Swimlane asset dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
dict: A Swimlane asset dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_asset(self, asset: dict) -> dict:
"""Adds a provided asset dictionary object to a Swimlane instance
Args:
asset (dict): A Swimlane asset dictionary object.
Returns:
dict: A Swimlane asset dictionary object.
"""
return self._make_request("POST", "/asset", json=asset)
add_credential(self, credential)
Used to add a keystore key name (but not it's value) to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
credential |
dict |
A keystore item to add to a Swimlane instance |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a keystore item |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_credential(self, credential: dict):
"""Used to add a keystore key name (but not it's value) to a Swimlane instance
Args:
credential (dict): A keystore item to add to a Swimlane instance
Returns:
dict : A dictionary of a keystore item
"""
return self._make_request("POST", "/credentials", json=credential)
add_dashboard(self, dashboard)
Adds a provided dashboard dictionary object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard |
dict |
A Swimlane dashboard dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane dashboard dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_dashboard(self, dashboard: dict):
"""Adds a provided dashboard dictionary object to a Swimlane instance
Args:
dashboard (dict): A Swimlane dashboard dictionary object.
Returns:
dict: A Swimlane dashboard dictionary object.
"""
return self._make_request("POST", "/dashboard", json=dashboard)
add_group(self, group)
Adds a provided group dictionary object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
dict |
A Swimlane group dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane group dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_group(self, group: dict):
"""Adds a provided group dictionary object to a Swimlane instance
Args:
group (dict): A Swimlane group dictionary object.
Returns:
dict: A Swimlane group dictionary object.
"""
try:
return self._make_request("POST", "/groups", json=group)
except SwimlaneHTTP400Error as sh:
if sh.code == 1051:
return True
else:
raise sh
except Exception as e:
raise e
add_report(self, report)
Adds a provided report dictionary object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report |
dict |
A report dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A report dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_report(self, report: dict):
"""Adds a provided report dictionary object to a Swimlane instance
Args:
report (dict): A report dictionary object.
Returns:
dict: A report dictionary object.
"""
return self._make_request("POST", "/reports", json=report)
add_role(self, role)
Adds a provided role dictionary object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
dict |
A role dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A role dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_role(self, role: dict):
"""Adds a provided role dictionary object to a Swimlane instance
Args:
role (dict): A role dictionary object.
Returns:
dict: A role dictionary object.
"""
return self._make_request("POST", "/roles", json=role)
add_task(self, task)
Adds a provided task dictionary to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A Swimlane task dictionary. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane task dictionary. |
Source code in aqueduct/instance.py
@dump_content
def add_task(self, task: dict):
"""Adds a provided task dictionary to a Swimlane instance
Args:
task (dict): A Swimlane task dictionary.
Returns:
dict: A Swimlane task dictionary.
"""
try:
return self._make_request("POST", "/task", json=task)
except Exception as e:
return False
add_user(self, user)
Adds a provided user dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
dict |
A user dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A user dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_user(self, user: dict):
"""Adds a provided user dictionary object.
Args:
user (dict): A user dictionary object.
Returns:
dict: A user dictionary object.
"""
return self._make_request("POST", "/user", json=user)
add_workflow(self, workflow)
Adds a Swimlane instance with the provided Workflow data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
A |
required |
Returns:
Type | Description |
---|---|
Workflow |
A |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_workflow(self, workflow):
"""Adds a Swimlane instance with the provided Workflow data model object
Args:
workflow (Workflow): A `Workflow` data model object
Returns:
Workflow: A `Workflow` data model object
"""
try:
return self._make_request("POST", "/workflow/", json=workflow)
except Exception as e:
return False
add_workspace(self, workspace)
Adds a provided Workspace object to a Swimlane instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace |
dict |
A workspace dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A workspace dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def add_workspace(self, workspace: dict):
"""Adds a provided Workspace object to a Swimlane instance
Args:
workspace (dict): A workspace dictionary object.
Returns:
dict: A workspace dictionary object.
"""
return self._make_request("POST", "/workspaces", json=workspace)
download_plugin(self, file_id)
A Swimlane internal fileId for a plugin or Python package to download
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id |
str |
An internal Swimlane fileId to download a plugin or python package |
required |
Returns:
Type | Description |
---|---|
BytesIO |
A bytesIO object of the downloaded file |
Source code in aqueduct/instance.py
@log_exception
def download_plugin(self, file_id: str):
"""A Swimlane internal fileId for a plugin or Python package to download
Args:
file_id (str): An internal Swimlane fileId to download a plugin or python package
Returns:
BytesIO: A bytesIO object of the downloaded file
"""
stream = BytesIO()
response = self.swimlane.request("GET", f"attachment/download/{file_id}", stream=True)
for chunk in response.iter_content(1024):
stream.write(chunk)
stream.seek(0)
return stream
get_applet(self, applet_id)
Retrieves a applet if it exists on the desired instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
applet_id |
str |
The ID of an applet |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a applet |
Source code in aqueduct/instance.py
@log_exception
def get_applet(self, applet_id):
"""Retrieves a applet if it exists on the desired instance
Args:
applet_id (str): The ID of an applet
Returns:
dict : A dictionary of a applet
"""
try:
return self._make_request("GET", f"/applet/{applet_id}")
except Exception as e:
return False
get_applets(self)
Used to retrieve applets
Returns:
Type | Description |
---|---|
dict |
A list of applet dictionaries |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_applets(self):
"""Used to retrieve applets
Returns:
dict : A list of applet dictionaries
"""
return self._make_request("GET", "/applet")
get_application(self, application_id)
Gets an application by a provided ID
If the application does not exist, then this method returns False.
If the application does exist it will return an JSON object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A Swimlane application ID |
required |
Returns:
Type | Description |
---|---|
json |
A Swimlane application JSON |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_application(self, application_id):
"""Gets an application by a provided ID
If the application does not exist, then this method returns False.
If the application does exist it will return an JSON object
Args:
application_id (str): A Swimlane application ID
Returns:
json: A Swimlane application JSON
"""
try:
return self._make_request("GET", f"/app/{application_id}")
except Exception as e:
return False
get_application_workspaces(self, application_id)
Gets a list of workspaces for an application ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A Swimlane application ID |
required |
Returns:
Type | Description |
---|---|
list |
A list of workspaces associated with an application. |
Source code in aqueduct/instance.py
@log_exception
def get_application_workspaces(self, application_id):
"""Gets a list of workspaces for an application ID.
Args:
application_id (str): A Swimlane application ID
Returns:
list: A list of workspaces associated with an application.
"""
try:
return self._make_request("GET", f"/workspaces/app/{application_id}")
except Exception as e:
return False
get_applications(self)
Retrieves a list of applications for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of json objects |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_applications(self):
"""Retrieves a list of applications for a Swimlane instance.
Returns:
list: A list of json objects
"""
return self._make_request("GET", "/app")
get_applications_light(self)
Gets light version of all applications
Returns:
Type | Description |
---|---|
list |
A list of application light objects |
Source code in aqueduct/instance.py
@log_exception
def get_applications_light(self):
"""Gets light version of all applications
Returns:
list: A list of application light objects
"""
return self._make_request("GET", "/app/light")
get_asset(self, asset_id)
Gets an asset by a provided ID
If the asset does not exist, then this method returns False.
If the asset does exist it will return an asset dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset_id |
str |
An Swimlane asset ID |
required |
Returns:
Type | Description |
---|---|
dict |
dict: A Swimlane instance asset dictionary. |
Source code in aqueduct/instance.py
@log_exception
def get_asset(self, asset_id: str) -> dict:
"""Gets an asset by a provided ID
If the asset does not exist, then this method returns False.
If the asset does exist it will return an asset dictionary.
Args:
asset_id (str): An Swimlane asset ID
Returns:
dict: A Swimlane instance asset dictionary.
"""
try:
return self._make_request("GET", f"/asset/{asset_id}")
except Exception as e:
return False
get_assets(self)
Retrieves a list of assets for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
list: A list of asset JSON objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_assets(self) -> list:
"""Retrieves a list of assets for a Swimlane instance.
Returns:
list: A list of asset JSON objects.
"""
assets = self._make_request("GET", "/asset")
return assets if assets and isinstance(assets, list) else []
get_credential(self, name)
Retrieves a keystore item if it exists on the desired instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of a keystore item |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a keystore item |
Source code in aqueduct/instance.py
@log_exception
def get_credential(self, name: str):
"""Retrieves a keystore item if it exists on the desired instance
Args:
name (str): Name of a keystore item
Returns:
dict : A dictionary of a keystore item
"""
try:
return self._make_request("GET", f"/credentials/{name}")
except Exception as e:
return False
get_credentials(self)
Used to retrieve keystore items
Returns:
Type | Description |
---|---|
dict |
A dictionary of keystore keys and encrypted values |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_credentials(self):
"""Used to retrieve keystore items
Returns:
dict : A dictionary of keystore keys and encrypted values
"""
return self._make_request("GET", "/credentials")
get_dashboard(self, dashboard_id)
Gets an dashboard by a provided ID
If the dashboard does not exist, then this method returns False.
If the dashboard does exist it will return an dashboard dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard_id |
str |
A Swimlane dashboard ID |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane dashboard dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_dashboard(self, dashboard_id: str):
"""Gets an dashboard by a provided ID
If the dashboard does not exist, then this method returns False.
If the dashboard does exist it will return an dashboard dictionary object.
Args:
dashboard_id (str): A Swimlane dashboard ID
Returns:
dict: A Swimlane dashboard dictionary object.
"""
try:
return self._make_request("GET", f"/dashboard/{dashboard_id}")
except Exception as e:
return False
get_dashboards(self)
Retrieves a list of dashboards for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of dashboard dictionary objects |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_dashboards(self):
"""Retrieves a list of dashboards for a Swimlane instance.
Returns:
list: A list of dashboard dictionary objects
"""
dashboards = self._make_request("GET", "/dashboard")
return dashboards if dashboards and isinstance(dashboards, list) else []
get_default_report_by_application_id(self, application_id)
Gets the default report for an application based on the provided ID
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A application id. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane report dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_default_report_by_application_id(self, application_id):
"""Gets the default report for an application based on the provided ID
Args:
application_id (str): A application id.
Returns:
dict: A Swimlane report dictionary object.
"""
try:
return self._make_request("GET", f"/reports/app/{application_id}/default")
except Exception as e:
return False
get_group_by_id(self, group_id)
Gets an group by a provided id
If the group does not exist, then this method returns False.
If the group does exist it will return a group dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
str |
A Swimlane group ID |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane group object. |
Source code in aqueduct/instance.py
@log_exception
def get_group_by_id(self, group_id: str):
"""Gets an group by a provided id
If the group does not exist, then this method returns False.
If the group does exist it will return a group dictionary object.
Args:
group_id (str): A Swimlane group ID
Returns:
dict: A Swimlane group object.
"""
try:
return self._make_request("GET", f"/groups/{group_id}")
except Exception as e:
return False
get_group_by_name(self, group_name)
Gets an group by a provided name
If the group does not exist, then this method returns False.
If the group does exist it will return a group dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_name |
str |
A Swimlane group name |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane group object. |
Source code in aqueduct/instance.py
@log_exception
def get_group_by_name(self, group_name: str):
"""Gets an group by a provided name
If the group does not exist, then this method returns False.
If the group does exist it will return a group dictionary object.
Args:
group_name (str): A Swimlane group name
Returns:
dict: A Swimlane group object.
"""
try:
resp = self._make_request("GET", f"/users/lookup?name={group_name}")
return resp[0] if resp and isinstance(resp, list) else False
except Exception as e:
return False
get_groups(self)
Retrieves a list of groups for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of group dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_groups(self):
"""Retrieves a list of groups for a Swimlane instance.
Returns:
list: A list of group dictionary objects.
"""
groups = self._make_request("GET", "/groups")
return groups["items"] if groups.get("items") else groups.get("groups", [])
get_pip_packages(self, versions=['Python2_7', 'Python3_6', 'Python3'])
Retrieves a list of pip packages for a Swimlane instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
versions |
list |
A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3']. |
['Python2_7', 'Python3_6', 'Python3'] |
Returns:
Type | Description |
---|---|
list |
A list of package dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_pip_packages(self, versions=["Python2_7", "Python3_6", "Python3"]):
"""Retrieves a list of pip packages for a Swimlane instance.
Args:
versions (list, optional): A list of Python versions. Defaults to ['Python2_7', 'Python3_6', 'Python3'].
Returns:
list: A list of package dictionary objects.
"""
return_list = []
for version in versions:
try:
resp = self._make_request("GET", f"/pip/packages/{version}")
if resp:
for item in resp:
if item and isinstance(item, dict):
return_list.append(item)
except Exception as e:
continue
return return_list
get_plugin(self, name)
Gets an plugin by a provided plugin name
If the plugin does not exist, then this method returns False.
If the plugin does exist it will return an plugin dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
A plugin name |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane plugin dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_plugin(self, name: str):
"""Gets an plugin by a provided plugin name
If the plugin does not exist, then this method returns False.
If the plugin does exist it will return an plugin dictionary object.
Args:
name (str): A plugin name
Returns:
dict: A Swimlane plugin dictionary object.
"""
try:
return self._make_request("GET", f"/task/packages/{name}")
except Exception as e:
return False
get_plugins(self)
Retrieves a list of plugins for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of plugin (light) dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_plugins(self):
"""Retrieves a list of plugins for a Swimlane instance.
Returns:
list: A list of plugin (light) dictionary objects.
"""
plugins = self._make_request("GET", "/task/packages")
return plugins if plugins and isinstance(plugins, list) else []
get_report(self, report_id)
Gets an report by a provided ID
If the report does not exist, then this method returns False.
If the report does exist it will return a report dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report_id |
str |
A Swimlane report ID |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane report dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_report(self, report_id: str):
"""Gets an report by a provided ID
If the report does not exist, then this method returns False.
If the report does exist it will return a report dictionary object.
Args:
report_id (str): A Swimlane report ID
Returns:
dict: A Swimlane report dictionary object.
"""
try:
return self._make_request("GET", f"/reports/{report_id}")
except Exception as e:
return False
get_reports(self)
Retrieves a list of reports for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of report dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_reports(self):
"""Retrieves a list of reports for a Swimlane instance.
Returns:
list: A list of report dictionary objects.
"""
reports = self._make_request("GET", "/reports")
return reports if reports else []
get_role(self, role_id)
Gets an role by a provided ID
If the role does not exist, then this method returns False.
If the role does exist it will return an role dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_id |
str |
A Swimlane role ID |
required |
Returns:
Type | Description |
---|---|
dict |
A role dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_role(self, role_id: str):
"""Gets an role by a provided ID
If the role does not exist, then this method returns False.
If the role does exist it will return an role dictionary object.
Args:
role_id (str): A Swimlane role ID
Returns:
dict: A role dictionary object.
"""
try:
return self._make_request("GET", f"/roles/{role_id}")
except Exception as e:
return False
get_role_by_name(self, role_name)
Gets an role by a provided name
If the role does not exist, then this method returns False.
If the role does exist it will return an role dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name |
str |
A Swimlane role name |
required |
Returns:
Type | Description |
---|---|
dict |
A role dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_role_by_name(self, role_name: str):
"""Gets an role by a provided name
If the role does not exist, then this method returns False.
If the role does exist it will return an role dictionary object.
Args:
role_name (str): A Swimlane role name
Returns:
dict: A role dictionary object.
"""
try:
return self._make_request("GET", f"/roles/?searchFieldName=name&searchValue={role_name}")
except Exception as e:
return False
get_roles(self)
Retrieves a list of roles for a Swimlane instance.
These roles are modeled after the role dictionary object.
Returns:
Type | Description |
---|---|
list |
A list of role dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_roles(self):
"""Retrieves a list of roles for a Swimlane instance.
These roles are modeled after the role dictionary object.
Returns:
list: A list of role dictionary objects.
"""
return_list = []
roles = self._make_request("GET", "/roles")
if roles and isinstance(roles, dict):
# checking for items here. 10.4.0 does not use the items key
# but greater versions do (e.g. 10.5>)
if roles.get("items"):
roles = roles["items"]
if roles and isinstance(roles, list):
for role in roles:
return_list.append(role)
return return_list
get_task(self, task_id)
Gets an task by a provided task ID.
If the task ID does not exist, then this method returns False.
If the task ID does exist it will return a dictionary object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_id |
str |
A task ID to retrieve the entire task JSON. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane task dictionary. |
Source code in aqueduct/instance.py
@log_exception
def get_task(self, task_id: str):
"""Gets an task by a provided task ID.
If the task ID does not exist, then this method returns False.
If the task ID does exist it will return a dictionary object
Args:
task_id (str): A task ID to retrieve the entire task JSON.
Returns:
dict: A Swimlane task dictionary.
"""
try:
return self._make_request("GET", f"/task/{task_id}")
except Exception as e:
return False
get_tasks(self)
Retrieves a list of tasks for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of task objects |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_tasks(self):
"""Retrieves a list of tasks for a Swimlane instance.
Returns:
list: A list of task objects
"""
tasks = self._make_request("GET", "/task/list")
if tasks and tasks.get("tasks"):
return tasks["tasks"]
get_tasks_by_application(self, application_id)
Gets a list of tasks by application ID.
If the application ID does not exist, then this method returns False.
If the task ID does exist it will return a list of tasks
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A application ID to retrieve tasks from. |
required |
Returns:
Type | Description |
---|---|
list |
A list of tasks associated with an application. |
Source code in aqueduct/instance.py
@log_exception
def get_tasks_by_application(self, application_id: str):
"""Gets a list of tasks by application ID.
If the application ID does not exist, then this method returns False.
If the task ID does exist it will return a list of tasks
Args:
application_id (str): A application ID to retrieve tasks from.
Returns:
list: A list of tasks associated with an application.
"""
try:
return self._make_request("GET", f"/task/list/{application_id}")
except Exception as e:
return False
get_user(self, user_id)
Gets an user by a provided ID
If the user does not exist, then this method returns False.
If the user does exist it will return an user dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
A Swimlane user ID |
required |
Returns:
Type | Description |
---|---|
dict |
A user dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_user(self, user_id: str):
"""Gets an user by a provided ID
If the user does not exist, then this method returns False.
If the user does exist it will return an user dictionary object.
Args:
user_id (str): A Swimlane user ID
Returns:
dict: A user dictionary object.
"""
try:
return self._make_request("GET", f"/user/{user_id}")
except Exception as e:
return False
get_users(self)
Retrieves a list of users for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of user dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_users(self):
"""Retrieves a list of users for a Swimlane instance.
Returns:
list: A list of user dictionary objects.
"""
users = self._make_request("GET", "/user/light")
return users if users else []
get_workflow(self, application_id)
Gets an workflow by a provided application ID
If the workflow does not exist, then this method returns False.
If the workflow does exist it will return an Workflow
object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A Swimlane application ID |
required |
Returns:
Type | Description |
---|---|
Workflow |
A |
Source code in aqueduct/instance.py
@log_exception
def get_workflow(self, application_id: str):
"""Gets an workflow by a provided application ID
If the workflow does not exist, then this method returns False.
If the workflow does exist it will return an `Workflow` object
Args:
application_id (str): A Swimlane application ID
Returns:
Workflow: A `Workflow` data model object
"""
try:
return self._make_request("GET", f"/workflow/{application_id}")
except Exception as e:
return False
get_workflows(self)
Retrieves a list of workflows for a Swimlane instance.
These workflow are modeled after the Workflow
data model
Returns:
Type | Description |
---|---|
list |
A list of |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_workflows(self):
"""Retrieves a list of workflows for a Swimlane instance.
These workflow are modeled after the `Workflow` data model
Returns:
list: A list of `Workflow` objects
"""
return_list = []
workflows = self._make_request("GET", "/workflow/")
if workflows:
for workflow in workflows:
return_list.append(workflow)
return return_list
get_workspace(self, workspace_id)
Gets an workspace by a provided ID
If the workspace does not exist, then this method returns False.
If the workspace does exist it will return an workspace dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
str |
A Swimlane workspace ID |
required |
Returns:
Type | Description |
---|---|
dict |
A workspace dictionary object. |
Source code in aqueduct/instance.py
@log_exception
def get_workspace(self, workspace_id: str):
"""Gets an workspace by a provided ID
If the workspace does not exist, then this method returns False.
If the workspace does exist it will return an workspace dictionary object.
Args:
workspace_id (str): A Swimlane workspace ID
Returns:
dict: A workspace dictionary object.
"""
try:
return self._make_request("GET", f"/workspaces/{workspace_id}")
except Exception as e:
return False
get_workspaces(self)
Retrieves a list of workspaces for a Swimlane instance.
Returns:
Type | Description |
---|---|
list |
A list of workspace dictionary objects. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def get_workspaces(self):
"""Retrieves a list of workspaces for a Swimlane instance.
Returns:
list: A list of workspace dictionary objects.
"""
workspaces = self._make_request("GET", "/workspaces")
return workspaces if workspaces else []
install_package(self, package)
Installs a Python (pip) package based on the provided package dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
package |
dict |
A Swimlane package dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of a Python pip package |
Source code in aqueduct/instance.py
@log_exception
def install_package(self, package: dict):
"""Installs a Python (pip) package based on the provided package dictionary object.
Args:
package (dict): A Swimlane package dictionary object.
Returns:
dict: A dictionary of a Python pip package
"""
json = {
"name": package["name"],
"version": package["version"],
"pythonVersion": package["pythonVersion"],
}
return self._make_request("POST", "/pip/packages", json=json)
install_package_offline(self, filename, stream, data)
Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
A filename string |
required |
stream |
BytesIO |
A BytesIO file stream |
required |
data |
dict |
A dictionary of additional request information |
required |
Returns:
Type | Description |
---|---|
json |
JSON Response from installing a Python package wheel file |
Source code in aqueduct/instance.py
@log_exception
def install_package_offline(self, filename, stream, data):
"""Installs a Python package wheel file offline to a Swimlane instance given a filename and a BytesIO file stream
Args:
filename (str): A filename string
stream (BytesIO): A BytesIO file stream
data (dict): A dictionary of additional request information
Returns:
json: JSON Response from installing a Python package wheel file
"""
return self.swimlane.request(
"POST",
"/pip/packages/offline",
data=data,
files={"wheel": (filename, stream.read())},
timeout=120,
)
search_user(self, query_string)
Searches for a user by a query string
If the user does not exist, then this method returns False.
If the user does exist it will return an user dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query_string |
str |
A query string typically a display name |
required |
Returns:
Type | Description |
---|---|
dict |
A user dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def search_user(self, query_string: str):
"""Searches for a user by a query string
If the user does not exist, then this method returns False.
If the user does exist it will return an user dictionary object.
Args:
query_string (str): A query string typically a display name
Returns:
dict: A user dictionary object.
"""
try:
resp = self._make_request("GET", f"/user/lookup?name={query_string}")
if resp and isinstance(resp, list):
return resp[0]
except Exception as e:
return False
update_applet(self, applet)
Updates a provided applet with the data contained in the provided task dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
applet |
dict |
A Swimlane applet dictionary. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane applet dictionary. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_applet(self, applet: dict):
"""Updates a provided applet with the data contained in the provided task dictionary.
Args:
applet (dict): A Swimlane applet dictionary.
Returns:
dict: A Swimlane applet dictionary.
"""
return self._make_request("PUT", f"/applet/{applet['id']}", json=applet)
update_application(self, application)
Updates the application based on the provided dictionary object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A application dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane application object |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_application(self, application):
"""Updates the application based on the provided dictionary object
Args:
application (dict): A application dictionary object.
Returns:
dict: A Swimlane application object
"""
return self._make_request("PUT", "/app", json=application)
update_asset(self, asset_id, asset)
Updates a provided asset ID with the data contained in the provided asset dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset_id |
str |
An asset ID to update on a Swimlane instance |
required |
asset |
dict |
A Swimlane asset dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane asset dictionary. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_asset(self, asset_id: str, asset: dict):
"""Updates a provided asset ID with the data contained in the provided asset dictionary object.
Args:
asset_id (str): An asset ID to update on a Swimlane instance
asset (dict): A Swimlane asset dictionary object.
Returns:
dict: A Swimlane asset dictionary.
"""
return self._make_request("PUT", f"/asset/{asset_id}", json=asset)
update_dashboard(self, dashboard)
Updates a Swimlane instance dashboard based on the provided dashboard dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard |
dict |
A Swimlane dashboard dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane dashboard dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_dashboard(self, dashboard: dict):
"""Updates a Swimlane instance dashboard based on the provided dashboard dictionary.
Args:
dashboard (dict): A Swimlane dashboard dictionary object.
Returns:
dict: A Swimlane dashboard dictionary object.
"""
return self._make_request("PUT", f"/dashboard/{dashboard['id']}", json=dashboard)
update_default_report(self, report)
Updates a Swimlane applications default report with the provided report dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report |
dict |
A report dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A report dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_default_report(self, report: dict):
"""Updates a Swimlane applications default report with the provided report dictionary object.
Args:
report (dict): A report dictionary object.
Returns:
dict: A report dictionary object.
"""
return self._make_request("PUT", f"/reports/{report['id']}", json=report)
update_group(self, group_id, group)
Updates a provided group ID with the data contained in the provided group dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
str |
A group ID to update on a Swimlane instance |
required |
group |
dict |
A Swimlane group dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane group dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_group(self, group_id: str, group: dict):
"""Updates a provided group ID with the data contained in the provided group dictionary object.
Args:
group_id (str): A group ID to update on a Swimlane instance
group (dict): A Swimlane group dictionary object.
Returns:
dict: A Swimlane group dictionary object.
"""
return self._make_request("PUT", f"/groups/{group_id}", json=group)
update_report(self, report_id, report)
Updates a provided report ID with the data contained in the provided report dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report_id |
str |
A report ID to update on a Swimlane instance |
required |
report |
dict |
A report dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A report dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_report(self, report_id: str, report: dict):
"""Updates a provided report ID with the data contained in the provided report dictionary object.
Args:
report_id (str): A report ID to update on a Swimlane instance
report (dict): A report dictionary object.
Returns:
dict: A report dictionary object.
"""
resp = self._make_request("PUT", f"/reports/{report_id}", json=report)
if resp:
return True
update_role(self, role_id, role)
Updates a provided role ID with the data contained in the provided role dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_id |
str |
A role ID to update on a Swimlane instance |
required |
role |
dict |
A role dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A role dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_role(self, role_id: str, role: dict):
"""Updates a provided role ID with the data contained in the provided role dictionary object.
Args:
role_id (str): A role ID to update on a Swimlane instance
role (dict): A role dictionary object.
Returns:
dict: A role dictionary object.
"""
return self._make_request("PUT", f"/roles/{role_id}", json=role)
update_task(self, task_id, task)
Updates a provided task ID with the data contained in the provided task dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_id |
str |
A task ID to update on a Swimlane instance |
required |
task |
dict |
A Swimlane task dictionary. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane task dictionary. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_task(self, task_id: str, task: dict):
"""Updates a provided task ID with the data contained in the provided task dictionary.
Args:
task_id (str): A task ID to update on a Swimlane instance
task (dict): A Swimlane task dictionary.
Returns:
dict: A Swimlane task dictionary.
"""
return self._make_request("PUT", f"/task/{task_id}", json=task)
update_user(self, user_id, user)
Updates a provided user ID with the data contained in the provided user dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
A user ID to update on a Swimlane instance |
required |
user |
dict |
A user dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A user dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_user(self, user_id: str, user: dict):
"""Updates a provided user ID with the data contained in the provided user dictionary object.
Args:
user_id (str): A user ID to update on a Swimlane instance
user (dict): A user dictionary object.
Returns:
dict: A user dictionary object.
"""
return self._make_request("PUT", f"/user/{user_id}", json=user)
update_workflow(self, workflow)
Updates a Swimlane instance with the provided Workflow
data model object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
A |
required |
Returns:
Type | Description |
---|---|
Workflow |
A |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_workflow(self, workflow):
"""Updates a Swimlane instance with the provided `Workflow` data model object
Args:
workflow (Workflow): A `Workflow` data model object
Returns:
Workflow: A `Workflow` data model object
"""
return self._make_request("PUT", f"/workflow/{workflow['id']}", json=workflow)
update_workspace(self, workspace_id, workspace)
Updates a provided workspace ID with the data contained in the provided workspace dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
str |
A workspace ID to update on a Swimlane instance |
required |
workspace |
dict |
A workspace dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A workspace dictionary object. |
Source code in aqueduct/instance.py
@dump_content
@log_exception
def update_workspace(self, workspace_id: str, workspace: dict):
"""Updates a provided workspace ID with the data contained in the provided workspace dictionary object.
Args:
workspace_id (str): A workspace ID to update on a Swimlane instance
workspace (dict): A workspace dictionary object.
Returns:
dict: A workspace dictionary object.
"""
return self._make_request("PUT", f"/workspaces/{workspace_id}", json=workspace)
upgrade_plugin(self, filename, stream)
Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
A filename string |
required |
stream |
BytesIO |
A BytesIO file stream |
required |
Returns:
Type | Description |
---|---|
json |
JSON Response from uploading a plugin |
Source code in aqueduct/instance.py
@log_exception
def upgrade_plugin(self, filename, stream):
"""Uploads a plugin to be upgraded on a Swimlane instance given a filename and a BytesIO file stream
Args:
filename (str): A filename string
stream (BytesIO): A BytesIO file stream
Returns:
json: JSON Response from uploading a plugin
"""
if not filename.endswith(".swimbundle"):
filename = filename.split(".")[0] + ".swimbundle"
return self.swimlane.request("POST", "/task/packages/upgrade", files={"file": (filename, stream.read())}).json()
upload_plugin(self, filename, stream)
Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
A filename string |
required |
stream |
BytesIO |
A BytesIO file stream |
required |
Returns:
Type | Description |
---|---|
json |
JSON Response from uploading a plugin |
Source code in aqueduct/instance.py
@log_exception
def upload_plugin(self, filename, stream):
"""Uploads a plugin to a Swimlane instance given a filename and a BytesIO file stream
Args:
filename (str): A filename string
stream (BytesIO): A BytesIO file stream
Returns:
json: JSON Response from uploading a plugin
"""
if not filename.endswith(".swimbundle"):
filename = filename.split(".")[0] + ".swimbundle"
return self.swimlane.request("POST", "/task/packages", files={"file": (filename, stream.read())}).json()
ComponentBase
A base class for all components.
This class is used to ensure that all derived classes have a sync method as well as shared methods.
_get_field_by_type(self, field_list, field_type='tracking')
private
Returns a field dictionary by its defined type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field_list |
list |
A list of application fields. |
required |
field_type |
str |
The fieldType value of the field. Defaults to "tracking". |
'tracking' |
Returns:
Type | Description |
---|---|
dict |
dict: A swimlane application field dictionary. |
Source code in aqueduct/components/base.py
def _get_field_by_type(self, field_list: list, field_type: str = "tracking") -> dict:
"""Returns a field dictionary by its defined type.
Args:
field_list (list): A list of application fields.
field_type (str, optional): The fieldType value of the field. Defaults to "tracking".
Returns:
dict: A swimlane application field dictionary.
"""
for field in field_list:
if field.get("fieldType") and field["fieldType"] == field_type:
return field
_replace(self, component, search_value, replace_value)
private
Recursively replaces any key or values matching a provided value with another one.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
dict |
The given component dictionary object. |
required |
search_value |
str |
The value to search for. |
required |
replace_value |
str |
The value to replace any matches found with. |
required |
Returns:
Type | Description |
---|---|
Dict[str, str] |
dict: Returns an updated dictionary with the replaced values. |
Source code in aqueduct/components/base.py
def _replace(self, component: dict, search_value: str, replace_value: str) -> Dict[str, str]:
"""Recursively replaces any key or values matching a provided value with another one.
Args:
component (dict): The given component dictionary object.
search_value (str): The value to search for.
replace_value (str): The value to replace any matches found with.
Returns:
dict: Returns an updated dictionary with the replaced values.
"""
if isinstance(component, dict):
new_dict = {}
for k, v in component.items():
key = k
if k == search_value:
key = replace_value
new_dict[key] = self._replace(v, search_value, replace_value)
return new_dict
elif isinstance(component, list):
new_list = []
for v in component:
new_list.append(self._replace(v, search_value, replace_value))
return new_list
else:
if component == search_value:
return replace_value
else:
return component
_set_unneeded_keys_to_empty_dict(self, component, keys=['createdByUser', 'modifiedByUser', 'permissions'])
private
A component object to remove defined keys from.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
dict or attrs |
A Swimlane component object to clean. |
required |
keys |
list |
A list of keys to set as empty dictionaries. |
['createdByUser', 'modifiedByUser', 'permissions'] |
Returns:
Type | Description |
---|---|
dict or attrs |
Returns an updated component with the values set as empty dictionaries. |
Source code in aqueduct/components/base.py
def _set_unneeded_keys_to_empty_dict(self, component, keys=["createdByUser", "modifiedByUser", "permissions"]):
"""A component object to remove defined keys from.
Args:
component (dict or attrs): A Swimlane component object to clean.
keys (list): A list of keys to set as empty dictionaries.
Returns:
dict or attrs: Returns an updated component with the values set as empty dictionaries.
"""
for key in keys:
if isinstance(component, dict):
if component.get(key):
component[key] = {}
return component
is_masked(self, value)
Returns true or false if the provided value is masked.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
A value to check if it is masked with ***. |
required |
Returns:
Type | Description |
---|---|
bool |
bool: Returns True if the value is masked. False if not. |
Source code in aqueduct/components/base.py
def is_masked(self, value: Any) -> bool:
"""Returns true or false if the provided value is masked.
Args:
value (Any): A value to check if it is masked with ***.
Returns:
bool: Returns True if the value is masked. False if not.
"""
if isinstance(value, str):
return self.MASKED_VALUE.match(value)
return False
sync(self)
Every component must have a defined sync method.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Raises when a component does not have a sync method defined. |
Source code in aqueduct/components/base.py
@abstractmethod
def sync(self):
"""Every component must have a defined sync method.
Raises:
NotImplementedError: Raises when a component does not have a sync method defined.
"""
raise NotImplementedError("The class does not have a sync method.")
Applications
Used to sync applications from a source instance to a destination instance of Swimlane.
_add_fields(self, source, destination)
private
Appends fields to a destination application.
This method will append fields to a destination application which are in the source application but missing from the destination application.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
dict |
A Swimlane source instance application. |
required |
destination |
dict |
A Swimlane destination instance application. |
required |
Returns:
Type | Description |
---|---|
dict |
A Swimlane destination application. |
Source code in aqueduct/components/applications.py
def _add_fields(self, source: dict, destination: dict):
"""Appends fields to a destination application.
This method will append fields to a destination application which
are in the source application but missing from the destination
application.
Args:
source (dict): A Swimlane source instance application.
destination (dict): A Swimlane destination instance application.
Returns:
dict: A Swimlane destination application.
"""
self.log(f"Checking application '{source['name']}' application for missing fields.")
if ComponentBase.mirror_app_fields_on_destination:
self.log(val=f"Force updating application '{source['name']}' fields on destination instance from source.")
dest_tracking_id_field = self._remove_tracking_field(application=destination)
self._remove_tracking_field(application=source)
destination["fields"] = source["fields"]
destination["fields"].append(dest_tracking_id_field)
self.log(val=f"Force update of application '{source['name']}' fields successful.")
else:
for sfield in source["fields"]:
if sfield.get("fieldType") and sfield["fieldType"].lower() == "tracking":
continue
if not self._is_field_in_fields(application_fields=destination["fields"], field=sfield):
self.log(f"Field '{sfield['name']}' not in destination application.")
destination["fields"].append(sfield)
self.log(f"Successfully added '{sfield['name']}' to destination application")
if ComponentBase.dry_run:
self.add_to_diff_log(source["name"], "added", subcomponent="field", value=sfield["name"])
return destination
_is_field_in_fields(self, application_fields, field)
private
Checks to see if a field matches in a list of application fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_fields |
list |
A Swimlane instance application fields list. |
required |
field |
dict |
A Swimlane instance application field. |
required |
Returns:
Type | Description |
---|---|
bool |
bool: Whether or not the field is in the provided list of fields. |
Source code in aqueduct/components/applications.py
def _is_field_in_fields(self, application_fields: list, field: dict) -> bool:
"""Checks to see if a field matches in a list of application fields.
Args:
application_fields (list): A Swimlane instance application fields list.
field (dict): A Swimlane instance application field.
Returns:
bool: Whether or not the field is in the provided list of fields.
"""
for app_field in application_fields:
if field.get("id") and app_field.get("id") and app_field["id"] == field["id"]:
return True
return False
_process_workspaces(self, application)
private
Ensures that Swimlane workspaces exist before processing applications.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A Swimlane instance application. |
required |
Exceptions:
Type | Description |
---|---|
GetComponentError |
Raised when an applications Workspace does not exist on the source instance. |
Source code in aqueduct/components/applications.py
def _process_workspaces(self, application: dict) -> None:
"""Ensures that Swimlane workspaces exist before processing applications.
Args:
application (dict): A Swimlane instance application.
Raises:
GetComponentError: Raised when an applications Workspace does not exist on the source instance.
"""
if application.get("workspaces") and application["workspaces"]:
for workspace in application["workspaces"]:
workspace_ = self.source_instance.get_workspace(workspace_id=workspace)
if not workspace_:
raise GetComponentError(name="workspace", id=workspace)
if workspace_ and not self.destination_instance.get_workspace(workspace_id=workspace):
Workspaces().sync_workspace(workspace=workspace_)
_remove_tracking_field(self, application)
private
Removes the tracking-id field from an application.
This method removes an applications tracking field since Swimlane automatically generates this for every new application.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application |
dict |
A Swimlane application to remove the tracking field from |
required |
Source code in aqueduct/components/applications.py
def _remove_tracking_field(self, application: dict):
"""Removes the tracking-id field from an application.
This method removes an applications tracking field since
Swimlane automatically generates this for every new application.
Args:
application (dict): A Swimlane application to remove the tracking field from
"""
return_dict = {}
for field in application.get("fields"):
if field.get("fieldType") and field["fieldType"].lower() == "tracking":
if not ComponentBase.dry_run:
self.log(f"Removing tracking-id field from application '{application['name']}'.")
application["fields"].remove(field)
return_dict = field
else:
self.add_to_diff_log(application["name"], "removed", subcomponent="tracking-field")
return return_dict
get_reference_app_order(self)
Creates a ordered dictionary based on most to least referenced applications.
This method creates an order of applications to be added or updated on a destination instance based on most to least reference relationships. For example, a source application that references 5 applications will be before an application which has 3 references to applications.
Returns:
Type | Description |
---|---|
dict |
An reference application ordered (sorted) application dictionary |
Source code in aqueduct/components/applications.py
def get_reference_app_order(self):
"""Creates a ordered dictionary based on most to least referenced applications.
This method creates an order of applications to be added or updated on a destination
instance based on most to least reference relationships. For example, a source application
that references 5 applications will be before an application which has 3 references to applications.
Returns:
dict: An reference application ordered (sorted) application dictionary
"""
reference_dict = {}
for application in self.source_instance.get_applications():
if application:
application_ = self.source_instance.get_application(application_id=application["id"])
if application_["id"] not in reference_dict:
reference_dict[application_["id"]] = []
for field in application_["fields"]:
if field.get("$type") and field["$type"] == "Core.Models.Fields.Reference.ReferenceField, Core":
reference_dict[application_["id"]].append(field["targetId"])
return_dict = OrderedDict()
for item in sorted(reference_dict, key=lambda k: len(reference_dict[k]), reverse=True):
return_dict[item] = reference_dict[item]
return return_dict
sync(self)
This method will sync all applications on a source instance with a destination instance.
Source code in aqueduct/components/applications.py
def sync(self):
"""This method will sync all applications on a source instance with a destination instance."""
self.log(f"Starting to sync 'Applications' from '{self.source_host}' to '{self.dest_host}'")
for application_id, values in self.get_reference_app_order().items():
self.sync_application(application_id=application_id)
# Checking if we need to update applications reference fields that contain old (source instance)
# tracking-ids in their columns. If so we remove them and update it with the new ones on the destination.
if self.__applications_needing_updates:
for application_id in self.__applications_needing_updates:
dest_application = self.destination_instance.get_application(application_id)
needs_update = False
for field in dest_application["fields"]:
if field.get("columns"):
column_list = []
for column in field["columns"]:
if self.tracking_id_map.get(column):
self.log(f"Removing old tracking-id '{column}' from application field '{field['key']}'")
column_list.append(self.tracking_id_map[column])
self.log(
f"Adding new tracking-id '{self.tracking_id_map[column]}' to application field "
f"'{field['key']}'"
)
needs_update = True
else:
column_list.append(column)
field["columns"] = column_list
if needs_update:
if not ComponentBase.dry_run:
self.log(f"Updating application '{dest_application['name']}' on destination with new.")
dest_application = self.destination_instance.update_application(dest_application)
self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
else:
self.add_to_diff_log(dest_application["name"], "updated")
sync_application(self, application_id)
This method syncs a single application from a source instance to a destination instance.
Once an application_id on a source instance is provided we retrieve this application JSON. Next we remove the tracking_field from the application since Swimlane automatically generates a unique value for this field.
If workspaces are defined in the application and do not currently exist will attempt to create or update them using the Workspaces class.
If the source application does not exist on the destination, we will create it with the same IDs for 1. application 2. fields 3. layout 4. etc.
By doing this, syncing of applications (and their IDs) is much easier.
If the application exists, we proceed to remove specific fields that are not needed.
Next, we check for fields which have been added to the source application but do not exist on the destination instance. If fields are found we add them to the destination object.
Next, we check to see if the destination has fields which are not defined in the source instance. If fields are found, we then remove them from the layout view of the source application. This equates to moving them to the "hidden" field section within the application builder so they can still be retrieved and reorganized as needed.
Finally, we update the application on the destination instance.
After updating the application we then check to ensure that the workflow of that application is up to date and accurate.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_id |
str |
A source application ID. |
required |
Source code in aqueduct/components/applications.py
def sync_application(self, application_id: str):
"""This method syncs a single application from a source instance to a destination instance.
Once an application_id on a source instance is provided we retrieve this application JSON.
Next we remove the tracking_field from the application since Swimlane automatically generates
a unique value for this field.
If workspaces are defined in the application and do not currently exist will attempt to create
or update them using the Workspaces class.
If the source application does not exist on the destination, we will create it with the same IDs for
1. application
2. fields
3. layout
4. etc.
By doing this, syncing of applications (and their IDs) is much easier.
If the application exists, we proceed to remove specific fields that are not needed.
Next, we check for fields which have been added to the source application but do not
exist on the destination instance. If fields are found we add them to the destination
object.
Next, we check to see if the destination has fields which are not defined in the source instance.
If fields are found, we then remove them from the layout view of the source application. This equates
to moving them to the "hidden" field section within the application builder so they can still be retrieved
and reorganized as needed.
Finally, we update the application on the destination instance.
After updating the application we then check to ensure that the workflow of that application is up to date
and accurate.
Args:
application_id (str): A source application ID.
"""
application = self.source_instance.get_application(application_id=application_id)
if not application:
raise GetComponentError(name="application", id=application_id)
self.log(f"Processing source instance application '{application['name']}'.")
if not self._is_in_include_exclude_lists(name=application["name"], type="applications"):
self._process_workspaces(application=application)
source_tracking_field = self._remove_tracking_field(application)
# scrubbing application dict to remove $type keys recursively.
self.scrub(application)
dest_application = self.destination_instance.get_application(application["id"])
if not dest_application:
if not ComponentBase.dry_run:
self.log(f"Adding application '{application['name']}' on destination.")
dest_application = self.destination_instance.add_application(application)
self.__applications_needing_updates.append(application_id)
self.log(f"Successfully added application '{application['name']}' on destination.")
# Setting tracking_id_map just in case tasks are added and need updating
# Tasks would need updating if they use an applications 'Tracking Id' field as an
# input or output since Swimlane randomizes the Tracking ID when created
self.tracking_id_map = (source_tracking_field["id"], dest_application["trackingFieldId"])
else:
self.add_to_diff_log(application["name"], "added")
else:
# removing these keys since they are not needed and only cause issues.
dest_application = self._set_unneeded_keys_to_empty_dict(component=dest_application)
dest_application = self._add_fields(source=application, destination=dest_application)
if not ComponentBase.dry_run:
self.log(f"Updating application '{dest_application['name']}' on destination.")
# Here we are setting the destination applications layout key to the source applications layout key
# we consider the source application layout to be the source of truth
dest_application["layout"] = application["layout"]
self.destination_instance.update_application(dest_application)
self.__applications_needing_updates.append(application_id)
self.tracking_id_map = (source_tracking_field["id"], dest_application["trackingFieldId"])
self.log(f"Successfully updated application '{dest_application['name']}' on destination.")
else:
self.add_to_diff_log(application["name"], "updated")
self.log(f"Checking for changes in workflow for application '{application['name']}'")
# we are providing the application as well as the name since there is a bug in Swimlane 10.5.2 with the
# workflows endpoint so we need to look it up by application Id instead.
Workflows().sync_workflow(application_name=application["name"], application_id=application_id)
Assets
Used to sync assets from a source instance to a destination instance of Swimlane.
destination_assets
property
readonly
A list of destination instance assets.
Returns:
Type | Description |
---|---|
list |
A list of destination asset names. |
_check_for_homework_assignment(self, asset)
private
Checks the provided asset dictionary for parameters containing secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset |
dict |
A swimlane asset dictionary object. |
required |
Source code in aqueduct/components/assets.py
def _check_for_homework_assignment(self, asset: dict) -> None:
"""Checks the provided asset dictionary for parameters containing secrets.
Args:
asset (dict): A swimlane asset dictionary object.
"""
parameter_list = []
if asset.get("parameters"):
for key, val in asset["parameters"].items():
if key != "$type":
if self.is_masked(val):
parameter_list.append(key)
if parameter_list:
self.add_to_homework_list(
component_name="assets",
value=f"You must manually enter your secrets in asset '{asset['name']}' for the following parameters: "
f"'{','.join([x for x in parameter_list])}'.",
)
sync(self)
This method is used to sync (create) all assets from a source instance to a destination instance
Source code in aqueduct/components/assets.py
def sync(self):
"""This method is used to sync (create) all assets from a source instance to a destination instance"""
self.log(f"Attempting to sync assets from '{self.source_host}' to '{self.dest_host}'.")
for asset in self.source_instance.get_assets():
self.sync_asset(asset=asset)
self.log(f"Completed syncing of assets from '{self.source_host}' to '{self.dest_host}'.")
sync_asset(self, asset)
This method will create (add) a single asset from a source instance to a destination instance.
Currently we are only adding assets and NOT updating them but this functionality may expand in the future.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
asset |
dict |
A single Swimlane asset dictionary from the Swimlane API |
required |
Source code in aqueduct/components/assets.py
def sync_asset(self, asset: dict) -> None:
"""This method will create (add) a single asset from a source instance to a destination instance.
Currently we are only adding assets and NOT updating them but this functionality may expand in the future.
Args:
asset (dict): A single Swimlane asset dictionary from the Swimlane API
"""
if not self._is_in_include_exclude_lists(asset["name"], "assets"):
self.log(f"Processing asset '{asset['name']}'.")
if asset["name"] not in self.destination_assets:
self.log(f"Asset '{asset['name']}' was not found on destination.")
if not ComponentBase.dry_run:
dest_asset = self.destination_instance.add_asset(asset)
if not dest_asset:
raise AddComponentError(model=asset, name=asset["name"])
self.log(f"Asset '{asset['name']}' was successfully added to destination.")
else:
self.add_to_diff_log(asset["name"], "added")
self._check_for_homework_assignment(asset=asset)
else:
self.log(f"Asset '{asset['name']}' already exists on destination. Skipping")
Dashboards
Used to sync dashboards from a source instance to a destination instance of Swimlane.
_get_destination_dashboard(self, source_dashboard_uid)
private
Returns a matching destination instance dashboard if it exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_dashboard_uid |
str |
The source instance dashboard UID. |
required |
Source code in aqueduct/components/dashboards.py
def _get_destination_dashboard(self, source_dashboard_uid: str):
"""Returns a matching destination instance dashboard if it exists.
Args:
source_dashboard_uid (str): The source instance dashboard UID.
"""
dest_dashboards = self.destination_instance.get_dashboards()
if dest_dashboards:
for d in dest_dashboards:
if d["uid"] == source_dashboard_uid:
return d
return None
_process_reports(self, dashboard)
private
Ensures that any referenced reports actually exist on the source instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard |
dict |
A Swimlane dashboard dictionary object. |
required |
Exceptions:
Type | Description |
---|---|
GetComponentError |
Raises when a defined report does not exist on a source Swimlane instance. |
Source code in aqueduct/components/dashboards.py
def _process_reports(self, dashboard: dict):
"""Ensures that any referenced reports actually exist on the source instance.
Args:
dashboard (dict): A Swimlane dashboard dictionary object.
Raises:
GetComponentError: Raises when a defined report does not exist on a source Swimlane instance.
"""
self.log(f"Processing dashboard '{dashboard['name']}' reports.")
for item in dashboard["items"]:
if item.get("reportId"):
self.log(
f"Dashboard '{dashboard['name']}' contains a report ({item['reportId']}). "
"Checking to make sure it exists."
)
report = self.source_instance.get_report(report_id=item["reportId"])
# TODO: Add main parameter to skip if the report does not exist on the source instance.
# Sometimes a card on a dashboard exists on the dashboard but the associated report does not.
if not report:
self.add_to_homework_list(
component_name="dashboards",
value=f"The dashboard '{dashboard['name']}' contains a card pointing to a report "
f"({item['reportId']}) that does not exist on the source instance. "
"It is recommended to remove this dashboard card from your source instance.",
)
self.log(
f"The dashboard '{dashboard['name']}' contains a report card that does not exist on the "
f"source instance. Look for report id '{item['reportId']}' and remove it from the dashboard."
)
else:
# Attempting to sync the report
Reports().sync_report(report=report)
else:
self.log(
val=f"Dashboard '{dashboard['name']}' includes a report card that is a card type of "
f"'{item.get('cardType')}'."
)
sync(self)
This method is used to sync all dashboards from a source instance to a destination instance
Source code in aqueduct/components/dashboards.py
def sync(self):
"""This method is used to sync all dashboards from a source instance to a destination instance"""
self.log(f"Attempting to sync dashboards from '{self.source_host}' to '{self.dest_host}'")
dashboards = self.source_instance.get_dashboards()
if dashboards:
for dashboard in dashboards:
self.sync_dashboard(dashboard=dashboard)
self.log(f"Completed syncing of dashboards from '{self.source_host}' to '{self.dest_host}'.")
sync_dashboard(self, dashboard)
This method syncs a single dashboard from a source instance to a destination instance.
This class first checks to see if the provided dashboard already exists on the destination instance. If it does not exist then we attempt to add the dashboard to the destination instance.
If the dashboard already exists on the destination instance, we first check it against all destination
instance dashboards. This check involves comparing the provided source dashboard dict with
the uid
and name
of a destination instance dashboard.
If a match is found, we then check if the version is the same. If it is we simply skip processing this dashboard.
If a match is found but the versions are different, we first ensure that all the reports in the dashboard are on the destination instance. Once that is complete, we modify the dashboard to remove unneeded keys and then update it as provided by the source instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard |
dict |
A source instance dashboard dictionary. |
required |
Source code in aqueduct/components/dashboards.py
def sync_dashboard(self, dashboard: dict):
"""This method syncs a single dashboard from a source instance to a destination instance.
This class first checks to see if the provided dashboard already exists on the destination instance.
If it does not exist then we attempt to add the dashboard to the destination instance.
If the dashboard already exists on the destination instance, we first check it against all destination
instance dashboards. This check involves comparing the provided source dashboard dict with
the `uid` and `name` of a destination instance dashboard.
If a match is found, we then check if the version is the same.
If it is we simply skip processing this dashboard.
If a match is found but the versions are different, we first ensure that all the reports in the dashboard are on
the destination instance. Once that is complete, we modify the dashboard to remove unneeded keys and then update
it as provided by the source instance.
Args:
dashboard (dict): A source instance dashboard dictionary.
"""
self.log(f"Processing dashboard '{dashboard['name']}' ({dashboard['id']})")
self.scrub(dashboard)
if not self._is_in_include_exclude_lists(dashboard["name"], "dashboards"):
# making sure that all reports exist on the source instance.
self._process_reports(dashboard=dashboard)
dest_dashboard = self._get_destination_dashboard(source_dashboard_uid=dashboard["uid"])
# if no dest_dashboard then we need to create it on the destination instance.
if not dest_dashboard:
if not ComponentBase.dry_run:
self.log(
f"Adding dashboard '{dashboard['name']}' for workspaces "
f"'{dashboard.get('workspaces')}' on destination"
)
# These keys need to be in our object but they need to be empty.
dashboard = self._set_unneeded_keys_to_empty_dict(component=dashboard)
dest_dashboard = self.destination_instance.add_dashboard(dashboard)
if not dest_dashboard:
raise AddComponentError(model=dashboard, name=dashboard["name"])
self.log(f"Successfully added dashboard '{dashboard['name']}' to destination.")
else:
self.add_to_diff_log(dashboard["name"], "added")
else:
if self.update_dashboards:
self.log(
f"Dashboard '{dashboard['name']}' for workspaces '{dashboard.get('workspaces')}' was found."
" Checking differences..."
)
self.scrub(dest_dashboard)
if Differ(source=dashboard, destination=dest_dashboard).check():
if not ComponentBase.dry_run:
self.log(f"Updating '{dashboard['name']}' now.")
dest_dashboard = self.destination_instance.update_dashboard(dashboard)
if not dest_dashboard:
raise UpdateComponentError(model=dashboard, name=dashboard["name"])
self.log(f"Successfully updated dashboard '{dashboard['name']}'.")
else:
self.add_to_diff_log(dashboard["name"], "updated")
else:
self.log(f"Dashboard '{dashboard['name']}' is the same on source and destination. Skipping...")
else:
self.log(
f"Skipping check of dashboard '{dashboard['name']}' for changes since "
"update_dashboards is False."
)
else:
self.log(f"Skipping dashboard '{dashboard['name']}' since it is excluded.")
Groups
Used to sync groups from a source instance to a destination instance of Swimlane.
_get_destination_group(self, group)
private
Attempts to retrieve a destination instance group.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
dict |
The source Swimlane instance group dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict or bool |
Returns a group dictionary object if found if not then a boolean. |
Source code in aqueduct/components/groups.py
def _get_destination_group(self, group: dict):
"""Attempts to retrieve a destination instance group.
Args:
group (dict): The source Swimlane instance group dictionary object.
Returns:
dict or bool: Returns a group dictionary object if found if not then a boolean.
"""
try:
return self.destination_instance.get_group_by_id(group["id"])
except JSONDecodeError as jd:
self.log(f"Unable to find group '{group['name']}' by id. Trying by name.")
try:
return self.destination_instance.get_group_by_name(group["name"])
except JSONDecodeError as jd:
self.log(f"Unable to find group '{group['name']}' by name. Assuming new group.")
_process_group(self, group)
private
Processes a source Swimlane instance group objects related objects.
This method processes any users or roles defined associated with the provided group dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
dict |
A source Swimlane instance group dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
The source Swimlane instance group dictionary object with possible updates. |
Source code in aqueduct/components/groups.py
def _process_group(self, group: dict):
"""Processes a source Swimlane instance group objects related objects.
This method processes any users or roles defined associated with the provided group dictionary object.
Args:
group (dict): A source Swimlane instance group dictionary object.
Returns:
dict: The source Swimlane instance group dictionary object with possible updates.
"""
if group.get("users") and group["users"]:
self.log(f"Processing user association on destination with group '{group['name']}'.")
user_list = []
from .users import Users
for user in group["users"]:
_user = Users().sync_user(user_id=user["id"])
if _user:
user_list.append(_user)
group["users"] = user_list
if group.get("roles") and group["roles"]:
self.log(f"Processing role association on destination with group '{group['name']}'.")
role_list = []
from .roles import Roles
for role in group["roles"]:
_role = Roles().sync_role(role=role)
if _role:
role_list.append(_role)
group["roles"] = role_list
return group
sync(self)
This method is used to sync all groups from a source instance to a destination instance.
Source code in aqueduct/components/groups.py
def sync(self):
"""This method is used to sync all groups from a source instance to a destination instance."""
self.log(f"Attempting to sync groups from '{self.source_host}' to '{self.dest_host}'")
groups = self.source_instance.get_groups()
if groups:
for group in groups:
self.sync_group(group=group)
self.log(f"Completed syncing of groups from '{self.source_host}' to '{self.dest_host}'.")
sync_group(self, group)
This class syncs a single source instance group to a destination instance.
We begin by processing the provided group and ensuring that all roles and users associated with the provided group are added to the destination instance.
Once that is complete, we then sync any nested groups within the provided source instance group.
If the provided group is already on the destination instance, then we just skip processing but if the provided group is not on the destination instance we add it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
dict |
A source instance group dictionary object. |
required |
Source code in aqueduct/components/groups.py
def sync_group(self, group: dict):
"""This class syncs a single source instance group to a destination instance.
We begin by processing the provided group and ensuring that all roles and users
associated with the provided group are added to the destination instance.
Once that is complete, we then sync any nested groups within the provided source instance group.
If the provided group is already on the destination instance, then we just skip processing but if
the provided group is not on the destination instance we add it.
Args:
group (dict): A source instance group dictionary object.
"""
if (
not self._is_in_include_exclude_lists(group["name"], "groups")
and group["name"] not in self.group_exclusions
):
self.log(f"Processing group '{group['name']}' ({group['id']})")
group = self._process_group(group=group)
# since groups can have nested groups we are iterating through them as well and processing.
if group.get("groups") and group["groups"]:
group_list = []
for group_ in group["groups"]:
group_list.append(self._process_group(group=group_))
group["groups"] = group_list
# after we are done processing we now attempt to get a destination instance group.
dest_group = self._get_destination_group(group=group)
if not dest_group:
if not ComponentBase.dry_run:
self.log(f"Creating new group '{group['name']}' on destination.")
dest_group = self.destination_instance.add_group(group)
if not dest_group:
raise AddComponentError(model=group, name=group["name"])
self.log(f"Successfully added new group '{group['name']}' to destination.")
else:
self.add_to_diff_log(group["name"], "added")
else:
self.log(f"Group '{group['name']}' already exists on destination.")
else:
self.log(f"Skipping group '{group['name']}' since it is excluded.")
KeyStore
Used to sync keystore items from a source instance to a destination instance of Swimlane.
sync(self)
Syncing of the keystore will only create new entries into the destination systems keystore.
The creation of these items does not transfer or implement the transfer of the value since these are encrypted within Swimlane themselves.
Once items are created in the keystore on the destination system then you must manually enter the value for that keystore item.
Source code in aqueduct/components/keystore.py
def sync(self):
"""Syncing of the keystore will only create new entries into the destination systems keystore.
The creation of these items does not transfer or implement the transfer of the value since
these are encrypted within Swimlane themselves.
Once items are created in the keystore on the destination system then you must manually enter
the value for that keystore item.
"""
self.log(f"Attempting to sync keystore from '{self.source_host}' to '{self.dest_host}'")
credentials = self.source_instance.get_credentials()
if credentials:
credentials.pop("$type")
for key, val in credentials.items():
if not self._is_in_include_exclude_lists(key, "keystore"):
self.log(f"Processing '{key}' credential")
if not self.destination_instance.get_credential(key):
credential_ = self.source_instance.get_credential(key)
if not ComponentBase.dry_run:
self.log(f"Adding Keystore item '{key}' to destination.")
self.destination_instance.add_credential(credential_)
self.log(f"Successfully added new keystore item '{key}'")
self.add_to_homework_list(
component_name="keystore",
value=f"You must manually enter the value for the keystore item '{key}'.",
)
else:
self.add_to_diff_log(key, "added")
else:
self.log(f"Keystore item '{key}' exists on destination. Skipping....")
else:
self.log(f"Skipping keystore item '{key}' since it is not included.")
else:
self.log("Unable to find any keystore values on the source Swimlane instance. Skipping....")
self.log(f"Completed syncing of keystore from '{self.source_host}' to '{self.dest_host}'.")
Packages
Syncs Python packages from a source instance to destination.
This class can transfer Python package wheels directly using the offline switch when creating an Aqueduct object.
_install_wheels(self, package)
private
Used to install wheels directly from one Swimlane instance to another.
This is considered an offline installation. To use this method please provide
offline=True
when instantiating the Aqueduct class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
package |
dict |
A Swimlane package dictionary object. |
required |
Source code in aqueduct/components/packages.py
def _install_wheels(self, package: dict):
"""Used to install wheels directly from one Swimlane instance to another.
This is considered an offline installation. To use this method please provide
`offline=True` when instantiating the Aqueduct class.
Args:
package (dict): A Swimlane package dictionary object.
"""
if package.get("fileId") and package["fileId"]:
self.log(f"Attempting to transfer wheels for package '{package['name']}'")
filename = f"{package['name']}-{package['version'].capitalize()}-py3-none-any.whl"
self.log(f"Downloading package '{package['name']}' version '{package['version']}' from source.")
stream = self.source_instance.download_plugin(file_id=package["fileId"])
self.log(f"Successfully downloaded '{package['name']}' from source.")
self.log(f"Uploading package '{package['name']}' to destination.")
try:
data = {"pythonVersion": package["pythonVersion"].capitalize()}
self.destination_instance.install_package_offline(filename=filename, stream=stream, data=data)
self.log(f"Successfully uploaded package '{package['name']}' to destination.")
except Exception as e:
self.log(
f"Error occurred when trying to upload wheels for package '{package['name']}' to destination."
" \t\t<-- Please install manually!!"
)
self.add_to_homework_list(
component_name="packages",
value=f"You must manually install '{package['name']}' version '{package['version']}' "
"on the destination instance.",
)
else:
self.log(
f"Unable to transfer wheel package '{package['name']}' to destination. \t\t<--- Must install manually!!"
)
sync(self)
Sync will sync all installed Python packages on a source system with a destination system.
If you specified the offline
switch as True
then it will transfer the packages directly and
install them manually instead of relying on Swimlane to install them from pypi.
Source code in aqueduct/components/packages.py
def sync(self):
"""Sync will sync all installed Python packages on a source system with a destination system.
If you specified the `offline` switch as `True` then it will transfer the packages directly and
install them manually instead of relying on Swimlane to install them from pypi.
"""
self.log(f"Attempting to sync packages from '{self.source_host}' to '{self.dest_host}'")
packages = self.source_instance.get_pip_packages()
if packages:
for package in packages:
self.sync_package(package=package)
self.log(f"Completed syncing of packages from '{self.source_host}' to '{self.dest_host}'.")
sync_package(self, package)
Syncs a single Python package object based on the Swimlane exported dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
package |
dict |
A Swimlane Python package dictionary object. |
required |
Source code in aqueduct/components/packages.py
def sync_package(self, package: dict):
"""Syncs a single Python package object based on the Swimlane exported dictionary
Args:
package (dict): A Swimlane Python package dictionary object.
"""
if not self._is_in_include_exclude_lists(package["name"], "packages"):
self.log(f"Processing package '{package['name']}'")
if package["name"] not in self.destination_packages:
if not ComponentBase.dry_run:
pstring = f"{package['name']}=={package['version']}"
self.log(f"Installing {package['pythonVersion']} package '{pstring}' on destination.")
if ComponentBase.offline:
self._install_wheels(package=package)
else:
try:
resp = self.destination_instance.install_package(package=package)
except Exception as e:
self.log(
f"Unable to install {package['pythonVersion']} package '{pstring}' on destination."
" \t\t<-- Please install manually..."
)
self.add_to_homework_list(
component_name="packages",
value=f"You must manually install '{package['name']}' version '{package['version']}' "
"on the destination instance.",
)
resp = None
if resp:
self.log(
f"Successfully installed {package['pythonVersion']} package '{pstring}' on destination."
)
else:
self.add_to_diff_log(package["name"], "added")
else:
self.log(f"Package '{package['name']}' already exists on destination '{self.dest_host}'. Skipping....")
Plugins
Used to sync plugins from a source instance to a destination instance of Swimlane.
_check_for_compatibility(self, plugin_name)
private
Checks the source plugin for compatibility with the destination accepted plugins.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plugin_name |
str |
The name of the plugin on the source instance. |
required |
Source code in aqueduct/components/plugins.py
def _check_for_compatibility(self, plugin_name: str) -> None:
"""Checks the source plugin for compatibility with the destination accepted plugins.
Args:
plugin_name (str): The name of the plugin on the source instance.
"""
dest_version = self.destination_instance.swimlane.product_version
response = self.source_instance.get_plugin(plugin_name)
if response and response.get("supportedSwimlaneVersion"):
if response["supportedSwimlaneVersion"] == ">=10.0.0 <10.5.0":
if version.parse(dest_version) >= version.parse("10.5.0"):
log_string = f"The plugin '{plugin_name}' is not compatible with the destination instance version "
log_string += f"({dest_version}). "
log_string += "Please upgrade the plugin on the source instance before continuing."
self.log(val=log_string, level="warning")
self.add_to_homework_list(component_name="plugins", value=log_string)
_download_plugin(self, name, id)
private
Downloads a plugin from a source Swimlane instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the plugin to download. |
required |
id |
str |
The id of the plugin to download. |
required |
Exceptions:
Type | Description |
---|---|
GetComponentError |
Raises when unable to download plugin from the source Swimlane instance. |
Returns:
Type | Description |
---|---|
Plugin |
Returns a Swimlane source instance plugin object. |
Source code in aqueduct/components/plugins.py
def _download_plugin(self, name: str, id: str):
"""Downloads a plugin from a source Swimlane instance.
Args:
name (str): The name of the plugin to download.
id (str): The id of the plugin to download.
Raises:
GetComponentError: Raises when unable to download plugin from the source Swimlane instance.
Returns:
Plugin: Returns a Swimlane source instance plugin object.
"""
try:
self.log(f"Downloading plugin '{name}' from source.")
return self.source_instance.download_plugin(id)
except Exception as e:
raise GetComponentError(type="Plugin", name=name, id=id)
sync(self)
This method is used to sync all plugins from a source instance to a destination instance
Source code in aqueduct/components/plugins.py
def sync(self):
"""This method is used to sync all plugins from a source instance to a destination instance"""
self.log(f"Attempting to sync plugins from '{self.source_host}' to '{self.dest_host}'")
self.dest_plugin_dict = self.destination_instance.plugin_dict
plugin_dict = self.source_instance.plugin_dict
if plugin_dict:
for name, file_id in plugin_dict.items():
self.sync_plugin(name=name, id=file_id)
self.log(f"Completed syncing of plugins from '{self.source_host}' to '{self.dest_host}'.")
sync_plugin(self, name, id)
This class syncs a single Swimlane plugin based on the provided name and ID.
We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.
If it is, then we download the plugin from the source instance. If we are successful, we then add it to the destination instance.
If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the plugin from both the source and destination instances. We compare their versions and if the source has a version greater than the destination we then add it to the destination instance by upgrading the plugin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of a Swimlane plugin |
required |
id |
str |
The internal ID of a Swimlane plugin |
required |
Source code in aqueduct/components/plugins.py
def sync_plugin(self, name: str, id: str):
"""This class syncs a single Swimlane plugin based on the provided name and ID.
We first check to see if the provided name of the plugin exists in our destination instances plugin_dict.
If it is, then we download the plugin from the source instance. If we are successful, we
then add it to the destination instance.
If the provided plugin name does not exist in our destination instance plugin_dict then we retrieve the
plugin from both the source and destination instances. We compare their versions and if the source has
a version greater than the destination we then add it to the destination instance by upgrading the plugin.
Args:
name (str): The name of a Swimlane plugin
id (str): The internal ID of a Swimlane plugin
"""
self.log(f"Processing '{name}' plugin with id '{id}'")
if not self._is_in_include_exclude_lists(name, "plugins"):
# checking to see if the destination instance already has source instance plugin.
if not self.dest_plugin_dict.get(name):
if not ComponentBase.dry_run:
plugin = self._download_plugin(name=name, id=id)
if plugin:
try:
self.log(f"Uploading plugin '{name}' to destination.")
self.destination_instance.upload_plugin(name, plugin)
self.log(f"Successfully uploaded plugin '{name}' to destination.")
except Exception as e:
self._check_for_compatibility(plugin_name=name)
self.log(
f"Failed to upload plugin '{name}' to destination '{self.dest_host}'",
level="warning",
)
self.add_to_homework_list(
component_name="plugins",
value=f"You must manually install plugin '{name}' and all tasks associated with it. "
"This also means you must hookup those tasks with the appropriate items in "
"workflow and push-button integrations.",
)
if not ComponentBase.continue_on_error:
raise e
else:
self.add_to_diff_log(name, "added")
else:
self.log(
f"Plugin '{name}' already exists on destination '{self.dest_host}'. Checking for differences...."
)
# getting source and destination instance plugins to check differences between them.
dest_plugin = self.destination_instance.get_plugin(name=name)
if not dest_plugin:
raise GetComponentError(type="Plugin", name=name)
source_plugin = self.source_instance.get_plugin(name=name)
if not source_plugin:
raise GetComponentError(type="Plugin", name=name)
if version.parse(source_plugin["version"]) > version.parse(dest_plugin["version"]):
# now we actually download the source instance plugin so we can upgrade it on
# the destination instance.
plugin = self._download_plugin(name=name, id=id)
if plugin:
if not ComponentBase.dry_run:
self.log(f"Upgrading plugin '{name}' on destination.")
self.destination_instance.upgrade_plugin(filename=name, stream=plugin)
self.log(f"Successfully upgraded plugin '{name}' on destination.")
else:
self.add_to_diff_log(name, "upgraded")
else:
self.log("Source and destination have the same version plugin. Skipping...")
else:
self.log(f"Skipping plugin '{name}' since it is excluded.")
Roles
Used to sync roles from a source instance to a destination instance of Swimlane.
_process_role(self, role)
private
Used to ensure any referenced user and groups are processed before continuing with adding / updating roles.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
dict |
A source Swimlane instance role dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
A modified source Swimlane instance role dictionary object. |
Source code in aqueduct/components/roles.py
def _process_role(self, role: dict):
"""Used to ensure any referenced user and groups are processed before continuing with adding / updating roles.
Args:
role (dict): A source Swimlane instance role dictionary object.
Returns:
dict: A modified source Swimlane instance role dictionary object.
"""
if role.get("users") and role["users"]:
self.log(f"Processing users in role '{role['name']}'")
user_list = []
from .users import Users
for user in role["users"]:
_user = Users().sync_user(user_id=user["id"])
if _user:
user_list.append(_user)
role["users"] = user_list
if role.get("groups") and role["groups"]:
self.log(f"Processing groups in role '{role['name']}'")
group_list = []
from .groups import Groups
for group in role["groups"]:
_group = Groups().sync_group(group=group)
if _group:
group_list.append(_group)
role["groups"] = group_list
return role
sync(self)
This method is used to sync all roles from a source instance to a destination instance.
Source code in aqueduct/components/roles.py
def sync(self):
"""This method is used to sync all roles from a source instance to a destination instance."""
self.log(f"Attempting to sync roles from '{self.source_host}' to '{self.dest_host}'.")
roles = self.source_instance.get_roles()
if roles:
for role in roles:
self.sync_role(role=role)
self.log(f"Completed syncing of roles from '{self.source_host}' to '{self.dest_host}'.")
sync_role(self, role)
Syncs a single source Swimlane instance role dictionary object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
dict |
A source Swimlane instance role dictionary object. |
required |
Exceptions:
Type | Description |
---|---|
AddComponentError |
Raises when unable to add a role dictionary object to a destination Swimlane instance. |
Returns:
Type | Description |
---|---|
dict |
A destination Swimlane instance `role dictionary object. |
Source code in aqueduct/components/roles.py
def sync_role(self, role: dict):
"""Syncs a single source Swimlane instance role dictionary object.
Args:
role (dict): A source Swimlane instance role dictionary object.
Raises:
AddComponentError: Raises when unable to add a role dictionary object to a destination Swimlane instance.
Returns:
dict: A destination Swimlane instance `role dictionary object.
"""
if not self._is_in_include_exclude_lists(role["name"], "roles") and role["name"] not in self.role_exclusions:
self.log(f"Processing role '{role['name']}' ({role['id']})")
self.scrub(role)
role = self._process_role(role=role)
dest_role = self.destination_instance.get_role(role_id=role["id"])
if not dest_role:
if not ComponentBase.dry_run:
self.log(f"Creating new role '{role['name']}' on destination.")
dest_role = self.destination_instance.add_role(role)
if not dest_role:
raise AddComponentError(model=role, name=role["name"])
self.log(f"Successfully added new role '{role['name']}' to destination.")
return dest_role
else:
self.add_to_diff_log(role["name"], "added")
else:
self.log(f"Role '{role['name']}' already exists on destination.")
else:
self.log(f"Skipping role '{role['name']}' since it is excluded.")
Tasks
Used to sync tasks from a source instance to a destination instance of Swimlane.
destination_plugin_dict
property
readonly
Creates a destination Swimlane instance plugin dict used to update tasks.
We need to create this plugin dict since once you upload a plugin to a Swimlane instance
the plugin imageId
, fileId
and actionId
all change. So we gather the installed plugin
ids and update any tasks so that they reference the new installed plugin.
The general structure of this plugin dict is:
{
"sw_virus_total": {
"GetAnalyses": {
"id": "a2dA20RlvfcV5jf34",
"imageId": "a8dvYs1Hjn3kFRxCi"
},
"fileId": "a2cwJFkbVYqB0xfVs"
}
}
Returns:
Type | Description |
---|---|
dict |
A dictionary of values used to update a Swimlane task. |
_check_tasks_using_tracking_id(self, task)
private
Updates a task dictionary object that is using tracking-id as input or output with the correct tracking-id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
Returns:
Type | Description |
---|---|
dict |
dict: A updated source Swimlane instance dictionary. |
Source code in aqueduct/components/tasks.py
def _check_tasks_using_tracking_id(self, task: dict) -> dict:
"""Updates a task dictionary object that is using tracking-id as input or output with the correct tracking-id.
Args:
task (dict): A source Swimlane instance task dictionary.
Returns:
dict: A updated source Swimlane instance dictionary.
"""
if task.get("inputMapping") and task["inputMapping"]:
task["inputMapping"] = self._update_task_mappings(
task_mappings=task["inputMapping"], task_name=task["name"]
)
if task.get("outputs") and task["outputs"]:
for output in task["outputs"]:
if (
output.get("backReferenceFieldId")
and output["backReferenceFieldId"]
and isinstance(output["backReferenceFieldId"], str)
):
if self.tracking_id_map.get(output["backReferenceFieldId"]):
self.log(
f"Updating task '{task['name']}' output reference field with the correct tracking-id "
f"'{self.tracking_id_map[output['backReferenceFieldId']]}'."
)
output["backReferenceFieldId"] = self.tracking_id_map[output["backReferenceFieldId"]]
if output.get("mappings") and output["mappings"]:
output["mappings"] = self._update_task_mappings(
task_mappings=output["mappings"], task_name=task["name"]
)
return task
_get_action_type(self, task)
private
Returns the actionType from the current task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
Exceptions:
Type | Description |
---|---|
AccessKeyError |
Raises when we are unable to access a certain key in the task dictionary. |
Returns:
Type | Description |
---|---|
str |
str: An actionType string. |
Source code in aqueduct/components/tasks.py
def _get_action_type(self, task: dict) -> str:
"""Returns the actionType from the current task.
Args:
task (dict): A source Swimlane instance task dictionary.
Raises:
AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.
Returns:
str: An actionType string.
"""
try:
return task["action"]["descriptor"]["actionType"]
except KeyError as ke:
raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
_get_plugin_name(self, task)
private
Returns the descriptor name from the current task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
Exceptions:
Type | Description |
---|---|
AccessKeyError |
Raises when we are unable to access a certain key in the task dictionary. |
Returns:
Type | Description |
---|---|
str |
str: An actions descriptor name string. |
Source code in aqueduct/components/tasks.py
def _get_plugin_name(self, task: dict) -> str:
"""Returns the descriptor name from the current task.
Args:
task (dict): A source Swimlane instance task dictionary.
Raises:
AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.
Returns:
str: An actions descriptor name string.
"""
try:
return task["action"]["descriptor"]["packageDescriptor"]["name"]
except KeyError as ke:
raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
_update_package_descriptor_id(self, task, plugin_name, action_type)
private
Updates the provided task with a new plugin task id.
When a plugin is installed on a destination system it generates new ids for the plugin. We must update our tasks with this new action id in order for them to work correctly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
plugin_name |
str |
The plugin name that the task belongs to. |
required |
action_type |
str |
The action name within a plugin for this task. |
required |
Exceptions:
Type | Description |
---|---|
AccessKeyError |
Raises when we are unable to access a certain key in the task dictionary. |
Returns:
Type | Description |
---|---|
dict |
dict: An updated source Swimlane instance task dictionary. |
Source code in aqueduct/components/tasks.py
def _update_package_descriptor_id(self, task: dict, plugin_name: str, action_type: str) -> dict:
"""Updates the provided task with a new plugin task id.
When a plugin is installed on a destination system it generates new ids for the plugin.
We must update our tasks with this new action id in order for them to work correctly.
Args:
task (dict): A source Swimlane instance task dictionary.
plugin_name (str): The plugin name that the task belongs to.
action_type (str): The action name within a plugin for this task.
Raises:
AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.
Returns:
dict: An updated source Swimlane instance task dictionary.
"""
try:
task["action"]["packageDescriptorId"] = self.destination_plugin_dict[plugin_name][action_type]["id"]
except KeyError as ke:
raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
return task
_update_task_file_id(self, task, plugin_name)
private
Updates the provided task with a new plugin fileId.
When a plugin is installed on a destination system it generates a new fileId. We must update our tasks with this new fileId in order for them to work correctly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
plugin_name |
str |
The plugin name that the task belongs to. |
required |
Exceptions:
Type | Description |
---|---|
AccessKeyError |
Raises when we are unable to access a certain key in the task dictionary. |
Returns:
Type | Description |
---|---|
dict |
dict: An updated source Swimlane instance task dictionary. |
Source code in aqueduct/components/tasks.py
def _update_task_file_id(self, task: dict, plugin_name: str) -> dict:
"""Updates the provided task with a new plugin fileId.
When a plugin is installed on a destination system it generates a new fileId.
We must update our tasks with this new fileId in order for them to work correctly.
Args:
task (dict): A source Swimlane instance task dictionary.
plugin_name (str): The plugin name that the task belongs to.
Raises:
AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.
Returns:
dict: An updated source Swimlane instance task dictionary.
"""
try:
task["action"]["descriptor"]["packageDescriptor"]["fileId"] = self.destination_plugin_dict[plugin_name][
"fileId"
]
except KeyError as ke:
raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
return task
_update_task_image_id(self, task, plugin_name, action_type)
private
Updates the provided task with a new plugin imageId.
When a plugin is installed on a destination system it generates a new imageId. We must update our tasks with this new imageId in order for them to work correctly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
plugin_name |
str |
The plugin name that the task belongs to. |
required |
action_type |
str |
The action name within a plugin for this task. |
required |
Exceptions:
Type | Description |
---|---|
AccessKeyError |
Raises when we are unable to access a certain key in the task dictionary. |
Returns:
Type | Description |
---|---|
dict |
dict: An updated source Swimlane instance task dictionary. |
Source code in aqueduct/components/tasks.py
def _update_task_image_id(self, task: dict, plugin_name: str, action_type: str) -> dict:
"""Updates the provided task with a new plugin imageId.
When a plugin is installed on a destination system it generates a new imageId.
We must update our tasks with this new imageId in order for them to work correctly.
Args:
task (dict): A source Swimlane instance task dictionary.
plugin_name (str): The plugin name that the task belongs to.
action_type (str): The action name within a plugin for this task.
Raises:
AccessKeyError: Raises when we are unable to access a certain key in the task dictionary.
Returns:
dict: An updated source Swimlane instance task dictionary.
"""
try:
task["action"]["descriptor"]["imageId"] = self.destination_plugin_dict[plugin_name][action_type]["imageId"]
except KeyError as ke:
raise AccessKeyError(object_name=task["name"], component_type="task", error=ke)
return task
_update_task_mappings(self, task_mappings, task_name)
private
Used to updated task inputs and output mappings with the correct tracking-id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_mappings |
list |
A list of input or output field mappings. |
required |
task_name |
str |
The name of task these field mappings belong to. |
required |
Returns:
Type | Description |
---|---|
list |
list: The original or updated task field mappings. |
Source code in aqueduct/components/tasks.py
def _update_task_mappings(self, task_mappings: list, task_name: str) -> list:
"""Used to updated task inputs and output mappings with the correct tracking-id.
Args:
task_mappings (list): A list of input or output field mappings.
task_name (str): The name of task these field mappings belong to.
Returns:
list: The original or updated task field mappings.
"""
return_list = []
for mapping in task_mappings:
if mapping.get("value") and mapping["value"] and isinstance(mapping["value"], str):
if self.tracking_id_map.get(mapping["value"]):
self.log(
f"Updating task '{task_name}' with the correct tracking-id "
f"'{self.tracking_id_map[mapping['value']]}'."
)
mapping["value"] = self.tracking_id_map[mapping["value"]]
return_list.append(mapping)
return return_list
sync(self)
This method is used to sync all tasks from a source instance to a destination instance.
Source code in aqueduct/components/tasks.py
def sync(self):
"""This method is used to sync all tasks from a source instance to a destination instance."""
self.log(f"Starting to sync tasks from '{self.source_host}' to '{self.dest_host}'.")
tasks = self.source_instance.get_tasks()
if tasks:
for task in tasks:
self.sync_task(task=task)
self.log(f"Completed syncing of tasks from '{self.source_host}' to '{self.dest_host}'.")
sync_task(self, task)
This method syncs a single task from a source Swimlane instance to a destination instance.
Using the provided task dictionary from Swimlane source instance we first get the actual task object from the source.
Next, we attempt to retrieve the task from the destination system. If the task does not exist
on the destination instance, we add it. If it does exist, we check if the uid
and the version
are the same.
If they are the same we skip updating the task. If they are different, we update the task on the destination
instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A Swimlane task object from a source system. |
required |
Returns:
Type | Description |
---|---|
dict |
dict: If we failed to add a task we return it so we can try again. Only if called using the sync method. |
Source code in aqueduct/components/tasks.py
def sync_task(self, task: dict) -> dict:
"""This method syncs a single task from a source Swimlane instance to a destination instance.
Using the provided task dictionary from Swimlane source instance we first get the actual task
object from the source.
Next, we attempt to retrieve the task from the destination system. If the task does not exist
on the destination instance, we add it. If it does exist, we check if the `uid` and the `version` are the same.
If they are the same we skip updating the task. If they are different, we update the task on the destination
instance.
Args:
task (dict): A Swimlane task object from a source system.
Returns:
dict: If we failed to add a task we return it so we can try again. Only if called using the sync method.
"""
if not self._is_in_include_exclude_lists(task["name"], "tasks"):
self.log(f"Processing task '{task['name']}'.")
task_ = self.source_instance.get_task(task["id"])
if not task_:
raise GetComponentError(type="task", name=task["name"], id="")
if not ComponentBase.dry_run:
task_ = self.update_task_plugin_ids(task=task_)
dest_task = self.destination_instance.get_task(task_["id"])
if not dest_task:
if not ComponentBase.dry_run:
self.log(f"Creating task '{task_['name']}' on destination.")
try:
# checking tasks for inputs and outputs mapped to source application
# tracking-ids and replacing them with the new ones
task_ = self._check_tasks_using_tracking_id(task=task_)
dest_task = self.destination_instance.add_task(task_)
self.log(f"Successfully added task '{task_['name']}' to destination.")
except Exception as e:
self.log(
f"Failed to add task '{task_['name']}' to destination.",
level="warning",
)
else:
self.add_to_diff_log(task_["name"], "added")
else:
self.log(f"Task '{task_['name']}' already exists on destination.")
if task_["uid"] == dest_task["uid"]:
dest_task = self._check_tasks_using_tracking_id(task=task_)
if not ComponentBase.dry_run:
self.log(f"Task '{task_['name']}' has changed. Updating...")
try:
dest_task = self.destination_instance.update_task(dest_task["id"], dest_task)
if not dest_task:
raise UpdateComponentError(model=task_, name=task_["name"])
self.log(f"Successfully updated task '{task_['name']}' on destination.")
except Exception as e:
raise UpdateComponentError(model=task_)
else:
self.add_to_diff_log(task_["name"], "updated")
else:
self.log(
f"The source task '{task['name']}' has a different UID ({task_['uid']}) then the "
f"destination task ({dest_task['uid']})"
)
else:
self.log(f"Skipping task '{task['name']}' since it is excluded.")
update_task_plugin_ids(self, task)
Used to update tasks with newly uploaded plugin Ids so it can reference the correct plugin in Swimlane.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
dict |
A source Swimlane instance task dictionary. |
required |
Returns:
Type | Description |
---|---|
Task |
An updated source Swimlane instance task dictionary. |
Source code in aqueduct/components/tasks.py
def update_task_plugin_ids(self, task: dict):
"""Used to update tasks with newly uploaded plugin Ids so it can reference the correct plugin in Swimlane.
Args:
task (dict): A source Swimlane instance task dictionary.
Returns:
Task: An updated source Swimlane instance task dictionary.
"""
self.log("Retrieving plugins from destination instance", level="debug")
action_type = self._get_action_type(task=task)
if action_type and action_type not in self.BUILTIN_TASK_ACTION_TYPES:
plugin_name = self._get_plugin_name(task=task)
self.log(f"Updating task '{task['name']}' with correct plugin IDs")
if self.destination_plugin_dict.get(plugin_name):
if self.destination_plugin_dict[plugin_name].get(action_type):
# We are updating the current source task_ so that it contains the correct Ids for the
# destination plugin
self.log(
f"Updating task '{task['name']}' with correct packageDescriptor fileId "
f"'{self.destination_plugin_dict[plugin_name]['fileId']}'"
)
task = self._update_task_file_id(task=task, plugin_name=plugin_name)
self.log(
f"Updating task '{task['name']}' with correct imageId "
f"'{self.destination_plugin_dict[plugin_name][action_type]['imageId']}'"
)
task = self._update_task_image_id(task=task, plugin_name=plugin_name, action_type=action_type)
self.log(
f"Updating task '{task['name']}' with correct packageDescriptorId "
f"'{self.destination_plugin_dict[plugin_name][action_type]['id']}'"
)
task = self._update_package_descriptor_id(
task=task, plugin_name=plugin_name, action_type=action_type
)
else:
self.log(f"Unable to find plugin '{plugin_name}' on destination!")
return task
Users
Used to sync users from a source instance to a destination instance of Swimlane.
_process_user(self, user)
private
Processes roles and groups associated with a source Swimlane instance user dictionary.
Attempts to make sure that roles and groups are created on the Swimlane destination instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
dict |
A source Swimlane instance user dictionary object. |
required |
Returns:
Type | Description |
---|---|
dict |
dict: A modified source Swimlane instance user dictionary object. |
Source code in aqueduct/components/users.py
def _process_user(self, user: dict) -> dict:
"""Processes roles and groups associated with a source Swimlane instance user dictionary.
Attempts to make sure that roles and groups are created on the Swimlane destination instance.
Args:
user (dict): A source Swimlane instance user dictionary object.
Returns:
dict: A modified source Swimlane instance user dictionary object.
"""
if user.get("roles") and user["roles"]:
self.log(f"Processing roles for user '{user['name']}'")
role_list = []
from .roles import Roles
for role in user["roles"]:
_role = Roles().sync_role(role=role)
if _role:
role_list.append(_role)
user["roles"] = role_list
if user.get("groups") and user["groups"]:
self.log(f"Processing groups for user '{user['name']}'")
group_list = []
from .groups import Groups
for group in user["groups"]:
_group = Groups().sync_group(group=group)
if _group:
group_list.append(_group)
user["groups"] = group_list
return user
sync(self)
This method is used to sync all users from a source instance to a destination instance.
Source code in aqueduct/components/users.py
def sync(self):
"""This method is used to sync all users from a source instance to a destination instance."""
self.log(f"Attempting to sync users from '{self.source_host}' to '{self.dest_host}'.")
users = self.source_instance.get_users()
if users:
for user in users:
self.sync_user(user_id=user["id"])
self.log(f"Completed syncing of users from '{self.source_host}' to '{self.dest_host}'.")
sync_user(self, user_id)
Syncs a single source Swimlane instance user with a destination instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
A Swimlane user ID. |
required |
Exceptions:
Type | Description |
---|---|
GetComponentError |
Raises when the provided user_id is not found on the source Swimlane instance. |
AddComponentError |
Raises when unable to add a user to a destination Swimlane instance. |
UpdateComponentError |
Raises when unable to update a user on a destination Swimlane instance. |
Returns:
Type | Description |
---|---|
dict |
A destination Swimlane instance user dictionary object. |
Source code in aqueduct/components/users.py
def sync_user(self, user_id: str):
"""Syncs a single source Swimlane instance user with a destination instance.
Args:
user_id (str): A Swimlane user ID.
Raises:
GetComponentError: Raises when the provided user_id is not found on the source Swimlane instance.
AddComponentError: Raises when unable to add a user to a destination Swimlane instance.
UpdateComponentError: Raises when unable to update a user on a destination Swimlane instance.
Returns:
dict: A destination Swimlane instance user dictionary object.
"""
user = self.source_instance.get_user(user_id)
if not user:
raise GetComponentError(type="User", id=user_id)
self.log(f"Processing user '{user['displayName']}' ({user_id})")
if user and not self._is_in_include_exclude_lists(user["displayName"], "users"):
if (
user["displayName"] != self.source_instance.swimlane.user.display_name
or user["userName"] != self.source_instance.swimlane.user.username
):
self.scrub(user)
self.log(f"Attempting to sync user '{user['displayName']}' on destination.")
dest_user = self.destination_instance.search_user(user["userName"])
if not dest_user:
if not ComponentBase.dry_run:
self.log(f"Adding new user '{user['displayName']}' to destination.")
dest_user = self.destination_instance.add_user(user)
if not dest_user:
raise AddComponentError(model=user, name=user["displayName"])
self.log(f"Successfully added user '{user['displayName']}' to destination.")
self.add_to_homework_list(
component_name="users",
value=f"You must set the password for the user '{user['displayName']}' on destination.",
)
return dest_user
else:
self.add_to_diff_log(user["displayName"], "added")
else:
self.log(f"User '{user['displayName']}' exists on destination.")
user = self._process_user(user=user)
if not ComponentBase.dry_run:
user["id"] = dest_user["id"]
dest_user = self.destination_instance.update_user(dest_user["id"], user)
if not dest_user:
raise UpdateComponentError(model=user, name=user["displayName"])
self.log(f"Successfully updated user '{user['displayName']}' on destination.")
return dest_user
else:
self.add_to_diff_log(user["displayName"], "updated")
else:
dname = self.source_instance.swimlane.user.display_name
if not ComponentBase.dry_run:
self.log(f"Unable to update the currently authenticated user '{dname}'. Skipping...")
else:
self.add_to_diff_log(name=dname, diff_type="added")
self.add_to_homework_list(
component_name="users",
value=f"The current authenticated user '{dname}' will need to be created manually "
"on the destination.",
)
else:
self.log(f"Skipping user '{user['displayName']}' since it is excluded.")
Workflows
Used to sync workflows from a source instance to a destination instance of Swimlane.
_update_workflow_stages_with_new_id(self, source, new_parent_id)
private
Updates a workflows stages to match the new parentId.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
dict |
A source workflow dictionary object. |
required |
new_parent_id |
str |
The new parentId value to replace in a workflows stages. |
required |
Returns:
Type | Description |
---|---|
dict |
dict: An updated source workflow dictionary object. |
Source code in aqueduct/components/workflows.py
def _update_workflow_stages_with_new_id(self, source: dict, new_parent_id: str) -> dict:
"""Updates a workflows stages to match the new parentId.
Args:
source (dict): A source workflow dictionary object.
new_parent_id (str): The new parentId value to replace in a workflows stages.
Returns:
dict: An updated source workflow dictionary object.
"""
if source.get("stages") and source["stages"]:
source_stages = []
for source_stage in source["stages"]:
if source_stage["parentId"] == source["id"]:
source_stage["parentId"] = new_parent_id
source_stages.append(source_stage)
source["stages"] = source_stages
return source
sync(self)
This method is used to sync all workflows from a source instance to a destination instance.
Source code in aqueduct/components/workflows.py
def sync(self):
"""This method is used to sync all workflows from a source instance to a destination instance."""
raise NotImplementedError("General workflow syncing is currently not implemented.")
sync_workflow(self, application_name, application_id=None)
This methods syncs a single applications workflow from a source Swimlane instance to a destination instance.
If an application_name is in our include or exclude filters we will either ignore or process the workflow updates for that application.
Once an application_name is provided we retrieve the workflow for that application from our workflow_dict. Additionally we retrieve the destination workflow for the provided application.
We create a temporary object that compares the stages of a source workflow to a destination workflow. If they are exactly the same we skip updating the workflow. If they are not, we copy the source workflow to the destination and update it to reflect the new workflow ID.
Finally we update the destination workflow with our changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
application_name |
str |
The name of an application to check and update workflow if applicable. |
required |
application_id |
str |
The application ID. Default is None. |
None |
Source code in aqueduct/components/workflows.py
def sync_workflow(self, application_name: str, application_id: str = None):
"""This methods syncs a single applications workflow from a source Swimlane instance to
a destination instance.
If an application_name is in our include or exclude filters we will either ignore or
process the workflow updates for that application.
Once an application_name is provided we retrieve the workflow for that application from
our workflow_dict. Additionally we retrieve the destination workflow for the provided
application.
We create a temporary object that compares the stages of a source workflow to a destination
workflow. If they are exactly the same we skip updating the workflow. If they are not, we
copy the source workflow to the destination and update it to reflect the new workflow ID.
Finally we update the destination workflow with our changes.
Args:
application_name (str): The name of an application to check and update workflow if applicable.
application_id (str, optional): The application ID. Default is None.
"""
# TODO: Add logic to handle when a piece of workflow does not exists. For example, when a task does not exist.
workflow = self.source_instance.workflow_dict.get(application_name)
if not workflow and application_id:
workflow = self.source_instance.get_workflow(application_id)
if workflow:
self.log(
f"Processing workflow '{workflow['id']}' for application '{application_name}' "
f"({workflow['applicationId']})."
)
dest_workflow = self.destination_instance.get_workflow(application_id=workflow["applicationId"])
if dest_workflow:
# We use the source workflow as truth and update it's parentId with the id of the destination
# workflow.
workflow = self._update_workflow_stages_with_new_id(source=workflow, new_parent_id=dest_workflow["id"])
# once we have updated it, it should be similar to the destination workflow
# So we check to see if they are NOT the same.
if Differ(source=workflow["stages"], destination=dest_workflow["stages"]).check():
if not ComponentBase.dry_run:
self.log("Source and destination workflows are different. Updating...")
for item in ["$type", "permissions", "version"]:
workflow.pop(item)
workflow["id"] = dest_workflow["id"]
resp = self.destination_instance.update_workflow(workflow=workflow)
self.log(f"Successfully updated workflow for application '{application_name}'.")
else:
self.add_to_diff_log(f"{application_name} - Workflow", "updated")
else:
self.log("Source and destination workflow is the same. Skipping...")
else: # May be an edge case and could possibly be removed.
if not ComponentBase.dry_run:
self.log(
f"Adding workflow for application '{application_name}' "
f"({self.source_instance.application_dict[application_name]['id']})."
)
dest_workflow = self.destination_instance.add_workflow(workflow=workflow)
self.log(f"Successfully added workflow for application '{application_name}'.")
else:
self.add_to_diff_log(f"{application_name} - Workflow", "added")
else:
raise GetComponentError(type="Workflow", name=application_name)
Workspaces
Used to sync workspaces from a source instance to a destination instance of Swimlane.
sync(self)
This method is used to sync all workspaces from a source instance to a destination instance.
Source code in aqueduct/components/workspaces.py
def sync(self):
"""This method is used to sync all workspaces from a source instance to a destination instance."""
self.log(f"Starting to sync workspaces from '{self.source_host}' to '{self.dest_host}'")
workspaces = self.source_instance.get_workspaces()
if workspaces:
for workspace in workspaces:
self.sync_workspace(workspace=workspace)
self.log(f"Completed syncing of workspaces from '{self.source_host}' to '{self.dest_host}'.")
sync_workspace(self, workspace)
Used to sync Swimlane workspaces.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace |
dict |
A source Swimlane instance workspace dictionary object. |
required |
Exceptions:
Type | Description |
---|---|
AddComponentError |
Raises when unable to add a workspace to a destination Swimlane instance. |
UpdateComponentError |
Raises when unable to update a workspace on a destination Swimlane instance. |
Source code in aqueduct/components/workspaces.py
def sync_workspace(self, workspace: dict):
"""Used to sync Swimlane workspaces.
Args:
workspace (dict): A source Swimlane instance workspace dictionary object.
Raises:
AddComponentError: Raises when unable to add a workspace to a destination Swimlane instance.
UpdateComponentError: Raises when unable to update a workspace on a destination Swimlane instance.
"""
if not self._is_in_include_exclude_lists(workspace["name"], "workspaces"):
self.log(f"Processing workspace '{workspace['name']}'")
dest_workspace = self.destination_instance.get_workspace(workspace["id"])
if not dest_workspace:
if not ComponentBase.dry_run:
self.log(f"Adding workspace '{workspace['name']}' to destination.")
# ensuring that applications and dashboards are unique in the workspace components lists
try:
for key in ["applications", "dashboards"]:
if workspace.get(key):
workspace[key] = list(set(workspace[key]))
except Exception as e:
self.log(
"Error when forcing applications and dashboards lists to be unique. "
"Continuing anyways..."
)
dest_workspace = self.destination_instance.add_workspace(workspace)
if not dest_workspace:
raise AddComponentError(model=workspace, name=workspace["name"])
self.log(f"Successfully added workspace '{workspace['name']}' to destination.")
else:
self.add_to_diff_log(workspace["name"], "added")
else:
self.log(f"Workspace '{workspace['name']}' already exists on destination. Checking differences...")
if not ComponentBase.dry_run:
dashboards = []
self.log(
f"Checking that dashboards associated with workspace '{workspace['name']}' "
"exist on destination."
)
if workspace.get("dashboards") and workspace["dashboards"]:
for dashboard in workspace["dashboards"]:
for key, val in self.source_instance.dashboard_dict.items():
if dashboard == val and self.destination_instance.dashboard_dict.get(key):
self.log(
f"Found {key} in dest dashboard with value of"
f" {self.destination_instance.dashboard_dict[key]}"
)
dashboards.append(self.destination_instance.dashboard_dict[key])
workspace["dashboards"] = dashboards
if workspace.get("applications") and workspace["applications"]:
self.log(
f"Checking that applications associated with workspace '{workspace['name']}' "
"exist on destination."
)
from .applications import Applications
for application_id in workspace["applications"]:
if application_id and application_id not in self.destination_instance.application_id_list:
Applications().sync_application(application_id=application_id)
resp = self.destination_instance.update_workspace(workspace_id=workspace["id"], workspace=workspace)
if not resp:
raise UpdateComponentError(model=workspace, name=workspace["name"])
self.log(f"Successfully updated workspace '{workspace['name']}' on destination.")
else:
self.add_to_diff_log(workspace["name"], "updated")