Job Async Executor: Running Long Running or Multi-step Async Tasks

  • Long Running Jobs: These are jobs that take several minutes or hours to execute.  
  • Multi-step Async Tasks: This is a task consisting of multiple steps. An example is as follows:

1) Transcoding a file.

2) Creating an asset from the file.

3) Adding technical metadata to the asset.

There are two scenarios in which a developer could implement long running and multi-step async tasks in JEF:

Scenario 1

A developer could develop an action executor that configures async behaviour by listening to an external event (example: register to message queue, or prepare to receive a REST request) to continue progress and finally complete / fail / cancel the job.

Scenario 2

A developer could develop an action executor that configures async behaviour via an active component, that needs to be run periodically in order to retrieve the current status and finally complete / fail / cancel the job.

Example: Periodically polling a third party service in order to retrieve the status of a job.

JEF currently supports the infrastructure for both these scenarios. However, in scenario 2 we have provided an ActionProgress mechanism which can help with implementation.

ActionProgress Configuration and Behaviour

The ActionExecutor class enables a developer to register the ActionProgress plugin function with the logic, which is periodically called until the job terminates (Statuses are: COMPLETED, FAILED, CANCELLED).

Example:

@Componentf 
@Scope("prototype")
@Slf4j
@PluginScope(value = PluginScopeType.DEVELOPMENT)
@ActionPlugin(uuid = "3ab237d4-0b19-4f01-8ded-396ee5444284", type = "smoke", 
plugin = "async-smoke-action", version = "1.0.2",
       actionConfiguration = AsyncSmokeActionConfiguration.class, 
actionProgress = AsyncSmokeActionProgress.class,
       actionProgressData = AsyncSmokeActionProgressData.class)
public class AsyncSmokeActionExecutor extends ActionExecutor <AsyncSmokeActionConfiguration> {
…				
		

ActionExecutor is called for the first time using the following method to execute a job request:

			
			
@Override
@SneakyThrows
public ActionExecutorResponse execute(ActionProgressData actionProgressData) {
// logic to run the plugin for the first time 
…
			
		

ActionExecutor.execute (ActionProgressData actionProgressData) is responsible for the following:

  • Preparing task execution (see actionProgressData below for further information).
  • Preparing any other logic required for plugin behaviour.

Once the first call to ActionExecutor.execute has been made, actionProgress is subsequently called until the job completes, fails, or is cancelled using the following method:

@Override
public ActionExecutorMessage execute(Job job, DemuxProgressData progressData) {
		
		
ActionProgress.execute
(Job job, DemuxProgressData progressData) is responsible for the following:
  • Taking the actionProgressData from latest iteration.
  • Taking any other logic required as part of the plugin behaviour.
  • Returning an action executor message for every call to the ActionProgress.execute(...) plugin.
  • Returning an action executor update  with “Running” progress  along with a message. This will be propagated in Ooyala Flex Enterprise and will show up in the job history.
  • Returning an ActionExecutorResponse with a state of  “Completed”.
Note: JEF supports job failures by sending an ActionExecutorUpdate with a status of “Failed”. In the previous version of JEF, a job would only fail if an exception was thrown.

ActionProgress.execute(...) will be called until a terminal state is achieved (Statuses are: COMPLETED, FAILED, CANCELLED).

By responding to the ActionProgress.execute(...) the job is kept running.

If ActionProgress.execute(...) is not called, a watcher will run periodically in the JEF service and it will be marked as “Timed Out”.

JEF comes with a watcher mechanism, which moves jobs to a “Timed Out” state if they don’t return after a certain period of time. This watcher can be disabled for each service type if necessary.

Note: ActionProgress can run on any of the service instances for the particular job async executors or resource services. No assumption should be made in regards to transient variables stored, or custom created threads between different calls to the ActionProgress.execute(...).

ActionProgress is Optional for Scenario 2 and Scenario 1.

The configuration shown above is optional. If an action plugin doesn’t configure the actionProgress, the plugin is expected to terminate as part of the ActionExecutor.execute(..) when the first call is made:

			
			@Override
@SneakyThrows
public ActionExecutorResponse execute(ActionProgressData actionProgressData) {
// logic to run first time plugin is called
…	
		

ActionProgressData: Maintaining Data Across Different Calls to ActionProgress.execute(...)

JEF 7.0.0 comes with a mechanism to help developers maintain data across different ActionProgress.execute iterations.

Example:

			
@Component 
@Scope("prototype")
@Slf4j
@PluginScope(value = PluginScopeType.DEVELOPMENT)
@ActionPlugin(uuid = "3ab237d4-0b19-4f01-8ded-396ee5444284", type = "smoke", 
plugin = "async-smoke-action", version = "1.0.2",
       actionConfiguration = AsyncSmokeActionConfiguration.class, 
actionProgress = AsyncSmokeActionProgress.class,
       actionProgressData = AsyncSmokeActionProgressData.class)
											
  • The optional actionProgressData attribute configures a plugin so that it can host data contexts for different runs of ActionProgress.execute(...)
  • When the ActionExecute.execute(...) starts and the actionProgressData attribute has been configured, a new instance of this class is provided to the execute method.
  • Developers can use this POJO (plain old java object) to keep the context of the job execution until the job finishes.
  • The actionProgressData attribute uses the latest state for every call to the ActionProgress.execute(...).
  • The actionProgressData attribute is serialised (JSON to Redis) with the state values for each call.

Final Call to the ActionProgress Methods

onCompleted, onCancel, or onFailed will be called after the ActionProgress.execute(...) changes the state to one of these terminal states.

Developers should not change the job status at this point, as the job has been terminated and propagated to the original Enterprise client. This method gives the developer the option to add any required logic before the ActionExecutor is finally discarded.

Was this article helpful?