MOA 12.03
Real Time Analytics for Data Streams
|
00001 /* 00002 * TaskThread.java 00003 * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand 00004 * @author Richard Kirkby ([email protected]) 00005 * 00006 * This program is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU General Public License as published by 00008 * the Free Software Foundation; either version 3 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00018 * 00019 */ 00020 package moa.tasks; 00021 00022 import java.util.concurrent.CopyOnWriteArraySet; 00023 00024 import moa.core.ObjectRepository; 00025 import moa.core.TimingUtils; 00026 00033 public class TaskThread extends Thread { 00034 00035 public static enum Status { 00036 00037 NOT_STARTED, RUNNING, PAUSED, CANCELLING, CANCELLED, COMPLETED, FAILED 00038 } 00039 00040 protected Task runningTask; 00041 00042 protected volatile Status currentStatus; 00043 00044 protected TaskMonitor taskMonitor; 00045 00046 protected ObjectRepository repository; 00047 00048 protected Object finalResult; 00049 00050 protected long taskStartTime; 00051 00052 protected long taskEndTime; 00053 00054 protected double latestPreviewGrabTime = 0.0; 00055 00056 CopyOnWriteArraySet<TaskCompletionListener> completionListeners = new CopyOnWriteArraySet<TaskCompletionListener>(); 00057 00058 public TaskThread(Task toRun) { 00059 this(toRun, null); 00060 } 00061 00062 public TaskThread(Task toRun, ObjectRepository repository) { 00063 this.runningTask = toRun; 00064 this.repository = repository; 00065 this.currentStatus = Status.NOT_STARTED; 00066 this.taskMonitor = new StandardTaskMonitor(); 00067 this.taskMonitor.setCurrentActivityDescription("Running task " + toRun); 00068 } 00069 00070 @Override 00071 public void run() { 00072 TimingUtils.enablePreciseTiming(); 00073 this.taskStartTime = TimingUtils.getNanoCPUTimeOfThread(getId()); 00074 try { 00075 this.currentStatus = Status.RUNNING; 00076 this.finalResult = this.runningTask.doTask(this.taskMonitor, 00077 this.repository); 00078 this.currentStatus = this.taskMonitor.isCancelled() ? Status.CANCELLED 00079 : Status.COMPLETED; 00080 } catch (Throwable ex) { 00081 this.currentStatus = Status.FAILED; 00082 this.finalResult = new FailedTaskReport(ex); 00083 } 00084 this.taskEndTime = TimingUtils.getNanoCPUTimeOfThread(getId()); 00085 fireTaskCompleted(); 00086 this.taskMonitor.setLatestResultPreview(null); // free preview memory 00087 } 00088 00089 public synchronized void pauseTask() { 00090 if (this.currentStatus == Status.RUNNING) { 00091 this.taskMonitor.requestPause(); 00092 this.currentStatus = Status.PAUSED; 00093 } 00094 } 00095 00096 public synchronized void resumeTask() { 00097 if (this.currentStatus == Status.PAUSED) { 00098 this.taskMonitor.requestResume(); 00099 this.currentStatus = Status.RUNNING; 00100 } 00101 } 00102 00103 public synchronized void cancelTask() { 00104 if ((this.currentStatus == Status.RUNNING) 00105 || (this.currentStatus == Status.PAUSED)) { 00106 this.taskMonitor.requestCancel(); 00107 this.currentStatus = Status.CANCELLING; 00108 } 00109 } 00110 00111 public double getCPUSecondsElapsed() { 00112 double secondsElapsed = 0.0; 00113 if (this.currentStatus == Status.NOT_STARTED) { 00114 secondsElapsed = 0.0; 00115 } else if (isComplete()) { 00116 secondsElapsed = TimingUtils.nanoTimeToSeconds(this.taskEndTime 00117 - this.taskStartTime); 00118 } else { 00119 secondsElapsed = TimingUtils.nanoTimeToSeconds(TimingUtils.getNanoCPUTimeOfThread(getId()) 00120 - this.taskStartTime); 00121 } 00122 return secondsElapsed > 0.0 ? secondsElapsed : 0.0; 00123 } 00124 00125 public Task getTask() { 00126 return this.runningTask; 00127 } 00128 00129 public String getCurrentStatusString() { 00130 switch (this.currentStatus) { 00131 case NOT_STARTED: 00132 return "not started"; 00133 case RUNNING: 00134 return "running"; 00135 case PAUSED: 00136 return "paused"; 00137 case CANCELLING: 00138 return "cancelling"; 00139 case CANCELLED: 00140 return "cancelled"; 00141 case COMPLETED: 00142 return "completed"; 00143 case FAILED: 00144 return "failed"; 00145 } 00146 return "unknown"; 00147 } 00148 00149 public String getCurrentActivityString() { 00150 return (isComplete() || (this.currentStatus == Status.NOT_STARTED)) ? "" 00151 : this.taskMonitor.getCurrentActivityDescription(); 00152 } 00153 00154 public double getCurrentActivityFracComplete() { 00155 switch (this.currentStatus) { 00156 case NOT_STARTED: 00157 return 0.0; 00158 case RUNNING: 00159 case PAUSED: 00160 case CANCELLING: 00161 return this.taskMonitor.getCurrentActivityFractionComplete(); 00162 case CANCELLED: 00163 case COMPLETED: 00164 case FAILED: 00165 return 1.0; 00166 } 00167 return 0.0; 00168 } 00169 00170 public boolean isComplete() { 00171 return ((this.currentStatus == Status.CANCELLED) 00172 || (this.currentStatus == Status.COMPLETED) || (this.currentStatus == Status.FAILED)); 00173 } 00174 00175 public Object getFinalResult() { 00176 return this.finalResult; 00177 } 00178 00179 public void addTaskCompletionListener(TaskCompletionListener tcl) { 00180 this.completionListeners.add(tcl); 00181 } 00182 00183 public void removeTaskCompletionListener(TaskCompletionListener tcl) { 00184 this.completionListeners.remove(tcl); 00185 } 00186 00187 protected void fireTaskCompleted() { 00188 for (TaskCompletionListener listener : this.completionListeners) { 00189 listener.taskCompleted(this); 00190 } 00191 } 00192 00193 public void getPreview(ResultPreviewListener previewer) { 00194 this.taskMonitor.requestResultPreview(previewer); 00195 this.latestPreviewGrabTime = getCPUSecondsElapsed(); 00196 } 00197 00198 public Object getLatestResultPreview() { 00199 return this.taskMonitor.getLatestResultPreview(); 00200 } 00201 00202 public double getLatestPreviewGrabTimeSeconds() { 00203 return this.latestPreviewGrabTime; 00204 } 00205 }