MOA 12.03
Real Time Analytics for Data Streams
RunVisualizer.java
Go to the documentation of this file.
00001 /*
00002  *    RunVisualizer.java
00003  *    Copyright (C) 2010 RWTH Aachen University, Germany
00004  *    @author Jansen (moa@cs.rwth-aachen.de)
00005  *
00006  *    This program is free software; you can redistribute it and/or modify
00007  *    it under the terms of the GNU General Public License as published by
00008  *    the Free Software Foundation; either version 3 of the License, or
00009  *    (at your option) any later version.
00010  *
00011  *    This program is distributed in the hope that it will be useful,
00012  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *    GNU General Public License for more details.
00015  *
00016  *    You should have received a copy of the GNU General Public License
00017  *    along with this program. If not, see <http://www.gnu.org/licenses/>.
00018  *    
00019  */
00020 
00021 package moa.gui.visualization;
00022 
00023 import java.awt.Color;
00024 import java.awt.Component;
00025 import java.awt.event.ActionEvent;
00026 import java.awt.event.ActionListener;
00027 import java.io.BufferedWriter;
00028 import java.io.FileWriter;
00029 import java.io.IOException;
00030 import java.io.PrintWriter;
00031 import java.util.ArrayList;
00032 import java.util.Iterator;
00033 import java.util.LinkedList;
00034 import java.util.logging.Level;
00035 import java.util.logging.Logger;
00036 import moa.cluster.Cluster;
00037 import moa.clusterers.ClusterGenerator;
00038 import moa.cluster.Clustering;
00039 import moa.clusterers.AbstractClusterer;
00040 import moa.evaluation.MeasureCollection;
00041 import moa.gui.TextViewerPanel;
00042 import moa.gui.clustertab.ClusteringVisualEvalPanel;
00043 import moa.gui.clustertab.ClusteringVisualTab;
00044 import moa.streams.clustering.ClusterEvent;
00045 import weka.core.Instance;
00046 import moa.gui.clustertab.ClusteringSetupTab;
00047 import moa.streams.clustering.ClusterEventListener;
00048 import moa.streams.clustering.ClusteringStream;
00049 import moa.streams.clustering.RandomRBFGeneratorEvents;
00050 import weka.core.Attribute;
00051 import weka.core.DenseInstance;
00052 import weka.core.FastVector;
00053 import weka.core.Instances;
00054 
00055 public class RunVisualizer implements Runnable, ActionListener, ClusterEventListener{
00056 
00057         
00059     public static final int initialPauseInterval = 5000;
00060     
00062     private int m_wait_frequency = 1000;
00063     
00067     private int m_redrawInterval = 100;
00068 
00069     
00070     /* flags to control the run behavior */
00071     private static boolean work;
00072     private boolean stop = false;
00073     
00074     /* total amount of processed instances */
00075     private static int timestamp;
00076     private static int lastPauseTimestamp;
00077             
00078     /* amount of instances to process in one step*/
00079     private int m_processFrequency;
00080     
00081     /* the stream that delivers the instances */
00082     private final ClusteringStream m_stream0;
00083     
00084     /* amount of relevant instances; older instances will be dropped;
00085        creates the 'sliding window' over the stream; 
00086        is strongly connected to the decay rate and decay threshold*/
00087     private int m_stream0_decayHorizon;
00088 
00089     /* the decay threshold defines the minimum weight of an instance to be relevant */
00090     private double m_stream0_decay_threshold;
00091     
00092     /* the decay rate of the stream, often reffered to as lambda;
00093        is being calculated from the horizion and the threshold 
00094        as these are more intuitive to define */
00095     private double m_stream0_decay_rate;
00096     
00097     
00098     /* the clusterer */
00099     private AbstractClusterer m_clusterer0;
00100     private AbstractClusterer m_clusterer1;
00101 
00102     /* the measure collections contain all the measures */
00103     private MeasureCollection[] m_measures0 = null;
00104     private MeasureCollection[] m_measures1 = null;
00105 
00106     /* left and right stream panel that datapoints and clusterings will be drawn to */
00107     private StreamPanel m_streampanel0;
00108     private StreamPanel m_streampanel1;
00109 
00110     /* panel that shows the evaluation results */
00111     private ClusteringVisualEvalPanel m_evalPanel;
00112     
00113     /* panel to hold the graph */
00114     private GraphCanvas m_graphcanvas;
00115     
00116     /* reference to the visual panel */
00117     private ClusteringVisualTab m_visualPanel;
00118 
00119     /* all possible clusterings */
00120     //not pretty to have all the clusterings, but otherwise we can't just redraw clusterings
00121     private Clustering gtClustering0 = null;
00122     private Clustering gtClustering1 = null;
00123     private Clustering macro0 = null;
00124     private Clustering macro1 = null;
00125     private Clustering micro0 = null;
00126     private Clustering micro1 = null;
00127 
00128     /* holds all the events that have happend, if the stream supports events */
00129     private ArrayList<ClusterEvent> clusterEvents;
00130     
00131     /* reference to the log panel */
00132     private final TextViewerPanel m_logPanel;
00133     
00134     public RunVisualizer(ClusteringVisualTab visualPanel, ClusteringSetupTab clusteringSetupTab){
00135         m_visualPanel = visualPanel;
00136         m_streampanel0 = visualPanel.getLeftStreamPanel();
00137         m_streampanel1 = visualPanel.getRightStreamPanel();
00138         m_graphcanvas = visualPanel.getGraphCanvas();
00139         m_evalPanel = visualPanel.getEvalPanel();
00140         m_logPanel = clusteringSetupTab.getLogPanel();
00141 
00142         m_stream0 = clusteringSetupTab.getStream0();
00143         m_stream0_decayHorizon = m_stream0.getDecayHorizon();
00144         m_stream0_decay_threshold = m_stream0.getDecayThreshold();
00145         m_stream0_decay_rate = (Math.log(1.0/m_stream0_decay_threshold)/Math.log(2)/m_stream0_decayHorizon);
00146 
00147         timestamp = 0;
00148         lastPauseTimestamp = 0;
00149         work = true;
00150 
00151 
00152         if(m_stream0 instanceof RandomRBFGeneratorEvents){
00153             ((RandomRBFGeneratorEvents)m_stream0).addClusterChangeListener(this);
00154             clusterEvents = new ArrayList<ClusterEvent>();
00155             m_graphcanvas.setClusterEventsList(clusterEvents);
00156         }
00157         m_stream0.prepareForUse();
00158 
00159         m_clusterer0 = clusteringSetupTab.getClusterer0();
00160         m_clusterer0.prepareForUse();
00161         
00162         
00163         m_clusterer1 = clusteringSetupTab.getClusterer1();
00164         if(m_clusterer1!=null){
00165             m_clusterer1.prepareForUse();
00166         }
00167         
00168         m_measures0 = clusteringSetupTab.getMeasures();
00169         m_measures1 = clusteringSetupTab.getMeasures();
00170 
00171 
00172         /* TODO this option needs to move from the stream panel to the setup panel */
00173         m_processFrequency = m_stream0.getEvaluationFrequency();
00174 
00175         //get those values from the generator
00176         int dims = m_stream0.numAttsOption.getValue();
00177         visualPanel.setDimensionComobBoxes(dims);
00178         visualPanel.setPauseInterval(initialPauseInterval);
00179 
00180         m_evalPanel.setMeasures(m_measures0, m_measures1, this);
00181         m_graphcanvas.setGraph(m_measures0[0], m_measures1[0],0,m_processFrequency);
00182     }
00183 
00184 
00185     public void run() {
00186             runVisual();
00187     }
00188 
00189 
00190     public void runVisual() {
00191         int processCounter = 0;
00192         int speedCounter = 0;
00193         LinkedList<DataPoint> pointBuffer0 = new LinkedList<DataPoint>();
00194         LinkedList<DataPoint> pointBuffer1 = new LinkedList<DataPoint>();
00195         ArrayList<DataPoint> pointarray0 = null;
00196         ArrayList<DataPoint> pointarray1 = null;
00197 
00198 
00199         while(work || processCounter!=0){
00200             if (m_stream0.hasMoreInstances()) {
00201                 timestamp++;
00202                 speedCounter++;
00203                 processCounter++;
00204                 if(timestamp%100 == 0){
00205                     m_visualPanel.setProcessedPointsCounter(timestamp);
00206                 }
00207 
00208                 Instance next0 = m_stream0.nextInstance();
00209                 DataPoint point0 = new DataPoint(next0,timestamp);
00210 
00211                 pointBuffer0.add(point0);
00212                 while(pointBuffer0.size() > m_stream0_decayHorizon){
00213                     pointBuffer0.removeFirst();
00214                 }
00215 
00216                 DataPoint point1 = null;
00217                 if(m_clusterer1!=null){
00218                         point1 = new DataPoint(next0,timestamp);
00219                         pointBuffer1.add(point1);
00220                         while(pointBuffer1.size() > m_stream0_decayHorizon){
00221                             pointBuffer1.removeFirst();
00222                         }
00223                 }
00224 
00225                 if(m_visualPanel.isEnabledDrawPoints()){
00226                     m_streampanel0.drawPoint(point0);
00227                     if(m_clusterer1!=null)
00228                         m_streampanel1.drawPoint(point1);
00229                     if(processCounter%m_redrawInterval==0){
00230                         m_streampanel0.applyDrawDecay(m_stream0_decayHorizon/(float)(m_redrawInterval));
00231                         if(m_clusterer1!=null)
00232                             m_streampanel1.applyDrawDecay(m_stream0_decayHorizon/(float)(m_redrawInterval));
00233                     }
00234                 }
00235 
00236                 Instance traininst0 = new DenseInstance(point0);
00237                 if(m_clusterer0.keepClassLabel())
00238                     traininst0.setDataset(point0.dataset());
00239                 else
00240                     traininst0.deleteAttributeAt(point0.classIndex());
00241                 m_clusterer0.trainOnInstanceImpl(traininst0);
00242                 
00243                 
00244                 if(m_clusterer1!=null){
00245                     Instance traininst1 = new DenseInstance(point1);
00246                     if(m_clusterer1.keepClassLabel())
00247                         traininst1.setDataset(point1.dataset());
00248                     else
00249                         traininst1.deleteAttributeAt(point1.classIndex());
00250                     m_clusterer1.trainOnInstanceImpl(traininst1);
00251                 }
00252 
00253                 if (processCounter >= m_processFrequency) {
00254                     processCounter = 0;
00255                     for(DataPoint p:pointBuffer0)
00256                         p.updateWeight(timestamp, m_stream0_decay_rate);
00257 
00258                     pointarray0 = new ArrayList<DataPoint>(pointBuffer0);
00259                                 
00260                     if(m_clusterer1!=null){
00261                         for(DataPoint p:pointBuffer1)
00262                             p.updateWeight(timestamp, m_stream0_decay_rate);
00263 
00264                                 pointarray1 = new ArrayList<DataPoint>(pointBuffer1);   
00265                     }
00266                     
00267                     
00268                     processClusterings(pointarray0, pointarray1);
00269 
00270                     int pauseInterval = m_visualPanel.getPauseInterval();
00271                     if(pauseInterval!=0 && lastPauseTimestamp+pauseInterval<=timestamp){
00272                         m_visualPanel.toggleVisualizer(true);
00273                     }
00274                         
00275                 }
00276             } else {
00277                 System.out.println("DONE");
00278                 return;
00279             }
00280             if(speedCounter > m_wait_frequency*30 && m_wait_frequency < 15){
00281                 try {
00282                     synchronized (this) {
00283                         if(m_wait_frequency == 0)
00284                             wait(50);
00285                         else
00286                             wait(1);
00287                     }
00288                 } catch (InterruptedException ex) {
00289                     
00290                 }
00291                 speedCounter = 0;
00292             }
00293         }
00294         if(!stop){
00295             m_streampanel0.drawPointPanels(pointarray0, timestamp, m_stream0_decay_rate, m_stream0_decay_threshold);
00296             if(m_clusterer1!=null)
00297                 m_streampanel1.drawPointPanels(pointarray1, timestamp, m_stream0_decay_rate, m_stream0_decay_threshold);
00298             work_pause();
00299         }
00300     }
00301 
00302     private void processClusterings(ArrayList<DataPoint> points0, ArrayList<DataPoint> points1){
00303         gtClustering0 = new Clustering(points0);
00304         gtClustering1 = new Clustering(points1);
00305 
00306         Clustering evalClustering0 = null;
00307         Clustering evalClustering1 = null;
00308 
00309         //special case for ClusterGenerator
00310         if(gtClustering0!= null){
00311             if(m_clusterer0 instanceof ClusterGenerator)
00312                 ((ClusterGenerator)m_clusterer0).setSourceClustering(gtClustering0);
00313             if(m_clusterer1 instanceof ClusterGenerator)
00314                 ((ClusterGenerator)m_clusterer1).setSourceClustering(gtClustering1);
00315         }
00316 
00317         macro0 = m_clusterer0.getClusteringResult();
00318         evalClustering0 = macro0;
00319 
00320         
00321         //TODO: should we check if micro/macro is being drawn or needed for evaluation and skip otherwise to speed things up?
00322         if(m_clusterer0.implementsMicroClusterer()){
00323             micro0 = m_clusterer0.getMicroClusteringResult();
00324             if(macro0 == null && micro0 != null){
00325                 //TODO: we need a Macro Clusterer Interface and the option for kmeans to use the non optimal centers   
00326                 macro0 = moa.clusterers.KMeans.gaussianMeans(gtClustering0, micro0);
00327             }
00328             if(m_clusterer0.evaluateMicroClusteringOption.isSet())
00329                 evalClustering0 = micro0;
00330             else
00331                 evalClustering0 = macro0;
00332         }
00333 
00334         if(m_clusterer1!=null){
00335             macro1 = m_clusterer1.getClusteringResult();
00336             evalClustering1 = macro1;
00337             if(m_clusterer1.implementsMicroClusterer()){
00338                 micro1 = m_clusterer1.getMicroClusteringResult();
00339                 if(macro1 == null && micro1 != null){
00340                         macro1 = moa.clusterers.KMeans.gaussianMeans(gtClustering1, micro1);
00341                 }
00342                 if(m_clusterer1.evaluateMicroClusteringOption.isSet())
00343                     evalClustering1 = micro1;
00344                 else
00345                     evalClustering1 = macro1;
00346             }
00347         }
00348         
00349         evaluateClustering(evalClustering0, gtClustering0, points0, true);
00350         evaluateClustering(evalClustering1, gtClustering1, points1, false);
00351 
00352         drawClusterings();
00353     }
00354 
00355     private void evaluateClustering(Clustering found_clustering, Clustering trueClustering, ArrayList<DataPoint> points, boolean algorithm0){
00356         StringBuilder sb = new StringBuilder();
00357         for (int i = 0; i < m_measures0.length; i++) {
00358             if(algorithm0){
00359                 if(found_clustering!=null && found_clustering.size() > 0){
00360                     try {
00361                         double msec = m_measures0[i].evaluateClusteringPerformance(found_clustering, trueClustering, points);
00362                         sb.append(m_measures0[i].getClass().getSimpleName()+" took "+msec+"ms (Mean:"+m_measures0[i].getMeanRunningTime()+")");
00363                         sb.append("\n");
00364 
00365                     } catch (Exception ex) { ex.printStackTrace(); }
00366                 }
00367                 else{
00368                     for(int j = 0; j < m_measures0[i].getNumMeasures(); j++){
00369                         m_measures0[i].addEmptyValue(j);
00370                     }
00371                 }
00372             }
00373             else{
00374                 if(m_clusterer1!=null && found_clustering!=null && found_clustering.size() > 0){
00375                     try {
00376                         double msec = m_measures1[i].evaluateClusteringPerformance(found_clustering, trueClustering, points);
00377                         sb.append(m_measures1[i].getClass().getSimpleName()+" took "+msec+"ms (Mean:"+m_measures1[i].getMeanRunningTime()+")");
00378                         sb.append("\n");
00379                     }
00380                     catch (Exception ex) { ex.printStackTrace(); }
00381                 }
00382                 else{
00383                     for(int j = 0; j < m_measures1[i].getNumMeasures(); j++){
00384                         m_measures1[i].addEmptyValue(j);
00385                     }
00386                 }
00387             }
00388         }
00389         m_logPanel.setText(sb.toString());
00390         m_evalPanel.update();
00391         m_graphcanvas.updateCanvas();
00392     }
00393 
00394     public void drawClusterings(){
00395         if(macro0!= null && macro0.size() > 0)
00396                 m_streampanel0.drawMacroClustering(macro0, Color.BLUE);
00397         if(micro0!= null && micro0.size() > 0)
00398                 m_streampanel0.drawMicroClustering(micro0, Color.GREEN);
00399         if(gtClustering0!= null && gtClustering0.size() > 0)
00400             m_streampanel0.drawGTClustering(gtClustering0, Color.BLACK);
00401 
00402         if(m_clusterer1!=null){
00403             if(macro1!= null && macro1.size() > 0)
00404                     m_streampanel1.drawMacroClustering(macro1, Color.BLUE);
00405             if(micro1!= null && micro1.size() > 0)
00406                     m_streampanel1.drawMicroClustering(micro1, Color.GREEN);
00407             if(gtClustering1!= null && gtClustering1.size() > 0)
00408                 m_streampanel1.drawGTClustering(gtClustering1, Color.BLACK);
00409         }
00410     }
00411 
00412     public void redraw(){
00413         m_streampanel0.repaint();
00414         m_streampanel1.repaint();
00415     }
00416 
00417 
00418     public static int getCurrentTimestamp(){
00419         return timestamp;
00420     }
00421 
00422     private void work_pause(){
00423         while(!work && !stop){
00424             try {
00425                 synchronized (this) {
00426                     wait(1000);
00427                 }
00428             } catch (InterruptedException ex) {
00429                 
00430             }
00431        }
00432        run();
00433     }
00434 
00435     public static void pause(){
00436         work = false;
00437         lastPauseTimestamp = timestamp;
00438     }
00439 
00440     public static void resume(){
00441         work = true;
00442     }
00443 
00444     public void stop(){
00445         work = false;
00446         stop = true;
00447     }
00448 
00449     public void setSpeed(int speed) {
00450         m_wait_frequency = speed;
00451     }
00452 
00453     public void actionPerformed(ActionEvent e) {
00454         //reacte on graph selection and find out which measure was selected
00455         int selected = Integer.parseInt(e.getActionCommand());
00456         int counter = selected;
00457         int m_select = 0;
00458         int m_select_offset = 0;
00459         boolean found = false;
00460         for (int i = 0; i < m_measures0.length; i++) {
00461             for (int j = 0; j < m_measures0[i].getNumMeasures(); j++) {
00462                 if(m_measures0[i].isEnabled(j)){
00463                         counter--;
00464                     if(counter<0){
00465                         m_select = i;
00466                         m_select_offset = j;
00467                         found = true;
00468                         break;
00469                     }
00470                 }
00471             }
00472             if(found) break;
00473         }
00474         m_graphcanvas.setGraph(m_measures0[m_select], m_measures1[m_select],m_select_offset,m_processFrequency);
00475     }
00476 
00477     public void setPointLayerVisibility(boolean selected) {
00478         m_streampanel0.setPointVisibility(selected);
00479         m_streampanel1.setPointVisibility(selected);
00480     }
00481     public void setMicroLayerVisibility(boolean selected) {
00482         m_streampanel0.setMicroLayerVisibility(selected);
00483         m_streampanel1.setMicroLayerVisibility(selected);
00484     }
00485     public void setMacroVisibility(boolean selected) {
00486         m_streampanel0.setMacroLayerVisibility(selected);
00487         m_streampanel1.setMacroLayerVisibility(selected);
00488     }
00489     public void setGroundTruthVisibility(boolean selected) {
00490         m_streampanel0.setGroundTruthLayerVisibility(selected);
00491         m_streampanel1.setGroundTruthLayerVisibility(selected);
00492     }
00493 
00494     public void changeCluster(ClusterEvent e) {
00495         if(clusterEvents!=null) clusterEvents.add(e);
00496         System.out.println(e.getType()+": "+e.getMessage());
00497     }
00498 
00499 
00500 
00501     public void exportCSV(String filepath) {
00502         PrintWriter out = null;
00503         try {
00504             if(!filepath.endsWith(".csv"))
00505                 filepath+=".csv";
00506             out = new PrintWriter(new BufferedWriter(new FileWriter(filepath)));
00507             String del = ";";
00508 
00509             Iterator<ClusterEvent> eventIt = null;
00510             ClusterEvent event = null;
00511             if(clusterEvents!=null && clusterEvents.size() > 0){
00512                 eventIt = clusterEvents.iterator();
00513                 event = eventIt.next();
00514             }
00515                  
00516             //raw data
00517             MeasureCollection measurecol[][] = new MeasureCollection[2][];
00518             measurecol[0] = m_measures0;
00519             measurecol[1] = m_measures1;
00520             int numValues = 0;
00521             //header
00522             out.write("Nr"+del);
00523             out.write("Event"+del);
00524             for (int m = 0; m < 2; m++) {
00525                 for (int i = 0; i < measurecol[m].length; i++) {
00526                     for (int j = 0; j < measurecol[m][i].getNumMeasures(); j++) {
00527                         if(measurecol[m][i].isEnabled(j)){
00528                             out.write(m+"-"+measurecol[m][i].getName(j)+del);
00529                             numValues = measurecol[m][i].getNumberOfValues(j);
00530                         }
00531                     }
00532                 }
00533             }
00534             out.write("\n");
00535 
00536 
00537             //rows
00538             for (int v = 0; v < numValues; v++){
00539                 //Nr
00540                 out.write(v+del);
00541 
00542                 //events
00543                 if(event!=null && event.getTimestamp()<=m_stream0_decayHorizon*v){
00544                     out.write(event.getType()+del);
00545                     if(eventIt!= null && eventIt.hasNext()){
00546                         event=eventIt.next();
00547                     }
00548                     else
00549                         event = null;
00550                 }
00551                 else
00552                     out.write(del);
00553 
00554                 //values
00555                 for (int m = 0; m < 2; m++) {
00556                     for (int i = 0; i < measurecol[m].length; i++) {
00557                         for (int j = 0; j < measurecol[m][i].getNumMeasures(); j++) {
00558                             if(measurecol[m][i].isEnabled(j)){
00559                                         double value = measurecol[m][i].getValue(j, v);
00560                                         if(Double.isNaN(value))
00561                                                 out.write(del);
00562                                         else
00563                                                 out.write(value+del);
00564                             }
00565                         }
00566                     }
00567                 }
00568                 out.write("\n");
00569             }
00570             out.close();
00571         } catch (IOException ex) {
00572             Logger.getLogger(RunVisualizer.class.getName()).log(Level.SEVERE, null, ex);
00573         } finally {
00574             out.close();
00575         }
00576     }
00577 
00578     public void weka() {
00579         try{
00580                 Class.forName("weka.gui.Logger");
00581         }
00582         catch (Exception e){
00583                 m_logPanel.addText("Please add weka.jar to the classpath to use the Weka explorer.");
00584                 return;
00585         }       
00586         
00587         
00588         Clustering wekaClustering;
00589         if(m_clusterer0.implementsMicroClusterer() && m_clusterer0.evaluateMicroClusteringOption.isSet())
00590             wekaClustering = micro0;
00591         else
00592             wekaClustering = macro0;
00593 
00594         if(wekaClustering == null || wekaClustering.size()==0){
00595             m_logPanel.addText("Empty Clustering");
00596             return;
00597         }
00598 
00599         int dims = wekaClustering.get(0).getCenter().length;
00600         FastVector attributes = new FastVector();
00601         for(int i = 0; i < dims; i++)
00602                 attributes.addElement( new Attribute("att" + i) );
00603 
00604         Instances instances = new Instances("trainset",attributes,0);
00605 
00606         for(int c = 0; c < wekaClustering.size(); c++){
00607             Cluster cluster = wekaClustering.get(c);
00608             Instance inst = new DenseInstance(cluster.getWeight(), cluster.getCenter());
00609             inst.setDataset(instances);
00610             instances.add(inst);
00611         }
00612 
00613         WekaExplorer explorer = new WekaExplorer(instances);
00614     }
00615 
00616 
00617 }
00618 
 All Classes Namespaces Files Functions Variables Enumerations