Giter Site home page Giter Site logo

Comments (3)

maple-leaf avatar maple-leaf commented on June 18, 2024

代码分析

引入依赖,创建读取与写入流

const https = require('https');
const querystring = require('querystring');
const Rx = require('rxjs');
const readline = require('readline');
const fs = require('fs');

const imgStream = readline.createInterface({  // 创建行读取流
    input: fs.createReadStream('filelist.txt')
});

const writeStream = fs.createWriteStream('output.txt');  // 创建写入流

使用Rx处理读取并反馈结果给写入

Rx.Observable.fromEvent(imgStream, 'line')  // 将行读取流转化为Rx的事件流
.takeUntil(Rx.Observable.fromEvent(imgStream, 'close'))  // 读取流截止时终止Rx流
.map(img => generateData(img))  // 将文件名处理成post的数据
 // 发起请求,并发3个,请求返回后延迟400ms后再进行下一步处理并发起下一个请求
.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) 
.subscribe(data => {
    // 处理数据并写入文件
    let str = data.url;
    if (data.status === 200 && data.data.xxx.length) {
        zzz = data.data.xxx.map(x => x.zzz);
        str += `    ${JSON.stringify(zzz)}`;
    }
    writeStream.write(`${str}\n`);
}, (err) => {
    console.log(err);
    console.log('!!!!!!!!!!!ERROR!!!!!!!!!');
}, () => {
    console.log('=====complete======');
    writeStream.end();
});

其中的需要关注的点在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,这里内部requestAPI返回一个封装了http异步请求并延迟400ms的Rx流,当请求完成并延迟完成后将数据返回上一层继续进行处理(可以类比为Promisethen)

from blog.

maple-leaf avatar maple-leaf commented on June 18, 2024

使用Rx的自定义流封装一个带错误重试机制的http请求

const requestFacepp = dataStr => {
    const options = {
        hostname: 'api.xxx.com',
        port: 443,
        path: '/xxx',
        method: 'POST',
        headers: {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Content-Length': Buffer.byteLength(dataStr)
        }
    };
    const reqData = querystring.parse(dataStr);
    const retry$ = new Rx.Subject();  // 触发重试的流,当其发出数据时会使`retryWhen`触发重试错误流
    let retryTimes = 3;  // 设置非正常失败(超时)重试的上限

    // 使用Rx的自定义流封装一个带错误重试机制的http请求,可以类比为new Promise
    // 但要注意的是Rx是流,即数据是可以持续的,而Promise则只有一个结果和状态
    return Rx.Observable.create(observer => {
        const req = https.request(options, res => {
            let data = '';
            res.setEncoding('utf8');
            res.on('data', chunk => {
                data += chunk;
            });
            res.on('end', () => {
                if (res.statusCode === 200) {
                    // 请求正常返回,向流内推送结果并结束
                    observer.next({
                        status: res.statusCode,
                        url: reqData.image_url,
                        data: JSON.parse(data)
                    });
                    observer.complete();
                } else {
                    // 请求正常返回,但不是正常结果,抛出错误并重试
                    console.log(`retring: ${reqData.image_url}`);
                    observer.error({
                        status: res.statusCode,
                        url: reqData.image_url
                    });
                    retry$.next(true);
                }
            });
        });

        req.setTimeout(4000, () => {
            // 设置请求4s超时,超时后终止,引发请求抛错
            req.abort();
        });

        req.on('error', err => {
            console.log(`retring(${retryTimes}): ${reqData.image_url}`);
            // 请求抛错时重试,超出次数则终止本次请求
            observer.error(`error: ${err.message}`);
            if (retryTimes > 0) {
                retryTimes--;
                retry$.next(true);
            } else {
                retry$.complete();
            }
        });

        req.write(dataStr);

        req.end();
        return () => { req.abort() };  // 返回终止流的处理回调
    })
    .retryWhen(errs => errs.switchMap(err => {
        // 未超过次数返回重试流,超出则返回错误数据并终止本次Rx流
        return retryTimes > 0 ? retry$ : Rx.Observable.of({
            status: 500,
            url: reqData.image_url
        });
    }));
};

from blog.

maple-leaf avatar maple-leaf commented on June 18, 2024

收尾

到此就搬砖完毕,开个车让他慢慢跑就可以了。
本篇展示了Rx在流数据处理与异步处理上的方式,逻辑与代码都挺清晰、扁平。在处理交杂的逻辑时也不错(重试部分)。如果喜欢或者有帮助的话可以后面在发一篇Rx在复杂DOM事件处理上的应用。;-)

from blog.

Related Issues (4)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.