xfworkerutil¶
XF Worker utilities.
This module implements the most commonly required functionality in task type scripts:
XF Worker infrastructure
XF network protocols
XF JSON schemas
Accessing job input and output parameters
Downloading and uploading files
Spawning XF entities like jobs and tasks
- class xfworkerutil.ActivityLogLevel(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
- class xfworkerutil.TemporaryWorkingDirectory(prefix: str)¶
Creates a temporary working directory.
Create a unique related path in the temporary working directory by adding a postfix. Similar to path.with_suffix(‘.ext’), but allows postfix to contain more than just a file extension, like ‘_SEG.AIM’. And ensures the path is unique.
- create_temp_path(filename: str) Path ¶
Create a unique path in the temporary working directory.
- is_under(p: Path) bool ¶
Is the given path under the working dir?
- remove() None ¶
Remove the temporary working directory, deleting all files recursively.
- subdir(name: str, max_name_len: int = 8) Iterator[None] ¶
Create a subdirectory and use it for the duration of the context manager for creating new temporary and related paths.
- class xfworkerutil.XFWorkerArguments¶
XF Worker arguments via environment variables to this task type script.
- property job_serial: int¶
The serial of the current job.
- property server_auth: str¶
The authentication information for the XF Server.
- property server_host: str¶
The fully qualified domain name of the XF Server.
- property server_port: str¶
The port of the XF Server.
- class xfworkerutil.XFWorkerJob¶
High-level utilities for executing a XF Worker job. Create an instance at the start of the script. Call finish() at the end of the (successful) script.
- activity_log_message(title: str, *, level: ActivityLogLevel = ActivityLogLevel.INFO) None ¶
Log a message event to the activity log.
- clear_error_message(*, save: bool = True) None ¶
Clear the error message in the job’s metadata.
- create_output_bitmap(filename: str) Dict[str, Any] ¶
Create a composite output parameter of type ‘Bitmap’.
- create_output_contours(filename: str, image: Dict[str, Any]) Dict[str, Any] ¶
Create a composite output parameter of type ‘Contours’.
- create_output_csv(filename: str) Dict[str, Any] ¶
Create a composite output parameter of type ‘CSV’.
- create_output_entity(uuid: str, name: str) Dict[str, Any] ¶
Create a composite output parameter of type ‘Entity’.
- create_output_foreign_file(filename: str) Dict[str, Any] ¶
Create a composite output parameter of type ‘ForeignFile’.
- create_output_gi_result(snapshot: Dict[str, Any], gi_image: Dict[str, Any]) Dict[str, Any] ¶
Create a composite output parameter of type ‘GIResult’.
- create_output_html_document(filename: str) Dict[str, Any] ¶
Create a composite output parameter of type ‘HTMLDocument’.
- create_output_image(filename: str, size: List[int], filenames: List[str] | None = None, additional_files: List[Dict[str, Any]] | None = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘Image’.
- create_output_image_pair_list(pairs: List[List[Dict[str, Any]]]) Dict[str, Any] ¶
Create a composite output parameter of type ‘ImagePairList’.
- create_output_image_voi(pos: List[int], size: List[int], image: Dict[str, Any], name: str = None, category: str = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘ImageVOI’.
- create_output_mesh(filename: str, filenames: List[str] | None = None, additional_files: List[Dict[str, Any]] | None = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘Mesh’.
- create_output_path(path: Path, postfix: str) Path ¶
Create a path for a local file associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Sends a HEAD request to FSS to find the direct-access path. If possible, just locates the local path to the file. (For write access!) Or creates a unique related path in the temporary working directory by adding a postfix.
- create_output_registration_result(transform: List[float], *, image1: Dict[str, Any] | None = None, image2: Dict[str, Any] | None = None, mesh1: Dict[str, Any] | None = None, mesh2: Dict[str, Any] | None = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘RegistrationResult’.
- create_output_rotated_image_voi(pos_mm: List[float], size_mm: List[float], size: List[int], rotation: List[float], image: Dict[str, Any], name: str = None, category: str = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘RotatedImageVOI’.
- create_output_snapshot(bitmap: Dict[str, Any], image: Dict[str, Any] = None, image2: Dict[str, Any] = None, mesh: Dict[str, Any] = None, metadata: Dict[str, Any] = None, video: Dict[str, Any] = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘Snapshot’.
- create_output_snapshot_metadata(filename: str) Dict[str, Any] ¶
Create a composite output parameter of type ‘SnapshotMetadata’.
- create_output_spectra(amplitudes_image: Dict[str, Any], wavelength_image: Dict[str, Any]) Dict[str, Any] ¶
Create a composite output parameter of type ‘Spectra’.
- create_output_video(filename: str, bitmap: Dict[str, Any] = None) Dict[str, Any] ¶
Create a composite output parameter of type ‘Video’.
Create a unique related path in the temporary working directory by adding a postfix. Similar to path.with_suffix(‘.ext’), but allows postfix to contain more than just a file extension, like ‘_SEG.AIM’. And ensures the path is unique.
- create_temp_path(filename: str) Path ¶
Create a unique path in the temporary working directory.
- download_additional_input_files(input_parameter: Dict[str, Any], *, copy_even_on_direct_access: bool = False) List[Path] ¶
Download additional files associated with an input parameter of the current XF job.
- download_input_file(input_parameter: Dict[str, Any], *, copy_even_on_direct_access: bool = False) Path ¶
Download a file associated with an input parameter of the current XF job. Connects to XF Server to get an authorization token. If possible, just locate the local path to the file. (For read-only access!) Downloads the file from FSS to the temporary working directory. Return the path to the local file.
Download a related file from the same source as the input parameter.
- download_xf_dependency_package_config_file(dependency_package_serial: int, filename: str, *, copy_even_on_direct_access: bool = False) Path ¶
Download a dependency-package-config file.
- download_xf_task_type_config_file(task_type_serial: int, filename: str, *, copy_even_on_direct_access: bool = False) Path ¶
Download a task-type-config file.
- download_xf_userdata_file(virtualpath: str, *, copy_even_on_direct_access: bool = False) Path ¶
Download file from @xfuserdata/ virtualpath.
- download_xf_workflow_config_file(workflow_serial: int, filename: str, *, copy_even_on_direct_access: bool = False) Path ¶
Download a workflow-config file.
- error(message: str) NoReturn ¶
Finish a failed run. Save the error message. (The job’s state remains unchanged for now and will be updated promptly by XF Worker.) (The temporary working directory remains unchanged (for debugging purposes) and will be deleted on a schedule by XF Worker.)
- find_xf_task_type(connection: Connection, name: str) Dict[str, Any] ¶
Find the latest task type of a given package name, or raise an exception.
- find_xf_task_types(connection: Connection, name: str, *, raise_if_not_any: bool = True) List[Dict[str, Any]] ¶
Find all task types of a given package name (in descending version order).
- finish(outputs: Dict[str, Any], units: Dict[str, str] = None) None ¶
Finish a successful run. Change the job’s state to Done, and store the provided outputs. Also remove the temporary working directory.
- get_current_task() Dict[str, Any] ¶
Get the current job’s task.
- get_debug_info() str ¶
Get some information useful for debugging.
- get_existing_job_sort_orders(connection: Connection) List[str] ¶
Get list of existing job sort order strings in this workflow for spawning more jobs.
- get_ipl_image_path(input_parameter: Dict[str, Any], *, auto_convert_from_omni: bool = True) Path ¶
Get (download if required) the image file. Return the path to the local file. Ensure the path is not a UNC network share path. Raise if it is not an IPL-compatible image format. Support OMNI metadata files with an additional element file that happens to be IPL-compatible.
- get_omni_image(input_parameter: Dict[str, Any], *, copy_even_on_direct_access: bool = False, download_only_metadata: bool = False) Image ¶
Get (download if required) the image file. Return the loaded image. Raise if it is not an OMNI-compatible image format.
- get_run_entity() Dict[str, Any] | None ¶
Get the current run’s entity.
- hook_exceptions() None ¶
Hook into Python’s uncaught exception handling mechanism to save the error message.
- load_job() Dict[str, Any] ¶
Connect to XF Server and load the current job.
- progress(percentage: float, *, save: bool = True) None ¶
Save a new estimated duration based on a progress percentage and the current time and the start time.
- save_job(**kwargs) Dict[str, Any] ¶
Connect to XF Server and save the current job, replacing all keyword arguments.
- server_connection() ContextManager[Connection] ¶
Open a connection to the XF Server.
- set_error_message(message: str, *, save: bool = True) None ¶
Save the error message in the job’s metadata.
- set_estimated_duration(estimated_duration_in_seconds: float, *, save: bool = True) None ¶
Save a new estimated duration in the job’s metadata.
- set_output_units(units: dict[str, str], *, save: bool = True) None ¶
Save the output units in the job’s metadata.
- spawn_xf_job_basic(connection: Connection, task_type: Dict[str, Any], task: Dict[str, Any], source_job_serial: int | None = None, state: str = 'Done', inputs: Dict[str, Any] = {}, outputs: Dict[str, Any] = {}, sort_order: str = None, metadata: Dict[str, Any] = None) Dict[str, Any] ¶
Spawn a done job for the given task.
- spawn_xf_task_basic(connection: Connection, task_type: Dict[str, Any], task_name: str = None, spawn_key: str = None, task_inputs: List[Dict[str, Any]] = [], task_metadata: Dict[str, Any] = {}, upgrade_task_types: List[Dict[str, Any]] = None) Dict[str, Any] ¶
Ensure a task of the given task type (with no inputs) in the current workflow.
- spawn_xf_task_type_basic(connection: Connection, task_type_metadata: Dict[str, Any], task_type_interface: Dict[str, Any]) Dict[str, Any] ¶
Spawn a new task type.
- switch_to_interactive_mode(error_message: str = None) NoReturn ¶
Save the current task’s initial state and the current job’s state as ReadyForInteractive, and exit.
- upload_omni_output(image: Image) Dict[str, Any] ¶
Upload a local omni image associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Uploads both files (the metadata file and pixels file) to FSS. Note: The files must be saved to disk first! Create a composite output parameter of type ‘Image’.
- upload_omni_output_only_metadata(image: Image, additional_files: List[Dict[str, Any]]) Dict[str, Any] ¶
Upload a local omni image associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Uploads only the metadata file to FSS. References additional files. Note: The metadata files must be saved to disk first! Create a composite output parameter of type ‘Image’.
- upload_output_file(local_path: Path) None ¶
Upload a local file associated with an output parameter of the current XF job. Connects to XF Server to get an authorization token. Uploads the file to FSS.
- upload_xf_userdata_file(local_path: Path, virtualpath: str) None ¶
Upload a local file to a @xfuserdata/ virtualpath.
- xfworkerutil.create_activity_log_event(conn: Connection, title: str, *, level: ActivityLogLevel = ActivityLogLevel.INFO, type: str = 'Message', source: str = 'XamFlowTaskType', payload: dict = {}) Dict[str, Any] ¶
Create a new event in the activity log.
- xfworkerutil.create_parameter(type_name: str, **kwargs) Dict[str, Any] ¶
Create a composite parameter.
- xfworkerutil.create_xf_job(conn: Connection, job: Dict[str, Any]) Dict[str, Any] ¶
Create a new XF job.
- xfworkerutil.create_xf_task(conn: Connection, task: Dict[str, Any]) Dict[str, Any] ¶
Create a new XF task.
- xfworkerutil.format_version(version: Tuple[int, ...]) str ¶
Format a version as a string.
- xfworkerutil.get_parameter_file_references(parameter: Dict[str, Any], *, only_additional: bool = False) List[Dict[str, Any]] ¶
Get a list of file references to files associated with a parameter.
- xfworkerutil.get_parameter_file_token(conn: Connection, parameter: dict) AuthorizationToken ¶
Request an FSS auth token to access XF parameter bulk data depending on type.
- xfworkerutil.get_sns_server_connection(sns_conn_params: Dict[str, Any]) ContextManager[Connection] ¶
Open a connection to the SNS Server.
- xfworkerutil.get_xf_combined_query(conn: Connection, project_serial: int = None, job_serial: int = None, task_type_serial: int = None, dependency_package_serial: int = None, task_types: str = None, dependency_packages: str = None, projects: str = None, workflows: str = None, tasks: str = None, jobs: str = None, workers: str = None) Dict[str, Any] ¶
Get a combined query result.
- xfworkerutil.get_xf_db_entity_by_uuid(conn: Connection, uuid: str) Dict[str, Any] ¶
Get a specific XF entity DB record by UUID.
- xfworkerutil.get_xf_default_sns_server_connection_params(conn: Connection) Dict[str, Any] | None ¶
Get the default SNS server connection parameters from the global config.
- xfworkerutil.get_xf_dependency_package_config_token(conn: Connection, dependency_package_serial: int | str, filename: str = None) AuthorizationToken ¶
Request a FSS auth token to access XF dependency package config bulk data.
- xfworkerutil.get_xf_job(conn: Connection, job_serial: int | str) Dict[str, Any] ¶
Get a specific XF job.
- xfworkerutil.get_xf_job_log_token(conn: Connection, job_serial: int | str, filename: str = None) AuthorizationToken ¶
Request a FSS auth token to access XF job log bulk data.
- xfworkerutil.get_xf_job_result_token(conn: Connection, job_serial: int | str, filename: str = None) AuthorizationToken ¶
Request a FSS auth token to access XF job result bulk data.
- xfworkerutil.get_xf_jobs_by_task(conn: Connection, task_serial: int | str) List[Dict[str, Any]] ¶
Get all XF jobs of a specific task.
- xfworkerutil.get_xf_jobs_by_workflow(conn: Connection, workflow_serial: int | str) List[Dict[str, Any]] ¶
Get all XF jobs of a specific workflow.
- xfworkerutil.get_xf_parameter_float_sequence(conn: Connection, sequence: Dict[str, Any]) List[float] ¶
Get all the float output values of the done jobs identified by a Sequence parameter.
- xfworkerutil.get_xf_parameter_paired_float_sequences(conn: Connection, sequence1: Dict[str, Any], sequence2: Dict[str, Any]) Tuple[List[float], List[float]] ¶
Get all the paired float output values of the done jobs identified by two Sequence parameters.
- xfworkerutil.get_xf_parameter_paired_sequences(conn: Connection, sequence1: Dict[str, Any], sequence2: Dict[str, Any]) Tuple[List[Any], List[Any]] ¶
Get all the paired output parameters of the done jobs identified by two Sequence parameters.
- xfworkerutil.get_xf_parameter_sequence(conn: Connection, sequence: Dict[str, Any]) List[Any] ¶
Get all the output parameters of the done jobs identified by a Sequence parameter.
- xfworkerutil.get_xf_project(conn: Connection, project_serial: int | str) Dict[str, Any] ¶
Get a specific XF project.
- xfworkerutil.get_xf_projects(conn: Connection) List[Dict[str, Any]] ¶
Get all XF projects.
- xfworkerutil.get_xf_task(conn: Connection, task_serial: int | str) Dict[str, Any] ¶
Get a specific XF task.
- xfworkerutil.get_xf_task_config_token(conn: Connection, task_serial: int | str, filename: str = None) AuthorizationToken ¶
Request a FSS auth token to access XF task config bulk data.
- xfworkerutil.get_xf_task_type(conn: Connection, task_type_serial: int | str) Dict[str, Any] ¶
Get a specific task type.
- xfworkerutil.get_xf_task_type_config_token(conn: Connection, task_type_serial: int | str, filename: str = None) AuthorizationToken ¶
Request a FSS auth token to access XF task type config bulk data.
- xfworkerutil.get_xf_task_types(conn: Connection) List[Dict[str, Any]] ¶
Get all task types.
- xfworkerutil.get_xf_tasks_by_workflow(conn: Connection, workflow_serial: int | str) List[Dict[str, Any]] ¶
Get all XF tasks of a specific XF workflow.
- xfworkerutil.get_xf_userdata_token(conn: Connection, scopes: List[Tuple[str, str]]) AuthorizationToken ¶
Request a FSS auth token to access @xfuserdata/ bulk data.
- xfworkerutil.get_xf_workflow(conn: Connection, workflow_serial: int | str) Dict[str, Any] ¶
Get a specific XF workflow.
- xfworkerutil.get_xf_workflow_config_token(conn: Connection, workflow_serial: int | str, filename: str = None) AuthorizationToken ¶
Request a FSS auth token to access XF workflow config bulk data.
- xfworkerutil.get_xf_workflow_run(conn: Connection, workflow_serial: int | str) Dict[str, Any] ¶
Get a specific XF workflow run.
- xfworkerutil.get_xf_workflows_by_project(conn: Connection, project_serial: int | str) List[Dict[str, Any]] ¶
Get all XF workflows of a specific XF project.
- xfworkerutil.parse_version(version: str) Tuple[int, ...] ¶
Parse a version string.
- xfworkerutil.update_xf_job(conn: Connection, job: Dict[str, Any], bump_occ_lock_version: bool = True, **kwargs) Dict[str, Any] ¶
Update the XF job.
- xfworkerutil.update_xf_project(conn: Connection, project: Dict[str, Any], **kwargs) Dict[str, Any] ¶
Update the XF project.
- xfworkerutil.update_xf_task(conn: Connection, task: Dict[str, Any], workflow_occ_lock_version: int, **kwargs) Dict[str, Any] ¶
Update the XF task.
- xfworkerutil.update_xf_workflow(conn: Connection, workflow: Dict[str, Any], **kwargs) Dict[str, Any] ¶
Update the XF workflow.