消息队列:RabbitMQ,Kafka
消息队列: RabbitMQ,Kafka
1)什么时候选择
RabbitMQ (AMQP 0-9-1 / 1.0,经典队列,Quorum Queues,Streams)
适用于:RPC/命令、工作流、短期任务、散播/主题路由、灵活确认、优先级管理。
优点:丰富的路由语义(exchanges),"基本。qos' (prefetch), per-message TTL/delay,方便的RPC (reply-to),轻松启动。
缺点:历史记录存储在队列中,横向缩放在队列/缝隙中;在非常大的流中高通量成本。
Apache Kafka(事件日志、聚会、消费者群体)
适用于:事件流、审计、事件来源、ETL/集成 (Connect)、高RPS/MBps,继电/重新处理、流处理(Streams/ksqlDB)。
优点:长期期刊,按批次缩放,稳定倒带,编译密钥。
缺点:"pull+party"模型不适用于小型RPC;仅在党内秩序;计划/兼容性管理是团队的职责。
2)交付语义和不变式
最多:没有撤退;很快,损失风险。
On-Least-once:带有复古;需要消费者的相容性。
Exactly-once:在有限条件下可实现(Kafka TX+偶数放大器+配对sink;RabbitMQ-通过重复数据消除/等效密钥表)。
顺序:RabbitMQ-队列顺序(在撤回/多用户时可能会中断);Kafka是批次的顺序,键指定批次。
域不变量:金钱/资产负债表-通过日志/传奇和偶数命令;不要依赖LWW。
3)集成模式
Outbox/InBox: DB中事件的原子记录→队列发布(outbox)和具有处理逻辑(inbox)的偶数消耗。
DLQ(死信):经过N尝试/错误-在DLQ+中。
Retry/Delay: RabbitMQ — TTL + dead-letter exchange;Kafka是带有背景的复古拓扑。
Request/Reply: RabbitMQ — `reply_to` + `correlation_id`;Kafka很少见,只有香料模式。
补偿:事件传奇;每个操作都是相反的。
4)密钥和拓扑设计
RabbitMQ
Exchanges: `direct`, `topic`, `fanout`, `headers`.
路由键:确定进入队列(和)。优先级-单独的队列。
QoS:"prefetch"(例如50-300)平衡速度/潜伏期。
Quorum Queues:在Raft上复制队列;更换镜面经典。
Streams:用于高通量/反射的离网流(类似于Kafka)。
Kafka
Topic → partitions:计划'#partitions'针对目标推导和并行(向后兼容的放大比缩小更容易)。
Key:单个密钥的所有条目均为同一批次(按密钥顺序保证)。
Replication factor: 3用于生产性主题,'min.insync.replicas=2'+'acks=all',用于可靠性。
Retention:按时间/大小划分;compaction-通过键+tombstones存储最新值以进行删除。
5)Retrai,DLQ,等效性
RabbitMQ
重播:带有backoff的per-message TTL+DLX(死信交换)(例如1m → 5m → 15m)。
等效性:"correlation_id"/"message-id"+已处理消息(TTL)表或确定性命令。
确认: 手动基本。成功交易后的ack;基本。nack(requeue=false)` в DLQ.
Kafka
重播:单独的复古拓扑;在成功的侧面效应之后,消费者将下注。
Exactly-once processing (EOS): Producer `enable.idempotence=true',交易生产者/消费者,消费者上的"read_committed";sink(例如,通过事务Kafka→Kafka或Kafka→DB)-轻轻同步。
Dedup:通过基底侧的键/等效键,或通过紧凑的主题。
6)性能和尺寸
Little定律: "L=λ × W"
对于枪手: 所需的'N ≈ arrival_rate × avg_processing_time ×库存(1。2–1.5)`.
RabbitMQ prefetch:从"prefetch=100"开始,测量p99/时间 "in flight"。
Kafka partitions:根据所需的消费者并行性和透视目标进行计算(例如,SSD/10GbE上的1批稳定为5-20 MB/s)。
7)可观察性和异同
一般性:- Lag/Backlog(消息/字节),消息年龄(p95/p99),error-rate处理,DLQ-rate。
- 时间"publikatsiya→obrabotka"(结束到结束)。
- 依赖图:生产商→经纪人→生产商。
RabbitMQ:
连接,通道,非封存消息,"memory_alarm","disk_free_limit","queue length" p95。
Quorum报告(领导者,Raft log,"quorum not enough"错过)。
Kafka:
Under-replicated partitions, ISR shrink/expand, controller changes.
Producer errors (timeouts, `request latency`), consumer lag per group/partition.
Broker I/O, page cache hit, GC, ZooKeeper/KRaft health.
8)安全性和多重性
Transit TLS加密,身份验证(SASL/PLAIN/SCRAM/OAuth,mTLS)。
授权:vhost/permissions(RabbitMQ),ACL到拓扑/组(Kafka)。
配额:连接、通道、队列/拓扑大小、发布/阅读速度。
隔离星期三(dev/stage/prod)和namespace/vhost。
9)操作和调整
RabbitMQ
通过节点(CPU/IO capacitet)分开exchanges/queues。
大缓冲区的Lazy queues(磁盘消息);避免"热"队列而不会出现障碍。
HA的Quorum Queues;规划Raft日志大小和磁盘。
TTL/length-limit策略,优先排队仅在实际需要(昂贵)的情况下。
bash rabbitmqctl set_policy DLX "^task\." \
'{"dead-letter-exchange":"dlx","message-ttl":60000,"max-length":100000}' --apply-to queues
Kafka
SSD/NVMe,快速网络;OS调音(swappiness低,文件限制)。
`acks=all`, `linger.ms'(战斗),compression。type=zstd'/lz4用于带宽。
消费者参数: 'max。poll.interval.ms`, `max.poll.records`, `fetch.min.bytes`.
Retention and compaction-存储/中继平衡。
可靠发布示例(Java,想法):java props. put("acks","all");
props. put("enable. idempotence", "true");
props. put("max. in. flight. requests. per. connection","1");
props. put("retries","10");
10)集成和生态系统
Kafka Connect(Sinks/Sources),Schema Registry(Avro/JSON/Protobuf)和兼容性("BACKWARD/FORWARD/FULL")。
Kafka Streams/ksqlDB:静态操作、窗口、单元。
RabbitMQ Shovel/Federation:集群/中心之间的转移。
运算符K8s:Strimzi(Kafka),RabbitMQ集群操作员;GitOps宣言。
11)实施清单(0-45天)
0-10天
定义使用情况:命令/task(RabbitMQ),事件/审计(Kafka)。
选择键("routing key"/"partition key"),设置SLO "publikatsiya→obrabotka"。
基本安全策略(TLS,ACL),配额,DLQ/TTL。
11-25天
引入outbox/inbox、Idempothy和dedup。
自定义backoff背面的背面(Rabbit: TTL+DLX;Kafka: retry topics).
Dashbords:lag,age,DLQ-rate,end to end latency;Alertes。
26-45天
调谐带宽: prefetch/acks(兔子);partitions/acks/batch (Kafka).
DR过程(镜像/复制),节点故障测试。
记录事件(模式)合同和兼容性策略。
12)反模式
一个用于所有任务的"通用"工具。
缺少DLQ/TTL:永久中毒(毒药消息)。
无限的"预言"→消费者饥饿,p99增长。
无钥匙的Kafka →缺省订单丢失/热派对。
没有真正需求/纪律的"Exactly-once"是虚假的安全感。
代码中的秘密/登录,没有TLS/ACL。
没有注册表和迁移的消息模式/版本的硬码。
13)成熟度量
Lag/age SLO ≥ 99%的时间执行;DLQ-rate处于控制之下。
相等性覆盖100%的关键途径;已实施outbox/inbox。
Retention/compaction已被记录下来,反射不会破坏消费者。
ISR/URP(Kafka)和Raft/磁盘限制(Rabbit)上的Alerta已配置。
事件合同被验证(Schema Registry),兼容性在CI中进行测试。
常规游戏日:主机/经纪人/AZ故障,恢复检查。
14)Configs示例(摘要)
RabbitMQ:预审和确认(pseudocode):python channel. basic_qos(prefetch_count=200)
for msg in consume("tasks"):
try:
handle(msg)
channel. basic_ack(msg. delivery_tag)
except Transient:
channel. basic_nack(msg. delivery_tag, request = False) # will go to DLQ
Kafka消费者(想法):
java props. put("enable. auto. commit","false");
props. put("isolation. level","read_committed"); // при EOS
//...
poll -> process(idempotent) -> commitSync()
15)结论
RabbitMQ和Kafka解决了不同类别的任务:命令/骨盆和丰富的路由与长期事件日志和可扩展的流媒体。成功在于正确的传递语义,相同性学科,深思熟虑的关键性,撤回/DLQ,可观察性和严格的安全性。围绕队列构建工程实践-outbox/inbox,schema和GitOps策略-您的集成将变得可预测,可扩展和可持续。