Interstellar Code

RXJS - Extensiones Reactivas en JavaScript y TypeScript

RXJS es una de las bibliotecas más poderosas para manejar programación reactiva en JavaScript y TypeScript. En este post, exploramos sus conceptos clave como Observables, Subjects y operadores, y cómo pueden transformar la forma en como se manejan los datos asíncronos y eventos en las aplicaciones web.

20 de agosto de 2025

Tabla de contenido:

  1. 1. RXJS: Extensiones Reactivas en JavaScript y TypeScript
  2. 2. Observables
  3. 3. Operadores
  4. 4. Operadores de tiempo
  5. 5. Peticiones Ajax
  6. 6. Operadores de transformación
  7. 7. Operadores de combinación

RXJS: Extensiones Reactivas en JavaScript y TypeScript

RXJS (Reactive Extensions for JavaScript) es una biblioteca para componer programas asíncronos y basados en eventos utilizando secuencias observables. Es ampliamente utilizada en aplicaciones web, especialmente con frameworks como Angular, pero también puede ser utilizada con React, Vue y otros.

Las extensiones reactivas nos permiten manejar flujos de datos y eventos de manera eficiente, facilitando la gestión de operaciones asíncronas como llamadas HTTP, eventos del usuario y más. Empleando lo que se conoce como programación reactiva.


Observables

Antes de profundizar en RXJS, es importante entender el concepto de Observables, Observers y Subscriptions.

A continuación se muestra un ejemplo básico de cómo crear y suscribirse a un Observable:

import { Observable, Observer } from 'rxjs';

// Definimos un observer
const observer: Observer<string> = {
  next: (value) => console.log('Next [obs]: ', value),
  error: (error) => console.warn('Error [obs]: ', error),
  complete: () => console.info('Se terminó el observable [obs]'),
}

// Creamos un observable que emite valores de tipo string
const observable$ = new Observable<string>((subscriber) => {
  // Emitimos valores al observable que van a recibir las suscripciones
  subscriber.next('Hola');
  subscriber.next('Mundo');

  // Forzar un error
  // const a: any = undefined;
  // a.name = 'Cesar';

  // Completamos el observable
  subscriber.complete();

  // No se emite nada después de completar
  subscriber.error('Algo salió mal');
  subscriber.next('Ya no se emite');
});

// Nos suscribimos al observable, recibiendo los valores emitidos
observable$.subscribe((value) => {
  console.log(value);
});

// Podemos manejar los tres casos: next, error y complete [!Importante: Esta deprecado]
observable$.subscribe(
  (value) => {
    console.log('Next: ', value);
  },
  (error) => {
    console.warn('Error: ', error);
  },
  () => {
    console.info('Se terminó el observable');
  }
);

// Usando un observer
const subscription = observable$.subscribe(observer);

// Cancelar la suscripción (importante para evitar memory leaks)
subscription.unsubscribe();

Subjects

Un Subject es un tipo especial de Observable que permite multicasting, es decir, permite que múltiples observadores se suscriban a él y reciban los mismos valores emitidos. A diferencia de un Observable estándar, que emite valores de forma independiente para cada suscriptor, un Subject comparte la misma fuente de datos entre todos sus suscriptores.

Cuando la data es emitida por el observable en si, es considerado como un Cold Observable, ya que cada suscriptor recibe su propia ejecución y valores. En cambio, cuando se utiliza un Subject, es considerado un Hot Observable, porque todos los suscriptores comparten la misma ejecución y reciben los mismos valores emitidos.

import { Subject, Observer, Observable } from 'rxjs';

const observer: Observer<number> = {
  next: (value) => console.log('Next: ', value),
  error: (error) => console.warn('Error: ', error),
  complete: () => console.info('Se completo el observable')
}

const interval$ = new Observable<number>((subscriber) => {
  // Emitimos un valor cada segundo
  const intervalId = setInterval(() => {
    subscriber.next(Math.random());
  }, 1000);

  // Limpiamos el intervalo cuando se complete o se cancele la suscripción
  return () => {
    clearInterval(intervalId);
    console.log('Intervalo destruido');
  }
});

// Creamos un Subject
const subject$ = new Subject<number>();
// Nos suscribimos al observable y enviamos los valores al Subject
const intervalSubscription = interval$.subscribe(subject$);

const subscription1 = subject$.subscribe(observer);
const subscription2 = subject$.subscribe(observer);

setTimeout(() => {
  subject$.next(10);
  subject$.complete();

  // Nos desuscribimos del observable original
  intervalSubscription.unsubscribe();
}, 5500);

of

El método of es un método de creación de observables que emite los valores que se le pasan como argumentos. Es decir que crea un observable que emite esos valores de manera sincrónica.

import { of } from 'rxjs';

// const observable$ = of<number>(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// const observable$ = of<Array<number>>([1, 2, 3, 4, 5]);
// const observable$ = of<number>(...[1, 2, 3, 4, 5], 6, 7, 8, 9, 10);
const observable$ = of<any>(
  [1, 2],
  { a: 1, b: 2 },
  function () { },
  true,
  Promise.resolve(true)
);

observable$.subscribe({
  next: (value) => console.log('Next: ', value),
  error: (error) => console.warn('Error: ', error),
  complete: () => console.info('Se completo el observable')
});

fromEvent

El método fromEvent es un método de creación que crea un observable a partir de eventos del DOM. Una vez que se suscribe al observable, el observable comenzará a emitir eventos cada vez que ocurra el evento especificado especificado y los observadores recibirán esos eventos.

import { fromEvent } from 'rxjs';

const event1$ = fromEvent<PointerEvent>(document, 'click');
const event2$ = fromEvent<KeyboardEvent>(document, 'keyup');

event1$.subscribe((event) => {
  console.log('Coordenadas: ', event.x, event.y);
});

event2$.subscribe((event) => {
  console.log('Tecla: ', event.key);
});

range

EL método range es un método de creación que crea un observable que emite una secuencia de números enteros en un rango especificado. El primer argumento es el valor inicial y el segundo es la cantidad de números a emitir. Si se especifica un tercer argumento, se utilizará como programador para la emisión de los valores, lo que permite que la emisión sea asíncrona.

import { asyncScheduler, range } from 'rxjs';

const src1$ = range(1, 10);
const src2$ = range(-5, 10);

console.log('Inicio');
src1$.subscribe(console.log);
console.log('Fin');

console.log('Inicio');
src2$.subscribe(console.log);
console.log('Fin');

// Range asíncrono
const srcAsync$ = range(1, 5, asyncScheduler);

console.log('Inicio');
srcAsync$.subscribe(console.log);
console.log('Fin');

interval y timer

El observable interval emite un valor cada cierto tiempo, el cual lo definimos en una cierta cantidad de milisegundos, tiempo en el cual estará emitiendo un valor, el cual es el contador de veces que ha emitido un valor.

Por otro lado el observable timer emite un valor después de un cierto tiempo, el cual definimos en milisegundos, y luego puede seguir emitiendo valores a intervalos regulares si se le proporciona un segundo argumento que define el intervalo.

import { Observer, interval, timer } from 'rxjs';

const observer: Observer<number> = {
  next: (value) => console.log('Next: ', value),
  error: (error) => console.warn('Error: ', error),
  complete: () => console.info('Se completo el observable'),
};

const todayInFiveSeconds = new Date();
todayInFiveSeconds.setSeconds(todayInFiveSeconds.getSeconds() + 5);

// Interval
const interval$ = interval(1000);

console.log('Inicio');
interval$.subscribe(observer);
console.log('Fin');

// Timer
// const timer$ = timer(2000);
// const timer$ = timer(2000, 1000);
const timer$ = timer(todayInFiveSeconds);

console.log('Inicio');
timer$.subscribe(observer);
console.log('Fin');

asyncScheduler

La función asyncScheduler es un programador de tareas que permite ejecutar funciones de manera asíncrona, similar a setTimeout o setInterval, pero con la capacidad de cancelar tareas programadas.

La función asyncScheduler.schedule permite programar una tarea para que se ejecute después de un cierto tiempo o de manera repetitiva, y se puede cancelar la tarea con el método unsubscribe de la suscripción que devuelve.

import { asyncScheduler } from 'rxjs';

const greeting = () => console.log('Hello world');
const greeting1 = (name: any) => console.log(`Hello ${name}`);
const greeting2 = (user: any) => console.log(`Hello ${user.name} ${user.lastName}`);

asyncScheduler.schedule(greeting, 1000);
asyncScheduler.schedule(greeting1, 2000, 'Cesar');
asyncScheduler.schedule(greeting2, 3000, { name: 'Cesar', lastName: 'Villalobos Olmos' });

const subscription = asyncScheduler.schedule(function (state: any) {
  console.log('State: ', state);

  this.schedule(state + 1, 1000);
}, 1000, 0);

// setTimeout(() => {
//   subscription.unsubscribe();
// }, 10000);

asyncScheduler.schedule(() => subscription.unsubscribe(), 10000);

from

La función from crea un observable a partir de un iterable, una promesa, un array, un objeto iterable o un observable, es decir, esta función nos permite convertir diferentes tipos de datos en un observable que podemos suscribirnos.

Su principal diferencia con of es que of emite los valores tal cual los recibe, mientras que from crea un observable que emite los valores de un iterable o una promesa uno por uno, lo que permite manejar flujos de datos más complejos y asíncronos.

import { from, Observer } from 'rxjs';

const observer: Observer<any> = {
  next: (value) => console.log('Next: ', value),
  error: (error) => console.warn('Error: ', error),
  complete: () => console.info('Complete'),
}

const myGenerator = function* () {
  yield 1;
  yield 2;
  yield 3;
  yield 4;
  yield 5;
}

const myIterable = myGenerator();

// for (let value of myIterable) {
//   console.log('Value: ', value);
// }

from(myIterable).subscribe(observer);

// const sources$ = from([1, 2, 3, 4, 5]);
// const sources$ = of(...[1, 2, 3, 4, 5]);
// const sources$ = from('Cesar');
const sources$ = from(fetch('https://api.github.com/users/chicho69-cesar'));

sources$.subscribe(async (response) => {
  const data = await response.json();
  console.log('DATA: ', data);
});

sources$.subscribe(observer);

Operadores

Cuando queremos manipular un valor que fluye a través de un observable, podemos utilizar operadores, los operadores son funciones que toman un observable y devuelven un nuevo observable con los valores transformados.

Para aplicar un operador, usamos el método pipe del observable original.

map

El operador map transforma cada valor emitido por el observable original aplicando una función a cada uno de ellos. Por ejemplo, si tenemos un observable que emite números y queremos multiplicarlos por 10, podemos usar map para lograrlo.

import { fromEvent, range } from 'rxjs';
import { map } from 'rxjs/operators';

range(1, 5).pipe(
  map<number, number>((val) => val * 10)
).subscribe((val) => console.log(val));

const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');

const keyupMap$ = keyup$.pipe(
  map((event) => event.code)
);

pluck

El operador pluck es una forma más concisa de acceder a una propiedad específica de un objeto emitido por el observable. Por ejemplo, si queremos obtener el valor de la propiedad key de un evento de teclado, podemos usar pluck para extraerlo.

import { fromEvent } from 'rxjs';
import { pluck } from 'rxjs/operators';

const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');

const keyupPluck$ = keyup$.pipe(
  // pluck('key')
  pluck('target', 'baseURI')
).subscribe((val) => console.log(val));

mapTo

El operador mapTo es similar a map, pero en lugar de aplicar una función a cada valor emitido, simplemente emite un valor constante. Por ejemplo, si queremos que cada vez que se presione una tecla se emita el mismo valor, podemos usar mapTo.

import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';

const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');

const keyupMapTo$ = keyup$.pipe(
  mapTo('Tecla presionada')
).subscribe((val) => console.log(val));

filter

El operador filter permite filtrar los valores emitidos por un observable basándose en una condición específica. Solo los valores que cumplen con la condición son emitidos al suscriptor.

import { from, fromEvent, range } from 'rxjs';
import { filter, map } from 'rxjs/operators';

range(20, 30).pipe(
  filter((value: number, index: number) => {
    return value % 2 === 0;
  })
).subscribe((value: number) => console.log(value));

interface Character {
  name: string;
  type: 'hero' | 'villan';
}

const characters: Character[] = [
  { name: 'Superman', type: 'hero' },
  { name: 'Batman', type: 'hero' },
  { name: 'Joker', type: 'villan' }
];

from(characters).pipe(
  filter((character: Character) => character.type === 'hero')
).subscribe((character: Character) => console.log(character));

const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup').pipe(
  map((event: KeyboardEvent) => event.code),
  filter((key: string) => key === 'Enter'),
);

keyup$.subscribe(console.log);

tap

El operador tap permite realizar efectos secundarios en los valores emitidos por un observable, es decir, tap ejecuta una función con la cual podemos observar los valores que están pasando por el observable sin modificar el flujo de datos.

import { range } from 'rxjs';
import { map, tap } from 'rxjs/operators';

const numbers$ = range(1, 5);

numbers$.pipe(
  tap((value: number) => console.log('Before: ', value)),
  map((value: number) => value * 10),
  tap({
    next: (value: number) => console.log('After: ', value),
    complete: () => console.info('Complete')
  })
).subscribe((value: number) => console.log('Subs: ', value));

reduce

El operador reduce se utiliza de forma similar a la función reduce de los arrays, pero en este caso se aplica a un flujo de datos (observable). El operador reduce toma un acumulador y un valor actual, y devuelve un único valor al final del flujo.

import { interval } from 'rxjs';
import { reduce, take, tap } from 'rxjs/operators';

interval(500).pipe(
  take(5),
  tap((value: number) => console.log('Current value:', value)),
  reduce((accumulate: number, current: number) => {
    return accumulate + current;
  }, 0)
).subscribe({
  next: (value: number) => {
    console.log('Reduced value from interval:', value);
  }
});

scan

El operador scan es similar a reduce, pero emite el valor acumulado en cada paso. Esto permite ver el estado intermedio de la acumulación, es decir, con reduce unicamente obtenemos el resultado final, mientras que con scan obtenemos todos los estados intermedios. Por lo tanto, scan es útil para mantener un estado a lo largo del tiempo en una secuencia de eventos.

import { from } from 'rxjs';
import { map, reduce, scan } from 'rxjs/operators';

const numbers = [1, 2, 3, 4, 5];

from(numbers).pipe(
  reduce((acc, value) => acc + value, 0),
).subscribe({
  // Solo obtenemos el valor final -> 15
  next: (result) => console.log('Sum of numbers with reduce: ', result),
});

from(numbers).pipe(
  scan((acc, value) => acc + value, 0),
).subscribe({
  // Obtenemos todos los valores intermedios -> 1, 3, 6, 10, 15
  next: (result) => console.log('Sum of numbers with scan: ', result),
});

take

El operador take permite limitar la cantidad de valores emitidos por un Observable, por ejemplo, si queremos tomar solo los primeros 3 valores emitidos por un Observable, podemos usar take(3).

import { of } from 'rxjs';
import { take, tap } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  tap((value: number) => console.log(`Value emitted: ${value}`)),
  take(3),
).subscribe({
  next: (value: number) => console.log(`Received value: ${value}`),
  complete: () => console.log('Stream completed'),
});

first

El operador first toma el primer valor que emite el observable el cual cumple con la condición especificada. Si no se encuentra ningún valor que cumpla con la condición, se lanzará un error.

import { fromEvent } from 'rxjs';
import { first, map, tap } from 'rxjs/operators';

const click$ = fromEvent<MouseEvent>(document, 'click');

click$.pipe(
  tap<MouseEvent>((event) => console.log('Click event:', event)),
  map<MouseEvent, { x: number; y: number }>((event) => ({
    x: event.clientX,
    y: event.clientY,
  })),
  first<{ x: number, y: number }>((event) => event.y >= 150)
).subscribe({
  next: (value) => console.log('First click with y >= 150:', value),
  complete: () => console.log('Completed'),
});

takeWhile

El operador takeWhile permite tomar valores de un observable mientras se cumpla una condición, y se completa el observable cuando la condición deja de cumplirse. El segundo parámetro true indica que el último valor que cumple la condición también se emitirá.

import { fromEvent } from 'rxjs';
import { map, takeWhile } from 'rxjs/operators';

const click$ = fromEvent<MouseEvent>(document, 'click');

click$.pipe(
  map<MouseEvent, { x: number; y: number }>(({ x, y }) => ({ x, y })),
  // takeWhile(({ y }) => y <= 300),
  takeWhile(({ y }) => y <= 300, true),
).subscribe({
  next: ({ x, y }) => console.log(`Mouse clicked at: (${x}, ${y})`),
  complete: () => console.log('Mouse clicks completed below y=300'),
});

skip

El operador skip permite omitir una cantidad específica de valores emitidos por un observable, es decir, si queremos ignorar los primeros 3 valores emitidos por un observable, podemos usar skip(3).

import { interval } from 'rxjs';
import { skip, take } from 'rxjs/operators';

interval(1000).pipe(
  skip(3),
  take(5)
).subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Complete')
});

takeUntil

El operador takeUntil permite tomar valores de un observable hasta que otro observable emita un valor, momento en el cual se completa el observable original.

import { fromEvent, interval } from 'rxjs';
import { skip, takeUntil, tap } from 'rxjs/operators';

const button = document.createElement('button');
button.innerHTML = 'Detener Timer';
document.querySelector('body')!.append(button);

const counter$ = interval(1000);

const buttonClick$ = fromEvent(button, 'click').pipe(
  tap(() => console.log('Button clicked before skip')),
  skip(1),
  tap(() => console.log('Button clicked after skip'))
);

counter$.pipe(
  takeUntil(buttonClick$),
).subscribe({
  next: (value) => console.log(`Counter: ${value}`),
  complete: () => console.log('Counter completed'),
});

distinct

El operador distinct filtra los valores emitidos por un observable, eliminando aquellos que ya han sido emitidos anteriormente. Por lo que los elementos que fluyan por el observable serán únicos.

import { from, of } from 'rxjs';
import { distinct } from 'rxjs/operators';

const numbers$ = of(1, '1', 1, 3, 3, 2, 2, 4, 4, 5, 3, 1, '1');

numbers$.pipe(
  distinct()
).subscribe(console.log);

distinctUntilChange

El operador distinctUntilChanged se utiliza para filtrar valores consecutivos que son iguales, permitiendo que solo el primer valor de una secuencia de valores duplicados se emita. Pero a diferencia de distinct, este operador no elimina todos los duplicados, sino que solo los consecutivos.

import { from, of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const numbers$ = of(1, '1', 1, 3, 3, 2, 2, 4, 4, 5, 3, 1, '1');

numbers$.pipe(
  distinctUntilChanged()
).subscribe(console.log);

distinctUntilKeyChange

El operador distinctUntilKeyChanged permite filtrar los valores emitidos por un observable, de tal manera que solo se emiten aquellos valores cuyo valor de una clave específica ha cambiado desde la última emisión. En este caso, estamos filtrando los personajes por su nombre, de modo que solo se emiten personajes con nombres únicos consecutivos.

import { from } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';

interface Character {
  name: string;
}

const characters: Character[] = [
  { name: 'Megaman' },
  { name: 'X' },
  { name: 'Zero' },
  { name: 'Dr. Willy' },
  { name: 'X' },
  { name: 'Megaman' },
  { name: 'Zero' },
];

from(characters).pipe(
  distinctUntilKeyChanged('name')
).subscribe(console.log);

Operadores de tiempo

debounceTime

El operador debounceTime se utiliza para limitar la cantidad de eventos que se procesan en un período de tiempo determinado. Es útil para evitar que se procesen eventos que ocurren demasiado rápido, como clics o pulsaciones de teclas.

import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, pluck } from 'rxjs/operators';

// Ejemplo 1
const click$ = fromEvent(document, 'click');

click$.pipe(
  debounceTime(3000)
).subscribe({
  next: (event) => console.log(event),
});

// Ejemplo 2
const input = document.createElement('input');
document.querySelector('body')!.append(input);

const input$ = fromEvent(input, 'keyup');

input$.pipe(
  debounceTime(1000),
  pluck('target', 'value'),
  distinctUntilChanged()
).subscribe({
  next: (value) => {
    console.log('EL VALOR ES: ', value);
  }
});

throttleTime

El operador throttleTime nos permite controlar la frecuencia con la que se emiten los valores de un observable. Este operador emite el primer valor inmediatamente y luego ignora los valores subsiguientes durante un período de tiempo especificado. Esto es útil para evitar la sobrecarga de eventos o para limitar la cantidad de veces que se procesa una acción en un intervalo de tiempo determinado.

import { asyncScheduler, fromEvent } from 'rxjs';
import { distinctUntilChanged, pluck, throttleTime } from 'rxjs/operators';

// Ejemplo 1
const click$ = fromEvent(document, 'click');

click$.pipe(
  throttleTime(3000)
).subscribe({
  next: (event) => console.log(event),
});

// Ejemplo 2
const input = document.createElement('input');
document.querySelector('body')!.append(input);

const input$ = fromEvent(input, 'keyup');

input$.pipe(
  throttleTime(400, asyncScheduler, {
    leading: true,
    trailing: true
  }),
  pluck('target', 'value'),
  distinctUntilChanged()
).subscribe({
  next: (value) => {
    console.log('EL VALOR ES: ', value);
  }
});

sampleTime

El operador sampleTime en RxJS emite el valor más reciente de un observable cada cierto intervalo de tiempo especificado. Es útil cuando se desea obtener una muestra periódica de los valores emitidos por un observable, en lugar de procesar cada valor individualmente.

import { fromEvent } from 'rxjs';
import { map, sampleTime } from 'rxjs/operators';

const click$ = fromEvent<MouseEvent>(document, 'click');

click$.pipe(
  sampleTime(2000),
  map(({ x, y }) => ({ x, y })),
).subscribe(console.log);

sample

El operador sample es un poco especial, ya que este operador va a emitir los valores del observable fuente cada vez que el observable que se pasa como argumento emita un valor. Es decir, el observable que se pasa como argumento actúa como un “disparador” para emitir el valor más reciente del observable fuente.

import { fromEvent, interval } from 'rxjs';
import { sample } from 'rxjs/operators';

const interval$ = interval(500);
const click$ = fromEvent(document, 'click');

interval$.pipe(
  sample(click$)
).subscribe(console.log);

auditTime

El operador auditTime como se propio nombre indica, nos sirve para auditar el valor emitido por un observable cada cierto tiempo, es decir, este operador va a emitir el valor más reciente del observable fuente cada cierto intervalo de tiempo especificado, ignorando cualquier otro valor emitido durante ese intervalo.

import { fromEvent } from 'rxjs';
import { auditTime, map, tap } from 'rxjs/operators';

const click$ = fromEvent<MouseEvent>(document, 'click');

click$.pipe(
  map(({ x }) => x),
  tap((val) => console.log('tap', val)),
  auditTime(5000)
).subscribe(console.log);

Peticiones Ajax

Ajax es una librería de RxJS que permite realizar peticiones HTTP de manera reactiva. Con esta librería, puedes manejar las respuestas y errores de las peticiones de forma más sencilla y elegante utilizando los operadores de RxJS.

ajax

El método ajax de la librería Ajax de RxJS permite realizar peticiones HTTP de manera reactiva. Este método devuelve un observable que emite la respuesta de la petición cuando esta se completa.

import { of } from 'rxjs';
import { ajax, AjaxError } from 'rxjs/ajax';
import { catchError, pluck } from 'rxjs/operators';

const url = 'https://api.github.com/users?per_page=5';

ajax(url).pipe(
  pluck('response'),
  catchError((error: AjaxError) => {
    console.warn('Error fetching users:', error);
    return of([]);
  })
).subscribe({
  next: (users) => console.log('Users:', users),
  error: (error) => console.warn('Error in subscription:', error),
  complete: () => console.log('Request completed')
});

El método ajax.getJSON realiza una petición HTTP GET a una URL específica y devuelve un Observable que emite la respuesta JSON, este método acepta la URL y un objeto de opciones que puede incluir encabezados HTTP.

import { ajax } from 'rxjs/ajax';

const url = 'https://httpbin.org/delay/1';

const obs$ = ajax.getJSON(url, {
  'Content-Type': 'application/json',
  'Authorization': 'Bearer YOUR_TOKEN'
});

obs$.subscribe({
  next: (response) => console.log('Response:', response),
  error: (error) => console.error('Error:', error),
  complete: () => console.log('Request completed')
});

La principal diferencia entre ajax.getJSON y ajax es que ajax.getJSON realiza una petición HTTP GET y espera que la respuesta sea un JSON, mientras que ajax es más general y puede realizar peticiones de diferentes tipos y manejar diferentes formatos de respuesta. Además, ajax permite más configuraciones como el método HTTP, los encabezados, el cuerpo de la petición, etc.

HTTP methods

Usando la librería Ajax de RxJS, podemos realizar peticiones HTTP utilizando diferentes métodos como GET, POST, PUT, DELETE, entre otros. A continuación se muestran ejemplos de cómo realizar peticiones con estos métodos.

import { ajax } from 'rxjs/ajax';

const url = 'https://httpbin.org/delay/1';

ajax.post(
  url,
  {
    id: 1,
    name: 'Cesar Villalobos Olmos',
    age: 23,
  },
  {
    'Content-Type': 'application/json',
    Authorization: 'Bearer 1234567890',
  },
).subscribe({
  next: (response) => console.log('Response post:', response),
  error: (error) => console.error('Error post:', error)
});

ajax.put(
  url,
  {
    id: 1,
    name: 'Cesar Villalobos Olmos',
    age: 24,
  },
  {
    'Content-Type': 'application/json',
    Authorization: 'Bearer 1234567890',
  },
).subscribe({
  next: (response) => console.log('Response put:', response),
  error: (error) => console.error('Error put:', error)
});

ajax.patch(
  url,
  {
    age: 23,
  },
  {
    'Content-Type': 'application/json',
    Authorization: 'Bearer 1234567890',
  },
).subscribe({
  next: (response) => console.log('Response patch:', response),
  error: (error) => console.error('Error patch:', error)
});

ajax({
  url,
  method: 'DELETE',
  headers: {
    'Content-Type': 'application/json',
    Authorization: 'Bearer 1234567890',
  },
}).subscribe({
  next: (response) => console.log('Response delete:', response),
  error: (error) => console.error('Error delete:', error)
});

Operadores de transformación

mergeAll

El operador mergeAll se encarga de suscribirse a los observables internos que se generan y emitir sus valores en el observable externo, es decir, este operador toma un observable que emite otros observables y los combina en un solo observable que emite todos los valores de los observables internos.

import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, map, mergeAll, pluck } from 'rxjs/operators';

const textInput = document.createElement('input');
textInput.type = 'text';
document.querySelector('body')!.append(textInput);

const input$ = fromEvent<KeyboardEvent>(textInput, 'keyup');

input$.pipe(
  debounceTime(500),
  pluck('target', 'value'),
  map((text) => ajax.getJSON(
    `https://api.github.com/search/users?q=${text}`
  )),
  mergeAll(),
  pluck('items')
).subscribe({
  next: (users) => {
    console.log('LOS USUARIOS: ', users);
  },
});

mergeMap

El operador mergeMap recibe una función que retorna un observable, este operador se suscribe a cada uno de los observables que retorna la función y emite todos los valores de cada uno de esos observables, por ejemplo, si tenemos un observable que emite valores y queremos transformar cada valor en un nuevo observable, podemos usar mergeMap para lograrlo.

import { fromEvent, interval, of } from 'rxjs';
import { map, mergeMap, take, takeUntil } from 'rxjs/operators';

const letters$ = of('a', 'b', 'c');

letters$.pipe(
  mergeMap((letter) => interval(1000).pipe(
    map(i => letter + i),
    take(3)
  ))
).subscribe({
  next: (val) => console.log('next:', val),
  complete: () => console.log('Complete')
});

const mousedown$ = fromEvent(document, 'mousedown');
const mouseup$ = fromEvent(document, 'mouseup');
const interval$ = interval();

mousedown$.pipe(
  mergeMap(() => interval$.pipe(
    takeUntil(mouseup$)
  ))
).subscribe(console.log);

switchMap

El operador switchMap lo que hace es cancelar la petición anterior y hacer una nueva petición cada vez que se emite un nuevo valor en el observable de origen, es decir, si tenemos un observable que emite valores y queremos transformar cada valor en un nuevo observable, pero solo queremos mantener la suscripción al último observable emitido, podemos usar switchMap para lograrlo.

import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, switchMap, mergeAll, pluck } from 'rxjs/operators';

const textInput = document.createElement('input');
textInput.type = 'text';
document.querySelector('body')!.append(textInput);

const input$ = fromEvent<KeyboardEvent>(textInput, 'keyup');

input$.pipe(
  debounceTime(500),
  pluck('target', 'value'),
  switchMap((text) => ajax.getJSON(
    `https://api.github.com/search/users?q=${text}`
  )),
  pluck('items')
).subscribe({
  next: (users) => {
    console.log('LOS USUARIOS: ', users);
  },
});

concatMap

El operador concatMap mapea cada uno de los valores emitidos por el observable origen a un observable interno y los concatena, es decir, espera a que el observable interno complete antes de suscribirse al siguiente. Por lo tanto, los valores emitidos por el observable interno se emiten en el mismo orden en que fueron recibidos del observable origen.

import { fromEvent, interval, of } from 'rxjs';
import { concatMap, take } from 'rxjs/operators';

const interval$ = interval(500).pipe(take(3));
const click$ = fromEvent(document, 'click');

click$.pipe(
  concatMap(() => interval$)
).subscribe(console.log);

const numbers$ = of(1, 2, 3);
const letters$ = of('a', 'b', 'c');

numbers$.pipe(
  concatMap((num) => of(num * 10)),
  concatMap((num) => letters$.pipe(
    concatMap((letter) => of(letter.toUpperCase() + num))
  ))
).subscribe(console.log);

exhaustMap

El operador exhaustMap ignora las emisiones entrantes mientras la suscripción interna está activa, es decir, si el observable interno aún no ha completado, cualquier nueva emisión del observable externo será ignorada hasta que el observable interno complete. Esto es útil en situaciones donde solo queremos procesar una solicitud a la vez y evitar la sobrecarga de múltiples solicitudes simultáneas.

import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

const interval$ = interval(500).pipe(take(3));
const click$ = fromEvent(document, 'click');

click$.pipe(
  exhaustMap(() => interval$)
).subscribe(console.log);

Operadores de combinación

startWith

El método startWith lo que nos permite es emitir valores antes de que se emitan los valores del observable, antes de que la suscripción reciba los valores del observable va a recibir los valores que le pasemos a startWith.

import { of } from 'rxjs';
import { startWith } from 'rxjs/operators';

const numbers$ = of(1, 2, 3).pipe(
  startWith('a', 'b', 'c'),
);

numbers$.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Completed')
});

endWith

El método endWith lo que nos permite es emitir valores después de que se hayan emitido los valores del observable, después de que la suscripción reciba los valores del observable va a recibir los valores que le pasemos a endWith.

import { of } from 'rxjs';
import { endWith } from 'rxjs/operators';

const numbers$ = of(1, 2, 3).pipe(
  endWith('x', 'y', 'z')
);

numbers$.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Completed')
});

concat

El método concat permite combinar múltiples Observables de manera secuencial, es decir, el segundo Observable no comenzará a emitir valores hasta que el primero haya completado.

import { concat, interval, of } from "rxjs";
import { take } from "rxjs/operators";

const interval$ = interval(1000);

concat(
  interval$.pipe(take(3)),
  interval$.pipe(take(2)),
  of(1)
).subscribe(console.log);

merge

El método merge combina múltiples Observables en uno solo, emitiendo los valores de todos los Observables a medida que llegan, sin esperar a que uno complete antes de comenzar con el siguiente.

import { fromEvent, merge } from 'rxjs';
import { pluck } from 'rxjs/operators';

const keyup$ = fromEvent(document, 'keyup');
const click$ = fromEvent(document, 'click');

merge(
  keyup$.pipe(pluck('type')),
  click$.pipe(pluck('type'))
).subscribe(console.log);

combineLatest

El método combineLatest toma múltiples observables y emite un valor cuando todos los observables han emitido al menos un valor, de ahí en adelante emitirá un valor cada vez que cualquiera de los observables emita un nuevo valor y se emiten juntos como un array.

import { combineLatest, fromEvent } from 'rxjs';
import { pluck } from 'rxjs/operators';

const input1 = document.createElement('input');
const input2 = document.createElement('input');

input1.placeholder = 'email@gmail.com';
input1.type = 'email';

input2.placeholder = '*********';
input2.type = 'password';

document.querySelector('body')!.append(input1, input2);

const getInputStream = (input: HTMLElement) => {
  return fromEvent<KeyboardEvent>(input, 'keyup').pipe(
    pluck('target', 'value')
  );
}

combineLatest(
  getInputStream(input1),
  getInputStream(input2)
).subscribe(([email, password]) => {
  console.log('Email:', email);
  console.log('Password:', password);
});

forkJoin

El método forkJoin se utiliza para combinar múltiples observables y esperar a que todos ellos completen, una vez que todos los observables hayan emitido su último valor, forkJoin emitirá un único valor que es un array o un objeto con los últimos valores emitidos por cada observable.

import { forkJoin, interval, of } from 'rxjs';
import { delay, take } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);
const interval$ = interval(1000).pipe(take(5));
const letters$ = of('a', 'b', 'c').pipe(delay(3500));

forkJoin(
  numbers$,
  interval$,
  letters$
).subscribe(console.log)

forkJoin(
  numbers$,
  interval$,
  letters$
).subscribe((resp) => {
  console.log('Números: ', resp[0])
  console.log('Intervalo: ', resp[1])
  console.log('Letras: ', resp[2])
});

forkJoin({
  numbers$,
  interval$,
  letters$
}).subscribe((resp) => {
  console.log(resp);
});

forkJoin({
  num: numbers$,
  int: interval$,
  let: letters$
}).subscribe((resp) => {
  console.log(resp);
});