Como hemos visto en las entradas anteriores de esta serie dedicada a RxJS, los observables son streams de datos a los que podemos suscribir observers para leerlos directamente o después de haberlos modificado mediante algún operador. Mediante este sistema podemos suscribir n observers al mismo observable, al igual que podemos llamar n veces a una función desde cualquier parte del código. Por ejemplo, dado este observable:
let myObservable = Rx.Observable.create( (observer)=> {
let count = 0;
let timestamp = Date.now();
observer.next(count++ + ' ::timestamp:: ' + timestamp);
observer.next(count++ + ' ::timestamp:: ' + timestamp);
});
Podemos llamarlo dos veces y cada una de ellas se ejecutará desde el principio como si fuera la primera vez.
myObservable.subscribe( msg=> console.log(`Primer observer ${msg}`) );
myObservable.subscribe( msg=> console.log(`Segundo observer ${msg}`) );
/*
Llamamos la primera vez:
Primer observer 0 ::timestamp:: 1514739846101
Primer observer 1 ::timestamp:: 1514739846101
Llamamos la segunda vez:
Segundo observer 0 ::timestamp:: 1514739846102
Segundo observer 1 ::timestamp:: 1514739846102
*/
En cierta manera, podríamos decir que es una relación 1 a 1, ya que cada vez que se lanza la suscripción a un observable, es como si se creara por primera vez. Sin embargo, a veces nos puede interesar una relación de 1 a muchos, de tal manera que lo que sucede en el mismo observable se comparta de la misma manera por los n observers que tiene suscritos. Y es aquí donde entran en juego los subject, que nos permiten pasar de un sistema unidifusión (unicast) a uno multidifusión (multicast).
La sintaxis básica es muy sencilla. Se declara el subject, se suscriben observers al subject y este se suscribe al observable original, pues los subjects son al mismo tiempo observables y observers. En código queda más claro:
let myObservable = Rx.Observable.create( (observer)=> {
let count = 0;
let timestamp = Date.now();
observer.next(count++ + ' ::timestamp:: ' + timestamp);
observer.next(count++ + ' ::timestamp:: ' + timestamp);
});
let mySubject = new Rx.Subject();
mySubject.subscribe( msg=> console.log(`Primer observer ${msg}`) );
mySubject.subscribe( msg=> console.log(`Segundo observer ${msg}`) );
myObservable.subscribe(mySubject);
/* Ahora se comparte el observable
Primer observer 0 ::timestamp:: 1514739943344
Segundo observer 0 ::timestamp:: 1514739943344
Primer observer 1 ::timestamp:: 1514739943344
Segundo observer 1 ::timestamp:: 1514739943344
*/
Como explica Netanel Basal para entender qué es un subject, en suma, podemos pensar que es un observable convertido en una clase que comparte sus propiedades.
class MySubject extends Rx.Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}