MOA 12.03
Real Time Analytics for Data Streams
DenStream.java
Go to the documentation of this file.
00001 /*
00002  *    DenStream.java
00003  *    Copyright (C) 2010 RWTH Aachen University, Germany
00004  *    @author Wels ([email protected])
00005  *
00006  *    This program is free software; you can redistribute it and/or modify
00007  *    it under the terms of the GNU General Public License as published by
00008  *    the Free Software Foundation; either version 3 of the License, or
00009  *    (at your option) any later version.
00010  *
00011  *    This program is distributed in the hope that it will be useful,
00012  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *    GNU General Public License for more details.
00015  *
00016  *    You should have received a copy of the GNU General Public License
00017  *    along with this program. If not, see <http://www.gnu.org/licenses/>.
00018  *    
00019  */
00020 
00021 package moa.clusterers.denstream;
00022 
00023 import java.util.ArrayList;
00024 import moa.cluster.Cluster;
00025 import moa.cluster.Clustering;
00026 import moa.clusterers.AbstractClusterer;
00027 import moa.core.Measurement;
00028 import moa.options.FloatOption;
00029 import moa.options.IntOption;
00030 import weka.core.DenseInstance;
00031 import weka.core.Instance;
00032 
00044 public class DenStream extends AbstractClusterer {
00045 
00046     public IntOption horizonOption = new IntOption("horizon",
00047                         'h', "Range of the window.", 1000);
00048     public FloatOption epsilonOption = new FloatOption("epsilon", 'e',
00049             "Defines the epsilon neighbourhood", 0.01, 0, 1);
00050     public IntOption minPointsOption = new IntOption("minPoints", 'p',
00051             "Minimal number of points cluster has to contain.", 10);
00052 
00053     public FloatOption betaOption = new FloatOption("beta", 'b',
00054             "", 0.001, 0, 1);
00055     public FloatOption muOption = new FloatOption("mu", 'm',
00056             "", 1, 0, Double.MAX_VALUE);
00057     public IntOption initPointsOption = new IntOption("initPoints", 'i',
00058             "Number of points to use for initialization.", 1000);
00059 
00060     private double weightThreshold = 0.01;
00061     double lambda;
00062     double epsilon;
00063     int minPoints;
00064     double mu;
00065     double beta;
00066 
00067     Clustering p_micro_cluster;
00068     Clustering o_micro_cluster;
00069     ArrayList<DenPoint> initBuffer;
00070 
00071     boolean initialized;
00072     private long timestamp = 0;
00073     Timestamp currentTimestamp;
00074     long tp;
00075     
00076 
00077     private class DenPoint extends DenseInstance{
00078         protected boolean covered;
00079 
00080         public DenPoint(Instance nextInstance, Long timestamp) {
00081             super(nextInstance);
00082             this.setDataset(nextInstance.dataset());
00083         }
00084     }
00085 
00086     @Override
00087     public void resetLearningImpl() {
00088         //init DenStream
00089         currentTimestamp = new Timestamp();
00090         lambda = -Math.log(weightThreshold) / Math.log(2)/(double) horizonOption.getValue();
00091         epsilon = epsilonOption.getValue();
00092         minPoints = minPointsOption.getValue();
00093         mu = muOption.getValue();
00094         beta = betaOption.getValue();
00095 
00096         initialized = false;
00097         p_micro_cluster = new Clustering();
00098         o_micro_cluster = new Clustering();
00099         initBuffer = new ArrayList<DenPoint>();
00100         tp = Math.round(1 / lambda * Math.log((beta * mu) / (beta * mu - 1))) + 1;
00101 
00102     }
00103 
00104     public void initialDBScan() {
00105         for (int p = 0; p < initBuffer.size(); p++) {
00106             DenPoint point = initBuffer.get(p);
00107             if (!point.covered) {
00108                 point.covered = true;
00109                 ArrayList<Integer> neighbourhood = getNeighbourhoodIDs(point, initBuffer, epsilon);
00110                 if (neighbourhood.size() > minPoints) {
00111                     MicroCluster mc = new MicroCluster(point, point.numAttributes(), timestamp, lambda, currentTimestamp);
00112                     expandCluster(mc, initBuffer, neighbourhood);
00113                     p_micro_cluster.add(mc);
00114                 } else {
00115                     point.covered = false;
00116                 }
00117             }
00118         }
00119     }
00120 
00121     @Override
00122     public void trainOnInstanceImpl(Instance inst) {
00123         timestamp++;
00124         currentTimestamp.setTimestamp(timestamp);
00125         DenPoint point = new DenPoint(inst, timestamp);
00127         //Initialization//
00129         if (!initialized) {
00130             initBuffer.add(point);
00131             if (initBuffer.size() >= initPointsOption.getValue()) {
00132                 initialDBScan();
00133                 initialized = true;
00134             }
00135         } else {
00137             //Merging(p)//
00139             boolean merged = false;
00140             if (p_micro_cluster.getClustering().size() != 0) {
00141                 MicroCluster x = nearestCluster(point, p_micro_cluster);
00142                 MicroCluster xCopy = x.copy();
00143                 xCopy.insert(point, timestamp);
00144                 if (xCopy.getRadius(timestamp) <= epsilon) {
00145                     x.insert(point, timestamp);
00146                     merged = true;
00147                 }
00148             }
00149             if (!merged && (o_micro_cluster.getClustering().size() != 0)) {
00150                 MicroCluster x = nearestCluster(point, o_micro_cluster);
00151                 MicroCluster xCopy = x.copy();
00152                 xCopy.insert(point, timestamp);
00153 
00154                 if (xCopy.getRadius(timestamp) <= epsilon) {
00155                     x.insert(point, timestamp);
00156                     merged = true;
00157                     if (x.getWeight() > beta * mu) {
00158                         o_micro_cluster.getClustering().remove(x);
00159                         p_micro_cluster.getClustering().add(x);
00160                     }
00161                 }
00162             }
00163             if (!merged) {
00164                 o_micro_cluster.getClustering().add(new MicroCluster(point.toDoubleArray(), point.toDoubleArray().length, timestamp, lambda, currentTimestamp));
00165             }
00166 
00168             //Periodic cluster removal//
00170             if (timestamp % tp == 0) {
00171                 ArrayList<MicroCluster> removalList = new ArrayList<MicroCluster>();
00172                 for (Cluster c : p_micro_cluster.getClustering()) {
00173                     if (((MicroCluster) c).getWeight() < beta * mu) {
00174                         removalList.add((MicroCluster) c);
00175                     }
00176                 }
00177                 for (Cluster c : removalList) {
00178                     p_micro_cluster.getClustering().remove(c);
00179                 }
00180 
00181                 for (Cluster c : o_micro_cluster.getClustering()) {
00182                     long t0 = ((MicroCluster) c).getCreationTime();
00183                     double xsi1 = Math.pow(2, (-lambda * (timestamp - t0 + tp))) - 1;
00184                     double xsi2 = Math.pow(2, -lambda * tp) - 1;
00185                     double xsi = xsi1 / xsi2;
00186                     if (((MicroCluster) c).getWeight() < xsi) {
00187                         removalList.add((MicroCluster) c);
00188                     }
00189                 }
00190                 for (Cluster c : removalList) {
00191                     o_micro_cluster.getClustering().remove(c);
00192                 }
00193             }
00194 
00195         }
00196     }
00197 
00198     private void expandCluster(MicroCluster mc, ArrayList<DenPoint> points, ArrayList<Integer> neighbourhood) {
00199         for (int p : neighbourhood) {
00200             DenPoint npoint = points.get(p);
00201             if (!npoint.covered) {
00202                 npoint.covered = true;
00203                 mc.insert(npoint, timestamp);
00204                 ArrayList<Integer> neighbourhood2 = getNeighbourhoodIDs(npoint, initBuffer, epsilon);
00205                 if (neighbourhood.size() > minPoints) {
00206                     expandCluster(mc, points, neighbourhood2);
00207                 }
00208             }
00209         }
00210     }
00211 
00212     private ArrayList<Integer> getNeighbourhoodIDs(DenPoint point, ArrayList<DenPoint> points, double eps) {
00213         ArrayList<Integer> neighbourIDs = new ArrayList<Integer>();
00214         for (int p = 0; p < points.size(); p++) {
00215             DenPoint npoint = points.get(p);
00216             if (!npoint.covered) {
00217                 double dist = distance(point.toDoubleArray(), points.get(p).toDoubleArray());
00218                 if (dist < eps) {
00219                     neighbourIDs.add(p);
00220                 }
00221             }
00222         }
00223         return neighbourIDs;
00224     }
00225 
00226     private MicroCluster nearestCluster(DenPoint p, Clustering cl) {
00227         MicroCluster min = null;
00228         double minDist = 0;
00229         for (int c = 0; c < cl.size(); c++ ) {
00230             MicroCluster x = (MicroCluster)cl.get(c);
00231             if (min == null) {
00232                 min = x;
00233             }
00234             double dist = distance(p.toDoubleArray(), x.getCenter());
00235             dist-=x.getRadius(timestamp);
00236             if (dist < minDist) {
00237                 minDist = dist;
00238                 min = x;
00239             }
00240         }
00241         return min;
00242 
00243     }
00244 
00245     private double distance(double[] pointA, double[] pointB) {
00246         double distance = 0.0;
00247         for (int i = 0; i < pointA.length; i++) {
00248             double d = pointA[i] - pointB[i];
00249             distance += d * d;
00250         }
00251         return Math.sqrt(distance);
00252     }
00253 
00254 
00255     public Clustering getClusteringResult() {
00256         return null;
00257     }
00258 
00259     @Override
00260     public boolean implementsMicroClusterer() {
00261         return true;
00262     }
00263 
00264     @Override
00265     public Clustering getMicroClusteringResult() {
00266         return p_micro_cluster;
00267     }
00268 
00269     @Override
00270     protected Measurement[] getModelMeasurementsImpl() {
00271         throw new UnsupportedOperationException("Not supported yet.");
00272     }
00273 
00274     @Override
00275     public void getModelDescription(StringBuilder out, int indent) {
00276     }
00277 
00278     public boolean isRandomizable() {
00279         return true;
00280     }
00281 
00282     public double[] getVotesForInstance(Instance inst) {
00283         return null;
00284     }
00285 }
 All Classes Namespaces Files Functions Variables Enumerations