Giter Site home page Giter Site logo

articles's People

Contributors

yeeyao avatar

Watchers

 avatar  avatar

articles's Issues

Go atomic 的实现

原子操作

overview

  • Go 语言标准中,sync/atomic 包将底层硬件提供的原子操作封装成 Go 函数,但这些操作只支持几种基本数据类型,因此为了扩大原子操作的适用范围,
    Go 在 1.4 版本向 sync/atomic 包中添加一个新的类型 Value,该类型的值相当于一个容器,可以被原子地存储(store)和加载(load)任意类型的值

起源

  • golang-dev 14 年讨论,有用户报告 encoding/gob 包在多核机器(80-core) 上的性能问题,认为 encoding/gob 之所以不能完全利用多核特性是因为
    它使用大量的互斥锁(mutex)。将这些互斥锁换成用 atomic.LoadPointer/StorePointer 来做并发控制,性能将提升 20 倍

  • 针对这个问题,讨论中提议在已有的 atomic 包基础上封装出一个 atomic.Value 类型,用户可以不依赖 unsafe.Pointer 情况下使用
    atomic 提供的原子操作。上述场景中,为何 atomic 会比 mutex 性能好很多,作者总结是
    mutexes do no scale, atomic loads do.

  • Mutex 由操作系统实现(OS 线程同步API),而 atomic 包中的原子操作则由底层硬件直接提供支持出处?。在 CPU 实现的指令集里,
    有一些指令被封装进 atomic 包,这些指令在执行过程中不允许中断(interrupt)。因此原子操作可以在 lock-free 情况下保证并发安全,
    同时性能可以做到随 CPU 个数增多而线性扩展。原子操作是相比其他同步技术更加基础的操作,原子操作是无锁的,通常直接通过 CPU 指令实现。

  • mutexes 需要 setup 以及 teardown,因此慢点,atomic operations 使用 atomic CPU 指令因此更快

原子性

  • 一个或者多个操作在 CPU 执行过程中不被中断的特性。这些操作对外表现成一个不可分割的整体,要么都执行,要么都不执行,
    外界不会看到它们只执行到一半的状态,现实世界中,CPU 不可能不中断执行一系列操作,但如果我们在执行多个操作时,能让它们的中间状态对外不可见,
    则我们可以宣称它们拥有了不可分割的原子性。

  • 这里的例子,在 Go(甚至大部分语言)中,一条普通赋值语句其实不是一个原子操作。比如 32 bit 机器上写 int64 类型变量就会有中间状态,
    因为它会拆分为两次写操作(MOV),写低 32 和 高 32 位。?如果一个线程刚写完低 32 位,还没来得及写高 32 位时,另一个线程读取了这个变量,
    则得到一个没有意义的中间值,导致程序出 bug。这是基础类型例子,如果我们对一个结构体赋值,则出现并发问题概率更高,可能写线程刚写完一小半字段,
    读线程就来读取这个变量,则只能读取到修改的一小部分字段,破坏变量完整性,同时读取的值也是错误的。

  • 面对这种多线程下变量读写问题,atomic.Value 登场了。它使得我们可以不依赖于不保证兼容性的 unsafe.Pointer 类型
    同时又能将任意数据类型的读写操作封装成原子性操作

用法

atomic.Value 类型对外暴露两个方法

  • v.Store(c) 写操作,将原始变量 c 存放到一个 atomic.Value 类型的 v 里;

  • c = v.Load() 读操作,从线程安全的 v 中读取上一步存放的内容

  • 只需要将需要并发保护的变量读取和赋值操作用 Load() 和 Store() 替代就好了

内部实现

数据结构

  • atomic.Value 被设计用来存储任意类型的数据,所以其内部字段是一个 interface{} 类型。文件还定义一个 ifaceWords 类型,
    将 interface{} 类型分解
type Value struct {
  v interface{}
}
type ifaceWords struct {
  typ  unsafe.Pointer
  data unsafe.Pointer
}

写入操作

  • 出于安全考虑,Go 不支持直接操作内存,它的标准库又提供一种不安全(不保证向后兼容性)的指针类型 unsafe.Pointer 让程序灵活操作内存。该类型可以
    绕过 Go 语言类型系统的检查,与任意指针类型相互转换。即如果两种类型具有相同的内存结构,我们可以用 unsafe.Pointer 作为桥梁, 让两种类型的指针相互转换
    从而实现同一份内存有不同的解读方式。比如 []byte 和 string 内部存储结构一样,但是 Go 的类型系统禁止它们互换,借助该指针类型就可以转换。
func (v *Value) Store(x interface{}) {
  if x == nil {
    panic("sync/atomic: store of nil value into Value")
  }
  vp := (*ifaceWords)(unsafe.Pointer(v))  // Old value
  xp := (*ifaceWords)(unsafe.Pointer(&x)) // New value
  for {
    typ := LoadPointer(&vp.typ)
    if typ == nil {
      // Attempt to start first store.
      // Disable preemption so that other goroutines can use
      // active spin wait to wait for completion; and so that
      // GC does not see the fake type accidentally.

      // 这里禁止抢占当前 Goroutine 来确保存储顺利完成 
      runtime_procPin()
      // 存一个标志位,宣告正在操作
      if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
        runtime_procUnpin()
        continue
      }
      // Complete first store.
      StorePointer(&vp.data, xp.data)
      StorePointer(&vp.typ, xp.typ)
      // 存储成功,标志位可抢占,直接返回
      runtime_procUnpin()
      return
    }
    if uintptr(typ) == ^uintptr(0) {
      // First store in progress. Wait.
      // Since we disable preemption around the first store,
      // we can wait with active spinning.
      continue
    }
    // First store completed. Check type and overwrite data.
    if typ != xp.typ {
      panic("sync/atomic: store of inconsistently typed value into Value")
    }
    StorePointer(&vp.data, xp.data)
    return
  }
}
代码实现
  • 这里的代码逻辑就像讨论中的伪代码。

  • 先获取类型,如果是 nil 表示是第一次存储。配合 CAS 实现乐观锁,先将 typ 设置为 ^uintptr(0)这个中间状态,如果失败则证明
    已经有别的线程抢先完成了赋值则解除抢占锁回到 for 循环第一步。需要注意先更新 data 再更新 typ。
    因为我们以 typ 字段作为写入是否完成的判断依据

  • 第一次写入未完成,如果看到 typ 字段还是 ^uintptr(0) 这个中间状态,表示第一次写入没有完成,继续循环,等待第一次写入完成

  • 在第一次写入完成,先检查上次写入类型和本次的类型是否一致,然后将本次的数值写入

  • 这里逻辑的主要**是,为了完成多个字段的原子性写入,我们可以抓住其中一个字段,以它的状态来标志整个原子写入的状态,TiDB 事务实现有类似的,
    它们叫做 Percolator 模型,主要**是选出一个 primaryRow,然后所有操作以这个 primaryRow 的成功与否作为标志。

  • atomic.Value 的存取通过 unsafe.Pointer(^uintptr(0)) 作为第一次存取的标志位,
    当 atomic.Value 第一次写入数据时,会将当前 Goroutine 设置为不可抢占, 并将要存储类型进行标记,再存入实际的数据与类型。
    当存储完毕后,即可解除不可抢占,返回。

  • 在不可抢占期间,且有并发的 Goroutine 再此存储时,如果标记没有被类型替换掉, 则说明第一次存储还未完成,形成 CompareAndSwap 循环进行等待。

  • tidb事务 这里有 Google 的相关论文可以阅读

读取操作

func (v *Value) Load() (x interface{}) {
  vp := (*ifaceWords)(unsafe.Pointer(v))
  typ := LoadPointer(&vp.typ)
  if typ == nil || uintptr(typ) == ^uintptr(0) {
    // First store not yet completed.
    return nil
  }
  data := LoadPointer(&vp.data)
  xp := (*ifaceWords)(unsafe.Pointer(&x))
  xp.typ = typ
  xp.data = data
  return
}
  • 如果 typ == nil 或者 ^uintptr(0) 表示第一次写入还没开始或者没完成,直接返回 nil(不对外暴露中间状态),否则,根据当前看到的
    typ 和 data 构造一个新的 interface{} 返回

总结

  • 原子操作由底层硬件支持,锁由操作系统的调度器实现,锁应该用来保护一段逻辑,对于一个变量更新的保护,原子操作通常会更有效率,
    同时更好利用多核优势。如果要更新的是一个复合对象,则应该使用 atomic.Value 封装好的实现

  • ref 这里稍做讨论。

原子操作实现

  • 我们以 atomic.CompareAndSwapPointer 为例,介绍 sync/atomic 包提供的同步模式。
    CompareAndSwapPointer 它在包中只有函数定义,没有函数体,其本身由运行时实现。
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
  • 实现的本质是使用 CPU 的 LOCK+CMPXCHGQ 指令:首先将 ptr 的值放入 BX,将假设的旧值放入 AX, 要比较的新值放入 CX。
    然后 LOCK CMPXCHGQ 与累加器 AX 比较并交换 CX 和 BX。因此原子操作本质上均为使用 CPU 指令进行实现(理所当然)

读写顺序性

原子性操作和 mutex 区别

difference between atomic operation and mutexes

  • ref

  • 一个或者多个操作在 CPU 执行过程中不被中断的特性。这些操作对外表现成一个不可分割的整体,要么都执行,要么都不执行,
    外界不会看到它们只执行到一半的状态,现实世界中,CPU 不可能不中断执行一系列操作,但如果我们在执行多个操作时,能让它们的中间状态对外不可见,
    则我们可以宣称它们拥有了不可分割的原子性

第一个答案

  • ref

  • 行为上没有差别,性能上的差别

  • 这里说,mutexes 需要 setup 以及 teardown,所以会 slow, 而 atomic operations 因为
    使用 atomic CPU 指令而更快。这里只有不同的适用场景区别

  • ref

  • 编译器将每个 sync/atomic 函数的调用转换为一个特殊集合的 CPU 级别的指令集合。比如 x86 atomic.AddInt64 将转换为
    有 LOCK 前缀的 ADD 指令。LOCK 保证在系统中所有的 CPU 将 coherent view 更新的内存地址。

  • mutex 是一个更加复杂的事情,最后,包装一些本地的特定 OS 的线程同步 API,比如 Linux 是 futex

  • Go runtime 经过同步的优化,同时,mutex 实现也尝试避免进入内核来在 goroutines 之间同步。最后,不能用原子性操作来在执行一个函数过程中保护一些内存
    状态,甚至一个语句。

第二个答案

重点

  • ref

  • 这里的重点是:atomicity(原子性)只是保证了当一个 CPU 正在写入数值而另一个正在读取它,读取的不会得到的数值包含第一个 CPU 开始重写它的之前
    的旧数值的位数以及正在写入的时候新数值的位数。这里的意思是保证读写操作完成,其他 CPU 才能读取。不会读取到修改的中间状态
    原子性不保证对数值进行观察的顺序, atomic.AddInt32() 不保证存放于 &cnt 的数值是 取值 cnt + 1(取值 cnt是操作开始的时候
    CPU 执行的活跃线程从内存中获取的数值)。 它不会提供任何保证,让任何尝试在同一时间读取这个数值的 goroutine 将获得的数值是 *cnt + 1

  • 另一方面,mutexes 和 channel 保证 shared/passed around 的数值的访问有严格的顺序

  • ref

  • 类似 atomic.AddInt32 的函数行为,在绝大多数的处理器中添加一个数值到内存需要一个 read-modify-write 内存循环
    (即使在 x86 中存在一个 add to memory 指令)。在一个多核机器上,如果两个核心同时执行 read-modify-write 循环,其中一个添加可能会丢失。
    atomic.AddInt32 保证不会有 additions 丢失

  • 除了上述 read-modify-write 循环,还有 atomic read atomic write 原子性操作

  • 同时,这里 说的,mutexes 的行为通过 Go 内存模型而得到保证,但是
    sync/atomic 函数则没有。可能内存模型应该讨论 sync/atomic,一个缺点是这可能会鼓励大家使用它们,这不是一个好主意。另一个缺点是,无法将这些函数
    描述成一个 happens-before 关系,比如一种情况是,如果对相同内存位置的 LoadInt32 在一个 StoreInt32 之后发生,则任何发生在 StoreInt32
    之前的事情将同样发生在 LoadInt32 之前,然而,没办法保证 LoadInt32 是否在 StoreInt32 之后发生,此时将无法确定内存的状态

  • 原子性操作是 CPU 的指令(loads, stores and read-modify-write sequences) 而 happens before
    这些事情是不会涉及 CPU,OS 线程等的高级别概念,只是一个观察的程序行为

延迟队列

延时队列

ref

业务场景:新建订单超过一定时间内支付就取消,设置日期提醒等

简单的情况可以直接轮询数据库,但是类似淘宝系统,数据量很多的情况下则不可能这样实现

为了解决上述问题,引入延时队列,以下是不同实现方案

Redis ZSet

ZSet 中每个元素按照 score 进行排序

实现

  1. 入队操作 ZADD KEY timestamp task 将任务按照其延迟处理时间作为 score 加入到 ZSet 中,ZAADD 时间复杂度是 O(logN)

  2. 起一个进程定时(比如每秒)通过 ZREANGEBYSCORE 方法查询 ZSet 中的 Score 最小元素

    • ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES O(logN + M) N 为 ZSet 元素个数,M 为查询元素个数
      • 查到分数小于当前时间戳,说明任务到执行时间了,则异步处理任务
      • 查到分数大于时间戳,还没有到执行时间,休眠后继续查询

一个架构

生产者 --> ZSet --> Events(ZooKeeper) --> MQ --> 消费者

设计

  1. 将延迟消息任务通过 hash 算法路由到不同 Redis Key

    • 避免同一个 Key 上存储较多延时消息后,入队和查询操作变慢
    • 系统有更好的横向扩展性,数据量激增可以增加 Redis Key 扩展
  2. 每个 Redis Key 对应建立一个处理进程,Event 进程来轮询 Key,查询是否有待处理的延迟消息

  3. 所有 Event 进程只负责分发消息,具体业务逻辑通过一个额外的消息队列异步处理

    • Event 进程只负责分发消息,处理消息速度
    • 使用额外消息队列,消息处理可扩展性好,可以通过增加消费者进程数量扩展系统处理消息能力
  4. Event 进程使用 ZooKeeper 选主单进程部署模式,避免 Event 进程宕机导致 Redis Key 消息堆积

RabbitMQ

依靠 TTL 和死信队列实现

死信队列

一种 RabbitMQ 消息处理机制,RabbitMQ 在生产和消费消息的时候,消息遇到情况就会变成死信

  1. 消息被拒绝 basic.reject/basic.nack 且不再重新投递 requeue=false
  2. 消息超时未消费,TTL 过期
  3. 消息队列到达最大长度

消息一旦变成一条死信,就会被重新投递到死信交换机

然后后者根据绑定规则转发到对应的死信队列上,监听该队列就可以重新消费消息

消息生存时间TTL

表示一条消息最大生存时间,单位毫秒

两种方式设置,一种是直接在创建队列的时候设置整个队列的 TTL 过期时间,所有入队的消息,都被设置成统一的过期时间,适用于延迟时间是固定值

针对单条消息设置单独的 TTL 过期时间

结构

生产者 --> 延迟队列交换机 --> 普通队列 --> 死信队列交换机 --> 死信队列 --> 消费者

TimeWheel

时间轮算法,一种实现延迟队列巧妙高效的算法,应用在 Netty,ZooKeeper kafka 等

时间轮

一个存储延迟消息的环形队列,底层是数组实现

环形队列中每个元素对应一个延迟任务列表,列表是双向链表,每一代表一个需要执行的延迟任务

时间轮有表盘指针,表示时间轮当前所指时间,随着时间推移,指针不断前进处理对应位置上的延迟任务列表

  • 这里将一个时间的任务放到一个列表中,有点哈希散列表的感觉,海星

添加延迟任务

时间轮大小固定,每个元素是双向环形链表,可以在 O(1) 时间添加延迟任务

比如当前时间是 2,我们想要添加一个延迟 3 秒的任务,快速计算出位置 5 并添加到 5 的任务队列尾部

多层时间轮

直接扩充整个时间轮内存以及访问效率低

Kafka 引入多层时间轮概念,类似时钟的时分秒

比如第一层时间轮表示时间范围是 0-12 s,第二层每个的范围也是 12 s,两层可以表示 144 s

针对时间,如果当前所有层次的时间轮无法表示,则添加一层新的时间轮

总结

Redis 实现方案简单,可以快速落地,但是存在数据丢失风险

RabbitMQ 实现方案本身消息可靠发送,可靠投递,死信队列,保障消息可靠性

Kafka 时间轮算法巧妙

消息队列设计

消息队列

作用

异步处理 消息放到队列,提高系统吞吐量

应用解耦,系统间通过消息通信,将生产者消费者隔离,提高系统扩展性

流量削峰 通过消息队列长度控制请求量,缓解短期内高并发请求

日志处理 解决大量日志传输

可靠投递,广播,流量控制,最终一致性等一系列功能

异步 RPC 的主要手段

具体存在的实现

ActiveMQ RabbitMQ Kafka Notify MetaQ RocketMQ 等

何时需要消息队列

可以使用 mq 的场景有很多 最常用的 做业务解耦/最终一致性/广播/错峰流控 等

如果需要强一致性,则 RPC 显得更合适,应该是 RPC 本身就可以通过返回来判断是否调用成功

解耦

解耦是消息队列要解决的最本质问题 这里解耦说的是一个事务,只关心核心流程,基于消息的模型,关心通知而非处理

主要功能和次要功能解耦,提高主要功能的处理能力,次要功能通过消息队列通知其他系统来完成

  • 这里理论上可以通过异步 RPC 来完成,然而最好也是像发布消息一样让系统来订阅完成
美团旅游的例子
产品中心的例子,它上游对接主站,移动后台,旅游供应链等各个数据源
下游对接筛选系统,API 系统等展示系统,上有数据发生变更时
如果不使用消息系统,则需要调用接口来更新数据,就特别依赖产品中心接口稳定性和处理能力
下游情况,可能更新索引,刷新缓存等一系列需求,上游和下游的更新数据其实更产品中心是没有关系的
如果使用接口来更新它们的数据则对于产品中心来说太重量级了
所以,这里只需要发布一个产品更新的通知,由下游系统来处理

订单系统的例子,订单最终支付成功后需要给用户发送短信积分什么的,不是系统的核心流程,如果和支付流程放在一起,则主流程的时间会增加
所以,我们只需要支付后,通过短信系统来通知客户已经支付成功,后者再发送短信

最终一致性

最终一致性指两个系统状态保持一致,要么都成功,要么都失败,理论上状态一致的时间越快越好,实际存在延迟,只能维持最终一致性

业界有一些为最终一致性而生的消息队列,比如 Notify QMQ 设计初衷就是为了交易系统中的高可靠通知

最终一致性不是消息队列必备特性,但可以依靠消息队列来做最终一致性的事情

像 kafka 一类的设计,在设计层面有丢消息的可能(比如定时刷盘,如果掉电就会丢消息),需要其他手段保证结果正确 TODO 备份

  • 这里理论是可靠的,数据写入靠磁盘进行备份
    银行转账例子 从 A 系统扣钱,然后 B 系统加钱,两个都成功就完成,其中一个不成功就一起回滚
    存在的意外是 A 扣钱了,B 价钱接口调用失败;A 扣钱,B 接口调用成功,网络异常引起超时;A B 其中一个失败,回滚时宕机了

最终一致性的解决方案

  1. 强一致性分布式事务,落地太难同时成本太高

  2. 最终一致性,主要用记录和补偿方式。做所有不确定的事情前,先把事情记录下来后去做,结果可能是成功,失败,或者不确定(可以等价失败)

    • 成功就可以将记录的东西清理掉,对于失败和不确定,可以依靠定时任务方式把所有失败事情重新执行一次,直到成功,这就类似 MySQL 的事务,使用 undo, redo log(重做日志)
  3. A 系统扣钱成功情况下,把要给 B 通知这件事记录在库中(为了保证最高可靠性可以把通知 B 系统加钱和扣钱成功这两件事维护在一个本地事务)

    • 通知成功则删除此条记录,通知失败或不确定则依靠定时任务补偿性通知我们,直到我们把状态更新到正确为止
  4. 整个模型依然可以基于 RPC 来做,但可以抽象成一个统一模型,基于消息队列来做一个企业总线

  5. 具体来说,本地事务维护业务变化和通知消息一起落地(失败则一起回滚)然后 RPC 到达 broker,broker 成功落地后,RPC 返回成功,本地消息可以删除

    • 否则本地消息一直靠定时任务轮询不断重发,这样保证消息可靠落地 broker
  6. broker 往 consumer 发送消息的过程类似,一直发送消息,直到 consumer 发送消费成功确认

  7. 我们先不理会重复消息问题,通过两次消息落地补偿,下游一定可以收到消息,然后依赖状态机版本号等方式判重,更新自己的业务,实现最终一致性

广播

如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口,有了消息队列,只需要关心消息是否送达了队列

错峰和流控

上下游对于事情的处理能力不同,Web 前端可以每秒承受成千上万请求,但数据库处理能力有限

系统和系统间的处理能力也不同,没有消息队列,可以通过协商或者滑动窗口等复杂方案处理

  • 滑动窗口,类似 tcp 处理网络拥塞问题,接收方在返回报文中加上自己能接收的报文大小(窗口大小)

所以,利用中间系统转储两个系统的通信内容,在下游系统有能力处理这些消息时处理,是一套相对通用的方式

总之,消息队列不是万能的,对于需要强事务保证且延迟敏感的,RPC 是优于消息队列的

对一些其他人关心但是对自己不重要的事情,可以用消息队列来处理

上下游系统处理能力存在差异时,利用消息队列做一个通用的漏斗,下游有能力处理时,再进行分发

如何设计

基于消息的系统模型,不一定需要 broker(消息队列服务端), Akka (actor 模型) ZeroMQ 等,都是基于消息的系统设计范式但是没有 broker

设计一个消息队列并且配备一个 broker 主要是

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机
  2. 规范一种范式和通用模式,满足解耦,最终一致性,错峰等需求
  3. 最简单的消息队列可以做成一个消息转发器,把一次 RPC 做成两次 RPC,发送者把消息投递到服务端,服务端将消息转发给接收端

一般来讲,设计消息队列整体思路是先建立一个整体的数据流

  • producer 发送给 broker, broker 发送给 consumer consumer 回复消费确认,broker 删除/备份消息等

利用 RPC 将数据流串起来,考虑 RPC 的高可用性,尽量做到无状态,方便水平扩展 类似 kafka

考虑如何承载消息堆积,在合适时机投递消息,处理堆积的最佳方式是存储,存储选型需要考虑性能/可靠性/开发维护成本

为实现广播功能,需要维护消费关系,可以利用 zk/config server 等保存消费关系

实现队列基本功能

RPC 通信协议

上述流程是三次 RPC 加上转储,其中 RPC 应该利用现有的框架 Thrift Dubbo gRPC

同样可以利用 memcached 或者 Redis 协议重新写一套 RPC 框架也可以

服务端提供两个 RPC 服务,一个用来接收消息,一个用来确认消息收到,同时做到不管哪个 server 收到消息和确认消息,结果一致 无状态

producer 尽量选择靠近本机房进行投递

高可用

所有高可用,依赖于 RPC 和存储的高可用实现

RPC 的高可用,比如美团基于 MTThrift 的 RPC 框架,阿里的 Dubbo 等,本身有服务自动发现,负载均衡等功能

消息队列的高可用,需要保证 broker 接收消息和确认消息的接口是幂等的,且 consumer 的几台机器处理消息是幂等的

  • 这样将消息队列可用性,转交给 RPC 框架处理

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者,broker 就失去它的意义

为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递

存储可以存储在内存,分布式 kv,磁盘,数据库等,主要分为持久化和非持久化

持久化形式能更大程度保证消息可靠性,理论能承载更大限度的消息堆积

并不是每种消息丢需要持久化存储,很多消息对于投递性能要求大于可靠性要求,数量极大(日志)暂存在内存,尝试几次最终投递也可以

存储子系统的选择

速度上来说 文件系统 > 分布式 KV(持久化)> 分布式文件系统 > 数据库,可靠性则反过来

需要从支持的业务场景做出最合理选择 消息队列支持支付/交易等可靠性要求高的,对性能和量级要求不高,没精力做文件存储系统研究,DB 最好

DB 受制于 IOPS,要求单 broker 5 位数以上的 QPS 性能,基于文件的存储比较好,整体采用数据文件+索引文件的处理方式

分布式 KV(MongoDB HBbase) 等或者持久化的 Redis,其编程接口友好,性能客观,可靠性要求不高的场景可以使用

消费关系解析

解析发送接收关系,进行正确的消息投递 Kafka Topic/Partition/ConsumerGroup RabbitMQ 的 Exchange

本质是单播和广播的区别,对于互联网大部分应用,组间广播,组内单播最常见

  • 组内广播一般是本地缓存更新等 slave 或者 缓存节点

一般通用设计是支持组间广播,不同组注册不同的订阅,组内不同机器,如果注册一个相同 ID 则单播,注册不同 ID (IP + 端口号) 则广播

广播关系的维护,一般消息队列本身是集群,所以维护在公共存储上,一般发送关系的维护和发送关系变更时的通知

队列高级特性设计

可靠投递(最终一致性)

完全不丢消息是可能的,前提是消息可能会重复,异常情况需要接收消息异常

解决方案:每当要发生不可靠事情(RPC 等)之前,先将消息落地,然后发送

  • 当失败或则和不知道成功还是失败时,消息状态是待发送,定时任务不断轮询所有待发送消息,最终一定可达
  • 对各种不确定(超时,宕机,消息未送达,送达后数据没有落地等)对发送方而言多是消息没有送达

具体

  • producer 往 broker 发送消息之前,做一次落地
  • 请求到 broker 后,broker 确保数据落地后再告诉客户端发送成功
  • 支持广播的消息队列需要对每个待发送的 endpoint 持久化一个发送状态,直到所有 endpoint 状态都 OK 才可以删除消息

重新推送消息面临消息重复,消息重复还有处理机会,消息丢失就无法再次找到

不是所有系统都要求最终一致性和可靠投递,任何基础组件要服务于业务场景

消息确认

当 broker 把消息投递给消费者后,消费者可以立即响应我收到了这个消息,但收到只是第一步,不一定能够处理,比如消费能力,系统负荷等

消息送达和消息处理分开,才真正实现消息队列本质-解耦,所以,允许消费者主动进行消费确认是必要的

收到正确消费 ack 的没有特殊,对于 reject 和 error,reject 业务方无法感知,同时消息处理时间和消息大小一般没有相关

  • 所以,reject 最好做成是滑动窗口/线程池来控制消息

消费能力不匹配时,直接拒绝,过一段时间重发,减少业务负担

重复消息和顺序消息

顺序消息能否一定满足,条件苛刻:允许消息丢失,从发送方到服务方到接收者都是单点单线程

所以绝对的顺序消息基本不能实现,在 METAQ/Kafka 等 pull 模型的消息队列中单线程生产/消费,排除消息丢失也是顺序消息解决方案

重复消息:如何鉴别重复消息,幂等处理重复消息;一个消息队列如何尽量减少重复消息的投递

  • 鉴别,每个消息应该有它的唯一身份,业务方自定义或者根据生成的消息 ID,记录这个 ID 就可以进行对比来鉴定重复

    • IP/PID/时间戳 或者数据库唯一键,bloom filter 分布式 kv 中的 key等 kafka key value timestamp
  • 幂等处理 版本号和状态机

    版本号
    每个消息自带版本号,上游发送时带上消息的版本号,下游对每次消息的处理,同时维护一个版本号,每次只接受比当前版本号大的消息,
    如果收到的消息版本号比当前的小,则是重复投递信息,但是如果不同版本号的消息达到顺序不一致,则参考 TCP/IP 协议,
    将小的版本号消息先保存,然后重新组织
    
    状态机
    使用版本号最大的问题是发送方必须要求信息带业务版本号,下游需要存储信息的版本号且严格保证顺序
    业务方自己维护一个状态机,定义各种状态的流转关系,比如下线状态只能接收上线消息,上线状态只能接收下线消息
    上线收到上线消息,或下线收到下线消息,消费者只需要告知不能处理这个消息给投递者,要求他过一段时间重发,同时限制重发次数
    这里的例子是消息都接收然后判断,如果状态不对就要求重发
    
    • 这里主要看业务逻辑 redis set 天然幂等

中间件对重复消息的处理

我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。

那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。

减少重复消息的关键

  • broker 记录 MessageID 直到投递成功后清除重复 ID 到来不做处理
    • 发送者在清除周期可以感知消息投递成功,基本不会在服务端产生重复消息
  • 对于 server 投递到 consumer 的消息,由于不确定对端是在处理过程还是消息发送丢失情况下,
    • 有必要记录下投递的 IP 地址,决定重发前询问 IP 消息处理成功了吗,询问无果再重发

事务

这里的例子是扣钱和加钱,需要满足事务一致性特征,要么都不进行,要么都成功

解决方案

  • 两阶段提交,分布式事务
  • 本地事务,本地落地,补偿发送

分布式事务最大问题是成本太高,两阶段提交协议,对于仲裁宕机或者单点故障,几乎无解黑洞,

  • 对于交易密集型或者 IO 密集型应用,没办法承受这么高的网络延迟,系统复杂性
  • 成熟的分布式事务一定构建在比较靠谱的商用 DB 和商用中间件,成本高

本地事务前文提及

  • 本地和业务在一个数据库实例中建表例子,扣钱业务操作在同一个事务里,将消息插入本地数据库,入库失败则业务回滚,入库成功,事务提交
  • 发送消息,只要消息没有发送成功,就一直靠定时任务重试
  • 这里说的本地业务做的是业务落地和消息落地的业务,不是业务落地和 RPC 成功的业务
    • 后者是事务嵌套 RPC,是大忌,会有长事务死锁等各种风险,同时这里不关心 RPC 的成功

消息只要落地成功,很大程度就没有丢失风险,消息投递到服务端确认后本地删除,就完成 producer 到 broker 的可靠投递

  • 消息存储异常时,业务可以回滚

本地业务最大两个使用障碍

  • 配置较为复杂,绑架业务方,必须本地数据库实例提供一个库表
  • 对于消息延迟高敏感的业务不适用

性能相关

异步同步

异步最终关心结果但不是当前的时间点,可以用轮询或者回调方式实现

同步是当时关心结果的

oneway 是发出去就不管死活的方式,对某些对可靠性没有要求的场景适用

任何 RPC 都是存在客户端异步和服务端异步的,可以任意组合

对客户端来说,同步和异步的区别主要是拿到一个 result 还是 future 的区别

  • 实现方式可以是线程池,NIO 或者其他事件机制

对于服务端异步,需要 RPC 支持。

  • 这里的例子是 Java 的 servlet 3.0 规范,服务端可以吐一个 future 给客户端,在 future done 的时候通知客户端

服务端异步的好处:解放了线程和 IO

  • 采用异步方式返回给客户端 future,有机会进行 IO 合并,将多个批次发送过来的消息一起落地
  • 解放线程指合并 IO 导致的 MySQL 线程不需要来一个请求就开一个线程

批量

生产者消费者模型最大痛点是:消费者应该何时消费

消费动作是事件驱动的,主要事件

  1. 攒够了一定数量
  2. 到达一定时间
  3. 队列中有新的数据到来

及时性很高的数据,可以用方式 3 比如客户端向服务端投递数据,只要队列有数据,就将队列中所有数据刷出,否则自己挂起,等待新数据到来

适量的延迟可以换取高性能,用定时/定量可能会理想一些

push 还是 pull

上文提到的消息队列,大多是针对 push 模型设计的,pull 模型有 Kafka MetaQ 等

慢消费

push 模型的最大致命伤是慢消费,如果消费者速度比发送者速度慢很多,会造成消息在 broker 堆积

最致命的是 broker 给 consumer 推送一堆 consumer 无法处理的消息,consumer 不是 reject 就是 error 然后来回踢皮球

反观 pull 模式,consumer 可以按需消费,不担心自己处理不了的消息来*扰自己,

  • 而 broker 也相对简单,无需记录每个要发送消息的状态,只需要维护所有消息队列和偏移量 这里说的是 kafka
  • 对于建立索引等慢消费,消息量有限且到来速度不均匀,pull 模式比较合适

消息延迟和忙等待

pull 模式最大短板,由于主动权在消费方,消费方无法准确地决定何时去拉取最新消息

如果一次 pull 取到消息了还可以继续 pull,如果没有 pull 取到则需要等待一段时间重新 pull

  • 这里等待多久很难判定,动态 pull 时间调整算法,依然存在延迟问题

阿里的 RocketMQ 中有一种优化做法-长轮询,基本思路是消费者如果尝试拉取失败,这里类似 comet

  • 不直接 return 而是把连接挂在那边 wait,服务端有新消息到来,把连接 notify 起来
  • 但是海量的长连接 block 对系统开销不容小觑

顺序消息

首先生产者需要保证发送顺序,需要使用唯一标记以及保证全局顺序唯一,之后才是消费者从 broker 中消费的问题

如果 push 模式的消息队列,支持分区,单分区只支持一个消费者消费

并且消费者只有确认一个消息消费后才能 push 另外一个消息,还要发送者保证全局顺序唯一

  • 这种方法可以做顺序消息,但是成本太高,尤其必须每个消息消费确认后才能发下一条消息,堆积能力和慢消费瓶颈 push 模式是灾难

pull 模式,全局顺序消息

  • producer 对应 partition 且单线程
  • consumer 对应 partition 消费确认,继续消费即可

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.