xfworkerutil

XF Worker utilities.

This module implements the most commonly required functionality in task type scripts:

  • XF Worker infrastructure

  • XF network protocols

  • XF JSON schemas

  • Accessing job input and output parameters

  • Downloading and uploading files

  • Spawning XF entities like jobs and tasks

class xfworkerutil.ActivityLogLevel(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)
class xfworkerutil.TemporaryWorkingDirectory(prefix: str)

Creates a temporary working directory.

Create a unique related path in the temporary working directory by adding a postfix. Similar to path.with_suffix(‘.ext’), but allows postfix to contain more than just a file extension, like ‘_SEG.AIM’. And ensures the path is unique.

create_temp_path(filename: str) Path

Create a unique path in the temporary working directory.

is_under(p: Path) bool

Is the given path under the working dir?

remove() None

Remove the temporary working directory, deleting all files recursively.

subdir(name: str, max_name_len: int = 8) Iterator[None]

Create a subdirectory and use it for the duration of the context manager for creating new temporary and related paths.

class xfworkerutil.XFWorkerArguments

XF Worker arguments via environment variables to this task type script.

property job_serial: int

The serial of the current job.

property server_auth: str

The authentication information for the XF Server.

property server_host: str

The fully qualified domain name of the XF Server.

property server_port: str

The port of the XF Server.

class xfworkerutil.XFWorkerJob

High-level utilities for executing a XF Worker job. Create an instance at the start of the script. Call finish() at the end of the (successful) script.

activity_log_message(title: str, *, level: ActivityLogLevel = ActivityLogLevel.INFO) None

Log a message event to the activity log.

clear_error_message(*, save: bool = True) None

Clear the error message in the job’s metadata.

create_output_bitmap(filename: str) Dict[str, Any]

Create a composite output parameter of type ‘Bitmap’.

create_output_contours(filename: str, image: Dict[str, Any]) Dict[str, Any]

Create a composite output parameter of type ‘Contours’.

create_output_csv(filename: str) Dict[str, Any]

Create a composite output parameter of type ‘CSV’.

create_output_entity(uuid: str, name: str) Dict[str, Any]

Create a composite output parameter of type ‘Entity’.

create_output_foreign_file(filename: str) Dict[str, Any]

Create a composite output parameter of type ‘ForeignFile’.

create_output_gi_result(snapshot: Dict[str, Any], gi_image: Dict[str, Any]) Dict[str, Any]

Create a composite output parameter of type ‘GIResult’.

create_output_html_document(filename: str) Dict[str, Any]

Create a composite output parameter of type ‘HTMLDocument’.

create_output_image(filename: str, size: List[int], filenames: List[str] | None = None, additional_files: List[Dict[str, Any]] | None = None) Dict[str, Any]

Create a composite output parameter of type ‘Image’.

create_output_image_pair_list(pairs: List[List[Dict[str, Any]]]) Dict[str, Any]

Create a composite output parameter of type ‘ImagePairList’.

create_output_image_voi(pos: List[int], size: List[int], image: Dict[str, Any], name: str = None, category: str = None) Dict[str, Any]

Create a composite output parameter of type ‘ImageVOI’.

create_output_mesh(filename: str, filenames: List[str] | None = None, additional_files: List[Dict[str, Any]] | None = None) Dict[str, Any]

Create a composite output parameter of type ‘Mesh’.

create_output_path(path: Path, postfix: str) Path

Create a path for a local file associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Sends a HEAD request to FSS to find the direct-access path. If possible, just locates the local path to the file. (For write access!) Or creates a unique related path in the temporary working directory by adding a postfix.

create_output_registration_result(transform: List[float], *, image1: Dict[str, Any] | None = None, image2: Dict[str, Any] | None = None, mesh1: Dict[str, Any] | None = None, mesh2: Dict[str, Any] | None = None) Dict[str, Any]

Create a composite output parameter of type ‘RegistrationResult’.

create_output_rotated_image_voi(pos_mm: List[float], size_mm: List[float], size: List[int], rotation: List[float], image: Dict[str, Any], name: str = None, category: str = None) Dict[str, Any]

Create a composite output parameter of type ‘RotatedImageVOI’.

create_output_snapshot(bitmap: Dict[str, Any], image: Dict[str, Any] = None, image2: Dict[str, Any] = None, mesh: Dict[str, Any] = None, metadata: Dict[str, Any] = None, video: Dict[str, Any] = None) Dict[str, Any]

Create a composite output parameter of type ‘Snapshot’.

create_output_snapshot_metadata(filename: str) Dict[str, Any]

Create a composite output parameter of type ‘SnapshotMetadata’.

create_output_spectra(amplitudes_image: Dict[str, Any], wavelength_image: Dict[str, Any]) Dict[str, Any]

Create a composite output parameter of type ‘Spectra’.

create_output_video(filename: str, bitmap: Dict[str, Any] = None) Dict[str, Any]

Create a composite output parameter of type ‘Video’.

Create a unique related path in the temporary working directory by adding a postfix. Similar to path.with_suffix(‘.ext’), but allows postfix to contain more than just a file extension, like ‘_SEG.AIM’. And ensures the path is unique.

create_temp_path(filename: str) Path

Create a unique path in the temporary working directory.

download_additional_input_files(input_parameter: Dict[str, Any], *, copy_even_on_direct_access: bool = False) List[Path]

Download additional files associated with an input parameter of the current XF job.

download_input_file(input_parameter: Dict[str, Any], *, copy_even_on_direct_access: bool = False) Path

Download a file associated with an input parameter of the current XF job. Connects to XF Server to get an authorization token. If possible, just locate the local path to the file. (For read-only access!) Downloads the file from FSS to the temporary working directory. Return the path to the local file.

Download a related file from the same source as the input parameter.

download_xf_dependency_package_config_file(dependency_package_serial: int, filename: str, *, copy_even_on_direct_access: bool = False) Path

Download a dependency-package-config file.

download_xf_task_type_config_file(task_type_serial: int, filename: str, *, copy_even_on_direct_access: bool = False) Path

Download a task-type-config file.

download_xf_userdata_file(virtualpath: str, *, copy_even_on_direct_access: bool = False) Path

Download file from @xfuserdata/ virtualpath.

download_xf_workflow_config_file(workflow_serial: int, filename: str, *, copy_even_on_direct_access: bool = False) Path

Download a workflow-config file.

error(message: str) NoReturn

Finish a failed run. Save the error message. (The job’s state remains unchanged for now and will be updated promptly by XF Worker.) (The temporary working directory remains unchanged (for debugging purposes) and will be deleted on a schedule by XF Worker.)

find_xf_task_type(connection: Connection, name: str) Dict[str, Any]

Find the latest task type of a given package name, or raise an exception.

find_xf_task_types(connection: Connection, name: str, *, raise_if_not_any: bool = True) List[Dict[str, Any]]

Find all task types of a given package name (in descending version order).

finish(outputs: Dict[str, Any], units: Dict[str, str] = None) None

Finish a successful run. Change the job’s state to Done, and store the provided outputs. Also remove the temporary working directory.

get_current_task() Dict[str, Any]

Get the current job’s task.

get_debug_info() str

Get some information useful for debugging.

get_existing_job_sort_orders(connection: Connection) List[str]

Get list of existing job sort order strings in this workflow for spawning more jobs.

get_ipl_image_path(input_parameter: Dict[str, Any], *, auto_convert_from_omni: bool = True) Path

Get (download if required) the image file. Return the path to the local file. Ensure the path is not a UNC network share path. Raise if it is not an IPL-compatible image format. Support OMNI metadata files with an additional element file that happens to be IPL-compatible.

get_omni_image(input_parameter: Dict[str, Any], *, copy_even_on_direct_access: bool = False, download_only_metadata: bool = False) Image

Get (download if required) the image file. Return the loaded image. Raise if it is not an OMNI-compatible image format.

get_run_entity() Dict[str, Any] | None

Get the current run’s entity.

hook_exceptions() None

Hook into Python’s uncaught exception handling mechanism to save the error message.

load_job() Dict[str, Any]

Connect to XF Server and load the current job.

progress(percentage: float, *, save: bool = True) None

Save a new estimated duration based on a progress percentage and the current time and the start time.

save_job(**kwargs) Dict[str, Any]

Connect to XF Server and save the current job, replacing all keyword arguments.

server_connection() ContextManager[Connection]

Open a connection to the XF Server.

set_error_message(message: str, *, save: bool = True) None

Save the error message in the job’s metadata.

set_estimated_duration(estimated_duration_in_seconds: float, *, save: bool = True) None

Save a new estimated duration in the job’s metadata.

set_output_units(units: dict[str, str], *, save: bool = True) None

Save the output units in the job’s metadata.

spawn_xf_job_basic(connection: Connection, task_type: Dict[str, Any], task: Dict[str, Any], source_job_serial: int | None = None, state: str = 'Done', inputs: Dict[str, Any] = {}, outputs: Dict[str, Any] = {}, sort_order: str = None, metadata: Dict[str, Any] = None) Dict[str, Any]

Spawn a done job for the given task.

spawn_xf_task_basic(connection: Connection, task_type: Dict[str, Any], task_name: str = None, spawn_key: str = None, task_inputs: List[Dict[str, Any]] = [], task_metadata: Dict[str, Any] = {}, upgrade_task_types: List[Dict[str, Any]] = None) Dict[str, Any]

Ensure a task of the given task type (with no inputs) in the current workflow.

spawn_xf_task_type_basic(connection: Connection, task_type_metadata: Dict[str, Any], task_type_interface: Dict[str, Any]) Dict[str, Any]

Spawn a new task type.

switch_to_interactive_mode(error_message: str = None) NoReturn

Save the current task’s initial state and the current job’s state as ReadyForInteractive, and exit.

upload_omni_output(image: Image) Dict[str, Any]

Upload a local omni image associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Uploads both files (the metadata file and pixels file) to FSS. Note: The files must be saved to disk first! Create a composite output parameter of type ‘Image’.

upload_omni_output_only_metadata(image: Image, additional_files: List[Dict[str, Any]]) Dict[str, Any]

Upload a local omni image associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Uploads only the metadata file to FSS. References additional files. Note: The metadata files must be saved to disk first! Create a composite output parameter of type ‘Image’.

upload_output_file(local_path: Path) None

Upload a local file associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Uploads the file to FSS.

upload_xf_userdata_file(local_path: Path, virtualpath: str) None

Upload a local file to a @xfuserdata/ virtualpath.

xfworkerutil.create_activity_log_event(conn: Connection, title: str, *, level: ActivityLogLevel = ActivityLogLevel.INFO, type: str = 'Message', source: str = 'XamFlowTaskType', payload: dict = {}) Dict[str, Any]

Create a new event in the activity log.

xfworkerutil.create_parameter(type_name: str, **kwargs) Dict[str, Any]

Create a composite parameter.

xfworkerutil.create_xf_job(conn: Connection, job: Dict[str, Any]) Dict[str, Any]

Create a new XF job.

xfworkerutil.create_xf_task(conn: Connection, task: Dict[str, Any]) Dict[str, Any]

Create a new XF task.

xfworkerutil.format_version(version: Tuple[int, ...]) str

Format a version as a string.

xfworkerutil.get_parameter_file_references(parameter: Dict[str, Any], *, only_additional: bool = False) List[Dict[str, Any]]

Get a list of file references to files associated with a parameter.

xfworkerutil.get_parameter_file_token(conn: Connection, parameter: dict) AuthorizationToken

Request an FSS auth token to access XF parameter bulk data depending on type.

xfworkerutil.get_sns_server_connection(sns_conn_params: Dict[str, Any]) ContextManager[Connection]

Open a connection to the SNS Server.

xfworkerutil.get_xf_combined_query(conn: Connection, project_serial: int = None, job_serial: int = None, task_type_serial: int = None, dependency_package_serial: int = None, task_types: str = None, dependency_packages: str = None, projects: str = None, workflows: str = None, tasks: str = None, jobs: str = None, workers: str = None) Dict[str, Any]

Get a combined query result.

xfworkerutil.get_xf_db_entity_by_uuid(conn: Connection, uuid: str) Dict[str, Any]

Get a specific XF entity DB record by UUID.

xfworkerutil.get_xf_default_sns_server_connection_params(conn: Connection) Dict[str, Any] | None

Get the default SNS server connection parameters from the global config.

xfworkerutil.get_xf_dependency_package_config_token(conn: Connection, dependency_package_serial: int | str, filename: str = None) AuthorizationToken

Request a FSS auth token to access XF dependency package config bulk data.

xfworkerutil.get_xf_job(conn: Connection, job_serial: int | str) Dict[str, Any]

Get a specific XF job.

xfworkerutil.get_xf_job_log_token(conn: Connection, job_serial: int | str, filename: str = None) AuthorizationToken

Request a FSS auth token to access XF job log bulk data.

xfworkerutil.get_xf_job_result_token(conn: Connection, job_serial: int | str, filename: str = None) AuthorizationToken

Request a FSS auth token to access XF job result bulk data.

xfworkerutil.get_xf_jobs_by_task(conn: Connection, task_serial: int | str) List[Dict[str, Any]]

Get all XF jobs of a specific task.

xfworkerutil.get_xf_jobs_by_workflow(conn: Connection, workflow_serial: int | str) List[Dict[str, Any]]

Get all XF jobs of a specific workflow.

xfworkerutil.get_xf_parameter_float_sequence(conn: Connection, sequence: Dict[str, Any]) List[float]

Get all the float output values of the done jobs identified by a Sequence parameter.

xfworkerutil.get_xf_parameter_paired_float_sequences(conn: Connection, sequence1: Dict[str, Any], sequence2: Dict[str, Any]) Tuple[List[float], List[float]]

Get all the paired float output values of the done jobs identified by two Sequence parameters.

xfworkerutil.get_xf_parameter_paired_sequences(conn: Connection, sequence1: Dict[str, Any], sequence2: Dict[str, Any]) Tuple[List[Any], List[Any]]

Get all the paired output parameters of the done jobs identified by two Sequence parameters.

xfworkerutil.get_xf_parameter_sequence(conn: Connection, sequence: Dict[str, Any]) List[Any]

Get all the output parameters of the done jobs identified by a Sequence parameter.

xfworkerutil.get_xf_project(conn: Connection, project_serial: int | str) Dict[str, Any]

Get a specific XF project.

xfworkerutil.get_xf_projects(conn: Connection) List[Dict[str, Any]]

Get all XF projects.

xfworkerutil.get_xf_task(conn: Connection, task_serial: int | str) Dict[str, Any]

Get a specific XF task.

xfworkerutil.get_xf_task_config_token(conn: Connection, task_serial: int | str, filename: str = None) AuthorizationToken

Request a FSS auth token to access XF task config bulk data.

xfworkerutil.get_xf_task_type(conn: Connection, task_type_serial: int | str) Dict[str, Any]

Get a specific task type.

xfworkerutil.get_xf_task_type_config_token(conn: Connection, task_type_serial: int | str, filename: str = None) AuthorizationToken

Request a FSS auth token to access XF task type config bulk data.

xfworkerutil.get_xf_task_types(conn: Connection) List[Dict[str, Any]]

Get all task types.

xfworkerutil.get_xf_tasks_by_workflow(conn: Connection, workflow_serial: int | str) List[Dict[str, Any]]

Get all XF tasks of a specific XF workflow.

xfworkerutil.get_xf_userdata_token(conn: Connection, scopes: List[Tuple[str, str]]) AuthorizationToken

Request a FSS auth token to access @xfuserdata/ bulk data.

xfworkerutil.get_xf_workflow(conn: Connection, workflow_serial: int | str) Dict[str, Any]

Get a specific XF workflow.

xfworkerutil.get_xf_workflow_config_token(conn: Connection, workflow_serial: int | str, filename: str = None) AuthorizationToken

Request a FSS auth token to access XF workflow config bulk data.

xfworkerutil.get_xf_workflow_run(conn: Connection, workflow_serial: int | str) Dict[str, Any]

Get a specific XF workflow run.

xfworkerutil.get_xf_workflows_by_project(conn: Connection, project_serial: int | str) List[Dict[str, Any]]

Get all XF workflows of a specific XF project.

xfworkerutil.parse_version(version: str) Tuple[int, ...]

Parse a version string.

xfworkerutil.update_xf_job(conn: Connection, job: Dict[str, Any], bump_occ_lock_version: bool = True, **kwargs) Dict[str, Any]

Update the XF job.

xfworkerutil.update_xf_project(conn: Connection, project: Dict[str, Any], **kwargs) Dict[str, Any]

Update the XF project.

xfworkerutil.update_xf_task(conn: Connection, task: Dict[str, Any], workflow_occ_lock_version: int, **kwargs) Dict[str, Any]

Update the XF task.

xfworkerutil.update_xf_workflow(conn: Connection, workflow: Dict[str, Any], **kwargs) Dict[str, Any]

Update the XF workflow.