- 消息重復(fù)概率比較高時(shí),需要對(duì)重復(fù)消息進(jìn)行合并處理避免浪費(fèi)有限的資源,減少消費(fèi)延遲;
- 需要根據(jù)業(yè)務(wù)自定義優(yōu)先級(jí)進(jìn)行消息處理,高優(yōu)先級(jí)的消息比低優(yōu)先級(jí)的消息先處理;
- 消息需要定時(shí)消費(fèi)的場景,消息只有在設(shè)定的消費(fèi)時(shí)間到了之后立馬被消費(fèi)。
本文將介紹一種基于Redis實(shí)現(xiàn)的消息隊(duì)列(Redis message queue, RMQ),RMQ可以作為傳統(tǒng)消息隊(duì)列的互補(bǔ)選擇,在傳統(tǒng)消息隊(duì)列沒有涉及的場景中使用RMQ。
功能介紹
RMQ設(shè)計(jì)為一個(gè)二方庫,可以幫助用戶基于Redis快速實(shí)現(xiàn)消息隊(duì)列的功能,RMQ消息隊(duì)列具有消息合并、區(qū)分優(yōu)先級(jí)、支持定時(shí)消息等特性。RMQ消息隊(duì)列可以用于異步解耦、削峰填谷,支持億級(jí)數(shù)據(jù)堆積。RMQ消息隊(duì)列目前支持三種類型的消息,分別是RangeMergeMessage(區(qū)間重復(fù)合并消息)、PriorityMessage(優(yōu)先級(jí)消息)、FixedTimeMessage(任意定時(shí)消息)。
區(qū)間重復(fù)合并消息
RangeMergeMessage支持區(qū)間重復(fù)消息合并,發(fā)送消息時(shí)需要設(shè)置時(shí)間區(qū)間,消息延遲該時(shí)間區(qū)間長度后被消費(fèi),在該時(shí)間區(qū)間內(nèi)如果發(fā)送重復(fù)的消息,重復(fù)消息將會(huì)被合并。如果消息在Redis服務(wù)端發(fā)生堆積,重復(fù)到來的消息依然會(huì)被合并處理。該類型消息適用于消息重復(fù)率較高且希望重復(fù)消息合并處理的場景,對(duì)重復(fù)消息進(jìn)行合并可以減少下游消費(fèi)系統(tǒng)的壓力,減少不必要的資源消耗,將有限的資源最大化的利用,提升消費(fèi)效率。
優(yōu)先級(jí)消息
PriorityMessage支持給消息設(shè)置任意等級(jí)的優(yōu)先級(jí),優(yōu)先級(jí)高的消息會(huì)被優(yōu)先消費(fèi),相同優(yōu)先級(jí)的消息被隨機(jī)消費(fèi)。如果消息在Redis服務(wù)端發(fā)生堆積,重復(fù)的消息將被合并處理,合并后消息的優(yōu)先級(jí)等于最后存儲(chǔ)的消息的優(yōu)先級(jí)。該類型消息適用于希望重復(fù)消息合并處理且需要設(shè)置優(yōu)先級(jí)的場景,下游消費(fèi)者資源有限時(shí),合并重復(fù)消息且優(yōu)先處理優(yōu)先級(jí)高的消息將可以合理利用有限的資源。
任意定時(shí)消息
FixedTimeMessage支持給消息設(shè)置任意消費(fèi)時(shí)間,只有消費(fèi)時(shí)間到了之后消息才被消費(fèi),消費(fèi)時(shí)間可精確到秒。消息到期后沒有及時(shí)被消費(fèi)時(shí),消費(fèi)者將按照時(shí)間由遠(yuǎn)及近進(jìn)行消費(fèi)。如果消息在Redis服務(wù)端發(fā)生堆積,重復(fù)的消息將被合并處理,合并后消息的消費(fèi)時(shí)間等于最后存儲(chǔ)的消息的消費(fèi)時(shí)間。該類型消息適用于希望重復(fù)消息合并處理且需要定時(shí)消費(fèi)的場景,定時(shí)消息應(yīng)用場景非常豐富,比如定時(shí)打標(biāo)去標(biāo)、活動(dòng)結(jié)束后清理動(dòng)作、訂單超時(shí)關(guān)閉等。
并發(fā)消費(fèi)控制
使用傳統(tǒng)消息中間件進(jìn)行集群消費(fèi)的時(shí)候,為了避免并發(fā)處理同一元數(shù)據(jù)導(dǎo)致不一致問題,通常需要對(duì)元數(shù)據(jù)加分布式鎖,頻繁的鎖沖突會(huì)導(dǎo)致消費(fèi)效率低下。加分布式鎖的最終目的其實(shí)就是保障屬于同一元數(shù)據(jù)的消息被串行消費(fèi)。加分布式鎖并不是最好的方案,最好的方案應(yīng)該是從根上解決并發(fā)問題,讓屬于同一元數(shù)據(jù)的消息串行消費(fèi)。RMQ消息隊(duì)列具有并發(fā)消費(fèi)控制能力,屬于同一元數(shù)據(jù)的消息只會(huì)被分配給全局唯一一個(gè)線程進(jìn)行消費(fèi),因此屬于同一元數(shù)據(jù)的消息將被串行消費(fèi)。使用方如果需要該能力,除了需要提供Redis,還需要提供ZooKeeper。
重試次數(shù)控制
RMQ消息隊(duì)列支持失敗重試消費(fèi)16次,業(yè)務(wù)返回消費(fèi)失敗后,消息會(huì)被回滾并等待重試消費(fèi),重試16次后消息進(jìn)入死信隊(duì)列,消息不再被消費(fèi),除非人工干預(yù)。
技術(shù)原理
總體框架
RMQ消息隊(duì)列由三部分組成,分別為ZooKeeper、RMQ二方庫、Redis。ZooKeeper負(fù)責(zé)維護(hù)集群worker的信息,將topic的所有slot分配給全局的woker。Redis負(fù)責(zé)存儲(chǔ)消息,采用Sorted Set結(jié)構(gòu)存儲(chǔ),Store Queue是消息存放的隊(duì)列,Prepare Queue是采用二階段消費(fèi)方式正在消費(fèi)的消息存放隊(duì)列,Dead Queue是死信隊(duì)列。RMQ二方庫由RmqClient、Consumer、Producer三部分組成。RmqClient負(fù)責(zé)RMQ的啟動(dòng)工作,包括上報(bào)TopicDef、Worker給ZooKeeper,分配Slot給Worker,掃描業(yè)務(wù)定義的MessageListener Bean。Producer負(fù)責(zé)根據(jù)不用消息類型將消息按照指定的方式存儲(chǔ)到Redis。Consumer負(fù)責(zé)根據(jù)不用消息類型按照指定方式從Redis彈出消息并調(diào)用業(yè)務(wù)的MessageListener。
消息存儲(chǔ)
Topic的設(shè)計(jì)
Topic的定義有三部分組成,topic表示主題名稱,slotAmount表示消息存儲(chǔ)劃分的槽數(shù)量,topicType表示消息的類型。主題名稱是一個(gè)Topic的唯一標(biāo)示,相同主題名稱Topic的slotAmount和topicType一定是一樣的。消息存儲(chǔ)采用Redis的Sorted Set結(jié)構(gòu),為了支持大量消息的堆積,需要把消息分散存儲(chǔ)到很多個(gè)槽中,slotAmount表示該Topic消息存儲(chǔ)共使用的槽數(shù)量,槽數(shù)量一定需要是2的n次冪。在消息存儲(chǔ)的時(shí)候,采用對(duì)指定數(shù)據(jù)或者消息體哈希求余得到槽位置。
StoreQueue 的設(shè)計(jì)
上圖中topic劃分了8個(gè)槽位,編號(hào)0-7。如果發(fā)送方指定了消息的slotBasis,則計(jì)算slotBasis的CRC32值,CRC32值對(duì)槽數(shù)量進(jìn)行取模得到槽序號(hào),SlotKey設(shè)計(jì)為#{topic}_#{index}(也即Redis的鍵),其中#{}表示占位符。發(fā)送方需要保證相同內(nèi)容的消息的slotBasis相同,如果沒有指定slotBasis則采用消息內(nèi)容計(jì)算SlotKey,這樣內(nèi)容相同的消息體就會(huì)落在同一個(gè)Sorted Set里面,所以內(nèi)容相同的消息會(huì)進(jìn)行合并。Redis的Sorted Set中的數(shù)據(jù)按照分?jǐn)?shù)排序,實(shí)現(xiàn)不同類型的消息的關(guān)鍵就在于如何利用分?jǐn)?shù)、如何添加消息到Sorted Set、如何從Sorted Set中彈出消息。優(yōu)先級(jí)消息將優(yōu)先級(jí)作為分?jǐn)?shù),消費(fèi)時(shí)每次彈出分?jǐn)?shù)最大的消息。任意定時(shí)消息將時(shí)間戳作為分?jǐn)?shù),消費(fèi)時(shí)每次彈出分?jǐn)?shù)大于當(dāng)前時(shí)間戳的一個(gè)消息。區(qū)間重復(fù)合并消息將時(shí)間戳作為分?jǐn)?shù),添加消息時(shí)將(當(dāng)前時(shí)間戳+時(shí)間區(qū)間)作為分?jǐn)?shù),消費(fèi)時(shí)每次彈出分?jǐn)?shù)大于當(dāng)前時(shí)間戳的一個(gè)消息。
PrepareQueue 的設(shè)計(jì)
為了保障RMQ消息隊(duì)列的可用性,做到每條消息至少消費(fèi)一次,消費(fèi)者不是直接pop有序集合中的元素,而是將元素從StoreQueue移動(dòng)到PrepareQueue并返回消息給消費(fèi)者,等消費(fèi)成功后再從PrepareQueue從刪除,或者消費(fèi)失敗后從PreapreQueue重新移動(dòng)到StoreQueue,這便是根據(jù)二階段提交的思想實(shí)現(xiàn)的二階段消費(fèi)。在后面將會(huì)詳細(xì)介紹二階段消費(fèi)的實(shí)現(xiàn)思路,這里重點(diǎn)介紹下PrepareQueue的存儲(chǔ)設(shè)計(jì)。StoreQueue中每一個(gè)Slot對(duì)應(yīng)PrepareQueue中的Slot,PrepareQueue的SlotKey設(shè)計(jì)為prepare{#{topic}#{index}}。PrepareQueue采用Sorted Set作為存儲(chǔ),消息移動(dòng)到PrepareQueue時(shí)刻對(duì)應(yīng)的(秒級(jí)時(shí)間戳*1000+重試次數(shù))作為分?jǐn)?shù),字符串存儲(chǔ)的是消息體內(nèi)容。這里分?jǐn)?shù)的設(shè)計(jì)與重試次數(shù)的設(shè)計(jì)密切相關(guān),所以在重試次數(shù)設(shè)計(jì)章節(jié)詳細(xì)介紹。PrepareQueue的SlotKey設(shè)計(jì)中需要注意的一點(diǎn),由于消息從StoreQueue移動(dòng)到PrepareQueue是通過Lua腳本操作的,因此需要保證Lua腳本操作的Slot在同一個(gè)Redis節(jié)點(diǎn)上,如何保證PrepareQueue的SlotKey和對(duì)應(yīng)的StoreQueue的SlotKey被hash到同一個(gè)Redis槽中呢。Redis的hash tag功能可以指定SlotKey中只有某一部分參與計(jì)算hash,這一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。
DeadQueue 的設(shè)計(jì)
消息重試消費(fèi)16次后,消息將進(jìn)入DeadQueue。DeadQueue的SlotKey設(shè)計(jì)為prepare{#{topic}#{index}},這里同樣采用hash tag功能保證DeadQueue的SlotKey與對(duì)應(yīng)StoreQueue的SlotKey存儲(chǔ)在同一Redis節(jié)點(diǎn)。
生產(chǎn)者
生產(chǎn)者的任務(wù)就是將消息添加到Redis的Sorted Set中。首先,需要計(jì)算出消息添加到Redis的SlotKey,如果發(fā)送方指定了消息的slotBasis(否則采用content代替),則計(jì)算slotBasis的CRC32值,CRC32值對(duì)槽數(shù)量進(jìn)行取模得到槽序號(hào),SlotKey設(shè)計(jì)為#{topic}_#{index},其中#{}表示占位符。然后,不同類型的消息有不同的添加方式,因此分布講述三種類型消息的添加過程。
區(qū)間重復(fù)合并消息
發(fā)送該消息時(shí)需要設(shè)置timeRange,timeRange必須大于0,單位為毫秒,表示消息將延遲timeRange毫秒后被消費(fèi),期間到來的重復(fù)消息將被合并,合并后的消息依然維持原來的消費(fèi)時(shí)間。因此在存儲(chǔ)該類型消息的時(shí)候,采用(當(dāng)前時(shí)間戳+timeRange)作為分?jǐn)?shù),添加消息采用Lua腳本執(zhí)行,保證操作的原子性,Lua腳本首先采用zscore命令檢查消息是否已經(jīng)存在,如果已經(jīng)存在則直接返回,如果不存在則執(zhí)行zadd命令添加。
優(yōu)先級(jí)消息
發(fā)送該消息時(shí)需要設(shè)置priority,priority必須大于16,表示消息的優(yōu)先級(jí),數(shù)值越大表示優(yōu)先級(jí)越高。因此在存儲(chǔ)該類型消息的時(shí)候,采用priority作為分?jǐn)?shù),采用zadd命令直接添加。
任意定時(shí)消息
發(fā)送該類型消息時(shí)需要設(shè)置fixedTime,fixedTime必須大于當(dāng)前時(shí)間,表示消費(fèi)時(shí)間戳,當(dāng)前時(shí)間大于該消費(fèi)時(shí)間戳的時(shí)候,消息才會(huì)被消費(fèi)。因此在存儲(chǔ)該類型消息的時(shí)候,采用fixedTime作為分?jǐn)?shù),采用命令zadd直接添加。
消費(fèi)者
二階段消費(fèi)方式
三種消費(fèi)模式
一般消息隊(duì)列存在三種消費(fèi)模式,分別是:最多消費(fèi)一次、至少消費(fèi)一次、只消費(fèi)一次。最多消費(fèi)一次模式消息可能丟失,一般不怎么使用。至少消費(fèi)一次模式消息不會(huì)丟失,但是可能存在重復(fù)消費(fèi),比較常用。只消費(fèi)一次模式消息被精確只消費(fèi)一次,實(shí)現(xiàn)較困難,一般需要業(yè)務(wù)記錄冪等ID來實(shí)現(xiàn)。RMQ實(shí)現(xiàn)了至少消費(fèi)一次的模式,那么如何保證消息至少被消費(fèi)一次呢?
至少消費(fèi)一次模式實(shí)現(xiàn)的難點(diǎn)
從最簡單的消費(fèi)模式——最多消費(fèi)一次說起,消費(fèi)者端只需要從消息隊(duì)列服務(wù)中取出消息就行,即執(zhí)行Redis的zpopmax命令,不倫消費(fèi)者是否接收到該消息并成功消費(fèi),消息隊(duì)列服務(wù)都認(rèn)為消息消費(fèi)成功。最多一次消費(fèi)模式導(dǎo)致消息丟失的因素可能有:網(wǎng)絡(luò)丟包導(dǎo)致消費(fèi)者沒有接收到消息,消費(fèi)者接收到消息但在消費(fèi)的時(shí)候宕機(jī)了,消費(fèi)者接收到消息但消費(fèi)失敗。針對(duì)消費(fèi)失敗導(dǎo)致消息丟失的情況比較好解決,只需要把消費(fèi)失敗的消息重新放入消息隊(duì)列服務(wù)就行,但是網(wǎng)絡(luò)丟包和消費(fèi)系統(tǒng)異常導(dǎo)致的消息丟失問題不好解決?赡苡腥藭(huì)想到,我們不把元素從有序集合中pop出來,我們先查詢優(yōu)先級(jí)最高的元素,然后消費(fèi),再刪除消費(fèi)成功的元素,但是這樣消息服務(wù)隊(duì)列就變成了同步阻塞隊(duì)列,性能會(huì)很差。
至少消費(fèi)一次模式的實(shí)現(xiàn)
至少消費(fèi)一次的問題比較類似銀行轉(zhuǎn)賬問題,A向B賬戶轉(zhuǎn)賬100元,如何保障A賬戶扣減100同時(shí)B賬戶增加100,因此我們可以想到二階段提交的思想。第一個(gè)準(zhǔn)備階段,A、B分別進(jìn)行資源凍結(jié)并持久化undo和redo日志,A、B分別告訴協(xié)調(diào)者已經(jīng)準(zhǔn)備好;第二個(gè)提交階段,協(xié)調(diào)者告訴A、B進(jìn)行提交,A、B分別提交事務(wù)。RMQ基于二階段提交的思想來實(shí)現(xiàn)至少消費(fèi)一次的模式。RMQ存儲(chǔ)設(shè)計(jì)中PrepareQueue的作用就是用來凍結(jié)資源并記錄事務(wù)日志,消費(fèi)者端即是參與者也是協(xié)調(diào)者。第一個(gè)準(zhǔn)備階段,消費(fèi)者端通過執(zhí)行Lua腳本從StoreQueue中Pop消息并存儲(chǔ)到PrepareQueue,同時(shí)消息傳輸?shù)较M(fèi)者端,消費(fèi)者端消費(fèi)該消息;第二個(gè)提交階段,消費(fèi)者端根據(jù)消費(fèi)結(jié)果是否成功協(xié)調(diào)消息隊(duì)列服務(wù)是提交還是回滾,如果消費(fèi)成功則提交事務(wù),該消息從PrepareQueue中刪除,如果消費(fèi)失敗則回滾事務(wù),消費(fèi)者端將該消息從PrepareQueue移動(dòng)到StoreQueue,如果因?yàn)楦鞣N異常導(dǎo)致PrepareQueue中消息滯留超時(shí),超時(shí)后將自動(dòng)執(zhí)行回滾操作。二階段消費(fèi)的流程圖如下所示。
實(shí)現(xiàn)方案的異常情況分析
我們來分析下采用二階段消費(fèi)方案可能存在的異常情況,從以下分析來看二階段消費(fèi)方案可以保障消息至少被消費(fèi)一次。
- 網(wǎng)絡(luò)丟包導(dǎo)致消費(fèi)者沒有接收到消息,這時(shí)消息已經(jīng)記錄到PrepareQueue,如果到了超時(shí)時(shí)間,消息被回滾放回StoreQueue,等待下次被消費(fèi),消息不丟失。
- 消費(fèi)者接收到了消息,但是消費(fèi)者還沒來得及消費(fèi)完成系統(tǒng)就宕機(jī)了,消息消費(fèi)超時(shí)到了后,消息會(huì)被重新放入StoreQueue,等待下次被消費(fèi),消息不丟失。
- 消費(fèi)者接收到了消息并消費(fèi)成功,消費(fèi)者端在協(xié)調(diào)事務(wù)提交的時(shí)候宕機(jī)了,消息消費(fèi)超時(shí)到了后,消息會(huì)被重新放入StoreQueue,等待下次被消費(fèi),消息被重復(fù)消費(fèi)。
- 消費(fèi)者接收到了消息但消費(fèi)失敗,消費(fèi)者端在協(xié)調(diào)事務(wù)提交的時(shí)候宕機(jī)了,消息消費(fèi)超時(shí)到了后,消息會(huì)被重新放入StoreQueue,等待下次被消費(fèi),消息不丟失。
- 消費(fèi)者接收到了消息并消費(fèi)成功,但是由于fullgc等原因使消費(fèi)時(shí)間太長,PrepareQueue中的消息由于超時(shí)已經(jīng)回滾到StoreQueue,等待下次被消費(fèi),消息被重復(fù)消費(fèi)。
重試次數(shù)控制的實(shí)現(xiàn)
采用二階段消費(fèi)方式,需要將消息在StoreQueue和PrepareQueue之間移動(dòng),如何實(shí)現(xiàn)重試次數(shù)控制呢,其關(guān)鍵在StoreQueue和PrepareQueue的分?jǐn)?shù)設(shè)計(jì)。PrepareQueue的分?jǐn)?shù)需要與時(shí)間相關(guān),正常情況下,消費(fèi)者不管消費(fèi)失敗還是消費(fèi)成功,都會(huì)從PrepareQueue刪除消息,當(dāng)消費(fèi)者系統(tǒng)發(fā)生異;蛘咤礄C(jī)的時(shí)候,消息就無法從PrepareQueue中刪除,我們也不知道消費(fèi)者是否消費(fèi)成功,為保障消息至少被消費(fèi)一次,我們需要做到超時(shí)回滾,因此分?jǐn)?shù)需要與消費(fèi)時(shí)間相關(guān)。當(dāng)PrepareQueue中的消息發(fā)生超時(shí)的時(shí)候,將消息從PrepareQueue移動(dòng)到StoreQueue。因此PrepareQueue的分?jǐn)?shù)設(shè)計(jì)為:秒級(jí)時(shí)間戳*1000+重試次數(shù)。不同類型的消息首次存儲(chǔ)到StoreQueue中的分?jǐn)?shù)表示的含義不盡相同,區(qū)間重復(fù)合并消息和任意定時(shí)消息存儲(chǔ)時(shí)的分?jǐn)?shù)表示消費(fèi)時(shí)間戳,優(yōu)先級(jí)消息存儲(chǔ)時(shí)的分?jǐn)?shù)表示優(yōu)先級(jí)。如果消息消費(fèi)失敗,消息從PrepareQueue回滾到StoreQueue,所有類型的消息存儲(chǔ)時(shí)的分?jǐn)?shù)都表示剩余重試次數(shù),剩余重試次數(shù)從16次不斷降低最后為0,消息進(jìn)入死信隊(duì)列。消息在StoreQueue和PrepareQueue之間移動(dòng)流程如下:
Pop 消息
不同類型的消息在消費(fèi)的時(shí)候Pop消息的方式不一樣,因此接下來分別講述三種類型消息的Pop方式。
區(qū)間重復(fù)合并消息
該消息存儲(chǔ)的分?jǐn)?shù)設(shè)計(jì)為消費(fèi)時(shí)間戳,當(dāng)前時(shí)間大于消息的消費(fèi)時(shí)間戳?xí)r,該消息應(yīng)該被消費(fèi)。因此采用Redis命令ZRANGEBYSCORE彈出分?jǐn)?shù)小于當(dāng)前時(shí)間戳的一條消息。
優(yōu)先級(jí)消息
該消息存儲(chǔ)的分?jǐn)?shù)設(shè)計(jì)為優(yōu)先級(jí),優(yōu)先級(jí)越高分?jǐn)?shù)越大,因此采用Redis命令ZPOPMAX彈出分?jǐn)?shù)最大的一條消息。
任意定時(shí)消息該消息存儲(chǔ)的分?jǐn)?shù)設(shè)計(jì)為消費(fèi)時(shí)間戳,當(dāng)前時(shí)間大于消息的消費(fèi)時(shí)間戳?xí)r,該消息應(yīng)該被消費(fèi)。因此采用Redis命令ZRANGEBYSCORE彈出分?jǐn)?shù)小于當(dāng)前時(shí)間戳的一條消息。
相關(guān)應(yīng)用
主圖價(jià)格表達(dá)項(xiàng)目
在主圖價(jià)格表達(dá)中需要實(shí)現(xiàn)一個(gè)功能,商品價(jià)格發(fā)生變化時(shí)將商品價(jià)格打印在商品主圖上面,那么需要在價(jià)格發(fā)生變動(dòng)的時(shí)候觸發(fā)合成一張帶價(jià)格的圖片,每一次觸發(fā)合圖時(shí)計(jì)算價(jià)格都是獲取當(dāng)前最新的價(jià)格。上游價(jià)格變化的因素很多,變化很頻繁,下游合圖消耗GPU資源較大,處理容量較低。因此需要盡可能合并觸發(fā)合圖消息,減輕下游處理壓力,于是使用了RMQ作為消息隊(duì)列來進(jìn)行削峰填谷、消息合并。不僅如此,還可以根據(jù)商家等級(jí)劃分觸發(fā)合圖消息的等級(jí),使KA商家能夠優(yōu)先得到處理,縮短價(jià)格變化的延遲。
在線上實(shí)際環(huán)境中,集群共130臺(tái)機(jī)器,RMQ消息隊(duì)列的發(fā)送消息能力和消費(fèi)消息能力均可以達(dá)到5w tps,而且這并不是峰值,理論上可以達(dá)到10w tps。
在線數(shù)據(jù)圈選引擎
在線數(shù)據(jù)圈選引擎需要處理各種來源的大量動(dòng)態(tài)數(shù)據(jù),需要將一段時(shí)間區(qū)間內(nèi)的消息合并處理,減少處理壓力,并且在對(duì)同一元數(shù)據(jù)進(jìn)行并發(fā)處理需要加分布式鎖,鎖沖突導(dǎo)致消費(fèi)效率下降。RMQ的區(qū)間重復(fù)合并消息和并發(fā)消費(fèi)控制能力可以幫助解決這些問題。目前,在線數(shù)據(jù)圈選引擎已經(jīng)采用了RMQ消息隊(duì)列作為核心組件,RMQ消息隊(duì)列發(fā)揮了很大的作用。
總結(jié)
本文提出了一種可實(shí)現(xiàn)的基于Redis的消息隊(duì)列,充分利用Sorted Set結(jié)構(gòu)設(shè)計(jì)了消息合并、優(yōu)先級(jí)、定時(shí)等特性,與傳統(tǒng)消息隊(duì)列形成互補(bǔ),彌補(bǔ)傳統(tǒng)消息隊(duì)列這方面特性的缺失。為了實(shí)現(xiàn)高可用,本文在二階段提交的思想上進(jìn)行改進(jìn)設(shè)計(jì)了二階段消費(fèi)方式,保障消息至少被消費(fèi)一次。未來將基于Redis的特性打造更多獨(dú)特的功能,與傳統(tǒng)消息中間件形成互補(bǔ)。在消費(fèi)控制方面會(huì)增加流量自動(dòng)調(diào)控能力,根據(jù)消息類型調(diào)控消費(fèi)速度,減少因?yàn)槟撤N類型消息消費(fèi)瓶頸導(dǎo)致整體消費(fèi)性能下降。