MOA 12.03
Real Time Analytics for Data Streams
|
00001 /* 00002 * ArffFileStream.java 00003 * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand 00004 * @author Richard Kirkby ([email protected]) 00005 * 00006 * This program is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU General Public License as published by 00008 * the Free Software Foundation; either version 3 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00018 * 00019 */ 00020 package moa.streams; 00021 00022 import java.io.BufferedReader; 00023 import java.io.FileInputStream; 00024 import java.io.IOException; 00025 import java.io.InputStream; 00026 import java.io.InputStreamReader; 00027 import java.io.Reader; 00028 00029 import moa.core.InputStreamProgressMonitor; 00030 import moa.core.InstancesHeader; 00031 import moa.core.ObjectRepository; 00032 import moa.options.AbstractOptionHandler; 00033 import moa.options.FileOption; 00034 import moa.options.IntOption; 00035 import moa.tasks.TaskMonitor; 00036 00037 import weka.core.Instance; 00038 import weka.core.Instances; 00039 00046 public class ArffFileStream extends AbstractOptionHandler implements 00047 InstanceStream { 00048 00049 @Override 00050 public String getPurposeString() { 00051 return "A stream read from an ARFF file."; 00052 } 00053 00054 private static final long serialVersionUID = 1L; 00055 00056 public FileOption arffFileOption = new FileOption("arffFile", 'f', 00057 "ARFF file to load.", null, "arff", false); 00058 00059 public IntOption classIndexOption = new IntOption( 00060 "classIndex", 00061 'c', 00062 "Class index of data. 0 for none or -1 for last attribute in file.", 00063 -1, -1, Integer.MAX_VALUE); 00064 00065 protected Instances instances; 00066 00067 protected Reader fileReader; 00068 00069 protected boolean hitEndOfFile; 00070 00071 protected Instance lastInstanceRead; 00072 00073 protected int numInstancesRead; 00074 00075 protected InputStreamProgressMonitor fileProgressMonitor; 00076 00077 public ArffFileStream() { 00078 } 00079 00080 public ArffFileStream(String arffFileName, int classIndex) { 00081 this.arffFileOption.setValue(arffFileName); 00082 this.classIndexOption.setValue(classIndex); 00083 restart(); 00084 } 00085 00086 @Override 00087 public void prepareForUseImpl(TaskMonitor monitor, 00088 ObjectRepository repository) { 00089 restart(); 00090 } 00091 00092 @Override 00093 public InstancesHeader getHeader() { 00094 return new InstancesHeader(this.instances); 00095 } 00096 00097 @Override 00098 public long estimatedRemainingInstances() { 00099 double progressFraction = this.fileProgressMonitor.getProgressFraction(); 00100 if ((progressFraction > 0.0) && (this.numInstancesRead > 0)) { 00101 return (long) ((this.numInstancesRead / progressFraction) - this.numInstancesRead); 00102 } 00103 return -1; 00104 } 00105 00106 @Override 00107 public boolean hasMoreInstances() { 00108 return !this.hitEndOfFile; 00109 } 00110 00111 @Override 00112 public Instance nextInstance() { 00113 Instance prevInstance = this.lastInstanceRead; 00114 this.hitEndOfFile = !readNextInstanceFromFile(); 00115 return prevInstance; 00116 } 00117 00118 @Override 00119 public boolean isRestartable() { 00120 return true; 00121 } 00122 00123 @Override 00124 public void restart() { 00125 try { 00126 if (this.fileReader != null) { 00127 this.fileReader.close(); 00128 } 00129 InputStream fileStream = new FileInputStream(this.arffFileOption.getFile()); 00130 this.fileProgressMonitor = new InputStreamProgressMonitor( 00131 fileStream); 00132 this.fileReader = new BufferedReader(new InputStreamReader( 00133 this.fileProgressMonitor)); 00134 this.instances = new Instances(this.fileReader, 1); 00135 if (this.classIndexOption.getValue() < 0) { 00136 this.instances.setClassIndex(this.instances.numAttributes() - 1); 00137 } else if (this.classIndexOption.getValue() > 0) { 00138 this.instances.setClassIndex(this.classIndexOption.getValue() - 1); 00139 } 00140 this.numInstancesRead = 0; 00141 this.lastInstanceRead = null; 00142 this.hitEndOfFile = !readNextInstanceFromFile(); 00143 } catch (IOException ioe) { 00144 throw new RuntimeException("ArffFileStream restart failed.", ioe); 00145 } 00146 } 00147 00148 protected boolean readNextInstanceFromFile() { 00149 try { 00150 if (this.instances.readInstance(this.fileReader)) { 00151 this.lastInstanceRead = this.instances.instance(0); 00152 this.instances.delete(); // keep instances clean 00153 this.numInstancesRead++; 00154 return true; 00155 } 00156 if (this.fileReader != null) { 00157 this.fileReader.close(); 00158 this.fileReader = null; 00159 } 00160 return false; 00161 } catch (IOException ioe) { 00162 throw new RuntimeException( 00163 "ArffFileStream failed to read instance from stream.", ioe); 00164 } 00165 } 00166 00167 @Override 00168 public void getDescription(StringBuilder sb, int indent) { 00169 // TODO Auto-generated method stub 00170 } 00171 }