あるシーケンスがあったとして、最初に流れてきたものからn秒間はすべて要素を捨てて、n秒以上たった時の最初の要素は後ろにながす。というようなことをRxでできないの?とかずきさんに聞いてみたんだけどどうやら作らないとだめだねー。と言われてしまったので作ってみました。
やりたいことのイメージは上図のとおり。青区間は通して、赤区間にきた要素は流しません。
public static class Hoge { public static IObservable<T> Filter<T>(this IObservable<T> seq, TimeSpan t) { return Observable.Create<T>(a => { var ts = DateTimeOffset.MinValue; return seq.Timestamp().Subscribe(value => { if (!((value.Timestamp - ts) >= t)) return; ts = value.Timestamp; a.OnNext(value.Value); }, seq.OnError, seq.OnCompleted); }); } }
てきとうな拡張メソッドを作って、Observable.Create を使って新しいシーケンスを生成して、変換をかけました。