1 package org.apache.turbine.services.schedule;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.turbine.services.InitializationException;
27 import org.apache.turbine.services.TurbineBaseService;
28 import org.apache.turbine.util.TurbineException;
29
30 /**
31 * Service for a cron like scheduler.
32 *
33 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
34 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
35 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
36 */
37 public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
38 {
39 /** Logging */
40 protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
41
42 /** The queue */
43 protected JobQueue<JobEntry> scheduleQueue = null;
44
45 /** Current status of the scheduler */
46 protected boolean enabled = false;
47
48 /** The main loop for starting jobs. */
49 protected MainLoop mainLoop;
50
51 /** The thread used to process commands. */
52 protected Thread thread;
53
54 /**
55 * Creates a new instance.
56 */
57 public AbstractSchedulerService()
58 {
59 mainLoop = null;
60 thread = null;
61 }
62
63 /**
64 * Initializes the SchedulerService.
65 *
66 * @throws InitializationException
67 * Something went wrong in the init stage
68 */
69 @Override
70 public void init() throws InitializationException
71 {
72 try
73 {
74 setEnabled(getConfiguration().getBoolean("enabled", true));
75 scheduleQueue = new JobQueue<JobEntry>();
76 mainLoop = new MainLoop();
77
78 @SuppressWarnings("unchecked") // Why is this cast necessary?
79 List<JobEntry> jobs = (List<JobEntry>)loadJobs();
80 scheduleQueue.batchLoad(jobs);
81 restart();
82
83 setInit(true);
84 }
85 catch (Exception e)
86 {
87 throw new InitializationException("Could not initialize the scheduler service", e);
88 }
89 }
90
91 /**
92 * Load all jobs from configuration storage
93 *
94 * @return the list of pre-configured jobs
95 * @throws TurbineException
96 */
97 protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
98
99 /**
100 * Shutdowns the service.
101 *
102 * This methods interrupts the housekeeping thread.
103 */
104 @Override
105 public void shutdown()
106 {
107 if (getThread() != null)
108 {
109 getThread().interrupt();
110 }
111 }
112
113 /**
114 * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
115 */
116 @Override
117 public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
118
119 /**
120 * Get a specific Job from Storage.
121 *
122 * @param oid
123 * The int id for the job.
124 * @return A JobEntry.
125 * @exception TurbineException
126 * job could not be retrieved.
127 */
128 @Override
129 public abstract JobEntry getJob(int oid) throws TurbineException;
130
131 /**
132 * Add a new job to the queue.
133 *
134 * @param je
135 * A JobEntry with the job to add.
136 * @throws TurbineException
137 * job could not be added
138 */
139 @Override
140 public void addJob(JobEntry je) throws TurbineException
141 {
142 updateJob(je);
143 }
144
145 /**
146 * Remove a job from the queue.
147 *
148 * @param je
149 * A JobEntry with the job to remove.
150 * @exception TurbineException
151 * job could not be removed
152 */
153 @Override
154 public abstract void removeJob(JobEntry je) throws TurbineException;
155
156 /**
157 * Add or update a job.
158 *
159 * @param je
160 * A JobEntry with the job to modify
161 * @throws TurbineException
162 * job could not be updated
163 */
164 @Override
165 public abstract void updateJob(JobEntry je) throws TurbineException;
166
167 /**
168 * List jobs in the queue. This is used by the scheduler UI.
169 *
170 * @return A List of jobs.
171 */
172 @Override
173 public List<JobEntry> listJobs()
174 {
175 return scheduleQueue.list();
176 }
177
178 /**
179 * Sets the enabled status of the scheduler
180 *
181 * @param enabled
182 *
183 */
184 protected void setEnabled(boolean enabled)
185 {
186 this.enabled = enabled;
187 }
188
189 /**
190 * Determines if the scheduler service is currently enabled.
191 *
192 * @return Status of the scheduler service.
193 */
194 @Override
195 public boolean isEnabled()
196 {
197 return enabled;
198 }
199
200 /**
201 * Starts or restarts the scheduler if not already running.
202 */
203 @Override
204 public synchronized void startScheduler()
205 {
206 setEnabled(true);
207 restart();
208 }
209
210 /**
211 * Stops the scheduler if it is currently running.
212 */
213 @Override
214 public synchronized void stopScheduler()
215 {
216 log.info("Stopping job scheduler");
217 Thread thread = getThread();
218 if (thread != null)
219 {
220 thread.interrupt();
221 }
222 enabled = false;
223 }
224
225 /**
226 * Return the thread being used to process commands, or null if there is no
227 * such thread. You can use this to invoke any special methods on the
228 * thread, for example, to interrupt it.
229 *
230 * @return A Thread.
231 */
232 public synchronized Thread getThread()
233 {
234 return thread;
235 }
236
237 /**
238 * Set thread to null to indicate termination.
239 */
240 protected synchronized void clearThread()
241 {
242 thread = null;
243 }
244
245 /**
246 * Start (or restart) a thread to process commands, or wake up an existing
247 * thread if one is already running. This method can be invoked if the
248 * background thread crashed due to an unrecoverable exception in an
249 * executed command.
250 */
251 public synchronized void restart()
252 {
253 if (enabled)
254 {
255 log.info("Starting job scheduler");
256 if (thread == null)
257 {
258 // Create the the housekeeping thread of the scheduler. It will
259 // wait for the time when the next task needs to be started,
260 // and then launch a worker thread to execute the task.
261 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
262 // Indicate that this is a system thread. JVM will quit only
263 // when there are no more enabled user threads. Settings threads
264 // spawned internally by Turbine as daemons allows commandline
265 // applications using Turbine to terminate in an orderly manner.
266 thread.setDaemon(true);
267 thread.start();
268 }
269 else
270 {
271 notify();
272 }
273 }
274 }
275
276 /**
277 * Return the next Job to execute, or null if thread is interrupted.
278 *
279 * @return A JobEntry.
280 * @exception TurbineException
281 * a generic exception.
282 */
283 protected synchronized JobEntry nextJob() throws TurbineException
284 {
285 try
286 {
287 while (!Thread.interrupted())
288 {
289 // Grab the next job off the queue.
290 JobEntry je = scheduleQueue.getNext();
291
292 if (je == null)
293 {
294 // Queue must be empty. Wait on it.
295 wait();
296 }
297 else
298 {
299 long now = System.currentTimeMillis();
300 long when = je.getNextRuntime();
301
302 if (when > now)
303 {
304 // Wait till next runtime.
305 wait(when - now);
306 }
307 else
308 {
309 // Update the next runtime for the job.
310 scheduleQueue.updateQueue(je);
311 // Return the job to run it.
312 return je;
313 }
314 }
315 }
316 }
317 catch (InterruptedException ex)
318 {
319 // ignore
320 }
321
322 // On interrupt.
323 return null;
324 }
325
326 /**
327 * Inner class. This is isolated in its own Runnable class just so that the
328 * main class need not implement Runnable, which would allow others to
329 * directly invoke run, which is not supported.
330 */
331 protected class MainLoop implements Runnable
332 {
333 /**
334 * Method to run the class.
335 */
336 @Override
337 public void run()
338 {
339 String taskName = null;
340 try
341 {
342 while (enabled)
343 {
344 JobEntry je = nextJob();
345 if (je != null)
346 {
347 taskName = je.getTask();
348
349 // Start the thread to run the job.
350 Runnable wt = new WorkerThread(je);
351 Thread helper = new Thread(wt);
352 helper.start();
353 }
354 else
355 {
356 break;
357 }
358 }
359 }
360 catch (Exception e)
361 {
362 log.error("Error running a Scheduled Job: " + taskName, e);
363 enabled = false;
364 }
365 finally
366 {
367 clearThread();
368 }
369 }
370 }
371 }