MOA 12.03
Real Time Analytics for Data Streams
|
00001 /* 00002 * CacheShuffledStream.java 00003 * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand 00004 * @author Richard Kirkby ([email protected]) 00005 * 00006 * This program is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU General Public License as published by 00008 * the Free Software Foundation; either version 3 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00018 * 00019 */ 00020 package moa.tasks; 00021 00022 import java.util.Random; 00023 00024 import weka.core.Instances; 00025 import moa.core.ObjectRepository; 00026 import moa.options.ClassOption; 00027 import moa.options.IntOption; 00028 import moa.streams.CachedInstancesStream; 00029 import moa.streams.InstanceStream; 00030 00037 public class CacheShuffledStream extends AbstractTask { 00038 00039 @Override 00040 public String getPurposeString() { 00041 return "Stores and shuffles examples in memory."; 00042 } 00043 00044 private static final long serialVersionUID = 1L; 00045 00046 public ClassOption streamOption = new ClassOption("stream", 's', 00047 "Stream to cache and shuffle.", InstanceStream.class, 00048 "generators.RandomTreeGenerator"); 00049 00050 public IntOption maximumCacheSizeOption = new IntOption("maximumCacheSize", 00051 'm', "Maximum number of instances to cache.", 1000000, 1, 00052 Integer.MAX_VALUE); 00053 00054 public IntOption shuffleRandomSeedOption = new IntOption( 00055 "shuffleRandomSeed", 'r', 00056 "Seed for random shuffling of instances.", 1); 00057 00058 @Override 00059 protected Object doTaskImpl(TaskMonitor monitor, ObjectRepository repository) { 00060 InstanceStream stream = (InstanceStream) getPreparedClassOption(this.streamOption); 00061 Instances cache = new Instances(stream.getHeader(), 0); 00062 monitor.setCurrentActivity("Caching instances...", -1.0); 00063 while ((cache.numInstances() < this.maximumCacheSizeOption.getValue()) 00064 && stream.hasMoreInstances()) { 00065 cache.add(stream.nextInstance()); 00066 if (cache.numInstances() 00067 % MainTask.INSTANCES_BETWEEN_MONITOR_UPDATES == 0) { 00068 if (monitor.taskShouldAbort()) { 00069 return null; 00070 } 00071 long estimatedRemainingInstances = stream.estimatedRemainingInstances(); 00072 long maxRemaining = this.maximumCacheSizeOption.getValue() 00073 - cache.numInstances(); 00074 if ((estimatedRemainingInstances < 0) 00075 || (maxRemaining < estimatedRemainingInstances)) { 00076 estimatedRemainingInstances = maxRemaining; 00077 } 00078 monitor.setCurrentActivityFractionComplete(estimatedRemainingInstances < 0 ? -1.0 00079 : (double) cache.numInstances() 00080 / (double) (cache.numInstances() + estimatedRemainingInstances)); 00081 } 00082 } 00083 monitor.setCurrentActivity("Shuffling instances...", -1.0); 00084 cache.randomize(new Random(this.shuffleRandomSeedOption.getValue())); 00085 return new CachedInstancesStream(cache); 00086 } 00087 00088 public Class<?> getTaskResultType() { 00089 return CachedInstancesStream.class; 00090 } 00091 }