RxJS is a powerful library for reactive programming, and one of its key features being the ability to create custom operators. In this guide, we’ll look at some handy custom operators and show how to implement them.
1. withLoading
Create a custom switchMap
that seamlessly manages loading states for each stream.
import { Observable, of } from 'rxjs' ;
import { map, catchError, startWith, finalize } from 'rxjs/operators' ;
interface WithLoadingResult < T > {
export function withLoading < T >() {
return ( source : Observable < T >) =>
startWith ({ loading: true }),
map (( data ) => ({ loading: false , data })),
catchError (( error ) => of ({ loading: false , error })),
someObservable$. pipe ( withLoading ()). subscribe (({ loading , data , error }) => {
console. log ( 'Loading...' );
console. error ( 'Error:' , error);
console. log ( 'Data:' , data);
2. debounceAndDistinct
Effectively manage user input, minimizing unnecessary API requests. We can achieve this by combining debounce and distinctUntilChanged
import { Observable } from 'rxjs' ;
import { debounceTime, distinctUntilChanged } from 'rxjs/operators' ;
export function debounceAndDistinct < T >( time : number = 300 ) {
return ( source : Observable < T >) =>
searchInput$. pipe ( debounceAndDistinct ()). subscribe (( value ) => {
// Perform search with debounced and distinct value
3. retryWithBackoff
Apply a backoff strategy for retrying failed requests more intelligently.
import { Observable, throwError, timer } from 'rxjs' ;
import { mergeMap, retryWhen } from 'rxjs/operators' ;
export function retryWithBackoff (
backoffTime : number = 1000
return ( source : Observable < any >) =>
mergeMap (( error , index ) => {
const retryAttempt = index + 1 ;
if (retryAttempt > maxRetries) {
return throwError (error);
console. log ( `Retry attempt ${ retryAttempt }: retrying in ${ backoffTime }ms` );
return timer (backoffTime * retryAttempt);
apiCall$. pipe ( retryWithBackoff ()). subscribe (
( data ) => console. log ( 'Success:' , data),
( error ) => console. error ( 'Error:' , error)
4. cachingOperator
Cache API responses to reduce server load and improve efficiency.
import { Observable, of } from 'rxjs' ;
import { tap, shareReplay } from 'rxjs/operators' ;
export function cachingOperator < T >( cacheTime : number = 60000 ) {
return ( source : Observable < T >) =>
new Observable < T >(( observer ) => {
if (cachedData && Date. now () - cachedTime < cacheTime) {
observer. next (cachedData);
apiCall$. pipe ( cachingOperator ()). subscribe (( data ) => console. log ( 'Data:' , data));
5. progressiveLoading
Load data incrementally, emitting partial results to enhance perceived performance.
import { Observable } from 'rxjs' ;
import { expand, take, map } from 'rxjs/operators' ;
export function progressiveLoading < T >(
return ( source : Observable < T []>) =>
map (( newItems ) => [ ... items, ... newItems. slice ( 0 , pageSize)])
take (Math. ceil (maxItems / pageSize))
apiCall$. pipe ( progressiveLoading ()). subscribe (( partialData ) => {
console. log ( 'Partial data:' , partialData);
6. errorHandlingOperator
Centralize error management for consistent and streamlined error handling.
import { Observable, throwError } from 'rxjs' ;
import { catchError } from 'rxjs/operators' ;
export function errorHandlingOperator < T >() {
return ( source : Observable < T >) =>
console. error ( 'An error occurred:' , error);
// You can add custom error handling logic here
return throwError ( 'Something went wrong. Please try again later.' );
apiCall$. pipe ( errorHandlingOperator ()). subscribe (
( data ) => console. log ( 'Data:' , data),
( error ) => console. error ( 'Handled error:' , error)
7. optimisticUpdate
Immediately update the UI before API confirmation, with a rollback in case of failure.
import { Observable, of } from 'rxjs' ;
import { map, catchError, switchMap } from 'rxjs/operators' ;
export function optimisticUpdate < T , R >(
updateFn : ( data : T ) => T ,
apiCall : ( data : T ) => Observable < R >
return ( source : Observable < T >) =>
map (( data ) => ({ optimistic: updateFn (data), original: data })),
switchMap (({ optimistic , original }) =>
apiCall (optimistic). pipe (
console. warn ( 'API call failed. Reverting to original data.' );
const updateTodo = ( todo : Todo ) : Todo => ({ ... todo, completed: true });
const apiUpdateTodo = ( todo : Todo ) : Observable < Todo > => // API call implementation
todoStream$. pipe ( optimisticUpdate (updateTodo, apiUpdateTodo))
. subscribe (( updatedTodo ) => console. log ( 'Updated todo:' , updatedTodo));
8. throttleAndBuffer
Use throttle combined with buffer to batch updates and manage data streams effectively.
import { Observable } from 'rxjs' ;
import { buffer, throttleTime } from 'rxjs/operators' ;
export function throttleAndBuffer < T >( time : number = 1000 ) {
return ( source : Observable < T >) =>
buffer (source. pipe ( throttleTime (time))),
filter (( batch ) => batch. length > 0 )
dataStream$. pipe ( throttleAndBuffer ()). subscribe (( batch ) => {
console. log ( 'Batched updates:' , batch);
9. conditionalMerge
Merge multiple observables based on dynamic conditions for flexible data combination.
import { Observable, merge } from 'rxjs' ;
import { filter } from 'rxjs/operators' ;
export function conditionalMerge < T >(
... sources : Array <[ Observable < T >, ( value : T ) => boolean ]>
... sources. map (([ source , condition ]) =>
source. pipe ( filter (condition))
const source1$ = of ( 1 , 2 , 3 , 4 );
const source2$ = of ( 'a' , 'b' , 'c' , 'd' );
[source1$, ( value ) => value % 2 === 0 ],
[source2$, ( value ) => [ 'a' , 'c' ]. includes (value)]
). subscribe (( value ) => console. log ( 'Merged value:' , value));
10. smartPolling
Implement adaptive polling intervals that respond to data changes or user activity.
import { Observable, timer } from 'rxjs' ;
import { switchMap, tap, distinctUntilChanged } from 'rxjs/operators' ;
export function smartPolling < T >(
pollFn : () => Observable < T >,
baseInterval : number = 5000 ,
maxInterval : number = 60000
let currentInterval = baseInterval;
return new Observable < T >(( observer ) => {
const subscription = timer ( 0 , baseInterval)
switchMap (() => pollFn ()),
if ( JSON . stringify (value) !== JSON . stringify (lastValue)) {
currentInterval = baseInterval;
currentInterval = Math. min (currentInterval * 2 , maxInterval);
return () => subscription. unsubscribe ();
const pollApi = () : Observable < Data > => // API call implementation
smartPolling (pollApi). subscribe (( data ) => console. log ( 'Polled data:' , data));
11. filterOnlyPresent
Filters out null or undefined values from the stream, ensuring only valid data is emitted.
import { Observable } from 'rxjs' ;
import { filter } from 'rxjs/operators' ;
export function filterOnlyPresent < T >() {
return ( source : Observable < T >) =>
filter (( value ) : value is NonNullable < T > => value !== null && value !== undefined )
someStream$. pipe ( filterOnlyPresent ()). subscribe (( value ) => {
console. log ( 'Non-null value:' , value);
12. filterOnlyPropertyPresent
Emits only when a specified property in the returned object is neither null nor undefined.
import { Observable } from 'rxjs' ;
import { filter } from 'rxjs/operators' ;
export function filterOnlyPropertyPresent < T >( prop : keyof T ) {
return ( source : Observable < T >) =>
filter (( value ) => value[prop] !== null && value[prop] !== undefined )
userStream$. pipe ( filterOnlyPropertyPresent ( 'name' )). subscribe (( user ) => {
console. log ( 'User with name:' , user);
These custom operators can greatly improve your RxJS workflows, making your code more efficient and cleaner. Be sure to thoroughly test these operators in your specific scenarios and adjust them as needed.