Interstellar Code
RXJS - Extensiones Reactivas en JavaScript y TypeScript
RXJS es una de las bibliotecas más poderosas para manejar programación reactiva en JavaScript y TypeScript. En este post, exploramos sus conceptos clave como Observables, Subjects y operadores, y cómo pueden transformar la forma en como se manejan los datos asíncronos y eventos en las aplicaciones web.
20 de agosto de 2025
Tabla de contenido:
RXJS: Extensiones Reactivas en JavaScript y TypeScript
RXJS (Reactive Extensions for JavaScript) es una biblioteca para componer programas asíncronos y basados en eventos utilizando secuencias observables. Es ampliamente utilizada en aplicaciones web, especialmente con frameworks como Angular, pero también puede ser utilizada con React, Vue y otros.
Las extensiones reactivas nos permiten manejar flujos de datos y eventos de manera eficiente, facilitando la gestión de operaciones asíncronas como llamadas HTTP, eventos del usuario y más. Empleando lo que se conoce como programación reactiva.
Observables
Antes de profundizar en RXJS, es importante entender el concepto de Observables
, Observers
y Subscriptions
.
- Observable: Un Observable es una colección de valores o eventos futuros. Puedes pensar en ellos como una versión más poderosa de las Promesas, ya que pueden emitir múltiples valores a lo largo del tiempo, es decir, un observable puede estar emitiendo valores continuamente a lo largo del tiempo, a diferencia de una promesa que solo resuelve un valor una vez.
- Observer: Un Observer es un objeto que define cómo reaccionar a los valores emitidos por un Observable. Un Observer puede tener hasta tres métodos:
next
,error
ycomplete
. - Subscription: Una Subscription representa la ejecución de un Observable. Cuando te suscribes a un Observable, la suscripción ejecutar las funciones definidas en el Observer donde recibirá los valores que emite el observable hasta que se complete o se cancele la suscripción.
A continuación se muestra un ejemplo básico de cómo crear y suscribirse a un Observable:
import { Observable, Observer } from 'rxjs';
// Definimos un observer
const observer: Observer<string> = {
next: (value) => console.log('Next [obs]: ', value),
error: (error) => console.warn('Error [obs]: ', error),
complete: () => console.info('Se terminó el observable [obs]'),
}
// Creamos un observable que emite valores de tipo string
const observable$ = new Observable<string>((subscriber) => {
// Emitimos valores al observable que van a recibir las suscripciones
subscriber.next('Hola');
subscriber.next('Mundo');
// Forzar un error
// const a: any = undefined;
// a.name = 'Cesar';
// Completamos el observable
subscriber.complete();
// No se emite nada después de completar
subscriber.error('Algo salió mal');
subscriber.next('Ya no se emite');
});
// Nos suscribimos al observable, recibiendo los valores emitidos
observable$.subscribe((value) => {
console.log(value);
});
// Podemos manejar los tres casos: next, error y complete [!Importante: Esta deprecado]
observable$.subscribe(
(value) => {
console.log('Next: ', value);
},
(error) => {
console.warn('Error: ', error);
},
() => {
console.info('Se terminó el observable');
}
);
// Usando un observer
const subscription = observable$.subscribe(observer);
// Cancelar la suscripción (importante para evitar memory leaks)
subscription.unsubscribe();
Subjects
Un Subject
es un tipo especial de Observable que permite multicasting, es decir, permite que múltiples observadores se suscriban a él y reciban los mismos valores emitidos. A diferencia de un Observable estándar, que emite valores de forma independiente para cada suscriptor, un Subject comparte la misma fuente de datos entre todos sus suscriptores.
Cuando la data es emitida por el observable en si, es considerado como un Cold Observable
, ya que cada suscriptor recibe su propia ejecución y valores. En cambio, cuando se utiliza un Subject, es considerado un Hot Observable
, porque todos los suscriptores comparten la misma ejecución y reciben los mismos valores emitidos.
import { Subject, Observer, Observable } from 'rxjs';
const observer: Observer<number> = {
next: (value) => console.log('Next: ', value),
error: (error) => console.warn('Error: ', error),
complete: () => console.info('Se completo el observable')
}
const interval$ = new Observable<number>((subscriber) => {
// Emitimos un valor cada segundo
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
// Limpiamos el intervalo cuando se complete o se cancele la suscripción
return () => {
clearInterval(intervalId);
console.log('Intervalo destruido');
}
});
// Creamos un Subject
const subject$ = new Subject<number>();
// Nos suscribimos al observable y enviamos los valores al Subject
const intervalSubscription = interval$.subscribe(subject$);
const subscription1 = subject$.subscribe(observer);
const subscription2 = subject$.subscribe(observer);
setTimeout(() => {
subject$.next(10);
subject$.complete();
// Nos desuscribimos del observable original
intervalSubscription.unsubscribe();
}, 5500);
of
El método of
es un método de creación de observables que emite los valores que se le pasan como argumentos. Es decir que crea un observable que emite esos valores de manera sincrónica.
import { of } from 'rxjs';
// const observable$ = of<number>(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// const observable$ = of<Array<number>>([1, 2, 3, 4, 5]);
// const observable$ = of<number>(...[1, 2, 3, 4, 5], 6, 7, 8, 9, 10);
const observable$ = of<any>(
[1, 2],
{ a: 1, b: 2 },
function () { },
true,
Promise.resolve(true)
);
observable$.subscribe({
next: (value) => console.log('Next: ', value),
error: (error) => console.warn('Error: ', error),
complete: () => console.info('Se completo el observable')
});
fromEvent
El método fromEvent
es un método de creación que crea un observable a partir de eventos del DOM. Una vez que se suscribe al observable, el observable comenzará a emitir eventos cada vez que ocurra el evento especificado especificado y los observadores recibirán esos eventos.
import { fromEvent } from 'rxjs';
const event1$ = fromEvent<PointerEvent>(document, 'click');
const event2$ = fromEvent<KeyboardEvent>(document, 'keyup');
event1$.subscribe((event) => {
console.log('Coordenadas: ', event.x, event.y);
});
event2$.subscribe((event) => {
console.log('Tecla: ', event.key);
});
range
EL método range
es un método de creación que crea un observable que emite una secuencia de números enteros en un rango especificado. El primer argumento es el valor inicial y el segundo es la cantidad de números a emitir. Si se especifica un tercer argumento, se utilizará como programador para la emisión de los valores, lo que permite que la emisión sea asíncrona.
import { asyncScheduler, range } from 'rxjs';
const src1$ = range(1, 10);
const src2$ = range(-5, 10);
console.log('Inicio');
src1$.subscribe(console.log);
console.log('Fin');
console.log('Inicio');
src2$.subscribe(console.log);
console.log('Fin');
// Range asíncrono
const srcAsync$ = range(1, 5, asyncScheduler);
console.log('Inicio');
srcAsync$.subscribe(console.log);
console.log('Fin');
interval y timer
El observable interval
emite un valor cada cierto tiempo, el cual lo definimos en una cierta cantidad de milisegundos, tiempo en el cual estará emitiendo un valor, el cual es el contador de veces que ha emitido un valor.
Por otro lado el observable timer
emite un valor después de un cierto tiempo, el cual definimos en milisegundos, y luego puede seguir emitiendo valores a intervalos regulares si se le proporciona un segundo argumento que define el intervalo.
import { Observer, interval, timer } from 'rxjs';
const observer: Observer<number> = {
next: (value) => console.log('Next: ', value),
error: (error) => console.warn('Error: ', error),
complete: () => console.info('Se completo el observable'),
};
const todayInFiveSeconds = new Date();
todayInFiveSeconds.setSeconds(todayInFiveSeconds.getSeconds() + 5);
// Interval
const interval$ = interval(1000);
console.log('Inicio');
interval$.subscribe(observer);
console.log('Fin');
// Timer
// const timer$ = timer(2000);
// const timer$ = timer(2000, 1000);
const timer$ = timer(todayInFiveSeconds);
console.log('Inicio');
timer$.subscribe(observer);
console.log('Fin');
asyncScheduler
La función asyncScheduler
es un programador de tareas que permite ejecutar funciones de manera asíncrona, similar a setTimeout o setInterval, pero con la capacidad de cancelar tareas programadas.
La función asyncScheduler.schedule
permite programar una tarea para que se ejecute después de un cierto tiempo o de manera repetitiva, y se puede cancelar la tarea con el método unsubscribe de la suscripción que devuelve.
import { asyncScheduler } from 'rxjs';
const greeting = () => console.log('Hello world');
const greeting1 = (name: any) => console.log(`Hello ${name}`);
const greeting2 = (user: any) => console.log(`Hello ${user.name} ${user.lastName}`);
asyncScheduler.schedule(greeting, 1000);
asyncScheduler.schedule(greeting1, 2000, 'Cesar');
asyncScheduler.schedule(greeting2, 3000, { name: 'Cesar', lastName: 'Villalobos Olmos' });
const subscription = asyncScheduler.schedule(function (state: any) {
console.log('State: ', state);
this.schedule(state + 1, 1000);
}, 1000, 0);
// setTimeout(() => {
// subscription.unsubscribe();
// }, 10000);
asyncScheduler.schedule(() => subscription.unsubscribe(), 10000);
from
La función from
crea un observable a partir de un iterable, una promesa, un array, un objeto iterable o un observable, es decir, esta función nos permite convertir diferentes tipos de datos en un observable que podemos suscribirnos.
Su principal diferencia con of es que of emite los valores tal cual los recibe, mientras que from crea un observable que emite los valores de un iterable o una promesa uno por uno, lo que permite manejar flujos de datos más complejos y asíncronos.
import { from, Observer } from 'rxjs';
const observer: Observer<any> = {
next: (value) => console.log('Next: ', value),
error: (error) => console.warn('Error: ', error),
complete: () => console.info('Complete'),
}
const myGenerator = function* () {
yield 1;
yield 2;
yield 3;
yield 4;
yield 5;
}
const myIterable = myGenerator();
// for (let value of myIterable) {
// console.log('Value: ', value);
// }
from(myIterable).subscribe(observer);
// const sources$ = from([1, 2, 3, 4, 5]);
// const sources$ = of(...[1, 2, 3, 4, 5]);
// const sources$ = from('Cesar');
const sources$ = from(fetch('https://api.github.com/users/chicho69-cesar'));
sources$.subscribe(async (response) => {
const data = await response.json();
console.log('DATA: ', data);
});
sources$.subscribe(observer);
Operadores
Cuando queremos manipular un valor que fluye a través de un observable, podemos utilizar operadores, los operadores son funciones que toman un observable y devuelven un nuevo observable con los valores transformados.
Para aplicar un operador, usamos el método pipe
del observable original.
map
El operador map
transforma cada valor emitido por el observable original aplicando una función a cada uno de ellos. Por ejemplo, si tenemos un observable que emite números y queremos multiplicarlos por 10, podemos usar map
para lograrlo.
import { fromEvent, range } from 'rxjs';
import { map } from 'rxjs/operators';
range(1, 5).pipe(
map<number, number>((val) => val * 10)
).subscribe((val) => console.log(val));
const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');
const keyupMap$ = keyup$.pipe(
map((event) => event.code)
);
pluck
El operador pluck
es una forma más concisa de acceder a una propiedad específica de un objeto emitido por el observable. Por ejemplo, si queremos obtener el valor de la propiedad key
de un evento de teclado, podemos usar pluck
para extraerlo.
import { fromEvent } from 'rxjs';
import { pluck } from 'rxjs/operators';
const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');
const keyupPluck$ = keyup$.pipe(
// pluck('key')
pluck('target', 'baseURI')
).subscribe((val) => console.log(val));
mapTo
El operador mapTo
es similar a map
, pero en lugar de aplicar una función a cada valor emitido, simplemente emite un valor constante. Por ejemplo, si queremos que cada vez que se presione una tecla se emita el mismo valor, podemos usar mapTo
.
import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');
const keyupMapTo$ = keyup$.pipe(
mapTo('Tecla presionada')
).subscribe((val) => console.log(val));
filter
El operador filter
permite filtrar los valores emitidos por un observable basándose en una condición específica. Solo los valores que cumplen con la condición son emitidos al suscriptor.
import { from, fromEvent, range } from 'rxjs';
import { filter, map } from 'rxjs/operators';
range(20, 30).pipe(
filter((value: number, index: number) => {
return value % 2 === 0;
})
).subscribe((value: number) => console.log(value));
interface Character {
name: string;
type: 'hero' | 'villan';
}
const characters: Character[] = [
{ name: 'Superman', type: 'hero' },
{ name: 'Batman', type: 'hero' },
{ name: 'Joker', type: 'villan' }
];
from(characters).pipe(
filter((character: Character) => character.type === 'hero')
).subscribe((character: Character) => console.log(character));
const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup').pipe(
map((event: KeyboardEvent) => event.code),
filter((key: string) => key === 'Enter'),
);
keyup$.subscribe(console.log);
tap
El operador tap
permite realizar efectos secundarios en los valores emitidos por un observable, es decir, tap
ejecuta una función con la cual podemos observar los valores que están pasando por el observable sin modificar el flujo de datos.
import { range } from 'rxjs';
import { map, tap } from 'rxjs/operators';
const numbers$ = range(1, 5);
numbers$.pipe(
tap((value: number) => console.log('Before: ', value)),
map((value: number) => value * 10),
tap({
next: (value: number) => console.log('After: ', value),
complete: () => console.info('Complete')
})
).subscribe((value: number) => console.log('Subs: ', value));
reduce
El operador reduce
se utiliza de forma similar a la función reduce
de los arrays, pero en este caso se aplica a un flujo de datos (observable). El operador reduce
toma un acumulador y un valor actual, y devuelve un único valor al final del flujo.
import { interval } from 'rxjs';
import { reduce, take, tap } from 'rxjs/operators';
interval(500).pipe(
take(5),
tap((value: number) => console.log('Current value:', value)),
reduce((accumulate: number, current: number) => {
return accumulate + current;
}, 0)
).subscribe({
next: (value: number) => {
console.log('Reduced value from interval:', value);
}
});
scan
El operador scan
es similar a reduce, pero emite el valor acumulado en cada paso. Esto permite ver el estado intermedio de la acumulación, es decir, con reduce unicamente obtenemos el resultado final, mientras que con scan
obtenemos todos los estados intermedios. Por lo tanto, scan
es útil para mantener un estado a lo largo del tiempo en una secuencia de eventos.
import { from } from 'rxjs';
import { map, reduce, scan } from 'rxjs/operators';
const numbers = [1, 2, 3, 4, 5];
from(numbers).pipe(
reduce((acc, value) => acc + value, 0),
).subscribe({
// Solo obtenemos el valor final -> 15
next: (result) => console.log('Sum of numbers with reduce: ', result),
});
from(numbers).pipe(
scan((acc, value) => acc + value, 0),
).subscribe({
// Obtenemos todos los valores intermedios -> 1, 3, 6, 10, 15
next: (result) => console.log('Sum of numbers with scan: ', result),
});
take
El operador take
permite limitar la cantidad de valores emitidos por un Observable, por ejemplo, si queremos tomar solo los primeros 3 valores emitidos por un Observable, podemos usar take(3)
.
import { of } from 'rxjs';
import { take, tap } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.pipe(
tap((value: number) => console.log(`Value emitted: ${value}`)),
take(3),
).subscribe({
next: (value: number) => console.log(`Received value: ${value}`),
complete: () => console.log('Stream completed'),
});
first
El operador first
toma el primer valor que emite el observable el cual cumple con la condición especificada. Si no se encuentra ningún valor que cumpla con la condición, se lanzará un error.
import { fromEvent } from 'rxjs';
import { first, map, tap } from 'rxjs/operators';
const click$ = fromEvent<MouseEvent>(document, 'click');
click$.pipe(
tap<MouseEvent>((event) => console.log('Click event:', event)),
map<MouseEvent, { x: number; y: number }>((event) => ({
x: event.clientX,
y: event.clientY,
})),
first<{ x: number, y: number }>((event) => event.y >= 150)
).subscribe({
next: (value) => console.log('First click with y >= 150:', value),
complete: () => console.log('Completed'),
});
takeWhile
El operador takeWhile
permite tomar valores de un observable mientras se cumpla una condición, y se completa el observable cuando la condición deja de cumplirse. El segundo parámetro true indica que el último valor que cumple la condición también se emitirá.
import { fromEvent } from 'rxjs';
import { map, takeWhile } from 'rxjs/operators';
const click$ = fromEvent<MouseEvent>(document, 'click');
click$.pipe(
map<MouseEvent, { x: number; y: number }>(({ x, y }) => ({ x, y })),
// takeWhile(({ y }) => y <= 300),
takeWhile(({ y }) => y <= 300, true),
).subscribe({
next: ({ x, y }) => console.log(`Mouse clicked at: (${x}, ${y})`),
complete: () => console.log('Mouse clicks completed below y=300'),
});
skip
El operador skip
permite omitir una cantidad específica de valores emitidos por un observable, es decir, si queremos ignorar los primeros 3 valores emitidos por un observable, podemos usar skip(3)
.
import { interval } from 'rxjs';
import { skip, take } from 'rxjs/operators';
interval(1000).pipe(
skip(3),
take(5)
).subscribe({
next: (value) => console.log(value),
complete: () => console.log('Complete')
});
takeUntil
El operador takeUntil
permite tomar valores de un observable hasta que otro observable emita un valor, momento en el cual se completa el observable original.
import { fromEvent, interval } from 'rxjs';
import { skip, takeUntil, tap } from 'rxjs/operators';
const button = document.createElement('button');
button.innerHTML = 'Detener Timer';
document.querySelector('body')!.append(button);
const counter$ = interval(1000);
const buttonClick$ = fromEvent(button, 'click').pipe(
tap(() => console.log('Button clicked before skip')),
skip(1),
tap(() => console.log('Button clicked after skip'))
);
counter$.pipe(
takeUntil(buttonClick$),
).subscribe({
next: (value) => console.log(`Counter: ${value}`),
complete: () => console.log('Counter completed'),
});
distinct
El operador distinct
filtra los valores emitidos por un observable, eliminando aquellos que ya han sido emitidos anteriormente. Por lo que los elementos que fluyan por el observable serán únicos.
import { from, of } from 'rxjs';
import { distinct } from 'rxjs/operators';
const numbers$ = of(1, '1', 1, 3, 3, 2, 2, 4, 4, 5, 3, 1, '1');
numbers$.pipe(
distinct()
).subscribe(console.log);
distinctUntilChange
El operador distinctUntilChanged
se utiliza para filtrar valores consecutivos que son iguales, permitiendo que solo el primer valor de una secuencia de valores duplicados se emita. Pero a diferencia de distinct, este operador no elimina todos los duplicados, sino que solo los consecutivos.
import { from, of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
const numbers$ = of(1, '1', 1, 3, 3, 2, 2, 4, 4, 5, 3, 1, '1');
numbers$.pipe(
distinctUntilChanged()
).subscribe(console.log);
distinctUntilKeyChange
El operador distinctUntilKeyChanged
permite filtrar los valores emitidos por un observable, de tal manera que solo se emiten aquellos valores cuyo valor de una clave específica ha cambiado desde la última emisión. En este caso, estamos filtrando los personajes por su nombre, de modo que solo se emiten personajes con nombres únicos consecutivos.
import { from } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';
interface Character {
name: string;
}
const characters: Character[] = [
{ name: 'Megaman' },
{ name: 'X' },
{ name: 'Zero' },
{ name: 'Dr. Willy' },
{ name: 'X' },
{ name: 'Megaman' },
{ name: 'Zero' },
];
from(characters).pipe(
distinctUntilKeyChanged('name')
).subscribe(console.log);
Operadores de tiempo
debounceTime
El operador debounceTime
se utiliza para limitar la cantidad de eventos que se procesan en un período de tiempo determinado. Es útil para evitar que se procesen eventos que ocurren demasiado rápido, como clics o pulsaciones de teclas.
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, pluck } from 'rxjs/operators';
// Ejemplo 1
const click$ = fromEvent(document, 'click');
click$.pipe(
debounceTime(3000)
).subscribe({
next: (event) => console.log(event),
});
// Ejemplo 2
const input = document.createElement('input');
document.querySelector('body')!.append(input);
const input$ = fromEvent(input, 'keyup');
input$.pipe(
debounceTime(1000),
pluck('target', 'value'),
distinctUntilChanged()
).subscribe({
next: (value) => {
console.log('EL VALOR ES: ', value);
}
});
throttleTime
El operador throttleTime
nos permite controlar la frecuencia con la que se emiten los valores de un observable. Este operador emite el primer valor inmediatamente y luego ignora los valores subsiguientes durante un período de tiempo especificado. Esto es útil para evitar la sobrecarga de eventos o para limitar la cantidad de veces que se procesa una acción en un intervalo de tiempo determinado.
import { asyncScheduler, fromEvent } from 'rxjs';
import { distinctUntilChanged, pluck, throttleTime } from 'rxjs/operators';
// Ejemplo 1
const click$ = fromEvent(document, 'click');
click$.pipe(
throttleTime(3000)
).subscribe({
next: (event) => console.log(event),
});
// Ejemplo 2
const input = document.createElement('input');
document.querySelector('body')!.append(input);
const input$ = fromEvent(input, 'keyup');
input$.pipe(
throttleTime(400, asyncScheduler, {
leading: true,
trailing: true
}),
pluck('target', 'value'),
distinctUntilChanged()
).subscribe({
next: (value) => {
console.log('EL VALOR ES: ', value);
}
});
sampleTime
El operador sampleTime
en RxJS emite el valor más reciente de un observable cada cierto intervalo de tiempo especificado. Es útil cuando se desea obtener una muestra periódica de los valores emitidos por un observable, en lugar de procesar cada valor individualmente.
import { fromEvent } from 'rxjs';
import { map, sampleTime } from 'rxjs/operators';
const click$ = fromEvent<MouseEvent>(document, 'click');
click$.pipe(
sampleTime(2000),
map(({ x, y }) => ({ x, y })),
).subscribe(console.log);
sample
El operador sample
es un poco especial, ya que este operador va a emitir los valores del observable fuente cada vez que el observable que se pasa como argumento emita un valor. Es decir, el observable que se pasa como argumento actúa como un “disparador” para emitir el valor más reciente del observable fuente.
import { fromEvent, interval } from 'rxjs';
import { sample } from 'rxjs/operators';
const interval$ = interval(500);
const click$ = fromEvent(document, 'click');
interval$.pipe(
sample(click$)
).subscribe(console.log);
auditTime
El operador auditTime
como se propio nombre indica, nos sirve para auditar el valor emitido por un observable cada cierto tiempo, es decir, este operador va a emitir el valor más reciente del observable fuente cada cierto intervalo de tiempo especificado, ignorando cualquier otro valor emitido durante ese intervalo.
import { fromEvent } from 'rxjs';
import { auditTime, map, tap } from 'rxjs/operators';
const click$ = fromEvent<MouseEvent>(document, 'click');
click$.pipe(
map(({ x }) => x),
tap((val) => console.log('tap', val)),
auditTime(5000)
).subscribe(console.log);
Peticiones Ajax
Ajax es una librería de RxJS que permite realizar peticiones HTTP de manera reactiva. Con esta librería, puedes manejar las respuestas y errores de las peticiones de forma más sencilla y elegante utilizando los operadores de RxJS.
ajax
El método ajax
de la librería Ajax de RxJS permite realizar peticiones HTTP de manera reactiva. Este método devuelve un observable que emite la respuesta de la petición cuando esta se completa.
import { of } from 'rxjs';
import { ajax, AjaxError } from 'rxjs/ajax';
import { catchError, pluck } from 'rxjs/operators';
const url = 'https://api.github.com/users?per_page=5';
ajax(url).pipe(
pluck('response'),
catchError((error: AjaxError) => {
console.warn('Error fetching users:', error);
return of([]);
})
).subscribe({
next: (users) => console.log('Users:', users),
error: (error) => console.warn('Error in subscription:', error),
complete: () => console.log('Request completed')
});
El método ajax.getJSON
realiza una petición HTTP GET a una URL específica y devuelve un Observable que emite la respuesta JSON, este método acepta la URL y un objeto de opciones que puede incluir encabezados HTTP.
import { ajax } from 'rxjs/ajax';
const url = 'https://httpbin.org/delay/1';
const obs$ = ajax.getJSON(url, {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN'
});
obs$.subscribe({
next: (response) => console.log('Response:', response),
error: (error) => console.error('Error:', error),
complete: () => console.log('Request completed')
});
La principal diferencia entre ajax.getJSON
y ajax
es que ajax.getJSON
realiza una petición HTTP GET y espera que la respuesta sea un JSON, mientras que ajax
es más general y puede realizar peticiones de diferentes tipos y manejar diferentes formatos de respuesta. Además, ajax
permite más configuraciones como el método HTTP, los encabezados, el cuerpo de la petición, etc.
HTTP methods
Usando la librería Ajax de RxJS, podemos realizar peticiones HTTP utilizando diferentes métodos como GET, POST, PUT, DELETE, entre otros. A continuación se muestran ejemplos de cómo realizar peticiones con estos métodos.
import { ajax } from 'rxjs/ajax';
const url = 'https://httpbin.org/delay/1';
ajax.post(
url,
{
id: 1,
name: 'Cesar Villalobos Olmos',
age: 23,
},
{
'Content-Type': 'application/json',
Authorization: 'Bearer 1234567890',
},
).subscribe({
next: (response) => console.log('Response post:', response),
error: (error) => console.error('Error post:', error)
});
ajax.put(
url,
{
id: 1,
name: 'Cesar Villalobos Olmos',
age: 24,
},
{
'Content-Type': 'application/json',
Authorization: 'Bearer 1234567890',
},
).subscribe({
next: (response) => console.log('Response put:', response),
error: (error) => console.error('Error put:', error)
});
ajax.patch(
url,
{
age: 23,
},
{
'Content-Type': 'application/json',
Authorization: 'Bearer 1234567890',
},
).subscribe({
next: (response) => console.log('Response patch:', response),
error: (error) => console.error('Error patch:', error)
});
ajax({
url,
method: 'DELETE',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer 1234567890',
},
}).subscribe({
next: (response) => console.log('Response delete:', response),
error: (error) => console.error('Error delete:', error)
});
Operadores de transformación
mergeAll
El operador mergeAll
se encarga de suscribirse a los observables internos que se generan y emitir sus valores en el observable externo, es decir, este operador toma un observable que emite otros observables y los combina en un solo observable que emite todos los valores de los observables internos.
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, map, mergeAll, pluck } from 'rxjs/operators';
const textInput = document.createElement('input');
textInput.type = 'text';
document.querySelector('body')!.append(textInput);
const input$ = fromEvent<KeyboardEvent>(textInput, 'keyup');
input$.pipe(
debounceTime(500),
pluck('target', 'value'),
map((text) => ajax.getJSON(
`https://api.github.com/search/users?q=${text}`
)),
mergeAll(),
pluck('items')
).subscribe({
next: (users) => {
console.log('LOS USUARIOS: ', users);
},
});
mergeMap
El operador mergeMap
recibe una función que retorna un observable, este operador se suscribe a cada uno de los observables que retorna la función y emite todos los valores de cada uno de esos observables, por ejemplo, si tenemos un observable que emite valores y queremos transformar cada valor en un nuevo observable, podemos usar mergeMap
para lograrlo.
import { fromEvent, interval, of } from 'rxjs';
import { map, mergeMap, take, takeUntil } from 'rxjs/operators';
const letters$ = of('a', 'b', 'c');
letters$.pipe(
mergeMap((letter) => interval(1000).pipe(
map(i => letter + i),
take(3)
))
).subscribe({
next: (val) => console.log('next:', val),
complete: () => console.log('Complete')
});
const mousedown$ = fromEvent(document, 'mousedown');
const mouseup$ = fromEvent(document, 'mouseup');
const interval$ = interval();
mousedown$.pipe(
mergeMap(() => interval$.pipe(
takeUntil(mouseup$)
))
).subscribe(console.log);
switchMap
El operador switchMap
lo que hace es cancelar la petición anterior y hacer una nueva petición cada vez que se emite un nuevo valor en el observable de origen, es decir, si tenemos un observable que emite valores y queremos transformar cada valor en un nuevo observable, pero solo queremos mantener la suscripción al último observable emitido, podemos usar switchMap
para lograrlo.
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, switchMap, mergeAll, pluck } from 'rxjs/operators';
const textInput = document.createElement('input');
textInput.type = 'text';
document.querySelector('body')!.append(textInput);
const input$ = fromEvent<KeyboardEvent>(textInput, 'keyup');
input$.pipe(
debounceTime(500),
pluck('target', 'value'),
switchMap((text) => ajax.getJSON(
`https://api.github.com/search/users?q=${text}`
)),
pluck('items')
).subscribe({
next: (users) => {
console.log('LOS USUARIOS: ', users);
},
});
concatMap
El operador concatMap
mapea cada uno de los valores emitidos por el observable origen a un observable interno y los concatena, es decir, espera a que el observable interno complete antes de suscribirse al siguiente. Por lo tanto, los valores emitidos por el observable interno se emiten en el mismo orden en que fueron recibidos del observable origen.
import { fromEvent, interval, of } from 'rxjs';
import { concatMap, take } from 'rxjs/operators';
const interval$ = interval(500).pipe(take(3));
const click$ = fromEvent(document, 'click');
click$.pipe(
concatMap(() => interval$)
).subscribe(console.log);
const numbers$ = of(1, 2, 3);
const letters$ = of('a', 'b', 'c');
numbers$.pipe(
concatMap((num) => of(num * 10)),
concatMap((num) => letters$.pipe(
concatMap((letter) => of(letter.toUpperCase() + num))
))
).subscribe(console.log);
exhaustMap
El operador exhaustMap
ignora las emisiones entrantes mientras la suscripción interna está activa, es decir, si el observable interno aún no ha completado, cualquier nueva emisión del observable externo será ignorada hasta que el observable interno complete. Esto es útil en situaciones donde solo queremos procesar una solicitud a la vez y evitar la sobrecarga de múltiples solicitudes simultáneas.
import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';
const interval$ = interval(500).pipe(take(3));
const click$ = fromEvent(document, 'click');
click$.pipe(
exhaustMap(() => interval$)
).subscribe(console.log);
Operadores de combinación
startWith
El método startWith
lo que nos permite es emitir valores antes de que se emitan los valores del observable, antes de que la suscripción reciba los valores del observable va a recibir los valores que le pasemos a startWith
.
import { of } from 'rxjs';
import { startWith } from 'rxjs/operators';
const numbers$ = of(1, 2, 3).pipe(
startWith('a', 'b', 'c'),
);
numbers$.subscribe({
next: (value) => console.log(value),
complete: () => console.log('Completed')
});
endWith
El método endWith
lo que nos permite es emitir valores después de que se hayan emitido los valores del observable, después de que la suscripción reciba los valores del observable va a recibir los valores que le pasemos a endWith
.
import { of } from 'rxjs';
import { endWith } from 'rxjs/operators';
const numbers$ = of(1, 2, 3).pipe(
endWith('x', 'y', 'z')
);
numbers$.subscribe({
next: (value) => console.log(value),
complete: () => console.log('Completed')
});
concat
El método concat
permite combinar múltiples Observables de manera secuencial, es decir, el segundo Observable no comenzará a emitir valores hasta que el primero haya completado.
import { concat, interval, of } from "rxjs";
import { take } from "rxjs/operators";
const interval$ = interval(1000);
concat(
interval$.pipe(take(3)),
interval$.pipe(take(2)),
of(1)
).subscribe(console.log);
merge
El método merge
combina múltiples Observables en uno solo, emitiendo los valores de todos los Observables a medida que llegan, sin esperar a que uno complete antes de comenzar con el siguiente.
import { fromEvent, merge } from 'rxjs';
import { pluck } from 'rxjs/operators';
const keyup$ = fromEvent(document, 'keyup');
const click$ = fromEvent(document, 'click');
merge(
keyup$.pipe(pluck('type')),
click$.pipe(pluck('type'))
).subscribe(console.log);
combineLatest
El método combineLatest
toma múltiples observables y emite un valor cuando todos los observables han emitido al menos un valor, de ahí en adelante emitirá un valor cada vez que cualquiera de los observables emita un nuevo valor y se emiten juntos como un array.
import { combineLatest, fromEvent } from 'rxjs';
import { pluck } from 'rxjs/operators';
const input1 = document.createElement('input');
const input2 = document.createElement('input');
input1.placeholder = 'email@gmail.com';
input1.type = 'email';
input2.placeholder = '*********';
input2.type = 'password';
document.querySelector('body')!.append(input1, input2);
const getInputStream = (input: HTMLElement) => {
return fromEvent<KeyboardEvent>(input, 'keyup').pipe(
pluck('target', 'value')
);
}
combineLatest(
getInputStream(input1),
getInputStream(input2)
).subscribe(([email, password]) => {
console.log('Email:', email);
console.log('Password:', password);
});
forkJoin
El método forkJoin
se utiliza para combinar múltiples observables y esperar a que todos ellos completen, una vez que todos los observables hayan emitido su último valor, forkJoin
emitirá un único valor que es un array o un objeto con los últimos valores emitidos por cada observable.
import { forkJoin, interval, of } from 'rxjs';
import { delay, take } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
const interval$ = interval(1000).pipe(take(5));
const letters$ = of('a', 'b', 'c').pipe(delay(3500));
forkJoin(
numbers$,
interval$,
letters$
).subscribe(console.log)
forkJoin(
numbers$,
interval$,
letters$
).subscribe((resp) => {
console.log('Números: ', resp[0])
console.log('Intervalo: ', resp[1])
console.log('Letras: ', resp[2])
});
forkJoin({
numbers$,
interval$,
letters$
}).subscribe((resp) => {
console.log(resp);
});
forkJoin({
num: numbers$,
int: interval$,
let: letters$
}).subscribe((resp) => {
console.log(resp);
});