ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ISODATA 迭代自组织数据分析技术

2022-05-16 17:02:05  阅读:161  来源: 互联网

标签:数据分析 迭代 self id ISODATA cluster np data class


ISODATA(Iterative Self-Organizing Data Analysis Technique Algorithm)的python实现(参考)

# coding:utf-8
import numpy as np
import pandas as pd
import logging
logging.basicConfig(level=logging.WARNING,
                    format='%(asctime)s-%(filename)s[line:%(lineno)d]-%(levelname)s:%(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')


def euclidean(p1, p2):
    return round(np.sqrt(((p2-p1)*(p2-p1)).sum()), 1)


class isodata():
    def __init__(self, n_cluster, max_std, min_distance, min_samples, max_iter=100):
        # n_cluster 预期的分类数量
        # max_std 最大类内方差
        # min_distance 最小类间距
        # min_samples 类的最小样本个数
        self.n_cluster = n_cluster
        self.max_std = max_std
        self.min_distance = min_distance
        self.min_samples = min_samples
        self.max_iter = max_iter

    def euclidean(self, p1, p2):
        # 欧式距离函数
        # p1, p2 : numpy.array
        return round(np.sqrt(((p2 - p1) * (p2 - p1)).sum()), 1)

    def distanceCluster(self, cluster):
        # 类间距
        dmins = [euclidean(p1, p2) for p1 in cluster for p2 in cluster]
        dmins = np.array(dmins).reshape(len(cluster), -1)
        return dmins

    def filterClassID(self, cdata):
        # 过滤少于min_samples个样本的类别
        # cdata 含分类标签的数据

        # 统计每个类别的个数
        cdata = cdata.sort_values("class_id")
        cid = np.sort(pd.unique(cdata["class_id"]))
        cids = cdata["class_id"].values.tolist()
        cid_count = [cids.count(_) for _ in cid]

        # 确定类别,需要合并
        ikeep = [i for i, _ in enumerate(cid_count) if _ >= self.min_samples]

        # 合并规则,注意 A合并到B,B合并到C的情况
        cluster = cdata.groupby("class_id").mean().values.tolist()
        kcluster = [cluster[_] for _ in ikeep]  # 保留的聚类中心

        data = self.reClassify(cdata, kcluster)
        # logging.warning("function filter")
        return data

    def reClassify(self, data, cluster=None):
        # 执行分类操作
        # data 包含分类标签class_id的特征数据
        if cluster is None:
            cluster = self.cdata.groupby("class_id").mean()
        rdata = pd.DataFrame()  # 点到各个聚类中心的距离
        cols = data.columns
        for i in range(len(cluster)):
            rdata[f"c{i}"] = [euclidean(cluster[i], p) for p in data[cols[:-1]].values]

        data["class_id"] = np.argmin(rdata.values, axis=1)  # 根据最小距离,分配类别标签
        # logging.warning(f"function reClassify")
        return data

    def reSplit(self, data):
        # 分裂操作
        # data pandas.DataFrame,特征+标签

        cstd = data.groupby("class_id").std()
        cstd_max = np.max(cstd.values, axis=1)
        cluster = data.groupby("class_id").mean().values.tolist()
        cluster_ = []
        for i, std in zip(cstd.index, cstd_max):
            nsample = data.query("class_id==@i").shape[0]
            if nsample > 2*self.min_samples and std > self.max_std:  # 样本数和方差条件
                icluster = np.array(cluster.pop(i))
                icluster_ = icluster + std, icluster-std
                cluster_.extend(icluster_)
        cluster.extend(cluster_)
        # logging.warning("function reSplit")
        return cluster

    def reUnion(self, data):
        # 合并操作
        # min_dist 最小类间距
        # return 合并后的特征+类别
        cluster = data.groupby("class_id").mean().values

        dmins = self.distanceCluster(cluster=cluster)
        dmin_idx = np.argwhere(dmins < self.min_distance)
        dmin_idx = [(i, j) for i, j in dmin_idx if i < j]
        isused = []
        for i, j in dmin_idx:
            if i in isused or j in isused:
                continue
            else:
                data[data["class_id"] == i]["class_id"] = j  # 通过修改类别标识,进行类别合并
        # logging.warning("function reUnion")
        return data

    def fit(self, data):

        # 聚类中心偏移
        last_cluster = []
        # 初始化类别标签
        data["class_id"] = np.random.randint(0, self.n_cluster, size=len(data))
        for epoch in range(self.max_iter):
            data = self.filterClassID(data)
            cluster = self.reSplit(data)  # 分裂
            data = self.reClassify(data, cluster=cluster)
            data = self.reUnion(data)

            # 跳出循环的条件
            cluster = data.groupby("class_id").mean().values
            cls = ",".join([f"{_:.3f}"for _ in cluster.reshape(-1)])

            last_cluster.append(cluster)
            if len(last_cluster) > 1:
                if len(last_cluster[-1]) == len(last_cluster[-2]):
                    err = np.max(last_cluster[-1] - last_cluster[-2])  # 注意这里会出现前后类别数不同的情况
                    logging.warning(f"{epoch}:{err:.3f} / {cls}")
                    if abs(err) < 1e4:
                        break
                else:
                    logging.warning(f"{epoch}: {cls}")
        self.cluster = cluster
        return self

    def fit_transform(self, data):
        # 聚类中心偏移
        last_cluster = []
        # 初始化类别标签
        data["class_id"] = np.random.randint(0, self.n_cluster, size=len(data))
        for epoch in range(self.max_iter):
            data = self.filterClassID(data)
            cluster = self.reSplit(data)  # 分裂
            data = self.reClassify(data, cluster=cluster)
            data = self.reUnion(data)

            # 跳出循环的条件
            cluster = data.groupby("class_id").mean().values
            cls = ",".join([f"{_:.3f}" for _ in cluster.reshape(-1)])

            last_cluster.append(cluster)
            if len(last_cluster) > 1:
                if len(last_cluster[-1]) == len(last_cluster[-2]):
                    err = np.max(last_cluster[-1] - last_cluster[-2])  # 注意这里会出现前后类别数不同的情况
                    logging.warning(f"{epoch}:{err:.3f} / {cls}")
                    if abs(err) < 1e4:
                        break
                else:
                    logging.warning(f"{epoch}: {cls}")

        return self.reClassify(data, cluster)

    def predict(self, x):
        # 预测函数

        return self.reClassify(x, cluster=self.cluster)


if __name__ == '__main__':
    data = np.random.randint(0, 10, (10, 2))
    rdata = np.r_[data, data+30, data + 50, data + 90]
    rdata = pd.DataFrame(rdata)
    ndata = (rdata-rdata.mean())/rdata.std()

    algo = isodata(n_cluster=6, max_std=0.8, min_distance=0.2, min_samples=2)
    ndata_ = algo.fit_transform(ndata)
    rdata["class_id"] = ndata_.class_id
    algo = algo.fit(ndata)

    tdata = np.r_[data, data + 10, data + 70]
    tdata = pd.DataFrame(tdata)
    tdata["class_id"] = 0
    sdata = (tdata - tdata.mean()) / tdata.std()
    res = algo.predict(sdata)
    tdata["class_id"] = res.class_id
    # logging.warning(f"\n{data}")

  

标签:数据分析,迭代,self,id,ISODATA,cluster,np,data,class
来源: https://www.cnblogs.com/ddzhen/p/16277546.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有