MOA 12.03
Real Time Analytics for Data Streams
BatchCmd.java
Go to the documentation of this file.
00001 /*
00002  *    BatchCmd.java
00003  *    Copyright (C) 2010 RWTH Aachen University, Germany
00004  *    @author Jansen (moa@cs.rwth-aachen.de)
00005  *
00006  *    This program is free software; you can redistribute it and/or modify
00007  *    it under the terms of the GNU General Public License as published by
00008  *    the Free Software Foundation; either version 3 of the License, or
00009  *    (at your option) any later version.
00010  *
00011  *    This program is distributed in the hope that it will be useful,
00012  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *    GNU General Public License for more details.
00015  *
00016  *    You should have received a copy of the GNU General Public License
00017  *    along with this program. If not, see <http://www.gnu.org/licenses/>.
00018  *    
00019  */
00020 
00021 package moa.gui;
00022 
00023 import java.io.BufferedWriter;
00024 import java.io.FileWriter;
00025 import java.io.IOException;
00026 import java.io.PrintWriter;
00027 import java.util.ArrayList;
00028 import java.util.Iterator;
00029 import java.util.logging.Level;
00030 import java.util.logging.Logger;
00031 import moa.clusterers.ClusterGenerator;
00032 import moa.cluster.Clustering;
00033 import moa.clusterers.AbstractClusterer;
00034 import moa.clusterers.clustream.Clustream;
00035 import moa.evaluation.F1;
00036 import moa.evaluation.General;
00037 import moa.evaluation.MeasureCollection;
00038 import moa.evaluation.SSQ;
00039 import moa.evaluation.SilhouetteCoefficient;
00040 import moa.evaluation.StatisticalCollection;
00041 import moa.evaluation.EntropyCollection;
00042 import moa.gui.visualization.DataPoint;
00043 import moa.gui.visualization.RunVisualizer;
00044 import moa.streams.clustering.ClusterEvent;
00045 import weka.core.Instance;
00046 import moa.streams.clustering.ClusterEventListener;
00047 import moa.streams.clustering.ClusteringStream;
00048 import moa.streams.clustering.RandomRBFGeneratorEvents;
00049 import weka.core.DenseInstance;
00050 
00051 public class BatchCmd implements ClusterEventListener{
00052 
00053         private ArrayList<ClusterEvent> clusterEvents;
00054         private AbstractClusterer clusterer;
00055         private ClusteringStream stream;
00056         private MeasureCollection[] measures;
00057 
00058         private int totalInstances;
00059         public boolean useMicroGT = false;
00060 
00061 
00062         public BatchCmd(AbstractClusterer clusterer, ClusteringStream stream, MeasureCollection[] measures, int totalInstances){
00063                 this.clusterer = clusterer;
00064                 this.stream = stream;
00065                 this.totalInstances = totalInstances;
00066                 this.measures = measures;
00067 
00068                 if(stream instanceof RandomRBFGeneratorEvents){
00069                         ((RandomRBFGeneratorEvents)stream).addClusterChangeListener(this);
00070                         clusterEvents = new ArrayList<ClusterEvent>();
00071                 }
00072                 else{
00073                         clusterEvents = null;
00074                 }
00075                 stream.prepareForUse();
00076                 clusterer.prepareForUse();
00077         }
00078 
00079         private ArrayList<ClusterEvent> getEventList(){
00080                 return clusterEvents;
00081         }
00082 
00083         @SuppressWarnings("unchecked")
00084         private static ArrayList<Class> getMeasureSelection(int selection){
00085                 ArrayList<Class>mclasses = new ArrayList<Class>();
00086                 mclasses.add(EntropyCollection.class);
00087                 mclasses.add(F1.class);
00088                 mclasses.add(General.class);
00089                 mclasses.add(SSQ.class);
00090                 mclasses.add(SilhouetteCoefficient.class);
00091                 mclasses.add(StatisticalCollection.class);
00092 
00093                 return mclasses;
00094         }
00095 
00096 
00097         /* TODO read args from command line */
00098         public static void main(String[] args){
00099                 RandomRBFGeneratorEvents stream = new RandomRBFGeneratorEvents();
00100                 AbstractClusterer clusterer = new Clustream();
00101                 int measureCollectionType = 0;
00102                 int amountInstances = 20000;
00103                 String testfile = "d:\\data\\test.csv";
00104 
00105                 runBatch(stream, clusterer, measureCollectionType, amountInstances, testfile);
00106         }
00107 
00108 
00109         public static void runBatch(RandomRBFGeneratorEvents stream, AbstractClusterer clusterer,
00110                         int measureCollectionType, int amountInstances, String outputFile){
00111                 // create the measure collection 
00112                 MeasureCollection[] measures = getMeasures(getMeasureSelection(measureCollectionType));
00113                 
00114                 // run the batch job
00115                 BatchCmd batch = new BatchCmd(clusterer, stream, measures, amountInstances);
00116                 batch.run();
00117 
00118                 // read events and horizon
00119                 ArrayList<ClusterEvent> clusterEvents = batch.getEventList();
00120                 int horizon = stream.decayHorizonOption.getValue();
00121                 
00122                 // write results to file
00123                 exportCSV(outputFile, clusterEvents, measures, horizon);
00124         }
00125 
00126 
00127         public void run(){
00128                 ArrayList<DataPoint> pointBuffer0 = new ArrayList<DataPoint>();
00129                 int m_timestamp = 0;
00130                 int decayHorizon = stream.getDecayHorizon();
00131 
00132                 double decay_threshold = stream.getDecayThreshold();
00133                 double decay_rate = (-1*Math.log(decay_threshold)/decayHorizon);
00134 
00135                 int counter = decayHorizon;
00136 
00137                 while(m_timestamp < totalInstances && stream.hasMoreInstances()){
00138                         m_timestamp++;
00139                         counter--;
00140                         Instance next = stream.nextInstance();
00141                         DataPoint point0 = new DataPoint(next,m_timestamp);
00142                         pointBuffer0.add(point0);
00143 
00144                         Instance traininst0 = new DenseInstance(point0);
00145                         if(clusterer instanceof ClusterGenerator)
00146                                 traininst0.setDataset(point0.dataset());
00147                         else
00148                                 traininst0.deleteAttributeAt(point0.classIndex());
00149 
00150                         clusterer.trainOnInstanceImpl(traininst0);
00151 
00152                         if(counter <= 0){
00153                                 //                if(m_timestamp%(totalInstances/10) == 0)
00154                                         //                    System.out.println("Thread"+threadID+":"+(m_timestamp*100/totalInstances)+"% ");
00155                                 for(DataPoint p:pointBuffer0)
00156                                         p.updateWeight(m_timestamp, decay_rate);
00157 
00158                                 Clustering gtClustering0;
00159                                 Clustering clustering0 = null;
00160 
00161                                 gtClustering0 = new Clustering(pointBuffer0);
00162                                 if(useMicroGT && stream instanceof RandomRBFGeneratorEvents){
00163                                         gtClustering0 = ((RandomRBFGeneratorEvents)stream).getMicroClustering();
00164                                 }
00165 
00166                                 clustering0 = clusterer.getClusteringResult();
00167                                 if(clusterer.implementsMicroClusterer()){
00168                                         if(clusterer instanceof ClusterGenerator
00169                                                         && stream instanceof RandomRBFGeneratorEvents){
00170                                                 ((ClusterGenerator)clusterer).setSourceClustering(((RandomRBFGeneratorEvents)stream).getMicroClustering());
00171                                         }
00172                                         Clustering microC = clusterer.getMicroClusteringResult();
00173                                         if(clusterer.evaluateMicroClusteringOption.isSet()){
00174                                                 clustering0 = microC;
00175                                         }
00176                                         else{
00177                                                 if(clustering0 == null && microC != null)
00178                                                         clustering0 = moa.clusterers.KMeans.gaussianMeans(gtClustering0, microC);
00179                                         }
00180                                 }
00181 
00182 
00183                                 //evaluate
00184                                 for (int i = 0; i < measures.length; i++) {
00185                                         try {
00186                                                 /*double sec =*/ measures[i].evaluateClusteringPerformance(clustering0, gtClustering0, pointBuffer0);
00187                                                 //System.out.println("Eval of "+measures[i].getClass().getSimpleName()+" at "+m_timestamp+" took "+sec);
00188                                         } catch (Exception ex) { ex.printStackTrace(); }
00189                                 }
00190 
00191                                 pointBuffer0.clear();
00192                                 counter = decayHorizon;
00193                         }
00194                 }
00195         }
00196 
00197         @SuppressWarnings("unchecked")
00198         private static MeasureCollection[] getMeasures(ArrayList<Class> measure_classes){
00199                 MeasureCollection[] measures = new MeasureCollection[measure_classes.size()];
00200                 for (int i = 0; i < measure_classes.size(); i++) {
00201                         try {
00202                                 MeasureCollection m = (MeasureCollection)measure_classes.get(i).newInstance();
00203                                 measures[i] = m;
00204 
00205                         } catch (Exception ex) {
00206                                 Logger.getLogger("Couldn't create Instance for "+measure_classes.get(i).getName());
00207                                 ex.printStackTrace();
00208                         }
00209                 }
00210                 return measures;
00211         }
00212 
00213         public void changeCluster(ClusterEvent e) {
00214                 if(clusterEvents!=null) clusterEvents.add(e);
00215         }
00216 
00217 
00218         public static void exportCSV(String filepath, ArrayList<ClusterEvent> clusterEvents, MeasureCollection[] measures, int horizon) {
00219                 PrintWriter out = null;
00220                 try {
00221                         if(!filepath.endsWith(".csv"))
00222                                 filepath+=".csv";
00223                         out = new PrintWriter(new BufferedWriter(new FileWriter(filepath)));
00224                         String del = ";";
00225 
00226                         Iterator<ClusterEvent> eventIt = null;
00227                         ClusterEvent event = null;
00228                         if(clusterEvents.size() > 0){
00229                                 eventIt = clusterEvents.iterator();
00230                                 event = eventIt.next();
00231                         }
00232 
00233                         int numValues = 0;
00234                         //header
00235                         out.write("Nr"+del);
00236                         out.write("Event"+del);
00237                         for (int m = 0; m < 1; m++) {
00238                                 for (int i = 0; i < measures.length; i++) {
00239                                         for (int j = 0; j < measures[i].getNumMeasures(); j++) {
00240                                                 if(measures[i].isEnabled(j)){
00241                                                         out.write(measures[i].getName(j)+del);
00242                                                         numValues = measures[i].getNumberOfValues(j);
00243                                                 }
00244                                         }
00245                                 }
00246                         }
00247                         out.write("\n");
00248 
00249 
00250                         //rows
00251                         for (int v = 0; v < numValues; v++){
00252                                 //Nr
00253                                 out.write(v+del);
00254 
00255                                 //events
00256                                 if(event!=null && event.getTimestamp()<=horizon){
00257                                         out.write(event.getType()+del);
00258                                         if(eventIt!= null && eventIt.hasNext()){
00259                                                 event=eventIt.next();
00260                                         }
00261                                         else
00262                                                 event = null;
00263                                 }
00264                                 else
00265                                         out.write(del);
00266 
00267                                 //values
00268                                 for (int m = 0; m < 1; m++) {
00269                                         for (int i = 0; i < measures.length; i++) {
00270                                                 for (int j = 0; j < measures[i].getNumMeasures(); j++) {
00271                                                         if(measures[i].isEnabled(j)){
00272                                                                 out.write(measures[i].getValue(j, v)+del);
00273                                                         }
00274                                                 }
00275                                         }
00276                                 }
00277                                 out.write("\n");
00278                         }
00279                         out.close();
00280                 } catch (IOException ex) {
00281                         Logger.getLogger(RunVisualizer.class.getName()).log(Level.SEVERE, null, ex);
00282                 } finally {
00283                         out.close();
00284                 }
00285         }
00286 }
00287 
 All Classes Namespaces Files Functions Variables Enumerations