消息隊列:RabbitMQ,Kafka
消息隊列: RabbitMQ,Kafka
1)什麼時候選擇
RabbitMQ (AMQP 0-9-1 / 1.0,經典隊列,Quorum Queues,Streams)
適用於:RPC/命令、工作流、短期任務、散播/主題路由、靈活確認、優先級管理。
優點:豐富的路由語義(exchanges),"基本。qos' (prefetch), per-message TTL/delay,方便的RPC (reply-to),輕松啟動。
缺點:歷史記錄存儲在隊列中,橫向縮放在隊列/縫隙中;在非常大的流中高通量成本。
Apache Kafka(事件日誌、聚會、消費者群體)
適用於:事件流、審計、事件來源、ETL/集成 (Connect)、高RPS/MBps,繼電/重新處理、流處理(Streams/ksqlDB)。
優點:長期期刊,按批次縮放,穩定倒帶,編譯密鑰。
缺點:「pull+party」模型不適用於小型RPC;僅在黨內秩序;計劃/兼容性管理是團隊的職責。
2)交付語義和不變式
最多:沒有撤退;很快,損失風險。
On-Least-once:帶有復古;需要消費者的相容性。
Exactly-once:在有限條件下可實現(Kafka TX+偶數放大器+配對sink;RabbitMQ-通過重復數據消除/等效密鑰表)。
順序:RabbitMQ-隊列順序(在撤回/多用戶時可能會中斷);Kafka是批次的順序,鍵指定批次。
域不變量:金錢/資產負債表-通過日誌/傳奇和偶數命令;不要依賴LWW。
3)集成模式
Outbox/InBox: DB中事件的原子記錄→隊列發布(outbox)和具有處理邏輯(inbox)的偶數消耗。
DLQ(死信):經過N嘗試/錯誤-在DLQ+中。
Retry/Delay: RabbitMQ — TTL + dead-letter exchange;Kafka是帶有背景的復古拓撲。
Request/Reply: RabbitMQ — `reply_to` + `correlation_id`;Kafka很少見,只有香料模式。
補償:事件傳奇;每個操作都是相反的。
4)密鑰和拓撲設計
RabbitMQ
Exchanges: `direct`, `topic`, `fanout`, `headers`.
路由鍵:確定進入隊列(和)。優先級-單獨的隊列。
QoS:「prefetch」(例如50-300)平衡速度/潛伏期。
Quorum Queues:在Raft上復制隊列;更換鏡面經典。
Streams:用於高通量/反射的離網流(類似於Kafka)。
Kafka
Topic → partitions:計劃'#partitions'針對目標推導和並行(向後兼容的放大比縮小更容易)。
Key:單個密鑰的所有條目均為同一批次(按密鑰順序保證)。
Replication factor: 3用於生產性主題,'min.insync.replicas=2'+'acks=all',用於可靠性。
Retention:按時間/大小劃分;compaction-通過鍵+tombstones存儲最新值以進行刪除。
5)Retrai,DLQ,等效性
RabbitMQ
重播:帶有backoff的per-message TTL+DLX(死信交換)(例如1m → 5m → 15m)。
等效性:「correlation_id」/「message-id」+已處理消息(TTL)表或確定性命令。
確認: 手動基本。成功交易後的ack;基本。nack(requeue=false)` в DLQ.
Kafka
重播:單獨的復古拓撲;在成功的側面效應之後,消費者將下註。
Exactly-once processing (EOS): Producer `enable.idempotence=true',交易生產者/消費者,消費者上的「read_committed」;sink(例如,通過事務Kafka→Kafka或Kafka→DB)-輕輕同步。
Dedup:通過基底側的鍵/等效鍵,或通過緊湊的主題。
6)性能和尺寸
Little定律: 「L=λ × W」
對於槍手: 所需的'N ≈ arrival_rate × avg_processing_time ×庫存(1。2–1.5)`.
RabbitMQ prefetch:從「prefetch=100」開始,測量p99/時間 「in flight」。
Kafka partitions:根據所需的消費者並行性和透視目標進行計算(例如,SSD/10GbE上的1批穩定為5-20 MB/s)。
7)可觀察性和異同
一般性:- Lag/Backlog(消息/字節),消息年齡(p95/p99),error-rate處理,DLQ-rate。
- 時間「publikatsiya→obrabotka」(結束到結束)。
- 依賴圖:生產商→經紀人→生產商。
RabbitMQ:
連接,通道,非封存消息,「memory_alarm」,「disk_free_limit」,「queue length」 p95。
Quorum報告(領導者,Raft log,「quorum not enough」錯過)。
Kafka:
Under-replicated partitions, ISR shrink/expand, controller changes.
Producer errors (timeouts, `request latency`), consumer lag per group/partition.
Broker I/O, page cache hit, GC, ZooKeeper/KRaft health.
8)安全性和多重性
Transit TLS加密,身份驗證(SASL/PLAIN/SCRAM/OAuth,mTLS)。
授權:vhost/permissions(RabbitMQ),ACL到拓撲/組(Kafka)。
配額:連接、通道、隊列/拓撲大小、發布/閱讀速度。
隔離星期三(dev/stage/prod)和namespace/vhost。
9)操作和調整
RabbitMQ
通過節點(CPU/IO capacitet)分開exchanges/queues。
大緩沖區的Lazy queues(磁盤消息);避免「熱」隊列而不會出現障礙。
HA的Quorum Queues;規劃Raft日誌大小和磁盤。
TTL/length-limit策略,優先排隊僅在實際需要(昂貴)的情況下。
bash rabbitmqctl set_policy DLX "^task\." \
'{"dead-letter-exchange":"dlx","message-ttl":60000,"max-length":100000}' --apply-to queues
Kafka
SSD/NVMe,快速網絡;OS調音(swappiness低,文件限制)。
`acks=all`, `linger.ms'(戰鬥),compression。type=zstd'/lz4用於帶寬。
消費者參數: 'max。poll.interval.ms`, `max.poll.records`, `fetch.min.bytes`.
Retention and compaction-存儲/中繼平衡。
可靠發布示例(Java,想法):java props. put("acks","all");
props. put("enable. idempotence", "true");
props. put("max. in. flight. requests. per. connection","1");
props. put("retries","10");
10)集成和生態系統
Kafka Connect(Sinks/Sources),Schema Registry(Avro/JSON/Protobuf)和兼容性(「BACKWARD/FORWARD/FULL」)。
Kafka Streams/ksqlDB:靜態操作、窗口、單元。
RabbitMQ Shovel/Federation:集群/中心之間的轉移。
運算符K8s:Strimzi(Kafka),RabbitMQ集群操作員;GitOps宣言。
11)實施清單(0-45天)
0-10天
定義使用情況:命令/task(RabbitMQ),事件/審計(Kafka)。
選擇鍵(「routing key」/「partition key」),設置SLO 「publikatsiya→obrabotka」。
基本安全策略(TLS,ACL),配額,DLQ/TTL。
11-25天
引入outbox/inbox、Idempothy和dedup。
自定義backoff背面的背面(Rabbit: TTL+DLX;Kafka: retry topics).
Dashbords:lag,age,DLQ-rate,end to end latency;Alertes。
26-45天
調諧帶寬: prefetch/acks(兔子);partitions/acks/batch (Kafka).
DR過程(鏡像/復制),節點故障測試。
記錄事件(模式)合同和兼容性策略。
12)反模式
一個用於所有任務的「通用」工具。
缺少DLQ/TTL:永久中毒(毒藥消息)。
無限的「預言」→消費者饑餓,p99增長。
無鑰匙的Kafka →缺省訂單丟失/熱派對。
沒有真正需求/紀律的「Exactly-once」是虛假的安全感。
代碼中的秘密/登錄,沒有TLS/ACL。
沒有註冊表和遷移的消息模式/版本的硬碼。
13)成熟度量
Lag/age SLO ≥ 99%的時間執行;DLQ-rate處於控制之下。
相等性覆蓋100%的關鍵途徑;已實施outbox/inbox。
Retention/compaction已被記錄下來,反射不會破壞消費者。
ISR/URP(Kafka)和Raft/磁盤限制(Rabbit)上的Alerta已配置。
事件合同被驗證(Schema Registry),兼容性在CI中進行測試。
常規遊戲日:主機/經紀人/AZ故障,恢復檢查。
14)Configs示例(摘要)
RabbitMQ:預審和確認(pseudocode):python channel. basic_qos(prefetch_count=200)
for msg in consume("tasks"):
try:
handle(msg)
channel. basic_ack(msg. delivery_tag)
except Transient:
channel. basic_nack(msg. delivery_tag, request = False) # will go to DLQ
Kafka消費者(想法):
java props. put("enable. auto. commit","false");
props. put("isolation. level","read_committed"); // при EOS
//...
poll -> process(idempotent) -> commitSync()
15)結論
RabbitMQ和Kafka解決了不同類別的任務:命令/骨盆和豐富的路由與長期事件日誌和可擴展的流媒體。成功在於正確的傳遞語義,相同性學科,深思熟慮的關鍵性,撤回/DLQ,可觀察性和嚴格的安全性。圍繞隊列構建工程實踐-outbox/inbox,schema和GitOps策略-您的集成將變得可預測,可擴展和可持續。