Spawning new runs from a task type

A processing task type can also spawn any number of new entities (e.g. a run, task, job, etc.).

For example Lucid.Util.SearchFolderFilesBySuffix uses xfworkerutil.XFWorkerJob.spawn_xf_job_basic() to spawn a new run (consisting of two jobs) for each pair of matching filenames found in a given User Data folder:

import logging
import uuid

import network
import xfsortorder
import xfworkerutil


worker = xfworkerutil.XFWorkerJob()

inputs = worker.job['inputs']
folder_parameter = inputs['folder']
suffixes = inputs['suffixes'].upper().split('|')
mode = inputs['mode']

if mode not in ('filename', 'dirname'):
    worker.error(f"Unknown mode {mode}")

# Search folder
def search_folder_recursively(folder_virtualpath):
    scopes = [('list', folder_virtualpath)]
    with worker.server_connection() as connection:
        list_dir_token = xfworkerutil.get_xf_userdata_token(connection, scopes)
    file_items, folder_items = network.list_files_and_folders_on_filestorage(list_dir_token)

    for item in file_items:
        item['virtualpath'] = network.virtualpath_join_file(folder_virtualpath, item['name'])

    for folder_item in folder_items:
        subfolder_virtualpath = network.virtualpath_join_folder(folder_virtualpath, folder_item['virtual_name'])
        subfolder_file_items = search_folder_recursively(subfolder_virtualpath)
        file_items.extend(subfolder_file_items)

    return file_items

file_items = search_folder_recursively(folder_parameter['virtualpath'])
group_key = 'virtualpath' if mode == 'dirname' else 'name'
file_items = {item[group_key].upper(): item for item in file_items}

suffix1 = suffixes[0]
tuples = []
for key1, item1 in file_items.items():
    if not key1.endswith(suffix1):
        continue
    logging.info(f"Found {key1}")
    items = [item1]
    for suffix2 in suffixes[1:]:
        key2 = key1[:-len(suffix1)] + suffix2
        item2 = file_items.get(key2)
        if item2 is not None:
            items.append(item2)
        else:
            logging.warning(f"Not found: {key2}")
    if len(items) < len(suffixes):
        logging.info(f"Skipping incomplete run for {key1}")
        continue
    tuples.append(tuple(items))

with worker.server_connection() as connection:
    file_task_type_1, *old_file_task_types_1 = worker.find_xf_task_types(connection, "XF.Data.SelectForeignFile")
    file_task_1 = worker.spawn_xf_task_basic(connection, file_task_type_1, suffix1, spawn_key=suffix1,
        upgrade_task_types=old_file_task_types_1)
    file_task_type_2, *old_file_task_types_2 = worker.find_xf_task_types(connection, "XF.Data.AddForeignFile")
    file_tasks_2 = [worker.spawn_xf_task_basic(connection, file_task_type_2, suffix, spawn_key=suffix,
        upgrade_task_types=old_file_task_types_2,
        task_inputs=[
            {
                'key': 'foreign_file_template',
                "source": {
                    "mode": "none"
                }
            },
        ]) for suffix in suffixes[1:]]

    existing_sort_orders = worker.get_existing_job_sort_orders(connection)
    new_sort_orders = xfsortorder.get_new_source_job_sort_orders(existing_sort_orders, len(tuples))

    for i, tuple in enumerate(tuples):
        sort_order = new_sort_orders[i]
        item1 = tuple[0]
        filename1 = item1['name']
        virtualpath1 = item1['virtualpath']
        logging.info(f"Spawning run for {filename1}")
        file_outputs_1 = {
            'foreign_file': dict(type='ForeignFile', source='userdata', virtualpath=virtualpath1, filename=filename1),
        }
        metadata = {}
        if mode == 'filename':
            entity_name = filename1[:-len(suffix1)]
        elif mode == 'dirname':
            entity_name = network.get_virtualpath_dirname(virtualpath1)
        metadata['run_name'] = entity_name
        metadata['display_name'] = entity_name
        entity = worker.create_output_entity(str(uuid.uuid4()), entity_name)
        metadata['run_entity'] = entity
        file_job_1 = worker.spawn_xf_job_basic(connection, file_task_type_1, file_task_1, outputs=file_outputs_1, state='Obsolete', sort_order=sort_order, metadata=metadata)
        for item2, file_task_2 in zip(tuple[1:], file_tasks_2):
            filename2 = item2['name']
            virtualpath2 = item2['virtualpath']
            file_inputs_2 = {
                'foreign_file_template': None,
            }
            file_outputs_2 = {
                'foreign_file': dict(type='ForeignFile', source='userdata', virtualpath=virtualpath2, filename=filename2),
            }
            file_job_2 = worker.spawn_xf_job_basic(connection, file_task_type_2, file_task_2, source_job_serial=file_job_1['serial'], inputs=file_inputs_2, outputs=file_outputs_2)
        xfworkerutil.update_xf_job(connection, file_job_1, state='Done') # Finish source job (created in Obsolete state to avoid auto-creating follower jobs)

# Save outputs
outputs: dict = {
}
worker.finish(outputs)

Also spawn_xf_task_basic() is used to spawn the required tasks if they don’t exist yet.

See also Reduce Task Types.