JobManager
Scout provides a job manager based on Java Executors framework to run tasks in parallel, and on Quartz Trigger API to support for schedule plans and to compute firing times. A task (aka job) can be scheduled to commence execution either immediately upon being scheduled, or delayed some time in the future. A job can be single executing, or recurring based on some schedule plan. The job manager itself is implemented as an application scoped bean, meaning that it is a singleton which exists once in the web application.
Functionality
-
immediate, delayed or timed execution
-
single (one-shot) or repetitive execution (based on Quartz schedule plans)
-
listen for job lifecycle events
-
wait for job completion
-
job cancellation
-
limitation of the maximal concurrently level among jobs
-
RunContext
based execution -
configurable thread pool size (core pool size, max pool size)
-
association of job execution hints to select jobs (e.g. to cancel or await job’s completion)
-
named jobs and threads to ease debugging
Job
A job is defined as some work to be executed asynchronously and is associated with a JobInput
to describe how to run that work. The work is given to the job manager in the form of a Runnable
or Callable
. The only difference is, that a Runnable
represents a 'fire-and-forget' action, meaning that the submitter of the job does not expect the job to return a result. On the other hand, a Callable
returns the computation’s result, which the submitter can await for. Of course, a runnable’s completion can also be waited for.
public class Work implements IRunnable {
@Override
public void run() throws Exception {
// do some work
}
}
public class WorkWithResult implements Callable<String> {
@Override
public String call() throws Exception {
// do some work
return "result";
}
}
Upon scheduling a job, the job manager returns a IFuture
to interact with the job, e.g. to cancel its execution, or to await its completion. The job itself can also access its IFuture
, namely via IFuture.CURRENT()
ThreadLocal.
public class Job implements IRunnable {
@Override
public void run() throws Exception {
IFuture<?> myFuture = IFuture.CURRENT.get();
}
}
Scheduling a Job
The job manager provides two scheduling methods, which only differ in the work they accept for execution (callable or runnable).
IFuture<Void> schedule(IRunnable runnable, JobInput input); (1)
<RESULT> IFuture<RESULT> schedule(Callable<RESULT> callable, JobInput input); (2)
1 | Use to schedule a runnable which does not return a result to the submitter |
2 | Use to schedule a callable which does return a result to the submitter |
The second and mandatory argument to be provided is the JobInput
, which tells the job manager how to run the job. Learn more about JobInput.
The following snippet illustrates how a job is actually scheduled.
IJobManager jobManager = BEANS.get(IJobManager.class); (1)
(2)
jobManager.schedule(() -> {
// do something
}, BEANS.get(JobInput.class)); (3)
1 | Obtain the job manager via bean manager (application scoped bean) |
2 | Provide the work to be executed (either runnable or callable) |
3 | Provide the JobInput to instrument job execution |
This looks a little clumsy, which is why Scout provides you with the Jobs
class to simplify dealing with the job manager, and to support you in the creation of job related artifacts like JobInput
, filter builders and more. Most importantly, it allows to schedule jobs in a shorter and more readable form.
Jobs.schedule(() -> {
// do something
}, Jobs.newInput());
JobInput
The job input tells the job manager how to run the job. It further names the job to ease debugging, declares in which context to run the job, and how to deal with unhandled exceptions. The job input itself is a bean, useful if adding some additional features to the job manager. The API of JobInput
supports for method chaining for reduced and more solid code.
Jobs.schedule(() -> {
// do something
}, Jobs.newInput()
.withName("job name") (1)
.withRunContext(ClientRunContexts.copyCurrent()) (2)
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(10, TimeUnit.SECONDS) (3)
.withSchedule(FixedDelayScheduleBuilder.repeatForever(5, TimeUnit.SECONDS))) (4)
.withExceptionHandling(new ExceptionHandler() { (5)
@Override
public void handle(Throwable t) {
System.err.println(t);
}
}, true));
This snippet instructs the job manager to run the job as following:
1 | Give the job a name. |
2 | Run the job in the current calling context, meaning in the very same context as the submitter is running when giving this job to the job manager. By copying the current context, the job will also be cancelled upon cancellation of the current RunContext . |
3 | Commence execution in 10 seconds (delayed execution). |
4 | Execute the job repeatedly, with a delay of 5 seconds between the termination of one and the commencement of the next execution. Also, repeat the job infinitely, until being cancelled. |
5 | Print any uncaught exception to the error console, and do not propagate the exception to the submitter, nor cancel the job upon an uncaught exception. |
The following paragraphs describe the functionality of JobInput
in more detail.
JobInput.withName
To optionally specify the name of the job, which is used to name the worker thread (only in development environment) and for logging purpose. Optionally, formatting anchors in the form of {} pairs can be used in the name, which will be replaced by the respective argument.
Jobs.newInput()
.withName("Sending emails [from={}, to={}]", "frank", "john@eclipse.org, jack@eclipse.org");
JobInput.withRunContext
To optionally specify the RunContext
to be installed during job execution.
The RunMonitor
associated with the RunContext
will be used as the job’s monitor, meaning that cancellation requests to the job future or the context’s monitor are equivalent. If no context is given, the job manager ensures a monitor to be installed, so that executing code can always query its cancellation status via RunMonitor.CURRENT.get().isCancelled()
.
JobInput.withExecutionTrigger
To optionally set the trigger to define the schedule upon which the job will commence execution. If not set, the job will commence execution immediately after being scheduled, and will execute exactly once.
The trigger mechanism is provided by Quartz Scheduler, meaning that you can profit from the powerful Quartz schedule capabilities.
For more information, see http://www.quartz-scheduler.org.
Use the static factory method Jobs.newExecutionTrigger()
to get an instance:
// Schedules a delayed single executing job
Jobs.newInput()
.withName("job")
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(10, TimeUnit.SECONDS));
// Schedules a repeatedly running job at a fixed rate (every hour), which ends in 24 hours
Jobs.newInput()
.withName("job")
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withEndIn(1, TimeUnit.DAYS)
.withSchedule(SimpleScheduleBuilder.repeatHourlyForever()));
// Schedules a job which runs at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday
Jobs.newInput()
.withName("job")
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule("0 15 10 ? * MON-FRI")));
Learn more about ExecutionTrigger.
JobInput.withExecutionSemaphore
To optionally control the maximal concurrently level among jobs assigned to the same semaphore.
With a semaphore in place, this job only commences execution, once a permit is free or gets available. If free, the job commences execution immediately at the next reasonable opportunity, unless no worker thread is available.
A semaphore initialized to one allows to run jobs in a mutually exclusive manner, and a semaphore initialized to zero to run no job at all. The number of total permits available can be changed at any time, which allows to adapt the maximal concurrency level to some dynamic criteria like time of day or system load. However, a semaphore can be sealed, meaning that the number of permits cannot be changed anymore, and any attempts will be rejected.
A new semaphore instance can be obtained via Jobs
class.
IExecutionSemaphore semaphore = Jobs.newExecutionSemaphore(5); (1)
for (int i = 0; i < 100; i++) {
(2)
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("job-{}", i)
.withExecutionSemaphore(semaphore)); (3)
}
1 | Create a new ExecutionSemaphore via Jobs class. The semaphore is initialized with 5 permits, meaning that at any given time, there are no more than 5 jobs running concurrently. |
2 | Schedule 100 jobs in a row. |
3 | Set the semaphore to limit the maximal concurrency level to 5 jobs. |
Learn more about ExecutionSemaphore.
JobInput.withExecutionHint
To associate the job with an execution hint. An execution hint is simply a marker to mark a job, and can be evaluated by filters to select jobs, e.g. to listen to job lifecycle events of some particular jobs, or to wait for some particular jobs to complete, or to cancel some particular jobs. A job may have multiple hints associated. Further, hints can be registered directly on the future via IFuture.addExecutionHint(hint)
, or removed via IFuture.removeExecutionHint(hint)
.
JobInput.withExceptionHandling
To control how to deal with uncaught exceptions.
By default, an uncaught exception is handled by ExceptionHandler
bean and then propagated to the submitter, unless the submitter is not waiting for the job to complete via IFuture.awaitDoneAndGet()
.
This method expects two arguments: an optional exception handler, and a boolean flag indicating whether to swallow exceptions. 'Swallow' is independent of the specified exception handler, and indicates whether an exception should be propagated to the submitter, or swallowed otherwise.
If running a repetitive job with swallowing set to true
, the job will continue its repetitive execution upon an uncaught exception. If set to false, the execution would exit.
JobInput.withExpirationTime
To set the maximal expiration time upon which the job must commence execution. If elapsed, the job is cancelled and does not commence execution. By default, a job never expires.
For a job that executes once, the expiration is evaluated just before it commences execution. For a job with a repeating schedule, it is evaluated before every single execution.
In contrast, the trigger’s end time specifies the time at which the trigger will no longer fire. However, if fired, the job may not be executed immediately at this time, which depends on whether having to compete for an execution permit first. So the end time may already have elapsed once commencing execution. In contrast, the expiration time is evaluated just before starting execution.
IFuture
A future represents the result of an asynchronous computation, and is returned by the job manager upon scheduling a job. The future provides functionality to await for the job to complete, or to get its computation result or exception, or to cancel its execution, and more.
Learn more about job cancellation in Job cancellation.
Learn more about listening for job lifecycle events in Subscribe for job lifecycle events.
Learn more about awaiting the job’s completion in Awaiting job completion.
Job states
Upon scheduling a job, the job transitions different states. The current state of a job can be queried from its associated IFuture.
state | description |
---|---|
SCHEDULED |
Indicates that a job was given to the job manager for execution. |
REJECTED |
Indicates that a job was rejected for execution. This might happen if the job manager has been shutdown, or if no more worker threads are available. |
PENDING |
Indicates that a job’s execution is pending, either because scheduled with a delay, or because of being a repetitive job while waiting for the commencement of the next execution. |
RUNNING |
Indicates that a job is running. |
DONE |
Indicates that a job finished execution, either normally or because it was cancelled. Use |
WAITING_FOR_PERMIT |
Indicates that a semaphore aware job is competing for a permit to become available. |
WAITING_FOR_BLOCKING_CONDITION |
Indicates that a job is blocked by a blocking condition, and is waiting for it to fall. |
The state 'done' does not necessarily imply that the job already finished execution. That is because a job also enters 'done' state upon cancellation, but may still continue execution. |
Future filter
A future filter is a filter which can be passed to various methods of the job manager to select some futures. The filter must implement IFilter
interface, and has a single method to accept futures of interest.
public class FutureFilter implements Predicate<IFuture<?>> {
@Override
public boolean test(IFuture<?> future) {
// Accept or reject the future
return false;
}
}
Scout provides you with FutureFilterBuilder
class to ease building filters which match multiple criteria joined by logical 'AND' operation.
Predicate<IFuture<?>> filter = Jobs.newFutureFilterBuilder() (1)
.andMatchExecutionHint("computation") (2)
.andMatchNotState(JobState.PENDING) (3)
.andAreSingleExecuting() (4)
.andMatchNotFuture(IFuture.CURRENT.get()) (5)
.andMatchRunContext(ClientRunContext.class) (6)
.andMatch(new SessionFutureFilter(ISession.CURRENT.get())) (7)
.toFilter(); (8)
1 | Returns an instance of the future filter builder |
2 | Specifies to match only futures associated with execution hint 'computation' |
3 | Specifies to match only jobs not in state pending |
4 | Specifies to match only single executing jobs, meaning no recurring jobs |
5 | Specifies to exclude the current future (if any) |
6 | Specifies to match only jobs running on behalf of a ClientRunContext |
7 | Specifies to match only jobs of the current session |
8 | Builds the filters to get a Filter instance |
Fore more information, refer to the JavaDoc of FutureFilterBuilder
.
Event filter
A job event filter is a filter which can be given to job manager to subscribe for job lifecycle events. The filter must implement IFilter
interface, and has a single method to accept events of interest.
public class EventFilter implements Predicate<JobEvent> {
@Override
public boolean test(JobEvent event) {
// Accept or reject the event
return false;
}
}
Scout provides you with JobEventFilterBuilder
class to ease building filters which match multiple criteria joined by logical 'AND' operation.
Predicate<JobEvent> filter = Jobs.newEventFilterBuilder() (1)
.andMatchEventType(JobEventType.JOB_STATE_CHANGED) (2)
.andMatchState(JobState.RUNNING) (3)
.andMatch(new SessionJobEventFilter(ISession.CURRENT.get())) (4)
.andMatchExecutionHint("computation") (5)
.toFilter(); (6)
1 | Returns an instance of the job event filter builder |
2 | Specifies to match all events representing a job state change |
3 | Specifies to match only events for jobs which transitioned into running state |
4 | Specifies to match only events for jobs of the current session |
5 | Specifies to match only events for jobs which are associated with the execution hint 'computation' |
6 | Builds the filters to get a Filter instance |
Fore more information, refer to the JavaDoc of JobEventFilterBuilder
.
Job cancellation
A job can be cancelled in two ways, either directly via its IFuture, or via job manager. Both expect you to provide a boolean flag indicating whether to interrupt the executing working thread. Upon cancellation, the job immediately enters 'done' state. Learn more about Job states. If cancelling via job manager, a future filter must be given to select the jobs to be cancelled. Learn more about Future filter
The cancellation attempt will be ignored if the job has already completed or was cancelled. If not running yet, the job will never run. If the job has already started, then the interruptIfRunning parameter determines whether the thread executing the job should be interrupted in an attempt to stop the job.
In the following some examples:
// Schedule a job
IFuture<?> future = Jobs.schedule(new Work(), Jobs.newInput());
// Cancel the job via its future
future.cancel(false);
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
.andMatchFuture(future1, future2, future3)
.toFilter(), false);
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
.andMatchExecutionHint("computation")
.andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
.toFilter(), false);
A job can query its current cancellation status via RunMonitor.CURRENT.get().isCancelled()
. If doing some long-running operations, it is recommended for the job to regularly check for cancellation.
A job which is scheduled to run on a copy of the submitting RunContext , it gets also cancelled once the RunMonitor of that context gets cancelled.
|
Subscribe for job lifecycle events
Sometimes it is useful to register for some job lifecycle events. The following event types can be subscribed for:
state | description |
---|---|
JOB_STATE_CHANGED |
Signals that a job transitioned to a new JobState, e.g. form JobState.SCHEDULED to JobState.RUNNING. |
JOB_EXECUTION_HINT_ADDED |
Signals that an execution hint was added to a job. |
JOB_EXECUTION_HINT_REMOVED |
Signals that an execution hint was removed from a job. |
JOB_MANAGER_SHUTDOWN |
Signals that the job manager was shutdown. |
The listener is registered via job manager as following:
Jobs.getJobManager().addListener(Jobs.newEventFilterBuilder() (1)
.andMatchEventType(JobEventType.JOB_STATE_CHANGED)
.andMatchState(JobState.RUNNING)
.andMatch(new SessionJobEventFilter(ISession.CURRENT.get()))
.toFilter(), event -> {
IFuture<?> future = event.getData().getFuture(); (2)
System.out.println("Job commences execution: " + future.getJobInput().getName());
});
1 | Subscribe for all events related to jobs just about to commence execution, and which belong to the current session |
2 | Get the future this event was fired for |
If interested in only events of a single future, the listener can be registered directly on the future.
future.addListener(Jobs.newEventFilterBuilder()
.andMatchEventType(JobEventType.JOB_STATE_CHANGED)
.andMatchState(JobState.RUNNING)
.toFilter(), event -> System.out.println("Job commences execution"));
Awaiting job completion
A job’s completion can be either awaited on its IFuture, or via job manager - the first optionally allows to consume the job’s computation result, whereas the second allows multiple futures to be awaited for.
Difference between 'done' and 'finished' state
When awaiting futures, the definition of 'done' and 'finished' state should be understood - 'done' means that the future completed either normally, or was cancelled. But, if cancelled while running, the job may still continue its execution, whereas a job which not commenced execution yet, will never do so. The latter typically applies for jobs scheduled with a delay. However, 'finished' state differs from 'done' state insofar as a cancelled, currently running job enters 'finished' state only upon its actual completion. Otherwise, if not cancelled, or cancelled before executing, it is equivalent to 'done' state. In most situations, it is sufficient to await for the future’s done state, especially because a cancelled job cannot return a result to the submitter anyway.
Awaiting a single future’s 'done' state
Besides of some overloaded methods, IFuture basically provides two methods to wait for a future to enter 'done' state, namely awaitDone
and awaitDoneAndGet
, with the difference that the latter additionally returns the job’s result or exception. If the future is already done, those methods will return immediately. For both methods, there exists an overloaded version to wait for at most a given time, which once elapsed results in a TimedOutError
thrown.
Further, awaitDoneAndGet
allows to specify an IExceptionTranslator
to control exception translation. By default, DefaultRuntimeExceptionTranslator
is used, meaning that a RuntimeException
is propagated as it is, whereas a checked exception would be wrapped into a PlatformException
. If you require checked exceptions to be thrown as they are, use DefaultExceptionTranslator
instead, or even NullExceptionTranslator
to work with the raw ExecutionException
as being thrown by Java Executor framework.
IFuture<String> future = Jobs.schedule(() -> {
// doing something
return "computation result";
}, Jobs.newInput());
// Wait until done without consuming the result
future.awaitDone(); (1)
future.awaitDone(10, TimeUnit.SECONDS); (2)
// Wait until done and consume the result
String result = future.awaitDoneAndGet(); (3)
result = future.awaitDoneAndGet(10, TimeUnit.SECONDS); (4)
// Wait until done, consume the result, and use a specific exception translator
result = future.awaitDoneAndGet(DefaultExceptionTranslator.class); (5)
result = future.awaitDoneAndGet(10, TimeUnit.SECONDS, DefaultExceptionTranslator.class); (6)
1 | Waits if necessary for the job to complete, or until cancelled. This method does not throw an exception if cancelled or the computation failed, but throws ThreadInterruptedError if the current thread was interrupted while waiting. |
2 | Waits if necessary for at most 10 seconds for the job to complete, or until cancelled, or the timeout elapses. This method does not throw an exception if cancelled, or the computation failed, but throws TimedOutError if waiting timeout elapsed, or throws ThreadInterruptedError if the current thread was interrupted while waiting. |
3 | Waits if necessary for the job to complete, and then returns its result, if available, or throws its exception according to DefaultRuntimeExceptionTranslator , or throws FutureCancelledError if cancelled, or throws ThreadInterruptedError if the current thread was interrupted while waiting. |
4 | Waits if necessary for at most 10 seconds for the job to complete, and then returns its result, if available, or throws its exception according to DefaultRuntimeExceptionTranslator , or throws FutureCancelledError if cancelled, or throws TimedOutError if waiting timeout elapsed, or throws ThreadInterruptedError if the current thread was interrupted while waiting. |
5 | Waits if necessary for the job to complete, and then returns its result, if available, or throws its exception according to the given DefaultExceptionTranslator , or throws FutureCancelledError if cancelled, or throws ThreadInterruptedError if the current thread was interrupted while waiting. |
6 | Waits if necessary for at most the given time for the job to complete, and then returns its result, if available, or throws its exception according to the given DefaultExceptionTranslator , or throws FutureCancelledError if cancelled, or throws TimedOutError if waiting timeout elapsed, or throws ThreadInterruptedError if the current thread was interrupted while waiting. |
It is further possible to await asynchronously on a future to enter done state by registering a callback via whenDone
method. The advantage over registering a listener is that the callback is invoked even if the future already entered done state upon registration.
future.whenDone(event -> {
// invoked upon entering done state.
}, ClientRunContexts.copyCurrent());
Because invoked in another thread, this method optionally accepts a RunContext to be applied when being invoked.
Awaiting a single future’s 'finished' state
Use the method awaitFinished
to wait for the job to finish, meaning that the job either completed normally or by an exception, or that it will never commence execution due to a premature cancellation. To learn more about the difference between 'done' and 'finished' state, click here.
Please note that this method does not return the job’s result, because by Java Future definition, a cancelled job cannot provide a result.
IFuture<String> future = Jobs.schedule(() -> {
// doing something
return "computation result";
}, Jobs.newInput());
// Wait until finished
future.awaitFinished(10, TimeUnit.SECONDS);
Awaiting multiple future’s 'done' state
Job Manager allows to await for multiple futures at once. The filter to be provided limits the futures to await for. This method requires you to provide a maximal time to wait.
Filters can be plugged by using logical filters like AndFilter
or OrFilter
, or negated by enclosing a filter in NotFilter
. Also see Future filter to create a filter to match multiple criteria joined by logical 'AND' operation.
// Wait for some futures
Jobs.getJobManager().awaitDone(Jobs.newFutureFilterBuilder() (1)
.andMatchFuture(future1, future2, future3)
.toFilter(), 1, TimeUnit.MINUTES);
// Wait for all futures marked as 'reporting' jobs of the current session
Jobs.getJobManager().awaitDone(Jobs.newFutureFilterBuilder() (2)
.andMatchExecutionHint("reporting")
.andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
.toFilter(), 1, TimeUnit.MINUTES);
1 | Waits if necessary for at most 1 minute for all three futures to complete, or until cancelled, or the timeout elapses. |
2 | Waits if necessary for at most 1 minute until all jobs marked as 'reporting' jobs of the current session complete, or until cancelled, or the timeout elapses. |
Awaiting multiple future’s 'finished' state
Use the method awaitFinished
to wait for multiple jobs to finish, meaning that the jobs either completed normally or by an exception, or that they will never commence execution due to a premature cancellation. To learn more about the difference between 'done' and 'finished' state, click here.
// Wait for some futures
Jobs.getJobManager().awaitFinished(Jobs.newFutureFilterBuilder() (1)
.andMatchFuture(future1, future2, future3)
.toFilter(), 1, TimeUnit.MINUTES);
// Wait for all futures marked as 'reporting' jobs of the current session
Jobs.getJobManager().awaitFinished(Jobs.newFutureFilterBuilder() (2)
.andMatchExecutionHint("reporting")
.andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
.toFilter(), 1, TimeUnit.MINUTES);
1 | Waits if necessary for at most 1 minute for all three futures to finish, or until cancelled, or the timeout elapses. |
2 | Waits if necessary for at most 1 minute until all jobs marked as 'reporting' jobs of the current session finish, or until cancelled, or the timeout elapses. |
Uncaught job exceptions
If a job throws an exception, that exception is handled by ExceptionHandler
, and propagated to the submitter. However, the exception is only propagated if having a waiting submitter. Also, an uncaught exception causes repetitive jobs to terminate.
This default behavior as described can be changed via JobInput.withExceptionHandling(..).
Blocking condition
A blocking condition allows a thread to wait for a condition to become true. That is similar to the Java Object’s 'wait/notify' mechanism, but with some additional functionality regarding semaphore aware jobs. If a semaphore aware job enters a blocking condition, it releases ownership of the permit, which allows another job of that same semaphore to commence execution. Upon the condition becomes true, the job then must compete for a permit anew.
A condition can be used across multiple threads to wait for the same condition. Also, a condition is reusable upon invalidation. And finally, a condition can be used even if not running within a job.
A blocking condition is often used by model jobs to wait for something to happen, but to allow another model job to run while waiting. A typical use case would be to wait for a MessageBox to be closed.
Example of a blocking condition
You are running in a semaphore aware job and require to do some long-running operation. During that time you do not require to be the permit owner. A simple but wrong approach would be the following:
// Schedule a long running operation.
IFuture<?> future = Jobs.schedule(new LongRunningOperation(), Jobs.newInput());
// Wait until done.
future.awaitDone();
The problem with this approach is, that you still are the permit owner while waiting, meaning that you possibly prevent other jobs from running. Instead, you could use a blocking condition for that to achieve:
// Create a blocking condition.
final IBlockingCondition operationCompleted = Jobs.newBlockingCondition(true);
// Schedule a long running operation.
IFuture<Void> future = Jobs.schedule(new LongRunningOperation(), Jobs.newInput());
// Register done callback to unblock the condition.
future.whenDone(event -> {
// Let the waiting job re-acquire a permit and continue execution.
operationCompleted.setBlocking(false);
}, null);
// Wait until done. Thereby, the permit of the current job is released for the time while waiting.
operationCompleted.waitFor();
ExecutionSemaphore
Represents a fair counting semaphore used in Job API to control the maximal number of jobs running concurrently.
Jobs which are assigned to the same semaphore run concurrently until they reach the maximal concurrency level defined for that semaphore. Subsequent tasks then wait in the queue until a permit becomes available.
A semaphore initialized to one allows to run jobs in a mutually exclusive manner, and a semaphore initialized to zero to run no job at all. The number of total permits available can be changed at any time, which allows to adapt the maximal concurrency level to some dynamic criteria like time of day or system load. However, once calling seal(), the number of permits cannot be changed anymore, and any attempts will result in an AssertionException. By default, a semaphore is unbounded.
ExecutionTrigger
Component that defines the schedule upon which a job will commence execution.
A trigger can be as simple as a 'one-shot' execution at some specific point in time in the future, or represent a schedule which executes a job on a repeated basis. The latter can be configured to run infinitely, or to end at a specific point in time. It is further possible to define rather complex triggers, like to execute a job every second Friday at noon, but with the exclusion of all the business’s holidays.
See the various schedule builders provided by Quartz Scheduler:
SimpleScheduleBuilder
, CronScheduleBuilder
, CalendarIntervalScheduleBuilder
, DailyTimeIntervalScheduleBuilder
. The most powerful builder is CronScheduleBuilder
. Cron is a UNIX tool with powerful and proven scheduling capabilities. For more information, see http://www.quartz-scheduler.org.
Additionally, Scout provides you with FixedDelayScheduleBuilder
to run a job with a fixed delay between the termination of one execution and the commencement of the next execution.
Use the static factory method 'Jobs.newExecutionTrigger()' to get an instance.
Misfiring
Regardless of the schedule used, job manager guarantees no concurrent execution of the same job. That may happen, if using a repeated schedule with the job not terminated its last execution yet, but the schedule’s trigger would like to fire for the next execution already. Such a situation is called a misfiring. The action to be taken upon a misfiring is configurable via the schedule’s misfiring policy. A policy can be to run the job immediately upon termination of the previous execution, or to just ignore that missed firing. See the JavaDoc of the schedule for more information.
Stopping the platform
Upon stopping the platform, the job manager will also be shutdown.If having a IPlatformListener
to perform some cleanup work, and which requires the job manager to be still functional, that listener must be annotated with an @Order
less than IJobManager.DESTROY_ORDER
, which is 5'900. If not specifying an @Order
explicitly, the listener will have the default order of 5, meaning being invoked before job manager shutdown anyway.
Model Jobs
Model jobs exist client side only, and are used to interact with the Scout client model to read and write model values in a serial manner per session.That enables no synchronization to be used when interacting with the model.
By definition, a model job requires to be run on behalf of a ClientRunContext
with a IClientSession
set, and must have the session’s model job semaphore set as its ExecutionSemaphore.That causes all such jobs to be run in sequence in the model thread.At any given time, there is only one model thread active per client session.
The class ModelJobs
is a helper class, and is for convenience purpose to facilitate the creation of model job related artifacts, and to schedule model jobs.
(1)
ModelJobs.schedule(() -> {
// doing something in model thread
}, ModelJobs.newInput(ClientRunContexts.copyCurrent()) (2)
.withName("Doing something in model thread"));
1 | Schedules the work to be executed in the model thread |
2 | Creates the JobInput to become a model job, meaning with the session’s model job semaphore set |
For model jobs, it is also allowed to run according to a Quartz schedule plan, or to be executed with a delay. Then the model permit is acquired just before each execution, and not upon being scheduled.
Furthermore, the class ModelJobs
provides some useful static methods:
// Returns true if the current thread represents the model thread for the current client session. At any given time, there is only one model thread active per client session.
ModelJobs.isModelThread();
// Returns true if the given Future belongs to a model job.
ModelJobs.isModelJob(IFuture.CURRENT.get());
// Returns a builder to create a filter for future objects representing a model job.
ModelJobs.newFutureFilterBuilder();
// Returns a builder to create a filter for JobEvent objects originating from model jobs.
ModelJobs.newEventFilterBuilder();
// Instructs the job manager that the current model job is willing to temporarily yield its current model job permit. It is rarely appropriate to use this method. It may be useful for debugging or testing purposes.
ModelJobs.yield();
Configuration
Job manager can be configured with properties starting with scout.jobmanager
. See Scout Config Properties.
Extending job manager
Job manager is implemented as an application scoped bean, and which can be replaced. To do so, create a class which extends JobManager
, and annotate it with @Replace
annotation. Most likely, you like to use the EE container’s ThreadPoolExecutor
, or to contribute some behavior to the callable chain which finally executes the job.
To change the executor, overwrite createExecutor
method and return the executor of your choice. But do not forget to register a rejection handler to reject futures upon rejection. Also, overwrite shutdownExecutor
to not shut down the container’s executor.
To contribute some behavior to the callable chain, overwrite the method interceptCallableChain
and contribute your decorator or interceptor. Refer to the method’s JavaDoc for more information.
Scheduling examples
This sections contains some common scheduling examples.
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running once")
.withRunContext(ClientRunContexts.copyCurrent()));
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running in 10 seconds")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(10, TimeUnit.SECONDS))); // delay of 10 seconds
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running every minute")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(1, TimeUnit.MINUTES) (1)
.withSchedule(SimpleScheduleBuilder.simpleSchedule() (2)
.withIntervalInMinutes(1) (3)
.repeatForever()))); (4)
1 | Configure to fire in 1 minute for the first time |
2 | Use Quartz simple schedule to achieve fixed-rate execution |
3 | Repetitively fire every minute |
4 | Repeat forever |
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running every minute for total 60 times")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(1, TimeUnit.MINUTES) (1)
.withSchedule(SimpleScheduleBuilder.simpleSchedule() (2)
.withIntervalInMinutes(1) (3)
.withRepeatCount(59)))); (4)
1 | Configure to fire in 1 minute for the first time |
2 | Use Quartz simple schedule to achieve fixed-rate execution |
3 | Repetitively fire every minute |
4 | Repeat 59 times, plus the initial execution |
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running forever with a delay of 1 minute between the termination of the previous and the next execution")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(1, TimeUnit.MINUTES) (1)
.withSchedule(FixedDelayScheduleBuilder.repeatForever(1, TimeUnit.MINUTES)))); (2)
1 | Configure to fire in 1 minute for the first time |
2 | Use fixed delay schedule |
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running 60 times with a delay of 1 minute between the termination of the previous and the next execution")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withStartIn(1, TimeUnit.MINUTES) (1)
.withSchedule(FixedDelayScheduleBuilder.repeatForTotalCount(60, 1, TimeUnit.MINUTES)))); (2)
1 | Configure to fire in 1 minute for the first time |
2 | Use fixed delay schedule |
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule("0 15 10 ? * MON-FRI")))); (1)
1 | Cron format: [second] [minute] [hour] [day_of_month] [month] [day_of_week] [year]? |
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("Running every minute starting at 14:00 and ending at 14:05, every day")
.withRunContext(ClientRunContexts.copyCurrent())
.withExecutionTrigger(Jobs.newExecutionTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule("0 0-5 14 * * ?")))); (1)
1 | Cron format: [second] [minute] [hour] [day_of_month] [month] [day_of_week] [year]? |
IExecutionSemaphore semaphore = Jobs.newExecutionSemaphore(5); (1)
for (int i = 0; i < 100; i++) {
Jobs.schedule(() -> {
// doing something
}, Jobs.newInput()
.withName("job-{}", i)
.withExecutionSemaphore(semaphore)); (2)
}
1 | Create the execution semaphore initialized with 5 permits |
2 | Set the execution semaphore to the job subject for limited concurrency |
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
.andMatch(new SessionFutureFilter(ISession.CURRENT.get()))
.toFilter(), true);
public class CancellableWork implements IRunnable {
@Override
public void run() throws Exception {
// do first chunk of operations
if (RunMonitor.CURRENT.get().isCancelled()) {
return;
}
// do next chunk of operations
if (RunMonitor.CURRENT.get().isCancelled()) {
return;
}
// do next chunk of operations
}
}
// Create a blocking condition.
final IBlockingCondition operationCompleted = Jobs.newBlockingCondition(true);
// Schedule a long running operation.
IFuture<Void> future = Jobs.schedule(new LongRunningOperation(), Jobs.newInput());
// Register done callback to unblock the condition.
future.whenDone(event -> {
// Let the waiting job re-acquire a permit and continue execution.
operationCompleted.setBlocking(false);
}, null);
// Wait until done. Thereby, the permit of the current job is released for the time while waiting.
operationCompleted.waitFor();