ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

golang 超时队列实现与使用

2018-09-28 15:30:21  阅读:467  来源: 互联网

标签:


超时队列

目前业务上需要一个速度快,数据超时删除的内存队列,实现和使用如下:

package  main
import (
	queue "github.com/fwhezfwhez/go-queue"
	"fmt"
)
func main() {
    //初始化,init
    q:= queue.NewEmpty()
    //压入,push
    q.Push(5)
    q.Push(4)
    //打印,print
    q.Print()
    //出列,pop
    fmt.Println(q.Pop())
    //打印,print
    q.Print()
    //长度,len
    fmt.Println(q.Length())
    //并发安全压入,currently safe push
    q.SafePush(6)
    //并发安全出列,currently safe pop
    fmt.Print(q.SafePop())
    q.Print()

    // time queue
    tq := queue.TimeQueueWithTimeStep(10*time.Second, 50, 1*time.Nanosecond)
    tq.StartTimeSpying()
    tq.TPush(5)
    tq.SafeTPush(6)

    fmt.Println("init:")
    tq.Print()

    time.Sleep(5 * time.Second)
    fmt.Println("after 5s:")
    tq.Print()

    time.Sleep(9 * time.Second)
    fmt.Println("after 14s")
    tq.Print()
}

协程超时管理

// start to spy on queue's time-out data and throw it
func (q *Queue) StartTimeSpying() {
	fmt.Println("time supervisor starts")
	go q.startTimeSpying()
}

// detail of StartTimeSpying function
func (q *Queue) startTimeSpying() error {
	var err = make(chan string, 0)
	go func(queue *Queue, er chan string) {
		fmt.Println("start time spying, data in the queue can stay for " + q.ExpireAfter.String())
		for {
			if queue.timeSpy == false {
				err <- "spying routine stops because: queue's timeSpy is false, make sure the queue is definition by q=TimeQueue(time.Duration,int)"
				return
			}
			select {
			case <-queue.flag:
				fmt.Println("time spy executing stops")
				return
			default:
				fmt.Print()
			}
			ok,er:=queue.timingRemove()
			if er!=nil{
				err <- er.(errorx.Error).StackTrace()
			}
			if ok {
				time.Sleep(queue.timeStep)
			}
		}
	}(q, err)
	select {
	case msg := <-err:
		fmt.Println("time spy supervisor accidentally stops because: ",msg)
		return errorx.NewFromString(msg)
	case <-q.flag:
		fmt.Println("time spy supervisor stops")
		return nil
	}
}


// remove those time-out data
func (q *Queue) timingRemove() (bool,error) {
	if len(q.Data) <1 {
		return true,nil
	}
	head, index, er := q.THead()
	if er != nil {
		return false, errorx.Wrap(er)
	}
	if index < 0 {
		return false, errorx.NewFromString("queue'length goes 0")
	}
	now := time.Now().Unix()
	created := time.Unix(head.CreatedAt, 0)
	//fmt.Println("now:",now)
	//fmt.Println("expire:",created.Add(q.ExpireAfter).Unix())
	if created.Add(q.ExpireAfter).Unix() < now {
		// out of time
		_,_,e := q.TPop()
		if e!=nil {
			return false, errorx.Wrap(e)
		}
		if len(q.Data) >0 {
			return q.timingRemove()
		}else{
			return true,nil
		}
	} else{
		return true ,nil
	}
}

标签:
来源: https://blog.csdn.net/fwhezfwhez/article/details/82884692

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有