JobManager

This document is referring to a past Scout release. Please click here for the recent version.

Scout provides a job manager based on Java Executors framework to run tasks in parallel, and on Quartz Trigger API to support for schedule plans and to compute firing times. A task (aka job) can be scheduled to commence execution either immediately upon being scheduled, or delayed some time in the future. A job can be single executing, or recurring based on some schedule plan. The job manager itself is implemented as an application scoped bean, meaning that it is a singleton which exists once in the web application.

Functionality

  • immediate, delayed or timed execution

  • single (one-shot) or repetitive execution (based on Quartz schedule plans)

  • listen for job lifecycle events

  • wait for job completion

  • job cancellation

  • limitation of the maximal concurrently level among jobs

  • RunContext based execution

  • configurable thread pool size (core pool size, max pool size)

  • association of job execution hints to select jobs (e.g. to cancel or await job’s completion)

  • named jobs and threads to ease debugging

Job

A job is defined as some work to be executed asynchronously and is associated with a JobInput to describe how to run that work. The work is given to the job manager in the form of a Runnable or Callable. The only difference is, that a Runnable represents a 'fire-and-forget' action, meaning that the submitter of the job does not expect the job to return a result. On the other hand, a Callable returns the computation’s result, which the submitter can await for. Of course, a runnable’s completion can also be waited for.

Listing 1. Work that does not return a result
public class Work implements IRunnable {

  @Override
  public void run() throws Exception {
    // do some work
  }
}
Listing 2. Work that returns a computation result
public class WorkWithResult implements Callable<String> {

  @Override
  public String call() throws Exception {
    // do some work
    return "result";
  }
}

Upon scheduling a job, the job manager returns a IFuture to interact with the job, e.g. to cancel its execution, or to await its completion. The job itself can also access its IFuture, namely via IFuture.CURRENT() ThreadLocal.

Listing 3. Accessing the Future from within the job
public class Job implements IRunnable {

  @Override
  public void run() throws Exception {
    IFuture<?> myFuture = IFuture.CURRENT.get();
  }
}

Scheduling a Job

The job manager provides two scheduling methods, which only differ in the work they accept for execution (callable or runnable).

IFuture<Void> schedule(IRunnable runnable, JobInput input); (1)

<RESULT> IFuture<RESULT> schedule(Callable<RESULT> callable, JobInput input); (2)
1 Use to schedule a runnable which does not return a result to the submitter
2 Use to schedule a callable which does return a result to the submitter

The second and mandatory argument to be provided is the JobInput, which tells the job manager how to run the job. Learn more about JobInput.

The following snippet illustrates how a job is actually scheduled.

Listing 4. Schedule a job
IJobManager jobManager = BEANS.get(IJobManager.class); (1)

(2)
jobManager.schedule(() -> {
  // do something
}, BEANS.get(JobInput.class)); (3)
1 Obtain the job manager via bean manager (application scoped bean)
2 Provide the work to be executed (either runnable or callable)
3 Provide the JobInput to instrument job execution

This looks a little clumsy, which is why Scout provides you with the Jobs class to simplify dealing with the job manager, and to support you in the creation of job related artifacts like JobInput, filter builders and more. Most importantly, it allows to schedule jobs in a shorter and more readable form.

Listing 5. Schedule a job via Jobs helper class
Jobs.schedule(() -> {
  // do something
}, Jobs.newInput());

JobInput

The job input tells the job manager how to run the job. It further names the job to ease debugging, declares in which context to run the job, and how to deal with unhandled exceptions. The job input itself is a bean, useful if adding some additional features to the job manager. The API of JobInput supports for method chaining for reduced and more solid code.

Listing 6. Schedule a job and control execution via JobInput
Jobs.schedule(() -> {
  // do something
}, Jobs.newInput()
    .withName("job name") (1)
    .withRunContext(ClientRunContexts.copyCurrent()) (2)
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(10, TimeUnit.SECONDS) (3)
        .withSchedule(FixedDelayScheduleBuilder.repeatForever(5, TimeUnit.SECONDS))) (4)
    .withExceptionHandling(new ExceptionHandler() { (5)

      @Override
      public void handle(Throwable t) {
        System.err.println(t);
      }
    }, true));

This snippet instructs the job manager to run the job as following:

1 Give the job a name.
2 Run the job in the current calling context, meaning in the very same context as the submitter is running when giving this job to the job manager. By copying the current context, the job will also be cancelled upon cancellation of the current RunContext.
3 Commence execution in 10 seconds (delayed execution).
4 Execute the job repeatedly, with a delay of 5 seconds between the termination of one and the commencement of the next execution. Also, repeat the job infinitely, until being cancelled.
5 Print any uncaught exception to the error console, and do not propagate the exception to the submitter, nor cancel the job upon an uncaught exception.

The following paragraphs describe the functionality of JobInput in more detail.

JobInput.withName

To optionally specify the name of the job, which is used to name the worker thread (only in development environment) and for logging purpose. Optionally, formatting anchors in the form of {} pairs can be used in the name, which will be replaced by the respective argument.

Jobs.newInput()
    .withName("Sending emails [from={}, to={}]", "frank", "john@eclipse.org, jack@eclipse.org");

JobInput.withRunContext

To optionally specify the RunContext to be installed during job execution. The RunMonitor associated with the RunContext will be used as the job’s monitor, meaning that cancellation requests to the job future or the context’s monitor are equivalent. If no context is given, the job manager ensures a monitor to be installed, so that executing code can always query its cancellation status via RunMonitor.CURRENT.get().isCancelled().

JobInput.withExecutionTrigger

To optionally set the trigger to define the schedule upon which the job will commence execution. If not set, the job will commence execution immediately after being scheduled, and will execute exactly once.

The trigger mechanism is provided by Quartz Scheduler, meaning that you can profit from the powerful Quartz schedule capabilities.

For more information, see http://www.quartz-scheduler.org.

Use the static factory method Jobs.newExecutionTrigger() to get an instance:

// Schedules a delayed single executing job
Jobs.newInput()
    .withName("job")
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(10, TimeUnit.SECONDS));

// Schedules a repeatedly running job at a fixed rate (every hour), which ends in 24 hours
Jobs.newInput()
    .withName("job")
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withEndIn(1, TimeUnit.DAYS)
        .withSchedule(SimpleScheduleBuilder.repeatHourlyForever()));

// Schedules a job which runs at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday
Jobs.newInput()
    .withName("job")
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withSchedule(CronScheduleBuilder.cronSchedule("0 15 10 ? * MON-FRI")));

Learn more about ExecutionTrigger.

JobInput.withExecutionSemaphore

To optionally control the maximal concurrently level among jobs assigned to the same semaphore.

With a semaphore in place, this job only commences execution, once a permit is free or gets available. If free, the job commences execution immediately at the next reasonable opportunity, unless no worker thread is available.

A semaphore initialized to one allows to run jobs in a mutually exclusive manner, and a semaphore initialized to zero to run no job at all. The number of total permits available can be changed at any time, which allows to adapt the maximal concurrency level to some dynamic criteria like time of day or system load. However, a semaphore can be sealed, meaning that the number of permits cannot be changed anymore, and any attempts will be rejected.

A new semaphore instance can be obtained via Jobs class.

IExecutionSemaphore semaphore = Jobs.newExecutionSemaphore(5); (1)

for (int i = 0; i < 100; i++) {
  (2)
  Jobs.schedule(() -> {
    // doing something
  }, Jobs.newInput()
      .withName("job-{}", i)
      .withExecutionSemaphore(semaphore)); (3)
}
1 Create a new ExecutionSemaphore via Jobs class. The semaphore is initialized with 5 permits, meaning that at any given time, there are no more than 5 jobs running concurrently.
2 Schedule 100 jobs in a row.
3 Set the semaphore to limit the maximal concurrency level to 5 jobs.

Learn more about ExecutionSemaphore.

JobInput.withExecutionHint

To associate the job with an execution hint. An execution hint is simply a marker to mark a job, and can be evaluated by filters to select jobs, e.g. to listen to job lifecycle events of some particular jobs, or to wait for some particular jobs to complete, or to cancel some particular jobs. A job may have multiple hints associated. Further, hints can be registered directly on the future via IFuture.addExecutionHint(hint), or removed via IFuture.removeExecutionHint(hint).

JobInput.withExceptionHandling

To control how to deal with uncaught exceptions. By default, an uncaught exception is handled by ExceptionHandler bean and then propagated to the submitter, unless the submitter is not waiting for the job to complete via IFuture.awaitDoneAndGet().

This method expects two arguments: an optional exception handler, and a boolean flag indicating whether to swallow exceptions. 'Swallow' is independent of the specified exception handler, and indicates whether an exception should be propagated to the submitter, or swallowed otherwise.

If running a repetitive job with swallowing set to true, the job will continue its repetitive execution upon an uncaught exception. If set to false, the execution would exit.

JobInput.withThreadName

To set the thread name of the worker thread that will execute the job.

JobInput.withExpirationTime

To set the maximal expiration time upon which the job must commence execution. If elapsed, the job is cancelled and does not commence execution. By default, a job never expires.

For a job that executes once, the expiration is evaluated just before it commences execution. For a job with a repeating schedule, it is evaluated before every single execution.

In contrast, the trigger’s end time specifies the time at which the trigger will no longer fire. However, if fired, the job may not be executed immediately at this time, which depends on whether having to compete for an execution permit first. So the end time may already have elapsed once commencing execution. In contrast, the expiration time is evaluated just before starting execution.

IFuture

A future represents the result of an asynchronous computation, and is returned by the job manager upon scheduling a job. The future provides functionality to await for the job to complete, or to get its computation result or exception, or to cancel its execution, and more.

Learn more about job cancellation in Job cancellation.
Learn more about listening for job lifecycle events in Subscribe for job lifecycle events.
Learn more about awaiting the job’s completion in Awaiting job completion.

Job states

Upon scheduling a job, the job transitions different states. The current state of a job can be queried from its associated IFuture.

state description

SCHEDULED

Indicates that a job was given to the job manager for execution.

REJECTED

Indicates that a job was rejected for execution. This might happen if the job manager has been shutdown, or if no more worker threads are available.

PENDING

Indicates that a job’s execution is pending, either because scheduled with a delay, or because of being a repetitive job while waiting for the commencement of the next execution.

RUNNING

Indicates that a job is running.

DONE

Indicates that a job finished execution, either normally or because it was cancelled. Use IFuture.isCancelled() to check for cancellation.

WAITING_FOR_PERMIT

Indicates that a semaphore aware job is competing for a permit to become available.

WAITING_FOR_BLOCKING_CONDITION

Indicates that a job is blocked by a blocking condition, and is waiting for it to fall.

The state 'done' does not necessarily imply that the job already finished execution. That is because a job also enters 'done' state upon cancellation, but may still continue execution.

Future filter

A future filter is a filter which can be passed to various methods of the job manager to select some futures. The filter must implement IFilter interface, and has a single method to accept futures of interest.

Listing 7. Example of a future filter
public class FutureFilter implements Predicate<IFuture<?>> {

  @Override
  public boolean test(IFuture<?> future) {
    // Accept or reject the future
    return false;
  }
}

Scout provides you with FutureFilterBuilder class to ease building filters which match multiple criteria joined by logical 'AND' operation.

Listing 8. Usage of FutureFilterBuilder
Predicate<IFuture<?>> filter = Jobs.newFutureFilterBuilder() (1)
    .andMatchExecutionHint("computation") (2)
    .andMatchNotState(JobState.PENDING) (3)
    .andAreSingleExecuting() (4)
    .andMatchNotFuture(IFuture.CURRENT.get()) (5)
    .andMatchRunContext(ClientRunContext.class) (6)
    .andMatch(new SessionFutureFilter(ISession.CURRENT.get())) (7)
    .toFilter(); (8)
1 Returns an instance of the future filter builder
2 Specifies to match only futures associated with execution hint 'computation'
3 Specifies to match only jobs not in state pending
4 Specifies to match only single executing jobs, meaning no recurring jobs
5 Specifies to exclude the current future (if any)
6 Specifies to match only jobs running on behalf of a ClientRunContext
7 Specifies to match only jobs of the current session
8 Builds the filters to get a Filter instance

Fore more information, refer to the JavaDoc of FutureFilterBuilder.

Event filter

A job event filter is a filter which can be given to job manager to subscribe for job lifecycle events. The filter must implement IFilter interface, and has a single method to accept events of interest.

Listing 9. Example of an event filter
public class EventFilter implements Predicate<JobEvent> {

  @Override
  public boolean test(JobEvent event) {
    // Accept or reject the event
    return false;
  }
}

Scout provides you with JobEventFilterBuilder class to ease building filters which match multiple criteria joined by logical 'AND' operation.

Listing 10. Usage of JobEventFilterBuilder
Predicate<JobEvent> filter = Jobs.newEventFilterBuilder() (1)
    .andMatchEventType(JobEventType.JOB_STATE_CHANGED) (2)
    .andMatchState(JobState.RUNNING) (3)
    .andMatch(new SessionJobEventFilter(ISession.CURRENT.get())) (4)
    .andMatchExecutionHint("computation") (5)
    .toFilter(); (6)
1 Returns an instance of the job event filter builder
2 Specifies to match all events representing a job state change
3 Specifies to match only events for jobs which transitioned into running state
4 Specifies to match only events for jobs of the current session
5 Specifies to match only events for jobs which are associated with the execution hint 'computation'
6 Builds the filters to get a Filter instance

Fore more information, refer to the JavaDoc of JobEventFilterBuilder.

Job cancellation

A job can be cancelled in two ways, either directly via its IFuture, or via job manager. Both expect you to provide a boolean flag indicating whether to interrupt the executing working thread. Upon cancellation, the job immediately enters 'done' state. Learn more about Job states. If cancelling via job manager, a future filter must be given to select the jobs to be cancelled. Learn more about Future filter

The cancellation attempt will be ignored if the job has already completed or was cancelled. If not running yet, the job will never run. If the job has already started, then the interruptIfRunning parameter determines whether the thread executing the job should be interrupted in an attempt to stop the job.

In the following some examples:

Listing 11. Cancel a job via its future
// Schedule a job
IFuture<?> future = Jobs.schedule(new Work(), Jobs.newInput());

// Cancel the job via its future
future.cancel(false);
Listing 12. Cancel multiple jobs via job manager
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
    .andMatchFuture(future1, future2, future3)
    .toFilter(), false);
Listing 13. Cancel multiple jobs which match a specific execution hint and the current session
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
    .andMatchExecutionHint("computation")
    .andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
    .toFilter(), false);

A job can query its current cancellation status via RunMonitor.CURRENT.get().isCancelled(). If doing some long-running operations, it is recommended for the job to regularly check for cancellation.

A job which is scheduled to run on a copy of the submitting RunContext, it gets also cancelled once the RunMonitor of that context gets cancelled.

Subscribe for job lifecycle events

Sometimes it is useful to register for some job lifecycle events. The following event types can be subscribed for:

state description

JOB_STATE_CHANGED

Signals that a job transitioned to a new JobState, e.g. form JobState.SCHEDULED to JobState.RUNNING.

JOB_EXECUTION_HINT_ADDED

Signals that an execution hint was added to a job.

JOB_EXECUTION_HINT_REMOVED

Signals that an execution hint was removed from a job.

JOB_MANAGER_SHUTDOWN

Signals that the job manager was shutdown.

The listener is registered via job manager as following:

Listing 14. Subscribe for global job events
Jobs.getJobManager().addListener(Jobs.newEventFilterBuilder() (1)
    .andMatchEventType(JobEventType.JOB_STATE_CHANGED)
    .andMatchState(JobState.RUNNING)
    .andMatch(new SessionJobEventFilter(ISession.CURRENT.get()))
    .toFilter(), event -> {
      IFuture<?> future = event.getData().getFuture(); (2)
      System.out.println("Job commences execution: " + future.getJobInput().getName());
    });
1 Subscribe for all events related to jobs just about to commence execution, and which belong to the current session
2 Get the future this event was fired for

If interested in only events of a single future, the listener can be registered directly on the future.

Listing 15. Subscribe for local job events
future.addListener(Jobs.newEventFilterBuilder()
    .andMatchEventType(JobEventType.JOB_STATE_CHANGED)
    .andMatchState(JobState.RUNNING)
    .toFilter(), event -> System.out.println("Job commences execution"));

Awaiting job completion

A job’s completion can be either awaited on its IFuture, or via job manager - the first optionally allows to consume the job’s computation result, whereas the second allows multiple futures to be awaited for.

Difference between 'done' and 'finished' state

When awaiting futures, the definition of 'done' and 'finished' state should be understood - 'done' means that the future completed either normally, or was cancelled. But, if cancelled while running, the job may still continue its execution, whereas a job which not commenced execution yet, will never do so. The latter typically applies for jobs scheduled with a delay. However, 'finished' state differs from 'done' state insofar as a cancelled, currently running job enters 'finished' state only upon its actual completion. Otherwise, if not cancelled, or cancelled before executing, it is equivalent to 'done' state. In most situations, it is sufficient to await for the future’s done state, especially because a cancelled job cannot return a result to the submitter anyway.

Awaiting a single future’s 'done' state

Besides of some overloaded methods, IFuture basically provides two methods to wait for a future to enter 'done' state, namely awaitDone and awaitDoneAndGet, with the difference that the latter additionally returns the job’s result or exception. If the future is already done, those methods will return immediately. For both methods, there exists an overloaded version to wait for at most a given time, which once elapsed results in a TimedOutError thrown.

Further, awaitDoneAndGet allows to specify an IExceptionTranslator to control exception translation. By default, DefaultRuntimeExceptionTranslator is used, meaning that a RuntimeException is propagated as it is, whereas a checked exception would be wrapped into a PlatformException. If you require checked exceptions to be thrown as they are, use DefaultExceptionTranslator instead, or even NullExceptionTranslator to work with the raw ExecutionException as being thrown by Java Executor framework.

Listing 16. Examples of how to await done state on a future
IFuture<String> future = Jobs.schedule(() -> {
  // doing something
  return "computation result";
}, Jobs.newInput());

// Wait until done without consuming the result
future.awaitDone(); (1)
future.awaitDone(10, TimeUnit.SECONDS); (2)

// Wait until done and consume the result
String result = future.awaitDoneAndGet(); (3)
result = future.awaitDoneAndGet(10, TimeUnit.SECONDS); (4)

// Wait until done, consume the result, and use a specific exception translator
result = future.awaitDoneAndGet(DefaultExceptionTranslator.class); (5)
result = future.awaitDoneAndGet(10, TimeUnit.SECONDS, DefaultExceptionTranslator.class); (6)
1 Waits if necessary for the job to complete, or until cancelled. This method does not throw an exception if cancelled or the computation failed, but throws ThreadInterruptedError if the current thread was interrupted while waiting.
2 Waits if necessary for at most 10 seconds for the job to complete, or until cancelled, or the timeout elapses. This method does not throw an exception if cancelled, or the computation failed, but throws TimedOutError if waiting timeout elapsed, or throws ThreadInterruptedError if the current thread was interrupted while waiting.
3 Waits if necessary for the job to complete, and then returns its result, if available, or throws its exception according to DefaultRuntimeExceptionTranslator, or throws FutureCancelledError if cancelled, or throws ThreadInterruptedError if the current thread was interrupted while waiting.
4 Waits if necessary for at most 10 seconds for the job to complete, and then returns its result, if available, or throws its exception according to DefaultRuntimeExceptionTranslator, or throws FutureCancelledError if cancelled, or throws TimedOutError if waiting timeout elapsed, or throws ThreadInterruptedError if the current thread was interrupted while waiting.
5 Waits if necessary for the job to complete, and then returns its result, if available, or throws its exception according to the given DefaultExceptionTranslator, or throws FutureCancelledError if cancelled, or throws ThreadInterruptedError if the current thread was interrupted while waiting.
6 Waits if necessary for at most the given time for the job to complete, and then returns its result, if available, or throws its exception according to the given DefaultExceptionTranslator, or throws FutureCancelledError if cancelled, or throws TimedOutError if waiting timeout elapsed, or throws ThreadInterruptedError if the current thread was interrupted while waiting.

It is further possible to await asynchronously on a future to enter done state by registering a callback via whenDone method. The advantage over registering a listener is that the callback is invoked even if the future already entered done state upon registration.

Listing 17. Example of when-done callback
future.whenDone(event -> {
  // invoked upon entering done state.
}, ClientRunContexts.copyCurrent());

Because invoked in another thread, this method optionally accepts a RunContext] to be applied when being invoked.

Awaiting a single future’s 'finished' state

Use the method awaitFinished to wait for the job to finish, meaning that the job either completed normally or by an exception, or that it will never commence execution due to a premature cancellation. To learn more about the difference between 'done' and 'finished' state, click here. Please note that this method does not return the job’s result, because by Java Future definition, a cancelled job cannot provide a result.

Listing 18. Examples of how to await finished state on a future
IFuture<String> future = Jobs.schedule(() -> {
  // doing something
  return "computation result";
}, Jobs.newInput());

// Wait until finished
future.awaitFinished(10, TimeUnit.SECONDS);

Awaiting multiple future’s 'done' state

Job Manager allows to await for multiple futures at once. The filter to be provided limits the futures to await for. This method requires you to provide a maximal time to wait.

Filters can be plugged by using logical filters like AndFilter or OrFilter, or negated by enclosing a filter in NotFilter. Also see Future filter to create a filter to match multiple criteria joined by logical 'AND' operation.

Listing 19. Examples of how to await done state of multiple futures
// Wait for some futures
Jobs.getJobManager().awaitDone(Jobs.newFutureFilterBuilder() (1)
    .andMatchFuture(future1, future2, future3)
    .toFilter(), 1, TimeUnit.MINUTES);

// Wait for all futures marked as 'reporting' jobs of the current session
Jobs.getJobManager().awaitDone(Jobs.newFutureFilterBuilder() (2)
    .andMatchExecutionHint("reporting")
    .andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
    .toFilter(), 1, TimeUnit.MINUTES);
1 Waits if necessary for at most 1 minute for all three futures to complete, or until cancelled, or the timeout elapses.
2 Waits if necessary for at most 1 minute until all jobs marked as 'reporting' jobs of the current session complete, or until cancelled, or the timeout elapses.

Awaiting multiple future’s 'finished' state

Use the method awaitFinished to wait for multiple jobs to finish, meaning that the jobs either completed normally or by an exception, or that they will never commence execution due to a premature cancellation. To learn more about the difference between 'done' and 'finished' state, click here.

Listing 20. Examples of how to await finish state of multiple futures
// Wait for some futures
Jobs.getJobManager().awaitFinished(Jobs.newFutureFilterBuilder() (1)
    .andMatchFuture(future1, future2, future3)
    .toFilter(), 1, TimeUnit.MINUTES);

// Wait for all futures marked as 'reporting' jobs of the current session
Jobs.getJobManager().awaitFinished(Jobs.newFutureFilterBuilder() (2)
    .andMatchExecutionHint("reporting")
    .andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
    .toFilter(), 1, TimeUnit.MINUTES);
1 Waits if necessary for at most 1 minute for all three futures to finish, or until cancelled, or the timeout elapses.
2 Waits if necessary for at most 1 minute until all jobs marked as 'reporting' jobs of the current session finish, or until cancelled, or the timeout elapses.

Uncaught job exceptions

If a job throws an exception, that exception is handled by ExceptionHandler, and propagated to the submitter. However, the exception is only propagated if having a waiting submitter. Also, an uncaught exception causes repetitive jobs to terminate.

This default behavior as described can be changed via JobInput.withExceptionHandling(..).

Blocking condition

A blocking condition allows a thread to wait for a condition to become true. That is similar to the Java Object’s 'wait/notify' mechanism, but with some additional functionality regarding semaphore aware jobs. If a semaphore aware job enters a blocking condition, it releases ownership of the permit, which allows another job of that same semaphore to commence execution. Upon the condition becomes true, the job then must compete for a permit anew.

A condition can be used across multiple threads to wait for the same condition. Also, a condition is reusable upon invalidation. And finally, a condition can be used even if not running within a job.

A blocking condition is often used by model jobs to wait for something to happen, but to allow another model job to run while waiting. A typical use case would be to wait for a MessageBox to be closed.

Example of a blocking condition

You are running in a semaphore aware job and require to do some long-running operation. During that time you do not require to be the permit owner. A simple but wrong approach would be the following:

// Schedule a long running operation.
IFuture<?> future = Jobs.schedule(new LongRunningOperation(), Jobs.newInput());

// Wait until done.
future.awaitDone();

The problem with this approach is, that you still are the permit owner while waiting, meaning that you possibly prevent other jobs from running. Instead, you could use a blocking condition for that to achieve:

// Create a blocking condition.
final IBlockingCondition operationCompleted = Jobs.newBlockingCondition(true);

// Schedule a long running operation.
IFuture<Void> future = Jobs.schedule(new LongRunningOperation(), Jobs.newInput());

// Register done callback to unblock the condition.
future.whenDone(event -> {
  // Let the waiting job re-acquire a permit and continue execution.
  operationCompleted.setBlocking(false);
}, null);

// Wait until done. Thereby, the permit of the current job is released for the time while waiting.
operationCompleted.waitFor();

ExecutionSemaphore

Represents a fair counting semaphore used in Job API to control the maximal number of jobs running concurrently.

Jobs which are assigned to the same semaphore run concurrently until they reach the maximal concurrency level defined for that semaphore. Subsequent tasks then wait in the queue until a permit becomes available.

A semaphore initialized to one allows to run jobs in a mutually exclusive manner, and a semaphore initialized to zero to run no job at all. The number of total permits available can be changed at any time, which allows to adapt the maximal concurrency level to some dynamic criteria like time of day or system load. However, once calling seal(), the number of permits cannot be changed anymore, and any attempts will result in an AssertionException. By default, a semaphore is unbounded.

ExecutionTrigger

Component that defines the schedule upon which a job will commence execution.

A trigger can be as simple as a 'one-shot' execution at some specific point in time in the future, or represent a schedule which executes a job on a repeated basis. The latter can be configured to run infinitely, or to end at a specific point in time. It is further possible to define rather complex triggers, like to execute a job every second Friday at noon, but with the exclusion of all the business’s holidays.

See the various schedule builders provided by Quartz Scheduler: SimpleScheduleBuilder, CronScheduleBuilder, CalendarIntervalScheduleBuilder, DailyTimeIntervalScheduleBuilder. The most powerful builder is CronScheduleBuilder. Cron is a UNIX tool with powerful and proven scheduling capabilities. For more information, see http://www.quartz-scheduler.org.

Additionally, Scout provides you with FixedDelayScheduleBuilder to run a job with a fixed delay between the termination of one execution and the commencement of the next execution.

Use the static factory method 'Jobs.newExecutionTrigger()' to get an instance.

Misfiring

Regardless of the schedule used, job manager guarantees no concurrent execution of the same job. That may happen, if using a repeated schedule with the job not terminated its last execution yet, but the schedule’s trigger would like to fire for the next execution already. Such a situation is called a misfiring. The action to be taken upon a misfiring is configurable via the schedule’s misfiring policy. A policy can be to run the job immediately upon termination of the previous execution, or to just ignore that missed firing. See the JavaDoc of the schedule for more information.

Stopping the platform

Upon stopping the platform, the job manager will also be shutdown.If having a IPlatformListener to perform some cleanup work, and which requires the job manager to be still functional, that listener must be annotated with an @Order less than IJobManager.DESTROY_ORDER, which is 5'900. If not specifying an @Order explicitly, the listener will have the default order of 5, meaning being invoked before job manager shutdown anyway.

Model Jobs

Model jobs exist client side only, and are used to interact with the Scout client model to read and write model values in a serial manner per session.That enables no synchronization to be used when interacting with the model.

By definition, a model job requires to be run on behalf of a ClientRunContext with a IClientSession set, and must have the session’s model job semaphore set as its ExecutionSemaphore.That causes all such jobs to be run in sequence in the model thread.At any given time, there is only one model thread active per client session.

The class ModelJobs is a helper class, and is for convenience purpose to facilitate the creation of model job related artifacts, and to schedule model jobs.

Listing 21. Running work in model thread
(1)
ModelJobs.schedule(() -> {
  // doing something in model thread
}, ModelJobs.newInput(ClientRunContexts.copyCurrent()) (2)
    .withName("Doing something in model thread"));
1 Schedules the work to be executed in the model thread
2 Creates the JobInput to become a model job, meaning with the session’s model job semaphore set

For model jobs, it is also allowed to run according to a Quartz schedule plan, or to be executed with a delay. Then the model permit is acquired just before each execution, and not upon being scheduled.

Furthermore, the class ModelJobs provides some useful static methods:

// Returns true if the current thread represents the model thread for the current client session. At any given time, there is only one model thread active per client session.
ModelJobs.isModelThread();

// Returns true if the given Future belongs to a model job.
ModelJobs.isModelJob(IFuture.CURRENT.get());

// Returns a builder to create a filter for future objects representing a model job.
ModelJobs.newFutureFilterBuilder();

// Returns a builder to create a filter for JobEvent objects originating from model jobs.
ModelJobs.newEventFilterBuilder();

// Instructs the job manager that the current model job is willing to temporarily yield its current model job permit. It is rarely appropriate to use this method. It may be useful for debugging or testing purposes.
ModelJobs.yield();

Configuration

Job manager can be configured with properties starting with scout.jobmanager. See Scout Config Properties.

Extending job manager

Job manager is implemented as an application scoped bean, and which can be replaced. To do so, create a class which extends JobManager, and annotate it with @Replace annotation. Most likely, you like to use the EE container’s ThreadPoolExecutor, or to contribute some behavior to the callable chain which finally executes the job.

To change the executor, overwrite createExecutor method and return the executor of your choice. But do not forget to register a rejection handler to reject futures upon rejection. Also, overwrite shutdownExecutor to not shut down the container’s executor.

To contribute some behavior to the callable chain, overwrite the method interceptCallableChain and contribute your decorator or interceptor. Refer to the method’s JavaDoc for more information.

Scheduling examples

This sections contains some common scheduling examples.

Listing 22. Schedule a one-shot job
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running once")
    .withRunContext(ClientRunContexts.copyCurrent()));
Listing 23. Schedule a job with a delay
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running in 10 seconds")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(10, TimeUnit.SECONDS))); // delay of 10 seconds
Listing 24. Schedule a repetitive job at a fixed rate
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running every minute")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(1, TimeUnit.MINUTES) (1)
        .withSchedule(SimpleScheduleBuilder.simpleSchedule() (2)
            .withIntervalInMinutes(1) (3)
            .repeatForever()))); (4)
1 Configure to fire in 1 minute for the first time
2 Use Quartz simple schedule to achieve fixed-rate execution
3 Repetitively fire every minute
4 Repeat forever
Listing 25. Schedule a repetitive job which runs 60 times at every minute
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running every minute for total 60 times")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(1, TimeUnit.MINUTES) (1)
        .withSchedule(SimpleScheduleBuilder.simpleSchedule() (2)
            .withIntervalInMinutes(1) (3)
            .withRepeatCount(59)))); (4)
1 Configure to fire in 1 minute for the first time
2 Use Quartz simple schedule to achieve fixed-rate execution
3 Repetitively fire every minute
4 Repeat 59 times, plus the initial execution
Listing 26. Schedule a repetitive job at a fixed delay
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running forever with a delay of 1 minute between the termination of the previous and the next execution")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(1, TimeUnit.MINUTES) (1)
        .withSchedule(FixedDelayScheduleBuilder.repeatForever(1, TimeUnit.MINUTES)))); (2)
1 Configure to fire in 1 minute for the first time
2 Use fixed delay schedule
Listing 27. Schedule a repetitive job which runs 60 times, but waits 1 minute between the termination of the previous and the commencement of the next execution
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running 60 times with a delay of 1 minute between the termination of the previous and the next execution")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withStartIn(1, TimeUnit.MINUTES) (1)
        .withSchedule(FixedDelayScheduleBuilder.repeatForTotalCount(60, 1, TimeUnit.MINUTES)))); (2)
1 Configure to fire in 1 minute for the first time
2 Use fixed delay schedule
Listing 28. Running at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withSchedule(CronScheduleBuilder.cronSchedule("0 15 10 ? * MON-FRI")))); (1)
1 Cron format: [second] [minute] [hour] [day_of_month] [month] [day_of_week] [year]?
Listing 29. Running every minute starting at 14:00 and ending at 14:05, every day
Jobs.schedule(() -> {
  // doing something
}, Jobs.newInput()
    .withName("Running every minute starting at 14:00 and ending at 14:05, every day")
    .withRunContext(ClientRunContexts.copyCurrent())
    .withExecutionTrigger(Jobs.newExecutionTrigger()
        .withSchedule(CronScheduleBuilder.cronSchedule("0 0-5 14 * * ?")))); (1)
1 Cron format: [second] [minute] [hour] [day_of_month] [month] [day_of_week] [year]?
Listing 30. Limit the maximal concurrency level among jobs
IExecutionSemaphore semaphore = Jobs.newExecutionSemaphore(5); (1)

for (int i = 0; i < 100; i++) {
  Jobs.schedule(() -> {
    // doing something
  }, Jobs.newInput()
      .withName("job-{}", i)
      .withExecutionSemaphore(semaphore)); (2)
}
1 Create the execution semaphore initialized with 5 permits
2 Set the execution semaphore to the job subject for limited concurrency
Listing 31. Cancel all jobs of the current session
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
    .andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
    .toFilter(), true);
Listing 32. Query for cancellation
public class CancellableWork implements IRunnable {

  @Override
  public void run() throws Exception {

    // do first chunk of operations

    if (RunMonitor.CURRENT.get().isCancelled()) {
      return;
    }

    // do next chunk of operations

    if (RunMonitor.CURRENT.get().isCancelled()) {
      return;
    }

    // do next chunk of operations
  }
}
Listing 33. Release current semaphore permit while executing
// Create a blocking condition.
final IBlockingCondition operationCompleted = Jobs.newBlockingCondition(true);

// Schedule a long running operation.
IFuture<Void> future = Jobs.schedule(new LongRunningOperation(), Jobs.newInput());

// Register done callback to unblock the condition.
future.whenDone(event -> {
  // Let the waiting job re-acquire a permit and continue execution.
  operationCompleted.setBlocking(false);
}, null);

// Wait until done. Thereby, the permit of the current job is released for the time while waiting.
operationCompleted.waitFor();