ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

c# – Rx:配对窗口持续时间与窗口内引发的事件数

2019-06-28 19:56:23  阅读:260  来源: 互联网

标签:c aggregate system-reactive


我想使用Rx来计算2个事件流的统计数据.

输入流

//    stream1    --A---B----A-B-----A-----B----A--B|
//    stream2    ----X---X-----------X--X---XX---X--X|

中级结果

窗口持续时间,其中窗口在A上打开并在B上关闭以及在这些窗口内引发的stream2事件的计数

//    result     ------1------0-----------2-------1|    <-- count of stream2 events in [A-B] window
//                     4      2           6       3     <-- paired with window [A-B] window duration

最后结果

通过stream2事件的计数和每个组的返回窗口持续时间统计对中间结果进行分组,例如平均,最小和最大窗口持续时间

//    output     -----------------------------------0    1     2|    <-- count of stream2 events in [A-B] window
//                                                  2   3.5    6     <-- average [A-B] window duration for that count of stream2 events.

Rx查询

public enum EventKind
{
    START,
    STOP,
    OTHER
};

public struct Event1
{
    public EventKind  Kind;
    public DateTime   OccurenceTime;
};

var merge = stream1.Merge(stream2.Select(x => new Event1
                                        {
                                            Kind = EventKind.OTHER,
                                            OccurenceTime = x
                                        }))
           .RemoveDisorder(x => x.OccurenceTime, new TimeSpan(0,0,10));

var shared = merge.Publish().RefCount();

// Windows open on START and close on STOP
var windows = shared.Window(
            shared.Where(x => x.Kind == EventKind.START),
            opening => shared.Where(x => x.Kind == EventKind.STOP));

// For each window we're interested in the duration of the window along with
// the count of OTHER events that were raised inside the window
//
var pairs = windows.Select(window => new 
        {
            Duration = window
                .Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
                .Buffer(2,1)                         // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
                .Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
                .Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency

            EventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window
        }
    );

我想简化可观察的类型

>来自IObservable< {IObservable< int>,IObservable< TimeSpan>}>
>到IObservable< {int,TimeSpan}>

这应该是可能的,因为每个窗口只有1个持续时间和1个OTHER事件计数.

此时,定义通过EventCount对窗口进行分组的输出查询并选择窗口持续时间的统计信息(例如每组的最小值,最大值,平均值)应该不会太困难.

var result = pairs
        .GroupBy(pair => pair.EventCount)
        .Select(g => new 
            {
                EventCount = g.Key,
                Min = g.Min(x => x.Duration),
                Avg = g.Average(x => x.Duration),
                Max = g.Max(x => x.Duration)
            });

RemoveDisorder是一个扩展方法,我用它来对OccurenceTime上合并的obersvable的结果进行排序.我需要它,因为我的输入流不是直播事件(如本例所示),而是通过Tx从日志中读取.并且2个排序流的合并输出本身不再排序.

解决方法:

使用Rx一段时间后,您可能遇到的常见情况是启动和停止事件.要正确处理它有几种方法,它将取决于您的要求.

如果您的问题只是使用数据投影检查@Brandon解决方案,关键是以不同的方式进行组合,例如使用SelectMany.如果你想保留Select运算符,则必须返回IObservable< T>输入投影.

无论如何,我认为你的作文总体上有问题,我将尝试在下面说明.

像你一样使用Window运算符,如果在开始流中发生多个连续事件,它将创建多个组.在您的代码中可能会出现问题,因为主事件流将在下一个事件发生时多次处理.

这个例子只是为了向您展示许多组的创建:

var subject = new Subject<Event1>();
var shared = subject.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
var values = shared.Where(a => a.Kind == EventKind.OTHER);

values.Window(start, a => stop).Subscribe(inner => 
 { 
    Console.WriteLine("New Group Started");
    inner.Subscribe(next => 
                    { 
                        Console.WriteLine("Next = "+ next.Kind + " | " + next.OccurenceTime.ToLongTimeString());
                    }, () => Console.WriteLine("Group Completed"));
 });

subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now });
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now.AddSeconds(1) });
subject.OnNext(new Event1 { Kind = EventKind.OTHER, OccurenceTime = DateTime.Now.AddSeconds(2) });
subject.OnNext(new Event1 { Kind = EventKind.STOP, OccurenceTime = DateTime.Now.AddSeconds(3) });

结果:

New Group Started
New Group Started
Next = OTHER | 4:55:46 PM
Next = OTHER | 4:55:46 PM
Group Completed
Group Completed

也许这种行为是可取的,否则将是必要的其他组成.为了“驯服”事件流,我看到了三种不同的方法:

>仅使用第一个启动事件开始计算,忽略其他启动而不会停止. (例如:Create observable and consume only between events).
>使用最新的启动事件计算流,在这种情况下,先前的流将被内部的组合忽略(可能使用Switch运算符).
>独立计算,考虑到每个开始事件都需要一个结束事件,允许在合成中创建许多组流(对我来说没有任何意义,除非你有一个匹配起始和结束事件的标识符).

要实现这些选项中的一个,通常,您有许多不同的方法来实现它.如果我理解你的问题,你正在寻找选项ONE.现在回答:

>保持窗口,代码太多:

IObservable<Event1> sx= GetEventStream();
var shared = sx.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
shared.Window(start, a => stop)
    .Select(sx => 
            sx.Publish(b =>
                        b.Take(1)
                        .Select(c => 
                        {
                            var final = b.LastOrDefaultAsync().Select(a => a.OccurenceTime);
                            var comp = b.Where(d => d.Kind == EventKind.OTHER).Count();
                            return final.Zip(comp, (d,e) => new { Count = e, Time = d - c.OccurenceTime });
                        })
                        .Switch()   // whatever operator here there's no difference
                    )               // because is just 1
            )
    .Concat()
    .Subscribe(next => 
    { 
        Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
    });

>使用GroupByUntil,一种“黑客”,但这是我的偏好:

IObservable<Event1> sx = GetEventStream();
var shared = sx.Publish().RefCount();       
var stop = shared.Where(a => a.Kind == EventKind.STOP).Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);       
start.GroupByUntil(a => Unit.Default, a => stop)
        .Select(newGroup => 
        { 
            var creation = newGroup.Take(1);
            var rightStream = shared.Where(a => a.Kind == EventKind.OTHER)
                                    .TakeUntil(newGroup.LastOrDefaultAsync())
                                    .Count();
            var finalStream = stop.Take(1);

            return creation.Zip(rightStream, finalStream, (a,b,c) => new { Count = b, Time = c.OccurenceTime - a.OccurenceTime });
        })
        .Concat()
        .Subscribe(next => 
        { 
            Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
        });

>不使用带有Take的组/窗口(1)在组合的最后添加Repeat运算符,但由于“重新订阅”(因为它是冷或热可观察的,将取决于它)可能会导致不希望的行为,和调度程序使用).
>创建一个声明自己的扩展方法的自定义实现,并不像看起来那么难,可能是最好的选择,但需要一段时间才能实现.

您的组合的另一个问题是无法获得统计数据,因为您无法在GroupBy运算符中完成每个新组.

我建议重新考虑你的方法,可能解决方案是将时间结合起来.有关统计数据和Rx的更多信息,请检查:
http://www.codeproject.com/Tips/853256/Real-time-statistics-with-Rx-Statistical-Demo-App

标签:c,aggregate,system-reactive
来源: https://codeday.me/bug/20190628/1318626.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有