I am no expert in RxJS as I am still learning my way around the new paradigm of Observable/Subject.
I was trying to implement a broadcast service (based on NgRadio) that allows a new subscriber to get the nth last value of events he was listening to and the next emitted values after subscription.
As I stumbled upon issues with ReplaySubject to ask for the last n values... I ended up implementing it as follows. I feel that the code is not that crisp and clean so I would appreciate any improvement from more experienced RxJS developers.
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Observable } from 'rxjs/Observable';
import { ReplaySubject } from 'rxjs/ReplaySubject';
import { Subject } from 'rxjs/Subject';
import * as _ from 'lodash';
export interface RadioEvent {
key: string;
data?: any;
}
export class RadioService {
private separator = ':';
private _eventBus = new Subject<RadioEvent>();
private _eventCacheBus = new ReplaySubject<RadioEvent>();
keyMatch(key, wildcard) {
var w = '*';
var ww = '**';
var partMatch = function (wl, k) {
var match = (wl === w) || (wl === k);
return match;
};
var sep = this.separator;
var kArr = key.split(sep);
var wArr = wildcard.split(sep);
var kLen = kArr.length;
var wLen = wArr.length;
var max = Math.max(kLen, wLen);
for (var i = 0; i < max; i++) {
var cK = kArr[i];
var cW = wArr[i];
// '**' match all gragments
if (cW == ww && (typeof cK !== 'undefined')) {
return true;
}
// test if fragments match
if (!partMatch(cW, cK)) {
return false;
}
}
return true;
};
cast<T>(key: string, data?: any) {
if (typeof key !== 'string' || !key.length) {
throw 'Bad key. Please provide a string';
}
this._eventBus.next({ key: key, data: data });
this._eventCacheBus.next({ key: key, data: data });
};
on<T>(key: string, count?: number) {
var _this = this;
var normalobs = this._eventBus
.filter(function (event: RadioEvent) {
return _this.keyMatch(event.key, key);
}).map(function (event) {
return event.data;
});
if (_.isNil(count)) {
return normalobs;
} else {
let obs = this._eventCacheBus
.filter(function (event: RadioEvent) {
return _this.keyMatch(event.key, key);
}).map(function (event) {
return event.data;
});
let subject = new ReplaySubject<T>(count);
obs.subscribe(value => {
subject.next(value);
})
return Observable.merge(normalobs, subject).distinct();
}
}
}
2 Answers 2
First of all, I recommend you to ensure your code is correct (see my comment on your question).
Here are a few suggestions about improving your code.
- Please do not use non-informative names like
w, ww, sep, kArg, cW, CK
, and even the "well-known"i, j, k, m, n
. They are absolutely horrible. They make decent code be an unreadable puzzle. They hide the intent and make me think where I should not be. - Please always make sure you use Type when a) the compiler loses (or is unable to infer) the type information; b) you can narrow down the type. That includes the function return type. Typing is not panacea but it helps prevent an entire class of bugs.
- Do not use generic methods
smth<T>(...)
unless you use the typeT
information anywhere in the function. - Minimize the distance between variable declaration, assignment, and first usage. Better declare and assign in the same place. Ideally, find a way to not even declare a variable.
- In error messages be more specific about what went wrong.
const star = '*';
const twoStars = '**';
export class RadioService {
private separator = ':';
private _eventBus = new Subject<RadioEvent>();
private _eventCacheBus = new ReplaySubject<RadioEvent>();
keyMatch(keyList: string, wildcardList: string): boolean {
const isMatch = (wildcard, key) => (wildcard === star) || (wildcard === key);
const allKeys = keyList.split(this.separator);
const allWildcards = wildcardList.split(this.separator);
const keyWildcardPairs = allKeys
.reduce((accum, current, index) => {
accum.push([current, allWildcards[index]]);
return accum;
}, <string[][]>[]);
return keyWildcardPairs.some(pair => {
const key = pair[0];
const wildcard = pair[1];
// '**' match all gragments
if (wildcard == twoStars && (typeof key !== 'undefined')) {
return true;
}
// test if fragments match
if (!isMatch(wildcard, key)) {
return true;
}
return false;
});
};
cast(key: string, data?: any): void {
if (typeof key !== 'string' || key.length <= 0) {
throw `Bad key '${JSON.stringify(key)}'. Please provide a non-empty string.`;
}
this._eventBus.next({ key: key, data: data });
this._eventCacheBus.next({ key: key, data: data });
};
on(key: string, count?: number): Observable<any> {
const _this = this;
const normalObservable = this._eventBus
.filter(event => _this.keyMatch(event.key, key))
.map(event => event.data);
const withoutReplay = _.isNil(count);
if (withoutReplay)
return normalObservable;
const subject = new ReplaySubject<number>(count);
this._eventCacheBus
.filter(event => _this.keyMatch(event.key, key))
.map(event => event.data)
.subscribe(value => subject.next(value));
return Observable
.merge(normalObservable, subject)
.distinct();
}
}
Thanks for your reply. To be honest part of the code related the key matching algorithm was copied so I did not take the time to clean it up. This is code in case someone wants to use it with correction to your review keymatch implementation (using .every()
):
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, ReplaySubject, Subject } from 'rxjs/Rx';
import * as _ from 'lodash';
const star = '*';
const twoStars = '**';
export interface RadioEvent {
key: string;
data?: any;
}
export class RadioService {
private separator = ':';
private _eventBus = new Subject<RadioEvent>();
private _eventCacheBus = new ReplaySubject<RadioEvent>();
public on(key: string, count?: number): Observable<any> {
const _this = this;
const normalObservable = this._eventBus
.filter((event: RadioEvent) => _this.keyMatch(event.key, key))
.map((event: RadioEvent) => event.data);
const withoutReplay = _.isNil(count);
if (withoutReplay) {
return normalObservable;
}
const subject = new ReplaySubject<number>(count);
this._eventCacheBus
.filter((event: RadioEvent) => _this.keyMatch(event.key, key))
.map((event: RadioEvent) => event.data)
.subscribe((value) => subject.next(value));
return Observable
.merge(normalObservable, subject)
.distinct();
}
public cast(_key: string, _data?: any): void {
if (typeof _key !== 'string' || _key.length <= 0) {
throw new Error(`Bad key '${JSON.stringify(_key)}'. Please provide a non-empty string.`);
}
this._eventBus.next({ key: _key, data: _data });
this._eventCacheBus.next({ key: _key, data: _data });
}
private keyMatch(eventKey: string, listenerKey: string): boolean {
const isMatch = (wildcard, key) => (wildcard === star) || (wildcard === key);
const eventKeyArray = eventKey.split(this.separator);
const listenerKeyArray = listenerKey.split(this.separator);
const keyWildcardPairs = eventKeyArray
.reduce((accum, current, index) => {
accum.push([current, listenerKeyArray[index]]);
return accum;
}, [] as string[][]);
let isWildcard = false;
return keyWildcardPairs.every((pair) => {
const key = pair[0];
const wildcard = pair[1];
// '**' match all fragments
if (wildcard === twoStars && (typeof key !== 'undefined')) {
isWildcard = true;
return true;
}
// test if fragments match
if (isMatch(wildcard, key) || isWildcard) {
return true;
}
return false;
});
}
}
Explore related questions
See similar questions with these tags.
if (!partMatch(cW, cK)) { return false; }
should probably beif (!partMatch(cW, cK)) { continue; }
or evenif (!partMatch(cW, cK)) { return true; }
instead, while the last instruction inkeyMatch()
should bereturn false
. \$\endgroup\$