Spawning new runs from a task type¶
A processing task type can also spawn any number of new entities (e.g. a run, task, job, etc.).
For example Lucid.Util.SearchFolderFilesBySuffix
uses xfworkerutil.XFWorkerJob.spawn_xf_job_basic()
to spawn a new run (consisting of two jobs) for each pair of matching filenames found in a given User Data folder:
import logging
import uuid
import network
import xfsortorder
import xfworkerutil
worker = xfworkerutil.XFWorkerJob()
inputs = worker.job['inputs']
folder_parameter = inputs['folder']
suffixes = inputs['suffixes'].upper().split('|')
mode = inputs['mode']
if mode not in ('filename', 'dirname'):
worker.error(f"Unknown mode {mode}")
# Search folder
def search_folder_recursively(folder_virtualpath):
scopes = [('list', folder_virtualpath)]
with worker.server_connection() as connection:
list_dir_token = xfworkerutil.get_xf_userdata_token(connection, scopes)
file_items, folder_items = network.list_files_and_folders_on_filestorage(list_dir_token)
for item in file_items:
item['virtualpath'] = network.virtualpath_join_file(folder_virtualpath, item['name'])
for folder_item in folder_items:
subfolder_virtualpath = network.virtualpath_join_folder(folder_virtualpath, folder_item['virtual_name'])
subfolder_file_items = search_folder_recursively(subfolder_virtualpath)
file_items.extend(subfolder_file_items)
return file_items
file_items = search_folder_recursively(folder_parameter['virtualpath'])
group_key = 'virtualpath' if mode == 'dirname' else 'name'
file_items = {item[group_key].upper(): item for item in file_items}
suffix1 = suffixes[0]
tuples = []
for key1, item1 in file_items.items():
if not key1.endswith(suffix1):
continue
logging.info(f"Found {key1}")
items = [item1]
for suffix2 in suffixes[1:]:
key2 = key1[:-len(suffix1)] + suffix2
item2 = file_items.get(key2)
if item2 is not None:
items.append(item2)
else:
logging.warning(f"Not found: {key2}")
if len(items) < len(suffixes):
logging.info(f"Skipping incomplete run for {key1}")
continue
tuples.append(tuple(items))
with worker.server_connection() as connection:
file_task_type_1, *old_file_task_types_1 = worker.find_xf_task_types(connection, "XF.Data.SelectForeignFile")
file_task_1 = worker.spawn_xf_task_basic(connection, file_task_type_1, suffix1, spawn_key=suffix1,
upgrade_task_types=old_file_task_types_1)
file_task_type_2, *old_file_task_types_2 = worker.find_xf_task_types(connection, "XF.Data.AddForeignFile")
file_tasks_2 = [worker.spawn_xf_task_basic(connection, file_task_type_2, suffix, spawn_key=suffix,
upgrade_task_types=old_file_task_types_2,
task_inputs=[
{
'key': 'foreign_file_template',
"source": {
"mode": "none"
}
},
]) for suffix in suffixes[1:]]
existing_sort_orders = worker.get_existing_job_sort_orders(connection)
new_sort_orders = xfsortorder.get_new_source_job_sort_orders(existing_sort_orders, len(tuples))
for i, tuple in enumerate(tuples):
sort_order = new_sort_orders[i]
item1 = tuple[0]
filename1 = item1['name']
virtualpath1 = item1['virtualpath']
logging.info(f"Spawning run for {filename1}")
file_outputs_1 = {
'foreign_file': dict(type='ForeignFile', source='userdata', virtualpath=virtualpath1, filename=filename1),
}
metadata = {}
if mode == 'filename':
entity_name = filename1[:-len(suffix1)]
elif mode == 'dirname':
entity_name = network.get_virtualpath_dirname(virtualpath1)
metadata['run_name'] = entity_name
metadata['display_name'] = entity_name
entity = worker.create_output_entity(str(uuid.uuid4()), entity_name)
metadata['run_entity'] = entity
file_job_1 = worker.spawn_xf_job_basic(connection, file_task_type_1, file_task_1, outputs=file_outputs_1, state='Obsolete', sort_order=sort_order, metadata=metadata)
for item2, file_task_2 in zip(tuple[1:], file_tasks_2):
filename2 = item2['name']
virtualpath2 = item2['virtualpath']
file_inputs_2 = {
'foreign_file_template': None,
}
file_outputs_2 = {
'foreign_file': dict(type='ForeignFile', source='userdata', virtualpath=virtualpath2, filename=filename2),
}
file_job_2 = worker.spawn_xf_job_basic(connection, file_task_type_2, file_task_2, source_job_serial=file_job_1['serial'], inputs=file_inputs_2, outputs=file_outputs_2)
xfworkerutil.update_xf_job(connection, file_job_1, state='Done') # Finish source job (created in Obsolete state to avoid auto-creating follower jobs)
# Save outputs
outputs: dict = {
}
worker.finish(outputs)
Also spawn_xf_task_basic()
is used to spawn the required tasks if they don’t exist yet.
See also Reduce Task Types.