MOA 12.03
Real Time Analytics for Data Streams
|
00001 /* 00002 * DenStream.java 00003 * Copyright (C) 2010 RWTH Aachen University, Germany 00004 * @author Wels ([email protected]) 00005 * 00006 * This program is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU General Public License as published by 00008 * the Free Software Foundation; either version 3 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00018 * 00019 */ 00020 00021 package moa.clusterers.denstream; 00022 00023 import java.util.ArrayList; 00024 import moa.cluster.Cluster; 00025 import moa.cluster.Clustering; 00026 import moa.clusterers.AbstractClusterer; 00027 import moa.core.Measurement; 00028 import moa.options.FloatOption; 00029 import moa.options.IntOption; 00030 import weka.core.DenseInstance; 00031 import weka.core.Instance; 00032 00044 public class DenStream extends AbstractClusterer { 00045 00046 public IntOption horizonOption = new IntOption("horizon", 00047 'h', "Range of the window.", 1000); 00048 public FloatOption epsilonOption = new FloatOption("epsilon", 'e', 00049 "Defines the epsilon neighbourhood", 0.01, 0, 1); 00050 public IntOption minPointsOption = new IntOption("minPoints", 'p', 00051 "Minimal number of points cluster has to contain.", 10); 00052 00053 public FloatOption betaOption = new FloatOption("beta", 'b', 00054 "", 0.001, 0, 1); 00055 public FloatOption muOption = new FloatOption("mu", 'm', 00056 "", 1, 0, Double.MAX_VALUE); 00057 public IntOption initPointsOption = new IntOption("initPoints", 'i', 00058 "Number of points to use for initialization.", 1000); 00059 00060 private double weightThreshold = 0.01; 00061 double lambda; 00062 double epsilon; 00063 int minPoints; 00064 double mu; 00065 double beta; 00066 00067 Clustering p_micro_cluster; 00068 Clustering o_micro_cluster; 00069 ArrayList<DenPoint> initBuffer; 00070 00071 boolean initialized; 00072 private long timestamp = 0; 00073 Timestamp currentTimestamp; 00074 long tp; 00075 00076 00077 private class DenPoint extends DenseInstance{ 00078 protected boolean covered; 00079 00080 public DenPoint(Instance nextInstance, Long timestamp) { 00081 super(nextInstance); 00082 this.setDataset(nextInstance.dataset()); 00083 } 00084 } 00085 00086 @Override 00087 public void resetLearningImpl() { 00088 //init DenStream 00089 currentTimestamp = new Timestamp(); 00090 lambda = -Math.log(weightThreshold) / Math.log(2)/(double) horizonOption.getValue(); 00091 epsilon = epsilonOption.getValue(); 00092 minPoints = minPointsOption.getValue(); 00093 mu = muOption.getValue(); 00094 beta = betaOption.getValue(); 00095 00096 initialized = false; 00097 p_micro_cluster = new Clustering(); 00098 o_micro_cluster = new Clustering(); 00099 initBuffer = new ArrayList<DenPoint>(); 00100 tp = Math.round(1 / lambda * Math.log((beta * mu) / (beta * mu - 1))) + 1; 00101 00102 } 00103 00104 public void initialDBScan() { 00105 for (int p = 0; p < initBuffer.size(); p++) { 00106 DenPoint point = initBuffer.get(p); 00107 if (!point.covered) { 00108 point.covered = true; 00109 ArrayList<Integer> neighbourhood = getNeighbourhoodIDs(point, initBuffer, epsilon); 00110 if (neighbourhood.size() > minPoints) { 00111 MicroCluster mc = new MicroCluster(point, point.numAttributes(), timestamp, lambda, currentTimestamp); 00112 expandCluster(mc, initBuffer, neighbourhood); 00113 p_micro_cluster.add(mc); 00114 } else { 00115 point.covered = false; 00116 } 00117 } 00118 } 00119 } 00120 00121 @Override 00122 public void trainOnInstanceImpl(Instance inst) { 00123 timestamp++; 00124 currentTimestamp.setTimestamp(timestamp); 00125 DenPoint point = new DenPoint(inst, timestamp); 00127 //Initialization// 00129 if (!initialized) { 00130 initBuffer.add(point); 00131 if (initBuffer.size() >= initPointsOption.getValue()) { 00132 initialDBScan(); 00133 initialized = true; 00134 } 00135 } else { 00137 //Merging(p)// 00139 boolean merged = false; 00140 if (p_micro_cluster.getClustering().size() != 0) { 00141 MicroCluster x = nearestCluster(point, p_micro_cluster); 00142 MicroCluster xCopy = x.copy(); 00143 xCopy.insert(point, timestamp); 00144 if (xCopy.getRadius(timestamp) <= epsilon) { 00145 x.insert(point, timestamp); 00146 merged = true; 00147 } 00148 } 00149 if (!merged && (o_micro_cluster.getClustering().size() != 0)) { 00150 MicroCluster x = nearestCluster(point, o_micro_cluster); 00151 MicroCluster xCopy = x.copy(); 00152 xCopy.insert(point, timestamp); 00153 00154 if (xCopy.getRadius(timestamp) <= epsilon) { 00155 x.insert(point, timestamp); 00156 merged = true; 00157 if (x.getWeight() > beta * mu) { 00158 o_micro_cluster.getClustering().remove(x); 00159 p_micro_cluster.getClustering().add(x); 00160 } 00161 } 00162 } 00163 if (!merged) { 00164 o_micro_cluster.getClustering().add(new MicroCluster(point.toDoubleArray(), point.toDoubleArray().length, timestamp, lambda, currentTimestamp)); 00165 } 00166 00168 //Periodic cluster removal// 00170 if (timestamp % tp == 0) { 00171 ArrayList<MicroCluster> removalList = new ArrayList<MicroCluster>(); 00172 for (Cluster c : p_micro_cluster.getClustering()) { 00173 if (((MicroCluster) c).getWeight() < beta * mu) { 00174 removalList.add((MicroCluster) c); 00175 } 00176 } 00177 for (Cluster c : removalList) { 00178 p_micro_cluster.getClustering().remove(c); 00179 } 00180 00181 for (Cluster c : o_micro_cluster.getClustering()) { 00182 long t0 = ((MicroCluster) c).getCreationTime(); 00183 double xsi1 = Math.pow(2, (-lambda * (timestamp - t0 + tp))) - 1; 00184 double xsi2 = Math.pow(2, -lambda * tp) - 1; 00185 double xsi = xsi1 / xsi2; 00186 if (((MicroCluster) c).getWeight() < xsi) { 00187 removalList.add((MicroCluster) c); 00188 } 00189 } 00190 for (Cluster c : removalList) { 00191 o_micro_cluster.getClustering().remove(c); 00192 } 00193 } 00194 00195 } 00196 } 00197 00198 private void expandCluster(MicroCluster mc, ArrayList<DenPoint> points, ArrayList<Integer> neighbourhood) { 00199 for (int p : neighbourhood) { 00200 DenPoint npoint = points.get(p); 00201 if (!npoint.covered) { 00202 npoint.covered = true; 00203 mc.insert(npoint, timestamp); 00204 ArrayList<Integer> neighbourhood2 = getNeighbourhoodIDs(npoint, initBuffer, epsilon); 00205 if (neighbourhood.size() > minPoints) { 00206 expandCluster(mc, points, neighbourhood2); 00207 } 00208 } 00209 } 00210 } 00211 00212 private ArrayList<Integer> getNeighbourhoodIDs(DenPoint point, ArrayList<DenPoint> points, double eps) { 00213 ArrayList<Integer> neighbourIDs = new ArrayList<Integer>(); 00214 for (int p = 0; p < points.size(); p++) { 00215 DenPoint npoint = points.get(p); 00216 if (!npoint.covered) { 00217 double dist = distance(point.toDoubleArray(), points.get(p).toDoubleArray()); 00218 if (dist < eps) { 00219 neighbourIDs.add(p); 00220 } 00221 } 00222 } 00223 return neighbourIDs; 00224 } 00225 00226 private MicroCluster nearestCluster(DenPoint p, Clustering cl) { 00227 MicroCluster min = null; 00228 double minDist = 0; 00229 for (int c = 0; c < cl.size(); c++ ) { 00230 MicroCluster x = (MicroCluster)cl.get(c); 00231 if (min == null) { 00232 min = x; 00233 } 00234 double dist = distance(p.toDoubleArray(), x.getCenter()); 00235 dist-=x.getRadius(timestamp); 00236 if (dist < minDist) { 00237 minDist = dist; 00238 min = x; 00239 } 00240 } 00241 return min; 00242 00243 } 00244 00245 private double distance(double[] pointA, double[] pointB) { 00246 double distance = 0.0; 00247 for (int i = 0; i < pointA.length; i++) { 00248 double d = pointA[i] - pointB[i]; 00249 distance += d * d; 00250 } 00251 return Math.sqrt(distance); 00252 } 00253 00254 00255 public Clustering getClusteringResult() { 00256 return null; 00257 } 00258 00259 @Override 00260 public boolean implementsMicroClusterer() { 00261 return true; 00262 } 00263 00264 @Override 00265 public Clustering getMicroClusteringResult() { 00266 return p_micro_cluster; 00267 } 00268 00269 @Override 00270 protected Measurement[] getModelMeasurementsImpl() { 00271 throw new UnsupportedOperationException("Not supported yet."); 00272 } 00273 00274 @Override 00275 public void getModelDescription(StringBuilder out, int indent) { 00276 } 00277 00278 public boolean isRandomizable() { 00279 return true; 00280 } 00281 00282 public double[] getVotesForInstance(Instance inst) { 00283 return null; 00284 } 00285 }