MOA 12.03
Real Time Analytics for Data Streams
FileStream.java
Go to the documentation of this file.
00001 /*
00002  *    ClusteringStream.java
00003  *    Copyright (C) 2010 RWTH Aachen University, Germany
00004  *    @author Jansen (moa@cs.rwth-aachen.de)
00005  *
00006  *    This program is free software; you can redistribute it and/or modify
00007  *    it under the terms of the GNU General Public License as published by
00008  *    the Free Software Foundation; either version 3 of the License, or
00009  *    (at your option) any later version.
00010  *
00011  *    This program is distributed in the hope that it will be useful,
00012  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *    GNU General Public License for more details.
00015  *
00016  *    You should have received a copy of the GNU General Public License
00017  *    along with this program. If not, see <http://www.gnu.org/licenses/>.
00018  *    
00019  */
00020 
00021 package moa.streams.clustering;
00022 
00023 import java.io.BufferedReader;
00024 import java.io.FileInputStream;
00025 import java.io.IOException;
00026 import java.io.InputStream;
00027 import java.io.InputStreamReader;
00028 import java.io.Reader;
00029 import java.util.ArrayList;
00030 import java.util.Arrays;
00031 import java.util.HashSet;
00032 
00033 import moa.core.InputStreamProgressMonitor;
00034 import moa.core.InstancesHeader;
00035 import moa.core.ObjectRepository;
00036 import moa.options.FileOption;
00037 import moa.options.FlagOption;
00038 import moa.options.IntOption;
00039 import moa.options.ListOption;
00040 import moa.options.Option;
00041 import moa.tasks.TaskMonitor;
00042 
00043 import weka.core.Instance;
00044 import weka.core.Instances;
00045 
00046 public class FileStream extends ClusteringStream{
00047 
00048         @Override
00049         public String getPurposeString() {
00050                 return "A stream read from an ARFF file. HINT: Visualization only works correctly with numerical 0-1 normalized attributes!";
00051         }
00052 
00053         private static final long serialVersionUID = 1L;
00054 
00055 
00056     String defaultfile = "";
00057 
00058         public FileOption arffFileOption = new FileOption("arffFile", 'f',
00059                         "ARFF file to load.", defaultfile, "arff", false);
00060 
00061         public IntOption classIndexOption = new IntOption(
00062                         "classIndex",
00063                         'c',
00064                         "Class index of data. 0 for none or -1 for last attribute in file.",
00065                         -1, -1, Integer.MAX_VALUE);
00066 
00067     public FlagOption normalizeOption = 
00068                 new FlagOption("normalize", 'n', 
00069                                 "Numerical data will be normalized to 0-1 " +
00070                                 "for the visualization to work. The complete arff file needs to be read upfront.");
00071 
00072     public ListOption removeAttributesOption = new ListOption("removeAttributes", 'r',
00073             "Attributes to remove. Enter comma seperated list, " +
00074             "starting with 1 for first attribute.", 
00075             new IntOption("removeAttribute", ' ', "Attribute to remove.",-1),
00076             new Option[0], ',');        
00077         
00078     public FlagOption keepNonNumericalAttrOption = 
00079                 new FlagOption("keepNonNumericalAttr", 'K',
00080                 "Non-numerical attributes are being filtered by default " +
00081                 "(except the class attribute). " +
00082                 "Check to keep all attributes. This option is being " +
00083                 "overwritten by the manual attribute removal filter.");
00084         
00085 
00086     
00087   
00088         protected Instances instances;
00089 
00090         protected Reader fileReader;
00091 
00092         protected boolean hitEndOfFile;
00093 
00094         protected Instance lastInstanceRead;
00095 
00096         protected int numInstancesRead;
00097 
00098         protected InputStreamProgressMonitor fileProgressMonitor;
00099         
00100         private Integer[] removeAttributes = null;
00101         private Instances filteredDataset = null;
00102         private ArrayList<Double[]> valuesMinMaxDiff = null;
00103 
00104         
00105         public FileStream(){
00106                 //remove numAttritube Option from ClusteringStream as that is being set internally for Filestream
00107                 numAttsOption = null;
00108         }
00109         
00110         @Override
00111         public void prepareForUseImpl(TaskMonitor monitor,
00112                         ObjectRepository repository) {
00113                 restart();
00114         }
00115 
00116         public InstancesHeader getHeader() {
00117                 return new InstancesHeader(this.filteredDataset);
00118         }
00119 
00120         public long estimatedRemainingInstances() {
00121                 double progressFraction = this.fileProgressMonitor
00122                                 .getProgressFraction();
00123                 if ((progressFraction > 0.0) && (this.numInstancesRead > 0)) {
00124                         return (long) ((this.numInstancesRead / progressFraction) - this.numInstancesRead);
00125                 }
00126                 return -1;
00127         }
00128 
00129         public boolean hasMoreInstances() {
00130                 return !this.hitEndOfFile;
00131         }
00132 
00133         public Instance nextInstance() {
00134                 Instance prevInstance = this.lastInstanceRead;
00135                 this.hitEndOfFile = !readNextInstanceFromFile();
00136                 return prevInstance;
00137         }
00138 
00139         public boolean isRestartable() {
00140                 return true;
00141         }
00142 
00143         public void restart() {
00144                 try {
00145                         if (fileReader != null) {
00146                                 fileReader.close();
00147                         }
00148                         InputStream fileStream = new FileInputStream(arffFileOption.getFile());
00149                         fileProgressMonitor = new InputStreamProgressMonitor(fileStream);
00150                         fileReader = new BufferedReader(new InputStreamReader(fileProgressMonitor));
00151                         instances = new Instances(fileReader, 1);
00152                         if (classIndexOption.getValue() < 0) {
00153                                 instances.setClassIndex(instances.numAttributes() - 1);
00154                         } else if (classIndexOption.getValue() > 0) {
00155                                 instances.setClassIndex(classIndexOption.getValue() - 1);
00156                         }
00157 
00158 
00159                         //use hashset to delete duplicates and attributes numbers that aren't valid
00160                         HashSet<Integer> attributes =  new HashSet<Integer>(); 
00161                         Option[] rawAttributeList = removeAttributesOption.getList();
00162                         for (int i = 0; i < rawAttributeList.length; i++) {
00163                                 int attribute = ((IntOption)rawAttributeList[i]).getValue();
00164                                 if(1 <= attribute && attribute <= instances.numAttributes())
00165                                         attributes.add(attribute-1);
00166                                 else
00167                                         System.out.println("Found invalid attribute removal description: " +
00168                                                         "Attribute option "+attribute
00169                                                         +" will be ignored. Filestream only has "
00170                                                         +instances.numAttributes()+" attributes.");
00171                         }
00172                         
00173                         //remove all non numeric attributes except the class attribute
00174                         if(!keepNonNumericalAttrOption.isSet()){
00175                                 for (int i = 0; i < instances.numAttributes(); i++) {
00176                                         if(!instances.attribute(i).isNumeric() && i != instances.classIndex()){
00177                                                 attributes.add(i);
00178                                         }
00179                                 }
00180                         }
00181                         
00182                         //read min/max values in case we need to normalize
00183                         if(normalizeOption.isSet())
00184                                 valuesMinMaxDiff = readMinMaxDiffValues(attributes);
00185                         
00186                         //convert hashset to array and sort array so we can delete attributes in a sequence
00187                         removeAttributes = attributes.toArray(new Integer[0]);
00188                         Arrays.sort(removeAttributes);
00189                         
00190                         //set updated number of attributes (class attribute included)
00191                         numAttsOption = new IntOption("numAtts", 'a',"", instances.numAttributes() - removeAttributes.length);
00192                         
00193                         if(removeAttributes.length > 0){
00194                                 System.out.println("Removing the following attributes:");
00195                                 for (int i = 0; i < removeAttributes.length; i++) {
00196                                         System.out.println((removeAttributes[i]+1)+" "
00197                                                         +instances.attribute(removeAttributes[i]).name());
00198                                 }
00199                         }
00200             
00201                         //create filtered dataset
00202                         filteredDataset = new Instances(instances);
00203                         for (int i = removeAttributes.length-1; i >= 0 ; i--) {
00204                                 filteredDataset.deleteAttributeAt(removeAttributes[i]);
00205                                 if(true){
00206                                         
00207                                 }
00208                         }
00209 
00210                         this.numInstancesRead = 0;
00211                         this.lastInstanceRead = null;
00212                         this.hitEndOfFile = !readNextInstanceFromFile();
00213                 } catch (IOException ioe) {
00214                         throw new RuntimeException("ArffFileStream restart failed.", ioe);
00215                 }
00216         }
00217 
00218         protected boolean readNextInstanceFromFile() {
00219                 try {
00220                         
00221                         if (this.instances.readInstance(this.fileReader)) {
00222                                 Instance rawInstance = this.instances.instance(0);
00223                                 
00224                                 //remove dataset from instance so we can delete attributes
00225                                 rawInstance.setDataset(null);
00226                                 for (int i = removeAttributes.length-1; i >= 0 ; i--) {
00227                                         rawInstance.deleteAttributeAt(removeAttributes[i]);     
00228                                 }
00229                                 //set adjusted dataset for instance
00230                                 rawInstance.setDataset(filteredDataset);
00231 
00232                                 if(normalizeOption.isSet() && valuesMinMaxDiff!=null){
00233                                         for (int i = 0; i < rawInstance.numAttributes() ; i++) {
00234                                                 if(valuesMinMaxDiff.get(i)[2]!=1 && i!=rawInstance.classIndex()){
00235                                                         double v = rawInstance.value(i);
00236                                                         v = (v - valuesMinMaxDiff.get(i)[0])/valuesMinMaxDiff.get(i)[2];
00237                                                         rawInstance.setValue(i, v);
00238                                                 }
00239                                         }
00240                                 }
00241                                 
00242                                 this.lastInstanceRead = rawInstance;
00243                                 this.instances.delete(); // keep instances clean
00244                                 this.numInstancesRead++;
00245                                 return true;
00246                         }
00247                         if (this.fileReader != null) {
00248                                 this.fileReader.close();
00249                                 this.fileReader = null;
00250                         }
00251                         return false;
00252                 } catch (IOException ioe) {
00253                         throw new RuntimeException(
00254                                         "ArffFileStream failed to read instance from stream.", ioe);
00255                 }
00256         }
00257         
00262         protected ArrayList<Double[]> readMinMaxDiffValues(HashSet<Integer> ignoredAttributes) {
00263                 ArrayList<Double[]> valuesMinMaxDiff = null;
00264                 
00265                 if(ignoredAttributes == null)
00266                         ignoredAttributes = new HashSet<Integer>();
00267                 
00268                 try {
00269                         InputStream fileStream = new FileInputStream(arffFileOption.getFile());
00270                         InputStreamProgressMonitor fileProgressMonitor = new InputStreamProgressMonitor(fileStream);
00271                         Reader fileReader = new BufferedReader(new InputStreamReader(fileProgressMonitor));
00272                         Instances instances = new Instances(fileReader, 1);
00273 
00274                         valuesMinMaxDiff = new ArrayList<Double[]>();
00275                         for (int i = 0; i < instances.numAttributes()-ignoredAttributes.size(); i++) {
00276                                 Double[] values =  {Double.POSITIVE_INFINITY,Double.NEGATIVE_INFINITY,0.0};
00277                                 valuesMinMaxDiff.add(values);
00278                         }
00279                         
00280                         System.out.print("Reading arff file for normalization...");
00281                         int counter = 0;
00282                         while (instances.readInstance(fileReader)) {
00283                                 Instance instance = instances.instance(0);
00284                                 int a = 0;
00285                                 for (int i = 0; i < instances.numAttributes(); i++) {
00286                                         if(!ignoredAttributes.contains(i)){
00287                                                 double value = instance.value(i);
00288                                                 if(value < valuesMinMaxDiff.get(a)[0])
00289                                                         valuesMinMaxDiff.get(a)[0] = value;
00290                                                 if(value > valuesMinMaxDiff.get(a)[1])
00291                                                         valuesMinMaxDiff.get(a)[1] = value;
00292                                                 a++;
00293                                         }
00294                                 }
00295                                 instances.delete();
00296 
00297                                 //show some progress
00298                                 counter++;
00299                                 if(counter >= 10000){
00300                                         counter = 0;
00301                                         System.out.print(".");
00302                                 }
00303                         }
00304                         if (fileReader != null) {
00305                                 fileReader.close();
00306                                 fileReader = null;
00307                         }
00308                         System.out.println("done!");
00309 
00310                         for (int i = 0; i < valuesMinMaxDiff.size(); i++) {
00311                                 valuesMinMaxDiff.get(i)[2]=valuesMinMaxDiff.get(i)[1]-valuesMinMaxDiff.get(i)[0];
00312                         }
00313 
00314                         return valuesMinMaxDiff;
00315                 } catch (IOException ioe) {
00316                         throw new RuntimeException(
00317                                         "ArffFileStream failed to read instance from stream.", ioe);
00318                 }
00319         }       
00320         
00321 
00322         public void getDescription(StringBuilder sb, int indent) {
00323                 // TODO Auto-generated method stub
00324 
00325         }
00326 
00327 }
 All Classes Namespaces Files Functions Variables Enumerations