Spawning new runs from a task type

A processing task type can also spawn any number of new entities (e.g. a run, task, job, etc.).

For example Lucid.Util.SearchFolderFilesBySuffix uses xfworkerutil.XFWorkerJob.spawn_xf_job_basic() to spawn a new run (consisting of two jobs) for each pair of matching filenames found in a given User Data folder:

import logging
import uuid
import pathlib
from typing import Any, Dict, List

import network
import omni
import xfsortorder
import xfworkerutil


worker = xfworkerutil.XFWorkerJob()

inputs = worker.job['inputs']
folder_parameter = inputs['folder']
suffixes = inputs['suffixes'].upper().split('|')
mode = inputs['mode']
typename = inputs['typename']

if mode not in ('standalone', 'filename', 'dirname'):
    worker.error(f"Unknown mode {mode}")

if typename not in ('ForeignFile', 'Image'):
    worker.error(f"Unknown typename {typename}")

# Search folder
def search_folder_recursively(folder_virtualpath):
    scopes = [('list', folder_virtualpath)]
    with worker.server_connection() as connection:
        list_dir_token = xfworkerutil.get_xf_userdata_token(connection, scopes)
    file_items, folder_items = network.list_files_and_folders_on_filestorage(list_dir_token)

    for item in file_items:
        item['virtualpath'] = network.virtualpath_join_file(folder_virtualpath, item['name'])

    for folder_item in folder_items:
        subfolder_virtualpath = network.virtualpath_join_folder(folder_virtualpath, folder_item['virtual_name'])
        subfolder_file_items = search_folder_recursively(subfolder_virtualpath)
        file_items.extend(subfolder_file_items)

    return file_items


def create_userdata_image(virtualpath: str, filename: str) -> Dict[str, Any]:
    file_parameter = dict(type='ForeignFile', source='userdata', virtualpath=virtualpath, filename = filename)
    file_path = worker.download_input_file(file_parameter)

    def matches(path: pathlib.Path, patterns: List[str]) -> bool:
        return any(path.match(pattern) or path.match(pattern.upper()) for pattern in patterns)

    if matches(file_path, ['*.omni']):
        image = omni.Image.load(file_path)
        size = image.size
    elif matches(file_path, ['*.aim', '*.aim;*']):
        image = omni.import_from_aim(file_path, load_pixels=False)
        size = image.size
    elif matches(file_path, ['*.isq', '*.isq;*']):
        image = omni.import_from_isq(file_path, load_pixels=False)
        size = image.size
    else:
        worker.error(f"Unknown format {filename}")
    return dict(type='Image', source='userdata', virtualpath=virtualpath, filename=filename, size=size)


file_items = search_folder_recursively(folder_parameter['virtualpath'])

if mode == "standalone":
    def has_any_suffix(name: str) -> bool:
        return any(name.upper().endswith(suffix) for suffix in suffixes)
    tuples = [tuple([item]) for item in file_items if has_any_suffix(item['name'])]

    spawn_key = 'file'
    task_name = f"Spawned {typename}"
    has_add_tasks = False

else:
    group_key = 'name' if mode == 'filename' else 'virtualpath'
    file_items = {item[group_key].upper(): item for item in file_items}

    suffix1 = suffixes[0]
    tuples = []
    for key1, item1 in file_items.items():
        if not key1.endswith(suffix1):
            continue
        logging.info(f"Found {key1}")
        items = [item1]
        for suffix2 in suffixes[1:]:
            key2 = key1[:-len(suffix1)] + suffix2
            item2 = file_items.get(key2)
            if item2 is not None:
                items.append(item2)
            else:
                logging.warning(f"Not found: {key2}")
        if len(items) < len(suffixes):
            logging.info(f"Skipping incomplete run for {key1}")
            continue
        tuples.append(tuple(items))

    spawn_key = suffix1
    task_name = suffix1
    has_add_tasks = True

if typename == 'ForeignFile':
    select_task_type_name = "XF.Data.SelectForeignFile"
    add_task_type_name = "XF.Data.AddForeignFile"
    add_task_input_template_name = 'foreign_file_template'
    def create_outputs(filename: str, virtualpath: str):
        return {
            'foreign_file': dict(type='ForeignFile', source='userdata', virtualpath=virtualpath, filename=filename),
        }


elif typename == 'Image':
    select_task_type_name = "XF.Data.SelectImage"
    add_task_type_name = "XF.Data.AddImage"
    add_task_input_template_name = 'image_template'
    def create_outputs(filename: str, virtualpath: str):
        return {
            'image': create_userdata_image(virtualpath, filename),
        }


with worker.server_connection() as connection:
    select_task_type, *old_select_task_types = worker.find_xf_task_types(connection, select_task_type_name)
    select_task = worker.spawn_xf_task_basic(connection, select_task_type, task_name, spawn_key=spawn_key,
        upgrade_task_types=old_select_task_types)
    if has_add_tasks:
        add_task_type, *old_add_task_types = worker.find_xf_task_types(connection, add_task_type_name)
        add_tasks = [worker.spawn_xf_task_basic(connection, add_task_type, suffix, spawn_key=suffix,
            upgrade_task_types=old_add_task_types,
            task_inputs=[
                {
                    'key': add_task_input_template_name,
                    "source": {
                        "mode": "none"
                    }
                },
            ]) for suffix in suffixes[1:]]

    existing_sort_orders = worker.get_existing_job_sort_orders(connection)
    new_sort_orders = xfsortorder.get_new_source_job_sort_orders(existing_sort_orders, len(tuples))

    for i, tuple in enumerate(tuples):
        sort_order = new_sort_orders[i]
        item1 = tuple[0]
        filename1 = item1['name']
        virtualpath1 = item1['virtualpath']
        logging.info(f"Spawning run for {filename1}")
        select_task_outputs = create_outputs(filename1, virtualpath1)
        metadata = {}
        if mode == 'standalone':
            entity_name = filename1
        elif mode == 'filename':
            entity_name = filename1[:-len(suffix1)]
        elif mode == 'dirname':
            entity_name = network.get_virtualpath_dirname(virtualpath1)
        metadata['run_display_name'] = entity_name
        metadata['display_name'] = entity_name
        entity = worker.create_output_entity(str(uuid.uuid4()), entity_name)
        metadata['run_entity'] = entity
        state = 'Obsolete' if has_add_tasks else 'Done'
        select_job = worker.spawn_xf_job_basic(connection, select_task_type, select_task, outputs=select_task_outputs, state=state, sort_order=sort_order, metadata=metadata)
        if has_add_tasks:
            for item2, add_task in zip(tuple[1:], add_tasks):
                filename2 = item2['name']
                virtualpath2 = item2['virtualpath']
                add_task_inputs = {
                    add_task_input_template_name: None,
                }
                add_task_outputs = create_outputs(filename2, virtualpath2)
                add_job = worker.spawn_xf_job_basic(connection, add_task_type, add_task, source_job_serial=select_job['serial'], inputs=add_task_inputs, outputs=add_task_outputs)
            xfworkerutil.update_xf_job(connection, select_job, state='Done') # Finish source job (created in Obsolete state to avoid auto-creating follower jobs)

# Save outputs
outputs: dict = {
}
worker.finish(outputs)

Also spawn_xf_task_basic() is used to spawn the required tasks if they don’t exist yet.

See also Reduce Task Types.