MOA 12.03
Real Time Analytics for Data Streams
EvaluateInterleavedChunks.java
Go to the documentation of this file.
00001 /*
00002  *    EvaluateInterleavedChunks.java
00003  *    Copyright (C) 2010 Poznan University of Technology, Poznan, Poland
00004  *    @author Dariusz Brzezinski (dariusz.brzezinski@cs.put.poznan.pl)
00005  *
00006  *    This program is free software; you can redistribute it and/or modify
00007  *    it under the terms of the GNU General Public License as published by
00008  *    the Free Software Foundation; either version 3 of the License, or
00009  *    (at your option) any later version.
00010  *
00011  *    This program is distributed in the hope that it will be useful,
00012  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *    GNU General Public License for more details.
00015  *
00016  *    You should have received a copy of the GNU General Public License
00017  *    along with this program. If not, see <http://www.gnu.org/licenses/>.
00018  *    
00019  */
00020 package moa.tasks;
00021 
00022 import java.io.File;
00023 import java.io.FileOutputStream;
00024 import java.io.PrintStream;
00025 import moa.classifiers.Classifier;
00026 import moa.core.Measurement;
00027 import moa.core.ObjectRepository;
00028 import moa.core.TimingUtils;
00029 import moa.evaluation.ClassificationPerformanceEvaluator;
00030 import moa.evaluation.LearningCurve;
00031 import moa.evaluation.LearningEvaluation;
00032 import moa.options.ClassOption;
00033 import moa.options.FileOption;
00034 import moa.options.IntOption;
00035 import moa.streams.InstanceStream;
00036 import weka.core.Instance;
00037 import weka.core.Instances;
00038 
00039 public class EvaluateInterleavedChunks extends MainTask {
00040 
00041         @Override
00042         public String getPurposeString() {
00043                 return "Evaluates a classifier on a stream by testing then training with chunks of data in sequence.";
00044         }
00045         
00046         private static final long serialVersionUID = 1L;
00047 
00051         public ClassOption learnerOption = new ClassOption("learner", 'l',
00052                         "Classifier to train.", Classifier.class, "bayes.NaiveBayes");
00053 
00057         public ClassOption streamOption = new ClassOption("stream", 's',
00058                         "Stream to learn from.", InstanceStream.class,
00059                         "generators.RandomTreeGenerator");
00060 
00064         public ClassOption evaluatorOption = new ClassOption("evaluator", 'e',
00065                         "Classification performance evaluation method.",
00066                         ClassificationPerformanceEvaluator.class,
00067                         "BasicClassificationPerformanceEvaluator");
00068 
00072         public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
00073                         "Maximum number of instances to test/train on  (-1 = no limit).",
00074                         100000000, -1, Integer.MAX_VALUE);
00075 
00079         public IntOption chunkSizeOption = new IntOption("chunkSize", 'c',
00080                         "Number of instances in a data chunk.",
00081                         1000, 1, Integer.MAX_VALUE);
00082         
00086         public IntOption timeLimitOption = new IntOption("timeLimit", 't',
00087                         "Maximum number of seconds to test/train for (-1 = no limit).", -1,
00088                         -1, Integer.MAX_VALUE);
00089 
00093         public IntOption sampleFrequencyOption = new IntOption("sampleFrequency",
00094                         'f',
00095                         "How many instances between samples of the learning performance.",
00096                         100000, 0, Integer.MAX_VALUE);
00097 
00101         public IntOption maxMemoryOption = new IntOption("maxMemory", 'b',
00102                         "Maximum size of model (in bytes). -1 = no limit.", -1, -1,
00103                         Integer.MAX_VALUE);
00104 
00108         public IntOption memCheckFrequencyOption = new IntOption(
00109                         "memCheckFrequency", 'q',
00110                         "How many instances between memory bound checks.", 100000, 0,
00111                         Integer.MAX_VALUE);
00112 
00116         public FileOption dumpFileOption = new FileOption("dumpFile", 'd',
00117                         "File to append intermediate csv reslts to.", null, "csv", true);
00118 
00122         public Class<?> getTaskResultType() {
00123                 return LearningCurve.class;
00124         }
00125 
00126         @Override
00127         protected Object doMainTask(TaskMonitor monitor, ObjectRepository repository) {
00128                 Classifier learner = (Classifier) getPreparedClassOption(this.learnerOption);
00129                 InstanceStream stream = (InstanceStream) getPreparedClassOption(this.streamOption);
00130                 ClassificationPerformanceEvaluator evaluator = (ClassificationPerformanceEvaluator) getPreparedClassOption(this.evaluatorOption);
00131                 learner.setModelContext(stream.getHeader());
00132                 int maxInstances = this.instanceLimitOption.getValue();
00133                 int chunkSize = this.chunkSizeOption.getValue();
00134                 long instancesProcessed = 0;
00135                 int maxSeconds = this.timeLimitOption.getValue();
00136                 int secondsElapsed = 0;
00137                 
00138                 monitor.setCurrentActivity("Evaluating learner...", -1.0);
00139                 LearningCurve learningCurve = new LearningCurve(
00140                                 "learning evaluation instances");
00141                 File dumpFile = this.dumpFileOption.getFile();
00142                 PrintStream immediateResultStream = null;
00143                 if (dumpFile != null) {
00144                         try {
00145                                 if (dumpFile.exists()) {
00146                                         immediateResultStream = new PrintStream(
00147                                                         new FileOutputStream(dumpFile, true), true);
00148                                 } else {
00149                                         immediateResultStream = new PrintStream(
00150                                                         new FileOutputStream(dumpFile), true);
00151                                 }
00152                         } catch (Exception ex) {
00153                                 throw new RuntimeException(
00154                                                 "Unable to open immediate result file: " + dumpFile, ex);
00155                         }
00156                 }
00157                 boolean firstDump = true;
00158                 boolean firstChunk = true;
00159                 boolean preciseCPUTiming = TimingUtils.enablePreciseTiming();
00160                 long evaluateStartTime = TimingUtils.getNanoCPUTimeOfCurrentThread();
00161                 long sampleTestTime =0, sampleTrainTime = 0;
00162                 double RAMHours = 0.0;
00163                 
00164                 while (stream.hasMoreInstances()
00165                                 && ((maxInstances < 0) || (instancesProcessed < maxInstances))
00166                                 && ((maxSeconds < 0) || (secondsElapsed < maxSeconds))) {
00167                         
00168                         Instances chunkInstances = new Instances(stream.getHeader(), chunkSize);
00169                         
00170                         while (stream.hasMoreInstances() && chunkInstances.numInstances() < chunkSize) {
00171                                 chunkInstances.add(stream.nextInstance());
00172                                 if (chunkInstances.numInstances()
00173                                                 % INSTANCES_BETWEEN_MONITOR_UPDATES == 0) {
00174                                         if (monitor.taskShouldAbort()) {
00175                                                 return null;
00176                                         }
00177                                         
00178                                         long estimatedRemainingInstances = stream.estimatedRemainingInstances();
00179                         
00180                                         if (maxInstances > 0) {
00181                                                 long maxRemaining = maxInstances - instancesProcessed;
00182                                                 if ((estimatedRemainingInstances < 0) || (maxRemaining < estimatedRemainingInstances)) {
00183                                                         estimatedRemainingInstances = maxRemaining;
00184                                                 }
00185                                         }
00186                                         
00187                                         monitor.setCurrentActivityFractionComplete((double) instancesProcessed/ (double) (instancesProcessed + estimatedRemainingInstances));
00188                                 }
00189                         }               
00190                         
00192                         long testStartTime = TimingUtils.getNanoCPUTimeOfCurrentThread();
00193                         if(!firstChunk)
00194                         {
00195                                 for (int i=0; i< chunkInstances.numInstances(); i++) {
00196                                         Instance testInst = (Instance) chunkInstances.instance(i).copy();
00197                                         //testInst.setClassMissing();
00198                                         double[] prediction = learner.getVotesForInstance(testInst);
00199                                         evaluator.addResult(testInst, prediction);
00200                             }
00201                         }
00202                         else
00203                         {
00204                                 firstChunk = false;
00205                         }
00206                         
00207                         sampleTestTime += TimingUtils.getNanoCPUTimeOfCurrentThread() - testStartTime;
00208                         
00210                         long trainStartTime = TimingUtils.getNanoCPUTimeOfCurrentThread();
00211                         
00212                         for (int i=0; i< chunkInstances.numInstances(); i++) {
00213                                 learner.trainOnInstance(chunkInstances.instance(i));
00214                                 instancesProcessed++;
00215                     }
00216                         
00217                         sampleTrainTime += TimingUtils.getNanoCPUTimeOfCurrentThread() - trainStartTime;
00218                         
00220                         if (instancesProcessed % this.sampleFrequencyOption.getValue() == 0) {
00221                                 
00222                                 double RAMHoursIncrement = learner.measureByteSize() / (1024.0 * 1024.0 * 1024.0); //GBs
00223                 RAMHoursIncrement *= (TimingUtils.nanoTimeToSeconds(sampleTrainTime + sampleTestTime) / 3600.0); //Hours
00224                 RAMHours += RAMHoursIncrement;
00225                                 
00226                                 double avgTrainTime = TimingUtils.nanoTimeToSeconds(sampleTrainTime)/((double)this.sampleFrequencyOption.getValue()/chunkInstances.numInstances());
00227                                 double avgTestTime = TimingUtils.nanoTimeToSeconds(sampleTestTime)/((double)this.sampleFrequencyOption.getValue()/chunkInstances.numInstances());
00228                                 
00229                                 sampleTestTime = 0;
00230                                 sampleTrainTime = 0;
00231                                 
00232                                 learningCurve.insertEntry(new LearningEvaluation(
00233                                         new Measurement[] {
00234                                                 new Measurement("learning evaluation instances", instancesProcessed),
00235                                                 new Measurement(("evaluation time ("+ (preciseCPUTiming ? "cpu " : "") + "seconds)"),TimingUtils.nanoTimeToSeconds(TimingUtils.getNanoCPUTimeOfCurrentThread() - evaluateStartTime)),
00236                                                 new Measurement("average chunk train time", avgTrainTime),
00237                                                 new Measurement("average chunk train speed", chunkInstances.numInstances() / avgTrainTime),
00238                                                 new Measurement("average chunk test time", avgTestTime),
00239                                                 new Measurement("average chunk test speed", chunkInstances.numInstances()/ avgTestTime),
00240                                                 new Measurement( "model cost (RAM-Hours)", RAMHours)}, 
00241                                         evaluator, 
00242                                         learner));
00243                                 
00244                                 if (immediateResultStream != null) {
00245                                         if (firstDump) {
00246                                                 immediateResultStream.println(learningCurve
00247                                                                 .headerToString());
00248                                                 firstDump = false;
00249                                         }
00250                                         immediateResultStream.println(learningCurve
00251                                                         .entryToString(learningCurve.numEntries() - 1));
00252                                         immediateResultStream.flush();
00253                                 }
00254                         }
00255                         
00257                         if (instancesProcessed % INSTANCES_BETWEEN_MONITOR_UPDATES == 0) {
00258                                 if (monitor.taskShouldAbort()) {
00259                                         return null;
00260                                 }
00261                                 long estimatedRemainingInstances = stream
00262                                                 .estimatedRemainingInstances();
00263                                 if (maxInstances > 0) {
00264                                         long maxRemaining = maxInstances - instancesProcessed;
00265                                         if ((estimatedRemainingInstances < 0)
00266                                                         || (maxRemaining < estimatedRemainingInstances)) {
00267                                                 estimatedRemainingInstances = maxRemaining;
00268                                         }
00269                                 }
00270                                 monitor
00271                                                 .setCurrentActivityFractionComplete(estimatedRemainingInstances < 0 ? -1.0
00272                                                                 : (double) instancesProcessed
00273                                                                                 / (double) (instancesProcessed + estimatedRemainingInstances));
00274                                 if (monitor.resultPreviewRequested()) {
00275                                         monitor.setLatestResultPreview(learningCurve.copy());
00276                                 }
00277                                 secondsElapsed = (int) TimingUtils
00278                                                 .nanoTimeToSeconds(TimingUtils
00279                                                                 .getNanoCPUTimeOfCurrentThread()
00280                                                                 - evaluateStartTime);
00281                         }
00282                 }
00283                 if (immediateResultStream != null) {
00284                         immediateResultStream.close();
00285                 }
00286                 return learningCurve;
00287         }
00288 
00289 }
 All Classes Namespaces Files Functions Variables Enumerations