node (6): introducción a los streams

Introducción a los streams de node

Bernard Schultze

archivado en: JavaScript / 12 septiembre, 2016 / taller:

En node se denomina streams a los procesos que permiten ser trabajados mientras están sucediendo, como ocurre en otros contextos con los vídeos en streaming, que pueden ser visualizados a medida que se están descargando. De hecho, en palabras técnicas, un stream es un interfaz abstracto que permite trabajar con datos en streaming. Para entenderlo podemos pensar en un antiguo monje medieval transcribiendo una obra a medida que la va leyendo: mientras dura el proceso de lectura recibe un flujo de datos -las palabras- y, sin esperar a que concluya el libro, ya está haciendo algo con ellas, copiarlas en un códice.

Los stream son instancias de EventEmitter, por lo que emiten eventos, y se clasifican en tres grandes categorías: de lectura (readable), de escritura (writable) y dobles (duplex y transform). Cada pieza de node -como file system o http- puede usar un tipo de stream o todos o ninguno. Pero antes de seguir en términos abstractos vamos a ver un ejemplo concreto, que es como quedan claras las cosas :P.

Por un lado, creamos un archivo de texto que podemos rellenar con un lore ipsum o lo que sea. Por otro, preparamos el código js, en el cual leeremos el contenido del primer documento con el método readFile() del api de file system.

"use strict";

const fs = require('fs');

fs.readFile('./lore.txt', (err, data)=> {

console.log('Contenido del archivo: '+data);

});

Al ejecutar el script, se muestra por consola el texto del archivo y parece que va bien; sin embargo, de esta manera nos hemos tenido que esperar a que se leyera toooodo el documento para poder hacer algo con los datos y esto en principio no es bueno, sobre todo cuando en lugar de ser un archivo ligero es un megasaurio monstruoso. Por lo tanto, podemos refactorizar el script y, en lugar de readFile(), usar el método createReadStream() que, como indica su nombre, crea un stream con los datos que está leyendo.

/* Cargamos el módulo */

const fs = require('fs');

/* definimos una variable donde iremos almacenando los pedazo de datos (chunks) a medida que vayan llegando */

let data = '';

/* declaramos el stream */

let streamRead = fs.createReadStream('./lore.txt');

/* Usamos el evento 'data' para ir guardando los pedazos de datos (chunk) en cuanto llegan. Además, aquí podríamos ir haciendo otras cosas con los datos, como ir mostrándolos o convirtiéndolos a utf-8. */

streamRead.on('data', (chunk)=> {

data += chunk;

});

/* Y lanzamos una escucha sobre el evento 'end' para hacer algo con los datos cuando ya han llegado todos */

streamRead.on('end', ()=> {

console.log('lectura terminada: '+data);

});

En el ejemplo hemos visto que en el evento 'data' llegan chunks («trozos») de datos, lo cual quizás necesite alguna aclaración. Cuando usamos estos métodos de stream no recuperamos de forma separada cada uno de los bits que conforman un algo digital, como una imagen o un archivo de texto, sino que nos van llegando en «paquetes» que se han ido almacenando en un buffer. Para entenderlo podemos pensar en un tren en marcha, que sería el sream, el cual está formado por muchos vagones, cada uno de los cuales es un buffer que transporta un chunk de datos. Hay una manera para trabajar con streams y recibir otra cosa que no sean buffers, pero esto lo veremos más adelante.

Streams readables

Hay un montón de implementaciones de node que manejan stream de lectura -como las conexiones TCP, algunas partes del servidor HTTP, los audios y vídeos en streaming, la recuperación de los contenidos de un archivo, etcétera- y en general todas utilizan estos tres eventos:

  • data: cuando se emiten chunks de datos.
  • error: cuando se produce o emite un error.
  • end: cuando han dejado de emitirse datos.

Existen dos modos de lectura: flowing (fluido) y paused (pausado). En el modo flowing los datos están listos para ser consumidos directamente, mientras que en el paused deben solicitarse explícitamente. Todos los stream comienzan en modo paused, pero se convierten en modo flowing cuando sucede alguna de estas tres cosas:

  • hay un evento 'data'.
  • se llama al método stream.resume().
  • se llama al método stream.pipe() para enviar datos a un stream writable, como veremos más adelante.

Para entender esto vamos a manejar otro evento, readable, que se lanza cuando los datos están listos para ser leídos, y vamos a refactorizar el ejemplo anterior quitando el evento data. Como este ya no existe, nunca llega a producirse el evento end, ya que los datos están «pausados».

const fs = require('fs');

let streamRead = fs.createReadStream('./lore.txt');

streamRead.on('readable', ()=> {

console.log('Listo para leer');

});

streamRead.on('end', ()=> {

console.log('lectura terminada');

});

Pero más que pausados, sería mejor decir almacenados en un buffer interno esperando a que alguien los solicite, por ejemplo, utilizando el método read() que veremos luego.

streamRead.on('readable', ()=> {

streamRead.read();

});

Este concepto de los dos estados se fundamenta en una propiedad interna de los stream readables que define tres estados posibles:

  • _readableState.flowing = null.
  • _readableState.flowing = false.
  • _readableState.flowing = true.

Cuando se encuentra en null significa que no hay mecanismos para consumir los datos; cuando está en false, que están listos para ser consumidos en un buffer interno; y cuando está en true, que se pueden consumir según se generan. Por ejemplo, en el caso anterior estaría en null al no existir ningún consumidor, pero aquí estaría en true.

streamRead.on('data', (chunk)=> {

console.log(streamRead._readableState.flowing ); // true

});

Los stream readables, además, se pueden trabajar con los siguientes métodos:

pause() / resume(): detienen y reanudan respectivamente la lectura de datos del stream. En un ejemplo muy claro de la documentación oficial:

streamRead.on('data', (chunk)=> {

console.log(`Han llegado ${chunk.length} bytes`);

/* Detenemos la lectura */

streamRead.pause();

setTimeout(() => {

console.log('La renaudamos pasado un segundo');

streamRead.resume();

}, 1000);

});

setEncoding(): por lo general, en el callback del evento data llega un buffer ilegible para los seres humanos. Por ejemplo, la cadena «Lore» llega como el siguiente objeto Buffer con los valores en hexadecimal: <Buffer 4c 6f 72 65> . Pero con este método, podemos indicar previamente que queremos codificar los chunks como strings utf8.

streamRead.setEncoding('utf8');

streamRead.on('data', (chunk)=> {

console.log(chunk); // string (no buffer): Lore

});

read(): como vimos, sirve para recuperar los datos almacenados en el buffer interno cuando estamos en modo paused. Acepta como argumento opcional el número de bytes a leer cada tacada.

Bueno, a falta de tratar el pipe(), que veremos al final, me faltaría hablar de algún método más, pero con los expuestos es suficiente para esta introducción.

Streams writables

Este tipo de streams son más sencillos que los anteriores, aunque también en este caso los encontramos en muchas implementaciones, como los TCP sockets, los HTTP requests del cliente, los HTTP responses del server, etcétera. Derivan de la clase stream.Writable y para entender cómo funcionan podemos preparar un script muy sencillo que use el método createWriteStream() del módulo file system, el cual, como indica su nombre, crea un stream de escritura y vuelca los datos indicados en el sitio indicado como parámetro ('output.txt en el ejemplo).

"use strict";

/* Cargamos el módulo file system */

const fs = require('fs');

/* Creamos un stream de escritura */

let streamWrite = fs.createWriteStream('output.txt');

/* Preparamos algunos datos para ser escritos */

let data = 'Timeo Danaos et dona ferentes';

/* Los escribimos codificados como utf8 */

streamWrite.write(data,'UTF8');

/* Indicamos que hemos terminado de escribir */

streamWrite.end();

Los eventos principales que se manejan en este tipo de streams son:

  • error: cuando se produce un error.
  • finish: se emite justo después de que hayamos indicado con el método end() que hemos terminado de escribir.
  • pipe y unpipe: que se lanzan cuando se encadenan streams, como veremos luego.

Entre los métodos destacan:

end: cierra el stream. Admite tres parámetros, el primero un chunk que se añade al finalizar -ya sea este un string, un buffer o cualquier otro valor que no sea null-, el segundo la codificación de este chunk y el tercero una función callback. Por ejemplo:

streamWrite.end(' y con esto se acabó de escribir', 'UTF8', ()=> {

console.log('por fin se terminó');

});

setDefaultEncoding: para modificar la codificación.

write: como indica su nombre, sirve para escribir y admite los mismos tres parámetros que end().

Pipes (tuberías)

El concepto de pipe viene del mundo linux y consiste en conectar dos procesos para que se ejecuten de forma consecutiva. Para entenderlo podemos pensar en tuberías por las que circula el agua desde un punto de origen a uno final. Por ejemplo, en este script de gulp se usa para reducir de peso las imágenes de una aplicación y luego copiarlas en el directorio indicado.

gulp.task('images', ()=> {

return gulp.src('app/images/**/*.+(png|jpg|gif|svg)')

.pipe(imagemin())

.pipe(gulp.dest('dist/images'))

});

En node es muy sencillo pipear streams. Basta con utilizar el método pipe() en el objeto que haya instanciado el stream, como en este caso, que pipeamos -enviamos- el stream de lectura directamente al stream de escritura:

const fs = require('fs');

/* Creamos un stream de lectura */

let streamRead = fs.createReadStream('./lore.txt');

/* Creamos un stream de escritura */

let streamWrite = fs.createWriteStream('output.txt');

/* Pipeamos los dos streams */

streamRead.pipe(streamWrite);

Bueno, pues con esto espero que haya quedado algo claro qué son los streams. Quedaría hablar de los stream del tipo Duplex y transform, pero no vale la pena incluirlos en esta introducción, pues son streams que aúnan las características de los readables y los writables que acabamos de ver. Lo que en cambio sí es importante es explicar cómo preparar nuestros propios streams, pero eso será en otra entrada, que por hoy lo dejo aquí.

|| Tags: ,

valoración de los lectores sobre node (6): introducción a los streams

  • estrellica valoración positiva
  • estrellica valoración positiva
  • estrellica valoración positiva
  • estrellica valoración negativa
  • estrellica valoración negativa
  • 3 sobre 5 (1 votos)

¿Te ha parecido útil o interesante esta entrada?
dormido, valoración 1 nadapensativo, valoración 2 un poco sonrisa, valoración 3 a medias guiño, valoración 4 bastante aplauso, valoración 5 mucho

Tú opinión es muy importante, gracias por compartirla!

Aportar un comentario


*