用NODE.JS中的流編寫工具是要注意的事項

字號:


    Node.js中的流十分強大,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。正因為它如此好用,所以在實戰(zhàn)中我們常?;谒鼇砭帉懸恍┕ぞ?函數(shù)/庫 ,但往往又由于自己對流的某些特性的疏忽,導(dǎo)致寫出的 函數(shù)/庫 在一些情況會達(dá)不到想要的效果,或者埋下一些隱藏的地雷。本文將會提供兩個在編寫基于流的工具時,私以為有些用的兩個tips。
    一,警惕EVENTEMITTER內(nèi)存泄露
    在一個可能被多次調(diào)用的函數(shù)中,如果需要給流添加事件監(jiān)聽器來執(zhí)行某些操作。那么則需要警惕添加監(jiān)聽器而導(dǎo)致的內(nèi)存泄露:
    'use strict';
    const fs = require('fs');
    const co = require('co');
    function getSomeDataFromStream (stream) {
     let data = stream.read();
     if (data) return Promise.resolve(data);
     if (!stream.readable) return Promise.resolve(null);
     return new Promise((resolve, reject) => {
      stream.once('readable', () => resolve(stream.read()));
      stream.on('error', reject);
      stream.on('end', resolve);
     })
    }
    let stream = fs.createReadStream('/Path/to/a/big/file');
    co(function *() {
     let chunk;
     while ((chunk = yield getSomeDataFromStream(stream)) !== null) {
      console.log(chunk);
     }
    }).catch(console.error);
    在上述代碼中,getSomeDataFromStream函數(shù)會在通過監(jiān)聽error事件和end事件,來在流報錯或沒有數(shù)據(jù)時,完成這個Promise。然而在執(zhí)行代碼時,我們很快就會在控制臺中看到報警信息:(node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit.,因為我們在每次調(diào)用該函數(shù)時,都為傳入的流添加了一個額外的error事件監(jiān)聽器和end事件監(jiān)聽器。為了避免這種潛在的內(nèi)存泄露,我們要確保每次函數(shù)執(zhí)行完畢后,清除所有此次調(diào)用添加的額外監(jiān)聽器,保持函數(shù)無污染:
    function getSomeDataFromStream (stream) {
     let data = stream.read();
     if (data) return Promise.resolve(data);
     if (!stream.readable) return Promise.resolve(null);
     return new Promise((resolve, reject) => {
      stream.once('readable', onData);
      stream.on('error', onError);
      stream.on('end', done);
      function onData () {
       done();
       resolve(stream.read());
      }
      function onError (err) {
       done();
       reject(err);
      }
      function done () {
       stream.removeListener('readable', onData);
       stream.removeListener('error', onError);
       stream.removeListener('end', done);
      }
     })
    }
    二,保證工具函數(shù)的回調(diào)在處理完畢數(shù)據(jù)后才被調(diào)用
    工具函數(shù)往往會對外提供一個回調(diào)函數(shù)參數(shù),待處理完流中的所有數(shù)據(jù)后,帶著指定值觸發(fā),通常的做法是將回調(diào)函數(shù)的調(diào)用掛在流的end事件中,但如果處理函數(shù)是耗時的異步操作,回調(diào)函數(shù)則可能在所有數(shù)據(jù)處理完畢前被調(diào)用:
    'use strict';
    const fs = require('fs');
    let stream = fs.createReadStream('/Path/to/a/big/file');
    function processSomeData (stream, callback) {
     stream.on('data', (data) => {
      // 對數(shù)據(jù)進(jìn)行一些異步耗時操作
      setTimeout(() => console.log(data), 2000);
     });
     stream.on('end', () => {
      // ...
      callback()
     })
    }
    processSomeData(stream, () => console.log('end'));
    以上的代碼callback回調(diào)可能會在數(shù)據(jù)并未被全部處理時就被調(diào)用,因為流的end事件的觸發(fā)時機僅僅是在流中的數(shù)據(jù)被讀完時。所以我們需要額外地對數(shù)據(jù)是否已處理完進(jìn)行檢查:
    function processSomeData (stream, callback) {
     let count = 0;
     let finished = 0;
     let isEnd = false;
     stream.on('data', (data) => {
      count++;
      // 對數(shù)據(jù)進(jìn)行一些異步耗時操作
      setTimeout(() => {
       console.log(data);
       finished++;
       check();
      }, 2000);
     });
     stream.on('end', () => {
      isEnd = true;
      // ...
      check();
     })
     function check () {
      if (count === finished && isEnd) callback()
     }
    }
    這樣一來,回調(diào)便會在所有數(shù)據(jù)都處理完畢后觸發(fā)了。