構建你自己的redux-saga
知乎上已經有不少介紹redux-saga的好文章了,例如redux-saga實踐總結、淺析redux-saga實現原理、 Redux-Saga漫談。本文將介紹redux-saga 的實現原理,並一步步地用代碼構建little-saga —— 一個redux-saga 的簡單版本。希望通過本文,更多人可以了解到redux-saga 背後的運行原理。
本文是對redux-saga 的原理解析,將不再介紹redux-saga 的相關概念。所以在閱讀文章之前,請確保對redux-saga 有一定的了解。
文章目錄
- 0.1 文章結構
- 0.2 名詞解釋
- 0.3 關於little-saga
- 1.1 生成器函數
- 1.2 使用while-true 來消費迭代器
- 1.3 使用遞歸函數來消費迭代器
- 1.4 雙向通信
- 1.5 effect 的類型與含義
- 1.6 result-first callback style
- 1.7 cancellation
- 1.8 effect 狀態
- 1.9 proc 初步實現
- 2.1 Task
- 2.2 fork model
- 2.3 類ForkQueue
- 2.4 task context
- 2.5 effect 類型拓展
- 2.6 little-saga 核心部分的完整實現
- 2.7 Task 狀態變化舉例
- 2.8 類Env
- 2.9 第二部分小節
- 3.1 commonEffects 拓展
- 3.2 channelEffects 拓展
- 3.3 compat 拓展
- 3.4 scheduler
- 3.5 其他細節問題
0.1 文章結構
本文很長,大致分為四部分。文中每一個章節有對應的xy 標記,方便相互引用。
- 0.x 介紹文章的一些相關訊息。
- 1.x 講解一些基礎概念並實現一個簡單版本的proc 函數。
- 2.x 介紹redux-saga/little-saga 的一些核心概念,例如Task、fork model、effect 類型拓展,並實現了little-saga 核心部分。
- 3.x 使用little-saga 的拓展機制,實現了race/all、channel、集成redux 等功能,也討論了一些其他相關問題。
0.2 名詞解釋
side effect 是來自函數式編程的概念,而effect 這個單詞直譯過來是「作用」。 「作用」是一個表意很模糊的詞語,不太適合用在技術文章中,所以本文中將保持使用effect 這個英文單詞。不過,到底什麼是effect 呢?這又是一個很難解釋清楚的問題。在本文中,我們放寬effect 的概念,只要是被yield 的值,都可以被稱為effect。例如yield 1
中,數字1
就是effect,不過數字類型的effect缺少明確的含義;又例如yield fetch('some-url')
中, fetch('some-url')
這個Promise對象便是effect。
當我們使用redux-saga 時,我們的業務代碼往往充當effect 的生產者:生成effect,並使用yield 語句將effect 傳給saga-middleware。而saga-middleware 充當effect 的消費者:獲取effect,根據effect 的類型解釋該effect,然後將結果返回給生產者。文中會使用effect-producer 與effect-runner 來表示生產者與消費者。注意effect-runner 是需要將結果返回給effect-producer 的,所以有的時候也需要將redux-saga 看作一種「請求-響應」模型,我們業務代碼生產effect 以發起「請求」,而saga- middleware 負責消費並將響應返回給業務代碼。
saga 又是一個讓人困擾的單詞(・へ・),這裡給一個簡單的說明。文中的「saga 函數」其實就是指JavaScript 生成器函數,不過特指那些「作為參數傳入函數proc 或sagaMiddleware.run 然後開始運行」的生成器函數。 「saga 實例」指的是調用saga 函數得到的迭代器對象。 task 指的是saga 實例運行狀態的描述對象。
0.3 關於little-saga
little-saga 大量參考了redux-saga 源碼,參考的版本為redux-saga v1.0.0-beta.1。 redux-saga 對許多邊界情況做了處理,代碼比較晦澀,而little-saga 則進行了大量簡化,所以兩者有許多實現細節差異。本文中出現的代碼都是little-saga 的,不過我偶爾也會附上相應的redux-saga 源碼鏈接,大家可以對照著看。
little-saga已經跑通了redux-saga的絕大部分測試(跳過了little-saga沒有實現的那一部分功能的測試),使用little-saga/compat
已經可以替換redux-saga,例如我上次寫的坦克大戰復刻版就已經使用little-saga替換掉了redux-saga 。
little-saga 的特點是沒有綁定redux,所以有的時候(例如寫網絡爬蟲、寫遊戲邏輯時)如果並不想使用redux,但仍想用fork model 和channel 來管理異步邏輯,可以嘗試一下little- saga。 little-saga 的初衷還是通過簡化redux-saga,能讓更多人理解redux-saga 背後的原理。
1.1 生成器函數
讓我們先從redux-saga 中最常見的yield 語法開始。生成器函數使用function*
聲明,而yield語法只能出現在生成器函數中。在生成器執行過程中,遇到yield 表達式立即暫停,後續可恢復執行狀態。使用redux-saga 時,所有的effect 都是通過yield 語法傳遞給effect-runner 的,effect-runner 處理該effect 並決定什麼時候恢復生成器。 InfoQ上面的深入淺出ES6(三):生成器Generators是一篇非常不錯的文章,對生成器不了解的話,非常推薦閱讀該文。
調用生成器函數我們可以得到一個迭代器對象(關於迭代器的文章推薦深入淺出ES6(二):迭代器和for-of循環)。在比較簡單的情況下,我們使用for-of 循環來「消費」該迭代器,下面的代碼就是一個簡單的例子。
function*range(start,end){for(leti=start;i<end;i++){yieldi}}for(letxofrange(1,10)){console.log(x)}// 输出 1, 2, 3 ... 8, 9
for-of 雖然很方便,但是功能有限。迭代器對象包含了三個方法:next/throw/return,for-of 循環只會不斷調用next 方法;next 方法是可以帶參數的,而for-of 循環調用該方法都是不傳參的。例如,for-of 循環就無法處理下面這樣的生成器函數了。
function * saga () { const someValue = yield [ 'echo' , 3 ] // someValue 应该为3,但使用for-of循环的话,该值为undefined yield Promise . reject ( someError ) // effectRunner 遇到rejected Promise 应该使用迭代器的throw 方法抛出someError // 但使用for-of 循环的话,无法调用迭代器的throw 方法}
1.2 使用while-true 來消費迭代器
如果我們不用for-of,而是使用while-true 循環自己實現消費者,手動調用next/throw/return 方法,那麼我們可以實現更多的功能。下面的代碼實現了一個「遇到數字5
就拋出錯誤」的effec-runner。
constiterator=range(1,10)while(true){const{done,value}=iterator.next(/* 我们可以决定这里的参数 */)if(done){break}if(value===5){iterator.throw(newError('5 is bad input'))}console.log(value)}// 输出 1, 2, 3, 4,然后抛出异常 '5 is bad input'
然而,while-true 仍有一個缺陷:while-true 是同步的。這意味著生成器無法暫停執行直到某個異步任務(例如網絡請求)完成,也就意味著無法使用while-true 實現redux-saga 了。
1.3 使用遞歸函數來消費迭代器
effect-runner 的最終答案是:遞歸函數。遞歸函數滿足了作為一個effect-runner 的所有要求,不僅能夠調用迭代器的next/throw/return 方法,能在調用這些方法時使用指定參數,還能同步或異步地調用它們。
上一個例子中我們所有的代碼都是同步的;而在下面的例子中,如果我們發現value 是偶數的話,我們不馬上調用next,而是使用setTimeout 延遲調用next。
constiterator=range(1,10)functionnext(){const{done,value}=iterator.next()if(done){return}console.log(value)if(value%2===0){setTimeout(next,value*300)}else{next()}}next()//陸續輸出1, 2, 3 ... 8, 9//偶數數字輸出之後,需要等待一會兒才會輸出下一個奇數//奇數數字輸出之後,立刻輸出下一個偶數
這個例子比較簡單,只是對value 進行奇偶判斷。不過我們不難設想以下的使用方法:effect-producer 產生promise,而effect-runner 對promise 的處理方式如下:當promise resolve 的時候調用迭代器的next 方法,當promise reject 的時候調用迭代器的throw 方法。我們就可以用生成器的語法實現async/await,這也是生成器比async/await 更加強大的原因。而redux-saga/little-saga 不僅實現了對promise 的處理,還實現了功能更為強大的fork model。
在本文後面,我們稱該遞歸函數為「驅動函數」。注意不要將驅動函數next和迭代器的next方法搞混,迭代器的next方法被調用的形式是iterator.next(someValue)
,而驅動函數被調用的形式不同。在redux-saga/little-saga,函數名字為next
的情況只有驅動函數和迭代器next方法這兩種,所以如果發現一個叫做next
的函數,且該函數不是迭代器方法,那麼該函數就是驅動函數。
1.4 雙向通信
前面的例子中,我們只用到了單項通信:effect-runner調用iterator.next()
獲取effect,但effect-runner並沒有將數據傳遞給effect-producer。在redux-saga 中,我們往往需要使用yield 語句的返回值,返回值的含義取決於effect 的類型。例如下面這個例子:
function * someSaga () { // yield 一个promise 应该返回promise resolve 的值const response = yield fetch ( 'https://example.com/' ) // yield 一个take effect 应该返回一个Action const action = yield take ( 'SOME_ACTION' ) // yield 一个all effect 应该返回一个数组,该数组记录了effect1 或effect2 的执行结果const allResult = yield all ([ effect1 , effect2 ]) }
為了實現雙向通信,effect-runner要提供合適的參數來調用iterator.next(arg)
。當iterator.next(arg)
被調用時,參數arg將會作為yield xxx
語句的返回值,且暫停的迭代器會繼續執行(直到遇到下一個yield語句)。為此我們修改前面的代碼如下:
function*range2(start,end){for(leti=start;i<end;i++){constresponse=yieldiconsole.log(`response of${i}is${response}`)}}constiterator=range2(1,10)functionnext(arg,isErr){//注意驅動函數多了參數arg和isErrletresultif(isErr){result=iterator.throw(arg)}else{//這裡我們將arg作為參數傳遞給iterator.next,作為effect-producer中yield語句的返回值result=iterator.next(arg)}const{done,value}=resultif(done){return}console.log('getting:',value)if(value===5){//將isErr置為true,就能用遞歸的方式調用iterator.throw方法next(newError('5 is bad input'),true)}else{//延遲調用驅動函數;「響應」是「請求」的兩倍setTimeout(()=>next(value*2),value*1000)}}next()//輸出// getting: 1// response of 1 is 2// getting: 2// response of 2 is 4// getting: 3// response of 3 is 6// getting: 4// response of 4 is 8// getting: 5// Uncaught Error: 5 is bad input//輸出getting: x之後,輸出會暫停一段時間
1.5 effect 的類型與含義
前面的例子中我們的effect-producer 都是簡單的range,effect(即被yield 的值)為數字。因為數字沒有什麼確切的含義,effect-runner 只是簡單地打印這些數字,然後再在合適地時刻調用驅動函數。
如果effect 有明確的含義,effect-runner 就可以根據其含義來決定具體的執行邏輯。 redux-saga 可以處理promise、iterator、take、put 等類型的effect,合理地組合不同類型的effect 可以表達非常複雜的異步邏輯。下面我們給little-saga加上一些簡單的effect的處理能力,是不是覺得這個和co很像呢?
function*gen(){console.log('enter ...')consta=yield['promise',fetch('/')]console.assert(ainstanceofResponse)constb=yield['delay',500]console.assert(b==='500ms elapsed')constc=yield['ping']console.assert(c==='pong')console.log('exit ... ')}constiterator=gen()functionnext(arg,isErr){letresultif(isErr){result=iterator.throw(arg)}else{result=iterator.next(arg)}const{done,value}=resultif(done){return}//不打印value,而是根據value的含義執行相應的處理邏輯if(value[0]==='promise'){constpromise=value[1]promise.then(resolvedValue=>next(resolvedValue),error=>next(error,true))}elseif(value[0]==='delay'){consttimeout=value[1]setTimeout(()=>next(`${timeout}ms elapsed`),timeout)}elseif(value[0]==='ping'){next('pong')}else{iterator.throw(newError('無法識別的effect'))}}next()
在redux-saga中,effect是一個由函數effect生成、 [IO]
字段為true
的對象。 little-saga 使用數組來表示effect:數組的第一個元素為字符串,用於表示effect 的類型,數組剩餘元素為effect 的參數。
前面幾個小節介紹了ES2015 生成器的特性,講解瞭如何使用遞歸函數來實現effect-runner。我們發現,約定一些常見的effect 類型,並恰當使用這些類型的話,我們可以用生成器語法寫出富有表達力的代碼。
1.6 result-first callback style
在Node.js 中,異步回調函數往往使用error-first 的模式:第一個參數為err,如果一個異步操作發生了錯誤,那麼錯誤會通過err 參數傳遞回來;第二個參數用於傳遞正確的操作結果,如果異步操作沒有發生錯誤,那麼操作結果會通過該參數進行傳遞。 error-first模式大量用於node核心模塊(例如fs模塊)和第三方庫(例如async模塊),可以閱讀該文章了解更多訊息。
而在redux-saga/little-saga 中,我們使用result-first 的模式。異步回調函數的第一個參數是操作結果,第二個參數是一個布爾值,表示是否發生了錯誤。我們稱該風格為result-first callback style,其類型訊息用TypeScript 表示如下:
typeCallback=(result: any,isErr: boolean)=>void
redux-saga 源碼中幾乎所有回調函數都是該風格的,相應的變量名也有好幾個:
- cont continuation縮寫,一般用於表示Task / MainTask / ForkQueue的後繼
- cb callback縮寫或是currCb應該是currentCallback的縮寫。一般用於effect 的後繼/回調函數
- next就是前邊的遞歸函數,它也是符合result-first callback style的
redux-saga 中這些變量名頻繁出現,僅在一個proc.js 文件中就出現了幾十次。在後面little-saga 的代碼中,我們都將使用該風格的回調函數。
1.7 cancellation
redux-saga 的一大特點就是effects 是可取消的,並且支持使用try-catch-finally 的語法將清理邏輯放在finally 語句塊中。官方文檔中也對任務取消做了說明。
在redux-saga 具體實現中,調用者(caller)會將回調函數cb 傳遞給被調用者(callee),當callee 完成異步任務時,調用cb 來把結果告訴給caller。而cancellation 機制是這麼實現的:如果一個操作是可取消的話,callee 需要將「取消時的邏輯」放在cb.cancel 上,這樣一來當caller 想要取消該異步操作時,直接調用cb.cancel () 即可。函數調用是嵌套的,cb.cancel 的設置需要跟著函數調用一層層進行設置。在後面許多代碼都會有類似cb.cancel = xxx
的操作,這些操作都是在實現cancellation。
文章如何取消你的Promise?中也提到了多種取消Promise 的方法,其中生成器是最具擴展性的方式,有興趣的同學可以進行閱讀。
1.8 effect 狀態
effect 狀態分為運行中、已完成(正常結束或是拋出錯誤結束都算完成)、被取消。
promise 一旦resolve/reject 之後,就不能再改變狀態了。 effect 也是類似,一旦完成或是被取消,就不能再改變狀態,「完成時的回調函數」和「被取消時的回調函數」合起來只能最多被調用一次。也就是說,effect 的「完成」和「被取消」是互斥的。
每一個effect在運行之前都會通過函數digestEffect
的處理。該函數用變量effectSettled
記錄了一個effect是否已經settled,保證了上述互斥性。
digestEffect
也調用了normalizeEffect
來規範化effect,這樣一來,對於promise/iterator,我們可以在effect-producer直接yield這些對象,而不需要將它們包裹在數組中。
digestEffect
和normalizeEffect
兩個函數的代碼如下:
constnoop=()=>nullconstis={func:/*判斷參數是否函數*/string:/*判斷參數是否字符串*//* ...... */}functiondigestEffect(rawEffect,cb){leteffectSettled=falsefunctioncurrCb(res,isErr){if(effectSettled){return}effectSettled=truecb.cancel=noopcb(res,isErr)}currCb.cancel=noopcb.cancel=()=>{if(effectSettled){return}effectSettled=truetry{currCb.cancel()}catch(err){console.error(err)}currCb.cancel=noop}runEffect(normalizeEffect(rawEffect),currCb)}// normalizeEffect定義在其他文件functionnormalizeEffect(effect,currCb){if(is.string(effect)){return[effect]}elseif(is.promise(effect)){return['promise',effect]}elseif(is.iterator(effect)){return['iterator',effect]}elseif(is.array(effect)){returneffect}else{consterror=newError('Unable to normalize effect')error.effect=effectcurrCb(error,true)}}
1.9 proc 初步實現
有了上面的基礎,我們就可以初步實現proc 函數了。所有saga 實例都是通過該函數啟動的,該函數是redux-saga 中最重要的函數之一。
constTASK_CANCEL=Symbol('TASK_CANCEL')constCANCEL=Symbol('CANCEL')functionproc(iterator,parentContext,cont){//實際上task並不是這麼構造的,不過在初步實現中,暫時先這樣吧consttask={cancel:()=>next(TASK_CANCEL),}//設置cancel邏輯cont.cancel=task.cancelnext()returntaskfunctionnext(arg,isErr){try{letresultif(isErr){result=iterator.throw(arg)}elseif(arg===TASK_CANCEL){// next.cancel由當前正在執行的effectRunner所設置next.cancel()result=iterator.return(TASK_CANCEL)}else{result=iterator.next(arg)}if(!result.done){digestEffect(result.value,next)}else{//迭代器執行完畢,調用cont將結果返回給上層cont(result.value)}}catch(error){//迭代器運行發生錯誤,調用cont將錯誤返回給上層cont(error,true)}}// function digestEffect(rawEffect, cb) { /* ...... */ }//執行effect,根據effect的類型調用不同的effectRunnerfunctionrunEffect(effect,currCb){consteffectType=effect[0]if(effectType==='promise'){resolvePromise(effect,ctx,currCb)}elseif(effectType==='iterator'){resolveIterator(iterator,ctx,currCb)}else{//拓展這裡的if-else便可以拓展新的effect類型thrownewError('Unknown effect type')}}functionresolvePromise([effectType,promise],ctx,cb){constcancelPromise=promise[CANCEL]if(is.func(cancelPromise)){//設置promise的cancel邏輯cb.cancel=cancelPromise}promise.then(cb,error=>cb(error,true))}functionresolveIterator([effectType,iterator],ctx,cb){proc(iterator,ctx,cb)}}
2.1 Task
proc函數( redux-saga的源碼)用於運行一個迭代器,並返回一個Task對象。 Task 對象描述了該迭代器的運行狀態,我們首先來看看Task 的接口(使用TypeScript 來表示類型訊息)。在little-saga中,我們將使用類似的Task接口。 (注意是類似的接口,而不是相同的接口)
typeCallback=(result: any,isErr: boolean)=>voidtypeJoiner={task: Task;cb: Callback}interfaceTask{cancel():voidtoPromise():Promise<any>result: anyerror: ErrorisRunning: booleanisCancelled: booleanisAborted: boolean}
Task對象包含了toPromise()
方法,該方法會返回saga實例對應的promise。 cancel()
方法使得saga實例允許被取消; isXXX
等字段反映了saga實例的運行狀態; result
/ error
字段可以記錄了saga實例的運行結果。
async 函數被調用後,內部邏輯也許非常複雜,最後返回一個Promise 對象來表示異步結果。而saga 實例除了異步結果,還包含了額外的功能:「取消」以及「查詢運行狀態」。所以Task 接口是要比Promise 接口復雜的,內部實現也需要更多的數據結構和邏輯。
2.2 fork model
redux-saga提供了fork effect來進行非阻塞調用, yield fork(...)
會返回一個Task對象,用於表示在後台執行的saga實例。在更普遍的情況下,一個saga 實例在運行的時候會多次yield fork effect,那麼一個parent-saga 實例就會有多個child-saga。 rootSaga通過sagaMiddleware.run()
開始運行,在rootSaga運行過程中,會fork得到若干個child-saga,每一個child-saga又會fork得到若干個grandchild-saga,如果我們將所有的parent-child關係繪製出來的話,我們可以得到類似於下圖這樣的一棵saga 樹。

redux-saga的文檔也對fork model 進行了詳細的說明,下面我做一點簡單的翻譯:
- 完成:一個saga 實例在滿足以下條件之後進入完成狀態:
- 迭代器自身的語句執行完成
- 所有的child-saga 進入完成狀態
當一個節點的所有子節點完成時,且自身迭代器代碼執行完畢時,該節點才算完成。
- 錯誤傳播:一個saga 實例在以下情況會中斷並拋出錯誤:
- 迭代器自身執行時拋出了異常
- 其中一個child-saga 拋出了錯誤
當一個節點發生錯誤時,錯誤會沿著樹向根節點向上傳播,直到某個節點捕獲該錯誤。
- 取消:取消一個saga 實例也會導致以下事情的發生:
- 取消mainTask,也就是取消當前saga 實例等待的effect
- 取消所有仍在執行的child-saga
取消一個節點時,該節點對應的整個子樹都將被取消。
2.3類ForkQueue
類ForkQueue
是fork model的具體實現。 redux-saga使用了函數forkQueue來實現,在little-saga中我們使用class語法定義了ForkQueue
類。
每一個saga 實例可以用一個Task 對象進行描述,為了實現fork model,每一個saga 實例開始運行時,我們需要用一個數組來保存child-tasks。我們來看看forkQueue 的接口:
interfaceForkQueue{constructor(mainTask: MainTask)// cont: Callback 这是一个私有的字段
addTask(task: Task):voidcancelAll():voidabort(err: Error):void}
ForkQueue的構造函數接受一個參數mainTask
,該參數代表當前迭代器自身代碼的執行狀態,forkQueue.cont會在ForkQueue被構造之後進行設置。當所有的child-task以及mainTask都完成時,我們需要調用forkQueue.cont來通知其parent-saga(對應於2.2 fork model中的「完成」 )。
ForkQueue 對象包含三個方法。方法addTask
用於添加新的child-task;方法cancelAll
用於取消所有的child-task;而方法abort
不僅會取消所有的child-task,還會調用forkQueue.cont向parent-task通知錯誤。
little-saga 的ForkQueue 實現如下:
classForkQueue{tasks=[]result=undefined//使用completed變量來保證「完成」和「出錯」的互斥completed=false// cont will be set after calling constructor()cont=undefinedconstructor(mainTask){this.mainTask=mainTask// mainTask一開始就會被添加到數組中this.addTask(this.mainTask)}//取消所有的child-task,並向上層通知錯誤abort(err){this.cancelAll()this.cont(err,true)}addTask(task){this.tasks.push(task)//指定child-task完成時的行為task.cont=(res,isErr)=>{if(this.completed){return}//移除child-taskremove(this.tasks,task)//清空child-task完成時的行為task.cont=noopif(isErr){//某一個child-task發生了錯誤,調用abort來進行「錯誤向上傳播」this.abort(res)}else{//如果是mainTask完成的話,記錄其結果if(task===this.mainTask){this.result=res}if(this.tasks.length===0){//滿足了task完成的兩個條件this.completed=truethis.cont(this.result)}}}}cancelAll(){if(this.completed){return}this.completed=true//依次調用child-task的cancel方法,進行「級聯向下取消」,並清空child-task完成時的行為this.tasks.forEach(t=>{t.cont=noopt.cancel()})this.tasks=[]}}
2.4 task context
每一個task 都有其對應的context 對象,用於保存該task 運行時的上下文訊息。在redux-saga 中我們可以使用getContext/setContext 讀寫該對象。 context 的一大特性是child-task 會使用原型鏈的方式繼承parent-task context。當嘗試訪問context 中的某個屬性時,不僅會在當前task context 對像中搜尋該屬性,也會在parent-task context 對象進行搜索,以及parent-task 的parent-task,依次層層往上搜索,直到找到該屬性或是到達rootSaga。該繼承機制在redux-saga中的實現也非常簡單,只有一行代碼: const taskContext = Object.create(parentContext)
。
context 是一個強大的機制,例如在React 中,React context 用途非常廣泛,react-redux / react-router 等相關類庫都是基於該機制實現的。然而在redux-saga 中,context 似乎很少被提起。
在little-saga 中,我們將充分利用context 機制,並使用該機制實現「effect 類型拓展」、「連接redux store」等功能。這些機制的實現會在本文後面提到。
2.5 effect 類型拓展
在1.9 proc初步實現中,函數runEffect
遇到未知effect類型便會拋出異常。這裡我們對該處做一些修改,以實現effect 的類型拓展。當遇到未知的effect類型時,我們將使用ctx.translator
的getRunner
方法來獲取該effect對應的effectRunner,然後調用該effectRunner。只要我們提前設置好ctx.translator
,就能在後續的代碼中使用拓展類型。為了方便設置ctx.translator
,little-saga中新增了def類型的effect,用來關聯拓展類型與其對應的effectRunner。
根據context 的特性,child-task 會繼承parent-saga 的context,故在parent-task 中定義的拓展類型也能用於child-task。在little-saga 中,race/all/take/put 等類型將使用該拓展機制進行實現。
functionrunEffect(effect,currCb){consteffectType=effect[0]if(effectType==='promise'){resolvePromise(effect,ctx,currCb)}elseif(effectType==='iterator'){resolveIterator(iterator,ctx,currCb)}elseif(effectType==='def'){runDefEffect(effect,ctx,currCb)/*其他已知類型的effect *//* .................... */}else{//未知類型的effectconsteffectRunner=ctx.translator.getRunner(effect)if(effectRunner==null){consterror=newError(`Cannot resolve effect-runner for type:${effectType}`)error.effect=effectcurrCb(error,true)}else{effectRunner(effect,ctx,currCb,{digestEffect})}}}functionrunDefEffect([_,name,handler],ctx,cb){def(ctx,name,handler)cb()}// def定義在其他文件functiondef(ctx,type,handler){constold=ctx.translator//替換ctx.translatorctx.translator={getRunner(effect){returneffect[0]===type?handler:old.getRunner(effect)}},}
2.6 little-saga 核心部分的完整實現
little-saga核心部分的實現代碼位於/src/core文件夾內。完整實現的代碼較多,相比於1.9 proc初步實現,完整實現添加了context、task、fork model、effect類型拓展、錯誤處理等功能,完善了Task生命週期(啟動/完成/出錯/取消)。
redux-saga 對應的實現代碼全部都位於proc.js 一個文件中,導致該文件很大;而little-saga 則是將實現分成了若干個文件,下面我們一個一個來進行分析。
2.6.1 整體思路
在2.6 後面的內容中,我們將使用以下的代碼作為例子。該例子中,Parent 會fork Child1 和Child2,分別需要100ms 和300ms 來完成。 Parent 迭代器本身的代碼(mainTask)需要200ms 完成。而整個Parent Task 則需要300ms 才能完成。
function*Parent(){constChild1=yieldfork(api.xxxx)// LINE-1 需要 100ms 才能完成
constChild2=yieldfork(api.yyyy)// LINE-2 需要 300ms 才能完成
yielddelay(200)// LINE-3 需要 200 ms 才能完成
}
下圖展示了Parent 運行時,各個Task / mainTask / ForkQueue 之間的相互關係。圖中的實線箭頭表示兩個對象之間的後繼關係(cont):「A 指向B」意味著「當A 完成時,需要將結果傳遞給B」。
在1.7 cancellation中我們知道cancellation的順序和cont恰好是相反的,在具體代碼實現時,我們不僅需要構建下圖中的cont關係,還需要構建反向的cancellation關係。

本小節中的代碼比較複雜,如果覺得理解起來比較困難的話,可以和2.7 Task狀態變化舉例對照著看。
2.6.2函數proc
函數proc 是運行saga 實例的入口函數。結合上圖,函數proc 的作用是創建圖中各個Task/mainTask 對象,並建立對象之間的後繼關係(cont)和取消關係(cancellation)。代碼如下:
// /src/core/proc.jsfunctionproc(iterator,parentContext,cont){//初始化當前task的contextconstctx=Object.create(parentContext)// mainTask用來跟踪當前迭代器的語句執行狀態constmainTask={// cont: **will be set when passed to ForkQueue**isRunning:true,isCancelled:false,cancel(){if(mainTask.isRunning&&!mainTask.isCancelled){mainTask.isCancelled=truenext(TASK_CANCEL)}},}//創建ForkQueue對象和Task對象,這兩個類的代碼在後面會寫出來consttaskQueue=newForkQueue(mainTask)consttask=newTask(taskQueue)//設置後繼關係taskQueue.cont=task.endtask.cont=cont//設置取消關係cont.cancel=task.cancelnext()returntask//以下代碼均為函數定義//在圖中驅動函數只和mainTask有聯繫//然後我們也可以發現下面next函數的代碼中,也只調用了mainTask的接口//即next函數中的代碼不會引用task和taskQueue對象functionnext(arg,isErr){console.assert(mainTask.isRunning,'Trying to resume an already finished generator')try{letresultif(isErr){result=iterator.throw(arg)}elseif(arg===TASK_CANCEL){mainTask.isCancelled=truenext.cancel()//取消當前執行的effect//跳轉到迭代器的finally block,執行清理邏輯result=iterator.return(TASK_CANCEL)}else{result=iterator.next(arg)}if(!result.done){digestEffect(result.value,next)}else{mainTask.isRunning=falsemainTask.cont(result.value)}}catch(error){if(!mainTask.isRunning){throwerror}if(mainTask.isCancelled){//在執行cancel邏輯時發生錯誤,在3.4其他問題與細節中說明console.error(error)}mainTask.isRunning=falsemainTask.cont(error,true)}}// function digestEffect(rawEffect, cb) { /* ...... */ }// function runEffect(effect, currCb) { /* ...... */ }// function resolvePromise([effectType, promise], ctx, cb) { /* ... */ }// function resolveIterator([effectType, iterator], ctx, cb) { /* ... */ }// ......各種內置類型的effect-runner// fork-model中新增了fork/spawn/join/cancel/cancelled//這五種類型的effec-runner代碼見下方}
2.6.3 fork-model 相關的effect-runner
函數runForkEffect
用來執行fork類型的effect,並向調用者返回一個subTask對象。該函數需要注意的是,有的時候調用者會使用fork來執行一些同步的任務,所以調用proc(iterator, ctx, noop)
可能會返回一個已經完成或是已經發生錯誤的subTask,此時我們不需要將subTask 放入fork-queue 中,而是需要執行其他操作。
// /src/core/proc.jsfunctionrunForkEffect([effectType,fn,...args],ctx,cb){constiterator=createTaskIterator(fn,args)try{suspend()//見3.4 schedulerconstsubTask=proc(iterator,ctx,noop)if(subTask.isRunning){task.taskQueue.addTask(subTask)cb(subTask)}elseif(subTask.error){task.taskQueue.abort(subTask.error)}else{cb(subTask)}}finally{flush()//見3.4 scheduler}}
剩下四個類型(spawn / join / cancel / cancelled)的effect-runner 比較簡單,這裡就不再進行介紹。
2.6.4類Task
類Task
是2.1 Task的具體實現。
// /src/core/Task.jsclassTask{isRunning=trueisCancelled=falseisAborted=falseresult=undefinederror=undefinedjoiners=[]// cont will be set after calling constructor()cont=undefinedconstructor(taskQueue){this.taskQueue=taskQueue}//調用cancel函數來取消該Task,這將取消所有當前正在執行的child-task和mainTask// cancellation會向下傳播,意味著該Task對應的saga-tree子樹都將會被取消//同時cancellation也會傳遞給該Task的所有joinerscancel=()=>{//如果該Task已經完成或是已經被取消,則跳過if(this.isRunning&&!this.isCancelled){this.isCancelled=truethis.taskQueue.cancelAll()//將TASK_CANCEL傳遞給所有joinersthis.end(TASK_CANCEL)}}//結束當前Task//設置Task的result/error,然後調用task.cont,最後將結果傳遞給joiners//當該Task的child-task和mainTask都完成時(即fork-queue完成時),該函數將被調用end=(result,isErr)=>{this.isRunning=falseif(!isErr){this.result=result}else{this.error=resultthis.isAborted=true}this.cont(result,isErr)this.joiners.forEach(j=>j.cb(result,isErr))this.joiners=null}toPromise(){//獲取task對應的promise對象,這裡省略了代碼}}
2.6.5 小節
這一節中代碼較多,而且代碼的邏輯密度很高。想要完全理解little-saga 的實現思路,還是需要仔細閱讀源代碼才行。
2.7 Task 狀態變化舉例
下面的代碼是2.6 中的例子。
function*Parent(){constChild1=yieldfork(api.xxxx)// LINE-1 需要 100ms 才能完成
constChild2=yieldfork(api.yyyy)// LINE-2 需要 300ms 才能完成
yielddelay(200)// LINE-3 需要 200 ms 才能完成
}
下表展示了這個例子在一些關鍵時間點的執行情況與相應的狀態變化。注意在下表中,如果沒有指定task/forkQueue/mainTask 是屬於Parent 還是Child1/Child2,默認都是屬於Parent 的。下表只展示了這個例子正常完成的過程,我們也可以思考一下在t=50 / t=150 / t=250 / t=350 等不同時間點,如果Parent 被取消了,代碼又會怎麼執行。

2.8類Env
類Env
的作用是在運行rootSaga之前,對root Task的運行環境進行配置。 Env 採用了鍊式調用風格的API,方便將多個配置串聯起來。
我們可以利用Env 來預先添加一些常見的effect 類型,例如all/race/take/put 等,這樣後續所有的saga 函數都可以直接使用這些effect 類型。例如下面的代碼在運行rootSaga 之前定義了delay 和echo 兩種effect。
newEnv().def('delay',([_,timeout],_ctx,cb)=>setTimeout(cb,timeout)).def('echo',([_,arg],_ctx,cb)=>cb(arg)).run(rootSaga)function*rootSaga(){yield['delay',500]// 500ms之後yield才會返回yield['echo','hello']// yield返回字符串'hello'}
2.9 第二部分小節
至此,little-saga 的核心部分實現完畢。核心部分實現了fork model,實現了fork/join/cancel/promise/iterator 等內置類型的effect-runner,並預留了拓展接口。第三部分中,我們將使用該拓展接口來實現redux-saga 中剩下的那些effect 類型(all/race/put/take 等)。
3.1 commonEffects 拓展
all-effect 的行為與Promise#all 非常類似:all-effect 在構造時接受一些effects 作為sub-effects,當所有sub-effects 完成時,all-effect 才算完成;當其中之一sub-effect 拋出錯誤時,all-effect 會立即拋出錯誤。
有了def effect,拓展effect 就簡單多了。 redux-saga中runAllEffect用於運行all類型的effect,我們拷貝該代碼,並簡單修改,使其符合effectRunner接口,即可在little-saga中實現all effect。 little-sage 中實現all effect 的代碼如下:
//這裡該函數是一個簡化版,省略了all-effect被取消的處理代碼//這裡假設effects是一個對象,實際版本中還需要考慮effects為數組的情況functionall([_,effects],ctx,cb,{digestEffect}){constkeys=Object.keys(effects)letcompletedCount=0constresults={}constchildCbs={}keys.forEach(key=>{constchCbAtKey=(res,isErr)=>{if(isErr||res===TASK_CANCEL){//其中一個sub-effect發生錯誤時,立刻調用cb來結束all-effectcb.cancel()cb(res,isErr)}else{results[key]=rescompletedCount++if(completedCount===keys.length){cb(results)}}}childCbs[key]=chCbAtKey})keys.forEach(key=>digestEffect(effects[key],childCbs[key]))}
race 和其他一些常見的effect 也是通過相同的方式實現的。 little-saga/commonEffects
提供了7種來自redux-saga的常用類型拓展,包括:all / race / apply / call / cps / getContext / setContext。當我們想要在代碼中使用這些類型時,我們可以使用Env 來加載commonEffects:
import{Env,io}from'little-saga'importcommonEffectsfrom'little-saga/commonEffects'// 调用 use(commonEffects) 之后,就能在代码中使用 commonEffects 提供的拓展类型了
newEnv().use(commonEffects).run(function*rootSaga(){yieldio.race({foo:io.cps(someFunction),foo:io.call(someAPI),})})
3.2 channelEffects 拓展
little-saga/channelEffects
提供了5種和channel相關的類型拓展(take / takeMaybe / put / actionChannel / flush),並從redux-saga拷貝了channel / buffers的代碼。
env.use(channelEffects)
不僅會添加類型拓展,還會在ctx.channel上設置一個默認channel。當使用put/take effect 時,如果沒有指定channel 參數,則默認使用ctx.channel。
使用little-saga 中的channel,可以實現任意兩個Task 之間的通信。不過channel 又是一個很大的話題,本文就不再詳細介紹了。 channel相關源碼的可讀性相當不錯,歡迎直接閱讀源碼。
3.3 compat 拓展
compat 拓展使得little-saga 可以和redux 進行集成,並提供與redux-saga 一致的API。不過little-saga 因為normalizedEffect 的關係,無法和其他redux 中間件(例如redux-thunk)共存,最終是無法完全兼容redux-saga API 的。
little-saga的createSagaMiddleware
也是比較有意思的一個函數,其實現思路如下:首先使用channelEffects添加channel相關拓展;然後用store.dispatch替換掉ctx.channel.put,這樣一來put effect會轉換為對dispatch函數的調用;另一方面, createSagaMiddleware
返回一個redux中間件,該中間件會將所有的action(回想一下,redux中action只能來自於dispatch)put回原來的channel中,這樣所有action又能夠重新被take到了;當然,中間件也使用了getState 來實現select effect。代碼如下:
functioncreateSagaMiddleware(cont){functionmiddleware({dispatch,getState}){letchannelPutconstenv=newEnv(cont).use(commonEffects).use(channelEffects).use(ctx=>{//記錄「真實」的channel.putchannelPut=ctx.channel.put//使用dispatch替換掉channel上的put方法ctx.channel.put=action=>{action[SAGA_ACTION]=truedispatch(action)}//使用def方法來定義select類型的effect-runnerdef(ctx,'select',([_effectType,selector=identity,...args],_ctx,cb)=>cb(selector(getState(),...args)),)})//當middleware函數執行時,說明store正在創建//此時我們給middleware.run設置正確的函數middleware.run=(...args)=>env.run(...args)returnnext=>action=>{constresult=next(action)// hit reducers//下面的if-else主要是為了保證channelPut(action)恰好被包裹在一層asap中// asap的介紹見3.4if(action[SAGA_ACTION]){// SAGA_ACTION字段為true表示該action來自saga//而在saga中,我們在put的時候已經使用了函數asap//所以在這裡就不需要再次調用asap了channelPut(action)}else{//表示該action來自store.dispatch//例如某個React組件的onClick中調用了dispatch方法asap(()=>channelPut(action))}returnresult}}middleware.run=(...args)=>{thrownewError('運行Saga函數之前,必須使用applyMiddleware將Saga中間件加載到Store中')}returnmiddleware}
3.4 scheduler
asap / suspend / flush 是來自於scheduler.js 的方法。 asap 被用在put effect 中,而後面兩個函數被用在fork/spawn effect 中。
scheduler 主要是處理「嵌套put」問題。考慮下面的代碼,rootSaga 會fork genA 和genB,genA 會先put-A 然後take-B,而genB 會先take-A 然後put-B。
function * rootSaga () { yield fork ( genA ) // LINE-1 yield fork ( genB ) // LINE-2 } function * genA () { yield put ({ type : 'A' }) yield take ( 'B' ) } function * genB () { yield take ( 'A' ) yield put ({ type : 'B' }) }
在使用scheduler的情況下,這兩次take都是可以成功的,即genA可以take到B,而genB可以take到A,這也是所我們期望的情況。
假設在不使用scheduler的情況下,put-A喚醒了take-A。因為這裡的put/take 的執行都是同步的,所以take-A 被喚醒之後執行的下一句是genB 中的put-B,而此時genA 還處於執行put-A 的狀態,genA 將丟失B。也就是說在不使用scheduler的情況下,嵌套的put很有可能導致部分action的丟失。
使用函數asap 包裹put 的過程,可以保證「內層的put」延遲到「外層的put 執行結束時」才開始執行,從而避免嵌套put。 asap是as soon as possible的縮寫, asap(fn)
的意思可以理解為「當外層的asap任務都執行完之後,盡可能快地執行fn」。
我們再考慮上面代碼中的LINE-1和LINE-2,在不使用scheduler的情況下,這兩行代碼的前後順序會影響運行結果:因為默認channel用的是multicastChannel,multicastChannel沒有緩存(buffer),所以為了能夠成功take-A,take-A 必須在put-A 之前就開始執行。
使用函數suspend/flush 包裹fork/spawn 的過程,可以保證「fork/spawn 中的同步put」延遲到「fork/spawn 執行結束時」才開始執行。這樣一來,take-A 總是能比put-A 先執行,LINE-1 和LINE-2 的前後順序就不會影響運行結果了。
3.5 其他細節問題
本小節記錄了redux-saga/little-saga 中仍存在的一些細節問題,不過這些問題在平時編程中較為少見,影響也不大。
Task 的「取消」和「完成」是互斥的。 Task 被取消時代碼會直接跳轉進入finally 語句塊,但此時仍有可能發生錯誤,即發生了「執行cancel 邏輯時發生錯誤」的現象。此時Task 的狀態已經為「被取消」,我們不能將task 的狀態修改為「完成(出錯)」。對於這類錯誤,little-saga 只是簡單地使用console.error 進行了打印,並沒有較為優雅的處理方式。所以我們在使用redux-saga/little-saga 寫代碼的時候,盡量避免過於復雜的cancel 邏輯,以防在cancel 邏輯中發生錯誤。
當一個往channel 中put 一個END 的時候,正在take 該channel 的該怎麼辦? redux-saga中的處理比較奇怪,我詢問了一下作者,他表示這是一個用在服務端渲染的hack。而little-saga 中做了簡化,如果take 得到了END,那麼就將END 看作是TASK_CANCEL。
有很多內容本文沒有提到,例如「調用迭代器的throw/return 方法時代碼的執行順序」,「異常的捕獲與處理」等。另外,redux-saga 的源碼中也有多處用TODO 進行了標記,所以還有許多問題等待這去解決。
3.6 總結
fork model 是一個非常優秀的異步邏輯處理模型,在閱讀redux-saga 源碼和測試,進而實現little-saga 的過程中,我也學到了非常多新知識。如果大家有什麼問題或建議的話,歡迎一起探討。