Spawning new runs from a task type¶
A processing task type can also spawn any number of new entities (e.g. a run, task, job, etc.).
For example Lucid.Util.SearchFolderFilesBySuffix
uses xfworkerutil.XFWorkerJob.spawn_xf_job_basic()
to spawn a new run (consisting of two jobs) for each pair of matching filenames found in a given User Data folder:
import logging
import uuid
import pathlib
from typing import Any, Dict, List
import network
import omni
import xfsortorder
import xfworkerutil
worker = xfworkerutil.XFWorkerJob()
inputs = worker.job['inputs']
folder_parameter = inputs['folder']
suffixes = inputs['suffixes'].upper().split('|')
mode = inputs['mode']
typename = inputs['typename']
if mode not in ('standalone', 'filename', 'dirname'):
worker.error(f"Unknown mode {mode}")
if typename not in ('ForeignFile', 'Image'):
worker.error(f"Unknown typename {typename}")
# Search folder
def search_folder_recursively(folder_virtualpath):
scopes = [('list', folder_virtualpath)]
with worker.server_connection() as connection:
list_dir_token = xfworkerutil.get_xf_userdata_token(connection, scopes)
file_items, folder_items = network.list_files_and_folders_on_filestorage(list_dir_token)
for item in file_items:
item['virtualpath'] = network.virtualpath_join_file(folder_virtualpath, item['name'])
for folder_item in folder_items:
subfolder_virtualpath = network.virtualpath_join_folder(folder_virtualpath, folder_item['virtual_name'])
subfolder_file_items = search_folder_recursively(subfolder_virtualpath)
file_items.extend(subfolder_file_items)
return file_items
def create_userdata_image(virtualpath: str, filename: str) -> Dict[str, Any]:
file_parameter = dict(type='ForeignFile', source='userdata', virtualpath=virtualpath, filename = filename)
file_path = worker.download_input_file(file_parameter)
def matches(path: pathlib.Path, patterns: List[str]) -> bool:
return any(path.match(pattern) or path.match(pattern.upper()) for pattern in patterns)
if matches(file_path, ['*.omni']):
image = omni.Image.load(file_path)
size = image.size
elif matches(file_path, ['*.aim', '*.aim;*']):
image = omni.import_from_aim(file_path, load_pixels=False)
size = image.size
elif matches(file_path, ['*.isq', '*.isq;*']):
image = omni.import_from_isq(file_path, load_pixels=False)
size = image.size
else:
worker.error(f"Unknown format {filename}")
return dict(type='Image', source='userdata', virtualpath=virtualpath, filename=filename, size=size)
file_items = search_folder_recursively(folder_parameter['virtualpath'])
if mode == "standalone":
def has_any_suffix(name: str) -> bool:
return any(name.upper().endswith(suffix) for suffix in suffixes)
tuples = [tuple([item]) for item in file_items if has_any_suffix(item['name'])]
spawn_key = 'file'
task_name = f"Spawned {typename}"
has_add_tasks = False
else:
group_key = 'name' if mode == 'filename' else 'virtualpath'
file_items = {item[group_key].upper(): item for item in file_items}
suffix1 = suffixes[0]
tuples = []
for key1, item1 in file_items.items():
if not key1.endswith(suffix1):
continue
logging.info(f"Found {key1}")
items = [item1]
for suffix2 in suffixes[1:]:
key2 = key1[:-len(suffix1)] + suffix2
item2 = file_items.get(key2)
if item2 is not None:
items.append(item2)
else:
logging.warning(f"Not found: {key2}")
if len(items) < len(suffixes):
logging.info(f"Skipping incomplete run for {key1}")
continue
tuples.append(tuple(items))
spawn_key = suffix1
task_name = suffix1
has_add_tasks = True
if typename == 'ForeignFile':
select_task_type_name = "XF.Data.SelectForeignFile"
add_task_type_name = "XF.Data.AddForeignFile"
add_task_input_template_name = 'foreign_file_template'
def create_outputs(filename: str, virtualpath: str):
return {
'foreign_file': dict(type='ForeignFile', source='userdata', virtualpath=virtualpath, filename=filename),
}
elif typename == 'Image':
select_task_type_name = "XF.Data.SelectImage"
add_task_type_name = "XF.Data.AddImage"
add_task_input_template_name = 'image_template'
def create_outputs(filename: str, virtualpath: str):
return {
'image': create_userdata_image(virtualpath, filename),
}
with worker.server_connection() as connection:
select_task_type, *old_select_task_types = worker.find_xf_task_types(connection, select_task_type_name)
select_task = worker.spawn_xf_task_basic(connection, select_task_type, task_name, spawn_key=spawn_key,
upgrade_task_types=old_select_task_types)
if has_add_tasks:
add_task_type, *old_add_task_types = worker.find_xf_task_types(connection, add_task_type_name)
add_tasks = [worker.spawn_xf_task_basic(connection, add_task_type, suffix, spawn_key=suffix,
upgrade_task_types=old_add_task_types,
task_inputs=[
{
'key': add_task_input_template_name,
"source": {
"mode": "none"
}
},
]) for suffix in suffixes[1:]]
existing_sort_orders = worker.get_existing_job_sort_orders(connection)
new_sort_orders = xfsortorder.get_new_source_job_sort_orders(existing_sort_orders, len(tuples))
for i, tuple in enumerate(tuples):
sort_order = new_sort_orders[i]
item1 = tuple[0]
filename1 = item1['name']
virtualpath1 = item1['virtualpath']
logging.info(f"Spawning run for {filename1}")
select_task_outputs = create_outputs(filename1, virtualpath1)
metadata = {}
if mode == 'standalone':
entity_name = filename1
elif mode == 'filename':
entity_name = filename1[:-len(suffix1)]
elif mode == 'dirname':
entity_name = network.get_virtualpath_dirname(virtualpath1)
metadata['run_display_name'] = entity_name
metadata['display_name'] = entity_name
entity = worker.create_output_entity(str(uuid.uuid4()), entity_name)
metadata['run_entity'] = entity
state = 'Obsolete' if has_add_tasks else 'Done'
select_job = worker.spawn_xf_job_basic(connection, select_task_type, select_task, outputs=select_task_outputs, state=state, sort_order=sort_order, metadata=metadata)
if has_add_tasks:
for item2, add_task in zip(tuple[1:], add_tasks):
filename2 = item2['name']
virtualpath2 = item2['virtualpath']
add_task_inputs = {
add_task_input_template_name: None,
}
add_task_outputs = create_outputs(filename2, virtualpath2)
add_job = worker.spawn_xf_job_basic(connection, add_task_type, add_task, source_job_serial=select_job['serial'], inputs=add_task_inputs, outputs=add_task_outputs)
xfworkerutil.update_xf_job(connection, select_job, state='Done') # Finish source job (created in Obsolete state to avoid auto-creating follower jobs)
# Save outputs
outputs: dict = {
}
worker.finish(outputs)
Also spawn_xf_task_basic()
is used to spawn the required tasks if they don’t exist yet.
See also Reduce Task Types.