MOA 12.03
Real Time Analytics for Data Streams
ArffFileStream.java
Go to the documentation of this file.
00001 /*
00002  *    ArffFileStream.java
00003  *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
00004  *    @author Richard Kirkby ([email protected])
00005  *
00006  *    This program is free software; you can redistribute it and/or modify
00007  *    it under the terms of the GNU General Public License as published by
00008  *    the Free Software Foundation; either version 3 of the License, or
00009  *    (at your option) any later version.
00010  *
00011  *    This program is distributed in the hope that it will be useful,
00012  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *    GNU General Public License for more details.
00015  *
00016  *    You should have received a copy of the GNU General Public License
00017  *    along with this program. If not, see <http://www.gnu.org/licenses/>.
00018  *    
00019  */
00020 package moa.streams;
00021 
00022 import java.io.BufferedReader;
00023 import java.io.FileInputStream;
00024 import java.io.IOException;
00025 import java.io.InputStream;
00026 import java.io.InputStreamReader;
00027 import java.io.Reader;
00028 
00029 import moa.core.InputStreamProgressMonitor;
00030 import moa.core.InstancesHeader;
00031 import moa.core.ObjectRepository;
00032 import moa.options.AbstractOptionHandler;
00033 import moa.options.FileOption;
00034 import moa.options.IntOption;
00035 import moa.tasks.TaskMonitor;
00036 
00037 import weka.core.Instance;
00038 import weka.core.Instances;
00039 
00046 public class ArffFileStream extends AbstractOptionHandler implements
00047         InstanceStream {
00048 
00049     @Override
00050     public String getPurposeString() {
00051         return "A stream read from an ARFF file.";
00052     }
00053 
00054     private static final long serialVersionUID = 1L;
00055 
00056     public FileOption arffFileOption = new FileOption("arffFile", 'f',
00057             "ARFF file to load.", null, "arff", false);
00058 
00059     public IntOption classIndexOption = new IntOption(
00060             "classIndex",
00061             'c',
00062             "Class index of data. 0 for none or -1 for last attribute in file.",
00063             -1, -1, Integer.MAX_VALUE);
00064 
00065     protected Instances instances;
00066 
00067     protected Reader fileReader;
00068 
00069     protected boolean hitEndOfFile;
00070 
00071     protected Instance lastInstanceRead;
00072 
00073     protected int numInstancesRead;
00074 
00075     protected InputStreamProgressMonitor fileProgressMonitor;
00076 
00077     public ArffFileStream() {
00078     }
00079 
00080     public ArffFileStream(String arffFileName, int classIndex) {
00081         this.arffFileOption.setValue(arffFileName);
00082         this.classIndexOption.setValue(classIndex);
00083         restart();
00084     }
00085 
00086     @Override
00087     public void prepareForUseImpl(TaskMonitor monitor,
00088             ObjectRepository repository) {
00089         restart();
00090     }
00091 
00092     @Override
00093     public InstancesHeader getHeader() {
00094         return new InstancesHeader(this.instances);
00095     }
00096 
00097     @Override
00098     public long estimatedRemainingInstances() {
00099         double progressFraction = this.fileProgressMonitor.getProgressFraction();
00100         if ((progressFraction > 0.0) && (this.numInstancesRead > 0)) {
00101             return (long) ((this.numInstancesRead / progressFraction) - this.numInstancesRead);
00102         }
00103         return -1;
00104     }
00105 
00106     @Override
00107     public boolean hasMoreInstances() {
00108         return !this.hitEndOfFile;
00109     }
00110 
00111     @Override
00112     public Instance nextInstance() {
00113         Instance prevInstance = this.lastInstanceRead;
00114         this.hitEndOfFile = !readNextInstanceFromFile();
00115         return prevInstance;
00116     }
00117 
00118     @Override
00119     public boolean isRestartable() {
00120         return true;
00121     }
00122 
00123     @Override
00124     public void restart() {
00125         try {
00126             if (this.fileReader != null) {
00127                 this.fileReader.close();
00128             }
00129             InputStream fileStream = new FileInputStream(this.arffFileOption.getFile());
00130             this.fileProgressMonitor = new InputStreamProgressMonitor(
00131                     fileStream);
00132             this.fileReader = new BufferedReader(new InputStreamReader(
00133                     this.fileProgressMonitor));
00134             this.instances = new Instances(this.fileReader, 1);
00135             if (this.classIndexOption.getValue() < 0) {
00136                 this.instances.setClassIndex(this.instances.numAttributes() - 1);
00137             } else if (this.classIndexOption.getValue() > 0) {
00138                 this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
00139             }
00140             this.numInstancesRead = 0;
00141             this.lastInstanceRead = null;
00142             this.hitEndOfFile = !readNextInstanceFromFile();
00143         } catch (IOException ioe) {
00144             throw new RuntimeException("ArffFileStream restart failed.", ioe);
00145         }
00146     }
00147 
00148     protected boolean readNextInstanceFromFile() {
00149         try {
00150             if (this.instances.readInstance(this.fileReader)) {
00151                 this.lastInstanceRead = this.instances.instance(0);
00152                 this.instances.delete(); // keep instances clean
00153                 this.numInstancesRead++;
00154                 return true;
00155             }
00156             if (this.fileReader != null) {
00157                 this.fileReader.close();
00158                 this.fileReader = null;
00159             }
00160             return false;
00161         } catch (IOException ioe) {
00162             throw new RuntimeException(
00163                     "ArffFileStream failed to read instance from stream.", ioe);
00164         }
00165     }
00166 
00167     @Override
00168     public void getDescription(StringBuilder sb, int indent) {
00169         // TODO Auto-generated method stub
00170     }
00171 }
 All Classes Namespaces Files Functions Variables Enumerations