How to make your own operator RxJS 😮

João Henrique de Oliveira Júnior
3 min readDec 14, 2020

--

Photo by fran innocenti on Unsplash

If you know RxJS and use it in your codes, you probably know how the operators are powerful and useful, don’t you? You can check the full list of operators in the official page of RxJS, there they organized a documentation separated by sections very simple to understand and learn.

However, you may want to make your own operator for an specific feature that your project requires. If that’s your case, so keep reading this practical little tutorial.

Making you own operator

Attention to a warning! This tutorial was created using Angular 10.2.0 (:

The first thing we need to understand is that operators are basically a function. Really simple. The big difference is what you receive and should return on that function.

Let’s see the example below:

const numbers = timer(3000, 1000);
const subscription = numbers.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 10000);
/* Result console.log: 0, 1, 2, 3, 4, 5, 6 */

Take it easy, I know you’ll never use something like this but I’m terrible with examples. The timer observable above emits after 3 seconds, a number every second. I put a timeout function too, only to stop this example after 10 seconds.

If you wanted to do an operation with these numbers emitted by the Observable, you could use a pipe and a map for example, right? I will double each value now:

const numbers = timer(3000, 1000);
const subscription = numbers
.pipe(
map(num: number) => num * 2)
)
.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 10000);
/* Result console.log: 0, 2, 4, 6, 8, 10, 12*/

Pretty cool, huh? I totally encourage you to use the operators from RxJS, if you don’t really find anything that could help you (what is hard), so it’s time to implement you own pipe, like this:

function myDoubleOperator<T>(source: Observable<number>) {
return new Observable(subscriber => {
source.subscribe(
(num) => { subscriber.next(num * 2); },
error => { subscriber.error(error); },
() => { subscriber.complete(); }
);
});
}
const numbers = timer(3000, 1000);
const subscription = numbers
.pipe(myDoubleOperator)
.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 10000);
/* Result console.log: 0, 2, 4, 6, 8, 10, 12 */

Checking the code above, we made a function that return and new Observable and that’s how it works:

  • If the previous observable throws an error it will throw this error too.
  • If the previous observable completes, it will complete too.
  • If the previous observable emits a number, then this number will be doubled before the subscription receives it.

— Is it the same thing that a simple map did before, isn’t?

— Yes! But you coded that solution yourself!

It is also possible to add parameters for your operator, thanks to Javascript features, example:

function multiplyOperator(qtdToMultiply: number) {
return function add<T>(source: Observable<number>):
Observable<number> {
return new Observable(subscriber => {
source.subscribe(
(dado) => { subscriber.next(dado * qtdToMultiply); },
error => { subscriber.error(error); },
() => { subscriber.complete(); }
);
});
}
}
const numbers = timer(3000, 1000);
const subscription = numbers
.pipe(multiplyOperator(4))
.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 10000);
/* Result console.log: 0, 4, 8, 12, 16, 20 */

Conclusion

Well padawan, now the limits are in your imagination. RxJS is rich and you should learn more about it. The example was made using Angular, Typescript e RxJs, but you can use RxJS without Angular too.

I hope you’ve learned something with this post today. See you!

--

--

João Henrique de Oliveira Júnior

Analista de sistemas na TOTVS. Bacharel e técnico em sistemas de informação. Ama a Deus, sua família, música e animais.