Search This Blog

Friday, 1 November 2013

Repeating and Delaying Tasks

The ExecutorService Interface is not the end of Executors. The package provides another specialized interface ScheduledExecutorService which is capable of executing tasks after a certain delay, or repeating tasks at fixed intervals.
I decided to test the interface.
The problem was simple - create a Jobber that starts after a certain time has elapsed
and have a Job Monitor that monitors the performance of the Jobber
.
The first step was to create a jobber:
class Jobber implements Callable<Void> {

  @Override
  public Void call() {
    String threadName = "[" + Thread.currentThread().getName() + "]";
    System.out.println(threadName + " : Starting the Jobs...");
    for (int i = 0; i < ScheduledExecutor.MAX_JOBS; i++) {
      int result = 0;
      int val = 0;
      synchronized (ScheduledExecutor.jobs) {
        val = ScheduledExecutor.jobs[i];
      }
      for (long j = 0; j < val; j++) {
        result = result + 1;
      }

      synchronized (ScheduledExecutor.results) {
        ScheduledExecutor.results[i] = result;
      }
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
    System.out.println(threadName + " : Completed the Jobs.");
    return null;
  }

}
The Jobber here is a Callable that processes the jobs supplied in a jobs array. After processing of each job it sleeps for a period of five seconds and then starts the next job. (A Five second sleep does not mean that the job will start on the 6th second. It will start when the thread is given processor time after it has slept for 5 seconds.)
The next step was to create a monitor:
class ResultMonitor implements Runnable {

  @Override
  public void run() {
          String threadName = "[" + Thread.currentThread().getName() + "]";
          int count = 0;
          synchronized (ScheduledExecutor.results) {
                  for (Integer result : ScheduledExecutor.results) {
                          if (null != result) {
                                  count++;
                          }
                  }
          }
          System.out.println(threadName + " : Total results found " + count);
  }

}
This class is much more simpler. ResultMonitor is a Runnable that simply checks the result array to see the number of jobs that have reached completion and reports back accordingly. Now the main class:
public class ScheduledExecutor {
  public static final int MAX_JOBS = 6;
  public static final int[] jobs;
  public static final Integer[] results = new Integer[MAX_JOBS];

  static {
          jobs = new int[MAX_JOBS];
          Random random = new Random((long) Math.random() * 3);
          for (int i = 0; i < jobs.length; i++) {
                  int nxtNo = random.nextInt();
                  nxtNo = nxtNo < 0 ? nxtNo * -1 : nxtNo;
                  jobs[i] = nxtNo / 1000;
          }
  }

  public static void main(String[] args) throws InterruptedException {
          ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
          ScheduledFuture<?> jobberFeature = scheduledExecutorService.schedule(
                          new Jobber(), 10, TimeUnit.SECONDS);
          ScheduledFuture<?> monitorFeature = scheduledExecutorService
                          .scheduleWithFixedDelay(new ResultMonitor(), 0, 10,
                                          TimeUnit.SECONDS);
          while (true) {
                  System.out.println("[" + Thread.currentThread().getName()
                                  + "] Testing thread Status: "
                                  + "\n Is Jobber Done ? " + jobberFeature.isDone()
                                  + "\n Is Monitor Done ? " + monitorFeature.isDone());

                  if (jobberFeature.isDone()) {
                          System.out.println("Terminating the Monitor as jobber is complete");
                          monitorFeature.cancel(true);
                          break;
                  }
                  try {
                          Thread.sleep(30 * 1000);
                  } catch (InterruptedException exception) {
                          scheduledExecutorService.shutdownNow();
                          System.exit(0);
                  }

          }
          scheduledExecutorService.shutdown();
  }

}
As seen here the ScheduledExecutor class creates a static array of jobs and has an empty result array. The two arrays are referenced by the Jobber and ResultMonitor classes.
The Executor used in the code is a instance of ScheduledThreadPoolExecutor. To schedule the Jobber instance to start after 10 seconds we have used the schedule API:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
         long delay, TimeUnit unit)
The return type is a ScheduledFuture instance:
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
Similarly to have a repeating task the API used is :
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
                    long delay, TimeUnit unit)
Within the main thread I check if the Jobber task is done. (Using the isDone() method). If the jobber is complete, then the MonitorFeature is cancelled. Without the cancellation the task will keep repeating endlessly.
Also I have used the main thread to audit the status of the Executor threads at intervals of 30 seconds. On running the code:
[pool-1-thread-1] : Total results found 0
[main] Testing thread Status:
 Is Jobber Done ? false
 Is Monitor Done ? false
[pool-1-thread-2] : Starting the Jobs...
[pool-1-thread-1] : Total results found 0
[pool-1-thread-2] : Completed the Jobs.
[pool-1-thread-2] : Total results found 6
[main] Testing thread Status:
 Is Jobber Done ? true
 Is Monitor Done ? false
Terminating the Monitor as jobber is complete
As seen the Jobber thread started after a certain time had elapsed since program start. Also the Executor used different threads for executing the ResultMonitor task.
There is also a scheduleAtFixedRate method available which attempts to start the iterations after fixed intervals. The scheduleWithFixedDelay method ensures that a certain amount of time elapses between the completion of one iteration and the beginning of the next run for the given Runnable instance.

No comments:

Post a Comment