ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

D*算法Python实例

2021-07-23 18:58:05  阅读:151  来源: 互联网

标签:tmp map cost Python self state 算法 实例 print


        最近粗略学习了一下D*算法,论文中伪代码如下:

记号

        1)  G:终点
        2)  X:表示当前节点,用 b(X)=Y表示  Y是 X的下一个节点,称为后向指针(backpointer),通过后向指针,很容易获得一条从起点到终点的路径。
        3) c(X,Y):从 X到 Y的cost,对于不存在边的两个节点, c(X,Y)可定义为无穷大。
        4)同 A∗ 一样, D∗ 也用 OPEN表和 CLOSED表来保存待访问和已经访问的节点。对每个节点 X,用 t(X)来表示 X的状态,NEW表示未添加到 OPEN, OPEN表示当前处于 OPEN list, CLOSED表示已从 OPEN list剔除。
        5)同 A∗ 一样, D∗ 也有一个对目标的估计函数, h(G,X),不同的是, D∗ 还多了一个key函数,对于在 OPEN list中的每个节点, k(G,X)的定义原文中为“be equal to the minimum of  h(G,X) before modification and all valuses assumed by h(G,X) since X was placed on the OPEN list”。也就是说,h函数会随着拓扑结构的变化而变化,key始终是h中最小的那一个。key函数将 OPEN中的节点 X分为两类,一类记为 RAISE,如果 k ( G , X ) < h ( G , X ) ,另一类记为 LOWER,如果 k ( G , X ) = h ( G , X ) 。也就是说, k ( G , X ) ≤ h ( G , X ) ,小于的情况如前方出现障碍物导致某些边的cost增加,等于的情况如前方cost不变或由于障碍物移动导致某些边cost减小。
        6) kmin​和 kold​,前者表示当前 OPEN中 key 的最小值,后者表示上一次的最小值。
        7)如果一系列节点 {X_1,X_2,...,X_N} ​满足 b(Xi+1​)=Xi​,则称为一个序列(sequence)。一个序列表示了一条从 XN​到 X1​的路径。在静态路径规划中,我们通常找的是一条从起点到终点的路径中,但是在动态路径规划中,起点是不断变化的,严格来讲应该是找寻一条从当前节点到终点的路径。显然这种情况下,考虑从终点到当前节点更为方便,故这里 X1​反而是最靠近终点的节点。称一个序列为单调(monotonic)的如果满足 t ( X i ) = C L O S E D   a n d   h ( G , X i ) < h ( G , X i + 1 ) 或者 t ( X i ) = OPEN   a n d   k ( G , X i ) < h ( G , X i + 1 )。杂乱无的序列是没有意义的,单调的序列其实就是我们要找的一条合理的路径。
        8)对 Xi​,Xj​,如果 i < j 则称 Xi​ 是 Xj​ 的祖先(ancestor),反之则称为后代(descendant)。
        9)如果记号涉及两个节点且其中一个为终点,略去终点,如 f(X)=f(G,X)

 

算法描述

        D∗主要包括两个部分,PROCESS−STATE 和 MODIFY−COST,前者用来计算终点到当前节点的最优cost,后者用来修正cost。主要流程如下:
        1)将所有节点的 tag设置为 NEW, h(G)设为0,将G放置在 OPEN中。
        2)重复调用 PROCESS−STATE直到当前节点 X从 OPEN中移除,即 t(X)=CLOSED,说明此时已经找到一条从X出发到达终点的路径。
        3)根据2)中获得的路径,从节点 X往后移动,直到达到终点或者检测到cost发送变化。
        4)当cost发生变化时,调用第二个函数 MODIFY−COST,并将因为障碍物而cost受到影响的节点重新放入 OPEN中。假设 Y是发现状态时机器人所处的节点。通过调用 PROCESS−STATE直到 kmin​≥h(Y),此时cost的变化已经传播到 Y,因此可以找到一条新的从 Y到终点的最优路径。

代码实现

        本文给出基于自己理解编写的Python实例,感觉对算法主体部分理解还不是很透彻,先做此记录。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2021/07/23
# @Author  : 小仙女
# @Site    :
# @File    : D_star.py
# @Software: PyCharm
 
import math
from decimal import Decimal

 
class State(object):
 
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None
        self.state = "."
        self.t = "new"
        self.h = 0
        self.k = 0
 
    def cost(self, state):
        if self.state == "#" or state.state == "#":
            return 100000  # 存在障碍物时,距离无穷大
        return math.sqrt(math.pow((self.x - state.x), 2) +
                         math.pow((self.y - state.y), 2))
 
    def set_state(self, state):
        if state not in [".", "#", "S", "E", "+", "*", "&"]:
            return
        self.state = state
 
 
class Map(object):
    '''
    创建地图
    '''
    def __init__(self, row, col):
        self.row = row
        self.col = col
        self.map = self.init_map()
 
    def init_map(self):
        # 初始化map
        map_list = []
        for i in range(self.row):
            tmp = []
            for j in range(self.col):
                tmp.append(State(i, j))
            map_list.append(tmp)
        return map_list
 
    def print_path(self):
        for i in range(self.row):
            tmp = ""
            for j in range(self.col):
                tmp += self.map[i][j].state + " "
            print(tmp)

    def print_state(self):
        print("--------------------------------------------------------------------------------------------")
        for i in range(self.row):
            tmp_h = "|   "
            tmp_k = "|   "
            tmp_b = "|   "
            for j in range(self.col):
                if self.map[i][j].h > 10000:
                    h = "inf "
                else:
                    if self.map[i][j].h > 10:
                        h = str(Decimal(self.map[i][j].h).quantize(Decimal('0.0')))
                    else:
                        h = str(Decimal(self.map[i][j].h).quantize(Decimal('0.00')))       
                if self.map[i][j].k > 10000:
                    k = "inf "
                else:
                    if self.map[i][j].k > 10:
                        k = str(Decimal(self.map[i][j].k).quantize(Decimal('0.0')))
                    else:
                        k = str(Decimal(self.map[i][j].k).quantize(Decimal('0.00')))   
                if self.map[i][j].parent:
                    parent = "(" + str(self.map[i][j].parent.x) + "," + str(self.map[i][j].parent.y) + ")"
                else:
                    parent = "None "
                tmp_h += "h:" + h + "   |   "
                tmp_k += "k:" + k + "   |   "
                tmp_b += "b:" + parent + "  |   "
            print(tmp_h)
            print(tmp_k)
            print(tmp_b)
            print("--------------------------------------------------------------------------------------------")
 
    def get_neighbers(self, state):
        # 获取8邻域
        state_list = []
        for i in [-1, 0, 1]:
            for j in [-1, 0, 1]:
                if i == 0 and j == 0:
                    continue
                if state.x + i < 0 or state.x + i >= self.row:
                    continue
                if state.y + j < 0 or state.y + j >= self.col:
                    continue
                state_list.append(self.map[state.x + i][state.y + j])
        return state_list
 
    def set_obstacle(self, point_list):
        # 设置障碍物的位置
        for x, y in point_list:
            if x < 0 or x >= self.row or y < 0 or y >= self.col:
                continue
            self.map[x][y].set_state("#")

    def remove_obstacle(self, point_list):
        # 移除障碍物
        remove = set()
        for x, y in point_list:
            if x < 0 or x >= self.row or y < 0 or y >= self.col or self.map[x][y].state != "#":
                continue
            self.map[x][y].set_state(".")
            for s in self.get_neighbers(self.map[x][y]):
                if self.map[x][y].h > s.cost(self.map[x][y]) + s.h:
                    s.parent = self.map[x][y]
                    self.map[x][y].h = s.cost(self.map[x][y]) + s.h
            remove.add(self.map[x][y])
        return remove


    def clear_path(self):
        for i in range(self.row):
            for j in range(self.col):
                if self.map[i][j].state != "." and self.map[i][j].state != "#":
                    self.map[i][j].state = "."

 
class Dstar(object):
 
    def __init__(self, maps):
        self.map = maps
        self.open_list = set()  # 创建空集合
 
    def process_state(self):
        '''
        D*算法的主要过程
        :return:
        '''
        x = self.min_state()    # 获取open list列表中最小k的节点
        if x is None:
            return -1
        k_old = self.get_kmin() #获取open list列表中最小k节点的k值
        self.remove(x)  # 从openlist中移除
        # print("")
        # print('当前节点')
        # print(x.x,x.y)
        if k_old < x.h:
            for y in self.map.get_neighbers(x):
                if y.h <= k_old and x.h > y.h + x.cost(y):
                # if y.t == "close" and x.h > y.h + x.cost(y):
                    x.parent = y
                    x.h = y.h + x.cost(y)
                    # print('修改x父节点降低h值')
                    # print(y.x,y.y)

        if k_old == x.h:
            for y in self.map.get_neighbers(x):
                if (y.t == "new" or y.parent == x and y.h != x.h + x.cost(y) \
                        or y.parent != x and y.h > x.h + x.cost(y)) and y != end:
                    # if y.t == "new" and y != end:
                        # print('探索到新节点')
                        # print(y.x,y.y)
                    # if y.parent == x and y.h != x.h + x.cost(y) and y != end:
                        # print('传递节点x代价至y')
                        # print(y.x,y.y)
                    # if y.parent != x and y.h > x.h + x.cost(y) and y != end:
                        # print('修改y父节点降低h值')
                        # print(y.x,y.y)
                    y.parent = x
                    self.insert(y, x.h + x.cost(y))
        else:
            for y in self.map.get_neighbers(x):
                if y.t == "new" or y.parent == x and y.h != x.h + x.cost(y):
                    y.parent = x
                    # print('新建/更新y父节点为x')
                    # print(y.x,y.y)
                    self.insert(y, x.h + x.cost(y))
                else:
                    if y.parent != x and y.h > x.h + x.cost(y):
                        # print('插入x节点准备重新扩展')
                        # print(x.x,x.y)
                        self.insert(x, x.h)
                    else:
                        if y.parent != x and x.h > y.h + x.cost(y) \
                                and y.t == "close" and y.h > k_old:
                            # print('插入y节点准备重新扩展')
                            # print(y.x,y.y)
                            self.insert(y, y.h)
        # print("")
        return self.get_kmin()
 
    def min_state(self):
        if not self.open_list:
            return None
        # 获取openlist中k值最小对应的节点
        min_state = min(self.open_list, key=lambda x: x.k)
        return min_state
 
    def get_kmin(self):
        # 获取openlist表中最小的k
        if not self.open_list:
            return -1
        k_min = min([x.k for x in self.open_list])
        return k_min
 
    def insert(self, state, h_new):
        if state.t == "new":
            state.k = h_new
        elif state.t == "open":
            state.k = min(state.k, h_new)
        elif state.t == "close":
            state.k = min(state.h, h_new)
        state.h = h_new
        state.t = "open"
        self.open_list.add(state)
 
    def remove(self, state):
        if state.t == "open":
            state.t = "close"
        self.open_list.remove(state)
 
    def modify_cost(self, x):
        if x.t == "close":
            self.insert(x, x.parent.h + x.cost(x.parent))
 
    def run(self, start, end):

        self.open_list.add(end)
        while True:
            if self.process_state() == -1:
                break
            if start.t == "close":
                break
        if start.t == "close":
            start.set_state("S")
            tmp = start.parent
            while tmp != end:
                tmp.set_state("+")
                tmp = tmp.parent
            tmp.set_state("E")
            print('障碍物未发生变化时,搜索的路径如下:')
            self.map.print_path()
            # print('障碍物未发生变化时,代价地图如下:')
            # self.map.print_state()
        
        print("")
        self.map.clear_path()
        self.map.set_obstacle([(2, 3), (3, 3), [5, 3]]) # 障碍物发生变化 1
        remove = self.map.remove_obstacle([(3, 2), (2, 1), (1, 2)])
        # self.map.set_obstacle([(3, 3)]) # 障碍物发生变化 2
        # remove = self.map.remove_obstacle([])
        # self.map.set_obstacle([(3, 3), (2, 9)]) # 障碍物发生变化 3
        # remove = self.map.remove_obstacle([])
        for s in remove:
            self.insert(s, s.h)
        if start.t == "close":
            tmp = start
            while tmp != end and tmp != None:
                tmp.set_state("*")
                if tmp.parent.state == "#":
                    self.modify(tmp.parent)
                    print("探测到障碍物,重新规划")
                    if tmp.parent.state == "#":
                        break
                    continue
                tmp = tmp.parent
            start.set_state("S")
            tmp.set_state("E")
            if tmp.x == end.x and tmp.y == end.y:
                print('障碍物发生变化时,搜索的路径如下:')
            else:
                print('[error] 动态寻路失败')
            self.map.print_path()
            # print('障碍物发生变化时,代价地图如下:')
            # self.map.print_state()
 
    def modify(self, state):
        self.modify_cost(state)
        while True:
            k_min = self.process_state()
            if k_min == -1 or k_min>10000:
                break
            if k_min >= state.h:
                break
 
 
if __name__ == '__main__':
    m = Map(6, 15)
    m.set_obstacle([(0, 1), (0, 2), (1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (4, 3)])
    start = m.map[5][1]
    end = m.map[0][14]
    dstar = Dstar(m)
    dstar.run(start, end)

 

标签:tmp,map,cost,Python,self,state,算法,实例,print
来源: https://blog.csdn.net/weixin_42768750/article/details/119042083

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有