RxJs captura el error y continúa

6 minutos de lectura

avatar de usuario
Cheborra

Tengo una lista de elementos para analizar, pero el análisis de uno de ellos puede fallar.

¿Cuál es el “Rx-Way” para detectar el error pero continuar ejecutando la secuencia?

Ejemplo de código:

var observable = Rx.Observable.from([0,1,2,3,4,5])
.map(
  function(value){
      if(value == 3){
        throw new Error("Value cannot be 3");
      }
    return value;
  });

observable.subscribe(
  function(value){
  console.log("onNext " + value);
  },
  function(error){
    console.log("Error: " + error.message);
  },
  function(){
    console.log("Completed!");
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

Lo que quiero hacer en un no-Rx-Way:

var items = [0,1,2,3,4,5];

for (var item in items){
  try{
    if(item == 3){
      throw new Error("Value cannot be 3");
    }
    console.log(item);
  }catch(error){
     console.log("Error: " + error.message);
  }
}

Gracias por adelantado

  • Ver Rx.Observable.onErrorResumeNext

    – Juan Mendes

    29 de julio de 2016 a las 2:40

  • Lo vi, pero no puedo hacer que funcione de esta manera… ¿podría escribir un ejemplo?

    – Cheborra

    29 de julio de 2016 a las 2:54

  • Ver Ben Lesh habla de esto especialmente a las 23:43.

    – LPL

    18 de enero de 2018 a las 1:10


avatar de usuario
pauladaniels

Te sugiero que uses flatMap (ahora mergeMap en rxjs versión 5) en su lugar, lo que le permitirá colapsar los errores si no le importan. Efectivamente, creará un Observable interno que puede tragarse si ocurre un error. La ventaja de este enfoque es que puede encadenar operadores y, si ocurre un error en cualquier parte de la canalización, se reenviará automáticamente al bloque catch.

const {from, iif, throwError, of, EMPTY} = rxjs;
const {map, flatMap, catchError} = rxjs.operators;

// A helper method to let us create arbitrary operators
const {pipe} = rxjs;

// Create an operator that will catch and squash errors
// This returns a function of the shape of Observable<T> => Observable<R>
const mapAndContinueOnError = pipe(
  //This will get skipped if upstream throws an error
  map(v => v * 2),
  catchError(err => {
    console.log("Caught Error, continuing")
    //Return an empty Observable which gets collapsed in the output
    return EMPTY;
  })
)

const observable = from([0, 1, 2, 3, 4, 5]).pipe(
  flatMap((value) => 
    iif(() => value != 3, 
      of(value), 
      throwError(new Error("Value cannot be 3"))
    ).pipe(mapAndContinueOnError)
  )
);

observable.subscribe(
  (value) => console.log("onNext " + value), (error) => console.log("Error: " + error.message), () => console.log("Completed!")
);
<script src="https://unpkg.com/[email protected]/dist/bundles/rxjs.umd.min.js"></script>

  • ¿Qué pasa si ocurre un error en algún lugar de una cadena después de la flatMap ?

    – nonybrighto

    17 de noviembre de 2017 a las 7:31

  • @nonybrighto Entonces eliminaría la transmisión y emitiría un error al suscriptor, a menos que hubiera otra captura en algún lugar de la transmisión.

    – paulpdaniels

    17 de noviembre de 2017 a las 7:35

  • Esto no parece funcionar en RxJS v6. Intenté migrar el código y el observable se completa después de detectar el error. ¿Le importaría actualizar su solución? He creado un stackblitz aquí como referencia. Código reproducible Stackblitz.

    – Manish Shrestha

    6 de noviembre de 2019 a las 9:24

  • @ManishShrestha ¡Ahí tienes!

    – paulpdaniels

    7 de noviembre de 2019 a las 2:42

  • Me encanta esta respuesta para – el desnudo pipe función, el ejemplo de trabajo rxJs y el const desestructuración de las importaciones. ¡Cada día es un día de escuela! – ah y para el iif!

    – El Ronnoco

    27 de noviembre de 2020 a las 9:29


avatar de usuario
me vuelvo

Debe cambiar a una nueva transmisión desechable y, si se produce un error, se eliminará de forma segura y mantendrá viva la transmisión original:

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

Más información sobre esta técnica en esta publicación de blog: La búsqueda de albóndigas: Continúe las transmisiones RxJS cuando ocurran errores

  • ¿Cómo se vería eso con los operadores lettable y pipe?

    – Roy Namir

    20 de febrero de 2018 a las 10:19

  • TENGA CUIDADO, si el observable interno tarda algún tiempo en emitir, es posible que nunca emita cuando se usa switchMap() ya que cancela el observable anterior. Para probarlo usted mismo, reemplace Rx.Observable.of(value) de esta respuesta con Rx.Observable.from(new Promise((resolve, reject) => setTimeout(() => resolve(i)))). El resultado solo será: “Éxito 5, Completo”. Reemplazo de la switchMap() con mergeMap() soluciona el problema y da los resultados esperados normalmente.

    – Montañero

    2 de diciembre de 2018 a las 15:28


avatar de usuario
Vedran

para mantener su endlessObservable$ de morir puedes poner tu failingObservable$ en un operador de mapeo de orden superior (por ejemplo switchMap, concatMap, exhaustMap…) y tragarse el error allí terminando la corriente interna con un empty() observable que no devuelve valores.

Usando RxJS 6:

endlessObservable$
    .pipe(
        switchMap(() => failingObservable$
            .pipe(
                catchError((err) => {
                    console.error(err);
                    return EMPTY;
                })
            )
        )
    );

  • Tratando de mantenerlo lo más simple posible para las personas que buscan una solución funcional.

    – Vedrano

    1 de noviembre de 2019 a las 14:56


  • un ejemplo más ilustrativo de este “doble envoltorio” en learnrxjs.io/operators/error_handling/catch.html (ejemplo nº 3) o stackblitz.com/edit/rxjs-catcherror-withmapoperators

    – Balaje

    8 de enero de 2020 a las 15:07


  • ¿Esto no cancelará el FailingObservable$ inacabado cuando se emita infiniteObservable$?

    – isevcik

    25 de febrero de 2020 a las 12:51

  • @isevcik sí lo hará, es solo un ejemplo; puede usar cualquiera de los otros operadores de mapeo de orden superior según el comportamiento que necesite.

    – Vedrano

    26 de febrero de 2020 a las 10:15

  • emtpy() ahora está en desuso, EMPTY debe usarse en su lugar.

    – JSON Derulo

    14 de junio de 2021 a las 10:19

avatar de usuario
knona

En caso de que no quiera o no pueda acceder al observable interno que causa el error, puede hacer algo como esto:

Usando RxJS 7:

const numbers$ = new Subject();

numbers$
  .pipe(
    tap(value => {
      if (value === 3) {
        throw new Error('Value cannot be 3');
      }
    }),
    tap(value => {
      console.log('Value:', value);
    }),
    catchError((err, caught) => {
      console.log('Error:', err.message);
      return caught;
    })
  )
  .subscribe();

for (let n = 1; n <= 10; n++) {
  numbers$.next(n);
}

Lo interesante aquí es el argumento “atrapado” en el operador catchError que se puede devolver.
https://rxjs.dev/api/operators/catchError

Solo funciona cuando la fuente observable es Caliente.

En mi caso, uso redux-observable y quería una forma de manejar mis errores en un solo lugar.

Se me ocurrió esto:

const allEpics = combineEpics(
    // all my epics
);

export const rootEpic = (action$, state$, dependencies) => {
  return allEpics(action$, state$, dependencies).pipe(
    catchError((err, caught) => {
        if (err instanceof MyError) {
        return concat(of(displayAlert(err.message)), caught);
      }
      throw err;
    })
  );
};

Si alguna de mis épicas arroja una excepción “MyError”, se detectará y se enviará otra acción.

En realidad puedes usar trata de atraparlo dentro de tu mapa función para manejar el error. Aquí está el fragmento de código

var source = Rx.Observable.from([0, 1, 2, 3, 4, 5])
    .map(
        function(value) {
            try {
                if (value === 3) {
                    throw new Error("Value cannot be 3");
                }
                return value;

            } catch (error) {
                console.log('I caught an error');
                return undefined;
            }
        })
    .filter(function(x) {
        return x !== undefined; });


source.subscribe(
    function(value) {
        console.log("onNext " + value);
    },
    function(error) {
        console.log("Error: " + error.message);
    },
    function() {
        console.log("Completed!");
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

¿Ha sido útil esta solución?

Esta web utiliza cookies propias y de terceros para su correcto funcionamiento y para fines analíticos y para mostrarte publicidad relacionada con sus preferencias en base a un perfil elaborado a partir de tus hábitos de navegación. Al hacer clic en el botón Aceptar, acepta el uso de estas tecnologías y el procesamiento de tus datos para estos propósitos. Configurar y más información
Privacidad