Giter Site home page Giter Site logo

blog2's People

Contributors

tavenyin avatar

Stargazers

 avatar

Watchers

 avatar

blog2's Issues

SpringBoot Websocket 实战

什么是Websocket

Websocket 是一种在单个TCP连接上进行全双工通信的协议。WebSocket连接成功后,服务端与客户端可以双向通信。在需要消息推送的场景,Websocket 相对于轮询能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

HTTP与Websocket

  • 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。

  • 依赖于TCP协议

  • 数据格式比较轻量,性能开销小,通信高效。

  • 可以发送文本,也可以发送二进制数据。

  • 没有同源限制,客户端可以与任意服务器通信。

  • 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

SpringBoot 中使用 Websocket

在简单了解Websocket 之后,我们来动手实践一下。SpringBoot 中有多种方式可以实现Websocket Server,这里我选择使用Tomcat 中 javax.websocket.server 的api来实现,结尾会给出demo地址

  1. 引入Maven依赖
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
  1. 创建一个Bean用于处理Websocket 请求,通过ServerEndpoint 声明当前Bean 接受的Websocket URL

这里为什么声明的是 @controller,后文会解释

import org.springframework.stereotype.Controller;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/message_websocket")
@Controller
public class MsgWebsocketController {

    @OnOpen
    public void onOpen(Session session) {
        // 先鉴权,如果鉴权通过则存储WebsocketSession,否则关闭连接,这里省略了鉴权的代码 
        WebSocketSupport.storageSession(session);
        System.out.println("session open. ID:" + session.getId());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        System.out.println("session close. ID:" + session.getId());
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("get client msg. ID:" + session.getId() + ". msg:" + message);
    }

    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

}

  1. 声明 ServerEndpointExporter
@Configuration
public class WebsocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

至此,Websocket Server 已经搭建完成,客户端已经可以和服务端通信了

服务端 向客户端推送消息 通过 session.getBasicRemote().sendText(message); 即可

源码浅析

我们来看下上述的短短几行代码是如何为我们构建 Websocket Server

ServerEndpointExporter

image.png

重点关注下红框中的内容

  1. ServerEndpointExporter 实现了 SmartInitializingSingleton,会在bean 实例化结束后调用 afterSingletonsInstantiated

  2. 从Spring上下文中获取所有标记@ServerEndpoint的Bean的name

其实 我们声明的 MsgWebsocketController 中并不是只能标记@controller,只是为了将其注册到Spring容器中,方便ServerEndpoint的注册而已,标记 @controller 更符合Spring的开发规范

3~4. 通过ServerContainer 将所有标记@ServerEndpoint的Bean 注册

ServerContainer 默认的实现类为 WsServerContainer,会对我们的ServerEndpoint做一个映射,URL => 对应的class,然后针对不同的事件调用指定的方法(例如建立连接时调用标记@Onopen的方法),这有点Spring DispatcherServlet 那味,感兴趣的同学可以自己看下

在了解了 Spring 为我们做了什么后,我们来完善一下我们的Demo

建立一个SessionManager

当我们想向客户端推送消息的时候,首先我们需要找到客户端与服务端建立的连接,也就是WebscoketSession

WsServerContainer 中虽然已经存储了 WebscoketSession,但是并没有办法直接通过SessionId,或者我们的业务Id 直接定位到指定的Session,所以我们需要实现一个自己的SessionManager

final ConcurrentHashMap<Object, Session> sessionPool = new ConcurrentHashMap<>();

使用 ConcurrentHashMap 管理即可

分布式推送解决

image.png

如图,用户1与服务器A建立Webscoket,用户2与服务器B建立Webscoket,那么用户1如果想向用户2推送一条消息,该如何实现?

WebscoketSession 实际上是网络连接,并不像我们传统应用的Session可以序列化到Redis,只能每个服务器管理自己的WebscoketSession,所以此时服务器A通知服务器B,你要给用户2推送一条消息。

一个比较简单有效的实现方法,利用消息队列,如下图

image.png

这个方案优点是实现简单,缺点是每台服务器都需要判断一遍当前是否存在指定的WebscoketSession ,方案细化的话则需要维护用户Session与每台服务器的关系,这样直接将消息推送给指定服务器即可

其他问题

测试时发现,当客户端断网后,服务端检测不到客户端失去连接的情况,依然可以调用Session的推送方法,服务端会一直持有这个无效的Session

目前想到的解决方案:设置WebsocketSession的最大空闲时间(session.setMaxIdleTimeout(milliseconds);),当超过这个时间时,服务端会关闭Session。前端定期发送一条心跳包,用于维持Session,当出现上述情况时,服务端也不会一直持有Session了

完整demo地址

关于demo的细节参考项目地址中Readme

Github 👉 https://github.com/TavenYin/taven-springboot-learning/tree/master/sp-websocket

参考

http://www.ruanyifeng.com/blog/2017/05/websocket.html

如果觉得有收获,可以关注我的公众号【殷天文】,第一时间接收到我的更新

Drools:规则加载 & 动态更新方案

前言

本文主要想聊下这几个问题

  1. Drools 的规则资源加载有几种方式
  2. Drools 的规则动态更新有几种方式

版本

7.69.0.Final

规则的加载

1. 使用 KieClasspathContainer

最简单的加载方式,官方的 demo 中使用的也是这种方式,从 classpath 下加载 kmodule 和规则资源。可以快速开始 Drools 应用开发

1.1. 引入 Drools 依赖

<dependency>
    <groupId>org.drools</groupId>
    <artifactId>drools-compiler</artifactId>
    <version>${drools.version}</version>
</dependency>
<dependency>
    <groupId>org.drools</groupId>
    <artifactId>drools-traits</artifactId>
    <version>${drools.version}</version>
</dependency>

1.2. 新建 resource/META-INF/kmodule.xml

<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://www.drools.org/xsd/kmodule">

    <kbase name="HelloWorldKB" packages="org.example.drools.helloworld">
        <ksession name="HelloWorldKS"/>
    </kbase>

</kmodule>

1.3. 新建 resource/org/example/drools/helloworld/hello.drl

package org.example.drools.helloworld;

rule "helloworld1"
    when
    then
    System.out.println("Hello World11111");
end

rule "helloworld2"
    when
    then
    System.out.println("Hello World2");
end

1.4. 创建 ClasspathContainer,并触发规则

KieServices ks = KieServices.Factory.get();
kieContainer = ks.newKieClasspathContainer();
KieSession kieSession = kieContainer.newKieSession("HelloWorldKS");
kieSession.fireAllRules();
kieSession.dispose();

创建 ClasspathContainer 流程浅析

当执行 ks.newKieClasspathContainer(); 时,会自动寻找 META-INF/kmodule.xml,用于创建 KieModule(KieModule 仅是对 KieBase 以及 KieSession 的定义)

当执行 kieContainer.newKieSession("HelloWorldKS") 时,会先创建 KieBase,此时也会去编译规则(如果你的规则文件比较大的话,这个编译过程可能会很慢)。KieBase 创建完成后,使用 KieBase 创建 KieSession

ClasspathContainer 方式小结

使用该方式的优点是简单、可以快速开发,但是缺点也很明显,规则和配置文件绑定在项目中(耦合度太高)。如果你不需要修改规则文件,这种方式还是可以采纳的

2. KieBuilder

KieServices ks = KieServices.Factory.get();
KieFileSystem kfs = ks.newKieFileSystem();
// kfs
kfs.write("src/main/resources/KBase1/ruleSet1.drl", drl);
kfs.write("src/main/resources/META-INF/kmodule.xml", ResourceFactory.newClassPathResource("META-INF/kmodule.xml"));
kfs.write("pom.xml", ResourceFactory.newFileResource("your_path/pom.xml"));

KieBuilder kieBuilder = ks.newKieBuilder(kfs);
kieBuilder.buildAll();
// releaseId 与 pom 中声明的一致
// 如果 kfs 中未写入 pom 的话,使用 ks.getRepository().getDefaultReleaseId()
KieContainer kieContainer = ks.newKieContainer(releaseId);

使用这种方式可以将规则和 kmodule.xml 存储在外部,简单说下流程

  1. 使用 KieFileSystem 创建一个基于内存的虚拟文件系统,kfs 中的文件路径规范参考 ClasspathContainer 方式
  2. KieBuilder 使用 kfs 中的 kmodule.xml 以及规则文件创建 KieModule(KieBuilder 内部再将 KieModule 保存在了 KieRepository)
  3. 通过 releaseId 创建 KieContainer,如果 kfs 中未指定 pom,则需要将 ks.getRepository().getDefaultReleaseId() 作为参数传入

当你希望把 Drools 资源外部存储时,使用 KieBuilder 是不错的方案

3. KieHelper

Resource resource = ...;
KieHelper helper = new KieHelper();
helper.addResource(resource, ResourceType.DRL);
KieBase kBase = helper.build();

使用 KieHelper 可以帮你快速创建一个 KieBase,可以认为是 KieBuilder 的操作简化,内部还是使用了 KieFileSystem 和 KieBuilder,只不过在创建 KieContainer 之后新建了一个 KieBase 作为返回值

测试的时候,或者说想自己管理 KieBase 的话,可以使用这个 API,总的来说不推荐使用。

4. KieScanner

这是在 Drools 官方文档中看到的一个*操作,通过动态加载 jar 的方式来实现资源加载和动态更新,下面简单介绍下。

首先我们需要将业务服务与 Drools 资源分离成两个 jar

Drools 资源 jar 具体结构如下,如果你习惯使用 drools-workbench 的话,也可以用它来创建资源 jar

│   pom.xml
│
└───src
    ├───main
    │   ├───java
    │   └───resources
    │       ├───com
    │       │   └───company
    │       │       └───hello
    │       │               helloworld.drl
    │       │
    │       └───META-INF
    │               kmodule.xml

pom 中需要注意的两点是

  • 你需要配置一个 jar 推送的远端仓库地址(这里我直接使用的是公司内部搭建的 Nexus)
  • 资源 jar 的 version 必须以 -SNAPSHOT 结尾

资源 jar 准备完成之后,使用命令 mvn clean deploy 将其推送到远端


下面是业务工程的操作

  1. 首先 pom 中引入 kie-ci,这里注意啊,不要引入你刚刚创建的资源 jar
        <dependency>
            <groupId>org.kie</groupId>
            <artifactId>kie-ci</artifactId>
            <version>7.69.0.Final</version>
        </dependency>
  1. 项目中加入如下代码
KieServices kieServices = KieServices.Factory.get();
// 注意这里的 releaseId 就是对应的是你资源 jar 的 groupId,artifactId,version
ReleaseId releaseId = kieServices.newReleaseId( "org.company", "drl-base", "0.1.0-SNAPSHOT" );
KieContainer kContainer = kieServices.newKieContainer( releaseId );
KieScanner kScanner = kieServices.newKieScanner( kContainer );
kScanner.start( 10000L );

然后启动业务服务 jar

  1. 访问业务服务验证规则是否加载

  2. 更新资源 jar 并推送至远端
    这时候可以看到业务进程会打出如下日志,说明规则更新成功

2022-05-19 16:43:11.223  INFO 20684 --- [        Timer-0] org.kie.api.builder.KieScanner           : The following artifacts have been updated: {yourjarName}
  1. 验证规则更新

KieScanner 原理浅析

  1. KieScanner 会启动一个线程,按照规定时间去扫描远端 maven 仓库(部署前要在 setting 中配置好 maven 远端仓库 url)
  2. 当发现快照时间戳发生变化时,下载到本地(具体如何动态加载的 class 这里我没太关注)
  3. 之后会新建一个 KieModule 通过 KieContainerImpl.updateToKieModule 来更新容器,本质上是更新 KBase

看到第三步后,我在想我自己是否可以利用这个 updateToKieModule 方法来实现更新 Container 呢?后来尝试了一下,证明可以

这里就不贴代码了,大概就是下面这样

KieBuilder = ...
KieModule kieModule = kieBuilder.getKieModule();
kContainer.updateToKieModule((InternalKieModule) kieModule);

规则库更新

1. updateToKieModule

上面讲到了 KieContainerImpl.updateToKieModule 的方式来更新规则库。

2. 创建新的 KieContainer

基于上面讲到的方式,其实可以想到。如果重新创建 KieContainer 的话,也相当于实现规则库动态更新。但是这种方式也存在一定问题

  • 这是开销最大的一种方式
  • 旧的 container 需要销毁,如果直接调用 dispose 方法清理资源可能会销毁正在使用的 kSession。

3. InternalKnowledgeBase

除此之外,KieBase 的实现类本身也提供了更新以及删除的 API

// 新增或者更新
KnowledgeBuilder kBuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kBuilder.add(resource, ResourceType.DRL);
if (kBuilder.hasErrors()) {
    KnowledgeBuilderErrors errors = kBuilder.getErrors();
    log.error(errors.toString());
    return;
}
InternalKnowledgeBase knowledgeBase = (InternalKnowledgeBase) kContainer.getKieBase(kieBaseName);
knowledgeBase.addPackages(kBuilder.getKnowledgePackages());

// 删除规则
knowledgeBase.removeKiePackage(packageName);
// 或者
knowledgeBase.removeRule(packageName, ruleName);

重点说明下,如果你要更新一个规则的话,直接调用 addPackages 即可,并不需要先删除再新增(这样反而有可能造成问题)

这种方式相比上面说到的 KieContainerImpl.updateToKieModule 的方式颗粒度要小一些,updateToKieModule 会更新所有的 KBase

并发更新规则

起因就是我想了解一下,KSession 正在执行时,更新 KBase 会有什么影响

举个例子具体说下

KnowledgeBuilder kbuilder = getKnowledgeBuilder("helloworld.drl");
InternalKnowledgeBase kieBase = KnowledgeBaseFactory.newKnowledgeBase();
kieBase.addPackages(kbuilder.getKnowledgePackages());
KieSession kieSession = kieBase.newKieSession();
kieSession.insert(1d);

CompletableFuture.runAsync(() -> {
    kieBase.removeKiePackage("com.example.drools.helloworld");
    log.info("remove package");
}).join();

kieSession.fireAllRules();
kieSession.dispose();

helloworld.drl 只有一个规则,在执行 fireAllRules 之前,执行了 KBase remove 操作,这会导致本次 fire 没有触发任何规则,因为此时 KBase 内部没有规则

这看起来好像挺合理的,但是如果你的本意是想先删除,再新增呢?删除 + 新增并没有一个原子操作,导致业务数据可能没有触发任何规则。

线程1 线程2
创建 kSession 并插入事实
kBase removePackage
fireAllRules
kBase addPackage

所以推荐尽可能不要在运行时做这种 删除 + 新增的操作


看到这时,其实我还有一个问题。当执行 kieSession.fireAllRules(); 时,规则库也允许被更新吗?

由于篇幅问题,这里我直接说结论:

  • fireAllRules 成功修改内部状态为 FIRING_ALL_RULES 时,任何 kBase 的修改操作会进入等待队列(等待 fire 结束)
  • 如果 kBase 修改操作先执行了,fireAllRules 会等待 kBase 更新成功后再触发规则

所以仅是动态更新规则的话,对 Drools 的执行是没有影响的

全文总结

  1. 我觉得既然使用了规则引擎,解耦是非常重要的,所以比较推荐使用 KieBuilder 的方式来加载规则;如果你真的就不需要规则资源外部存储的话,直接使用 ks.newKieClasspathContainer(); 就可以了

  2. 如果你想使用 KieScanner 的话,一定要注意做好快照版本的管理。生产环境和开发环境不能使用同一个 maven 仓库,或者使用不同的版本防止开发环境更新影响生产环境

  3. 规则库动态更新方案的话,本文总结了三种

    • 以创建 KieContainer 的方式,实现动态更新
    • 使用 KieContainerImpl.updateToKieModule
    • 使用 InternalKnowledgeBase 的 API

如果你需要动态更新 KieModule 的话,可以考虑使用 updateToKieModule 或者重新创建 KieContainer 的方式(需要注意销毁旧的 Container)

如果你仅仅是需要动态更新规则的话,可以考虑使用 InternalKnowledgeBase(该方式开销更小,需要注意不要使用删除+新增的方式)和 updateToKieModule (开销相对前者较大)

聊聊 IO 多路复用

像 Nginx 这种以高并发高性能闻名的项目,之所以性能如此优秀,其原因是使用了 IO 多路复用技术,可以用最少的进程来支持大量的请求。本文和大家一起聊聊什么是 IO 多路复用,它能带来什么

常见的 IO 模型

四种 IO 模型

所有的 IO 都分为两个阶段:"等待数据就绪" 和 "拷贝到用户空间"。

我最开始不熟悉 IO 的时候,很难理解为什么有这两个过程,下面我来仔细说说

等待数据就绪:以 socket.read 为例,不管我们使用什么编程语言,这个操作都是去交给操作系统完成的(调用系统函数)。在执行系统函数时 CPU 向网卡发出 IO 请求。网卡在接收到数据之后,由网卡的 DMA 把的数据写到操作系统的内核缓冲区,完成后通知 CPU,之后 CPU 会将内核缓冲区的数据拷贝到用户空间

拷贝到用户空间:操作系统有自己的内存区域,叫做内核空间。我们平常跑的程序都在用户空间,用户空间无法直接访问内核空间的数据,所以需要拷贝。拷贝到用户空间这个过程可以理解为纯 CPU 操作,非常地快,可以认为基本不耗时

聊聊开发中 BIO 场景

我们平时 Java 开发常用的框架 Spring 或者 Servlet 这都是经典的 BIO (Blocking IO,阻塞 IO)模型。

使用 Servlet 这种 BIO 的模型都需要大量的线程,其根本原因有两点

虽然 Servlet 已经支持了 NIO,但是本文还是把它作为一个经典的 BIO 模型来讨论

1. 压榨 CPU

当 IO 阻塞时,CPU 处于空闲状态。想象一下,你一条 SQL 发给数据库,这个时候必须等到数据库给你响应才能继续往下执行。而 IO 本身并不是时刻都需要 CPU 的参与(例如我们上面说的等待就绪过程不需要 CPU 参与)。虽然这个过程对于人类来说可能很快,但是 CPU 确实在摸鱼。所以 BIO 模型中为了充分的利用 CPU 必须使用大量的线程(当发生 IO 阻塞时,CPU切换到其他线程继续工作)

2. 为了处理更多的连接

以前的我很傻的认为一个线程只能处理一个连接。但是其实 Thread 和 Connection 这两个东西之间根本没有什么羁绊,你完全可以在一个线程中处理 N 多个请求,只不过对于靠后的请求来说,他要等待的时间太长了。所以在 BIO 模型中,大多数都是 每连接每线程 的方式

BIO 带来的瓶颈

BIO 为了更充分的利用 CPU 和处理更多的连接,必须要使用大量的线程。而线程过多带来的副作用就是占用大量内存,线程上下文切换占用大量 CPU 时间片等等

那么有没有更好的方案呢?

NIO

相对于 BIO 来说,NIO 调用可以立刻得到反馈,不需要再傻等了。以 read 为例,如果数据已经就绪则返回给用户;反之返回 0,永远不会阻塞。

NIO 特性貌似可以解决 BIO 的痛点,我们通过一个线程来监听所有的 socket,当 socket 就绪时,再进行读写操作,这样做可以吗?

可以,但是需要不断的遍历所有的 socket,这样做的话效率还是有点低,有更好的办法吗?

IO 多路复用

IO 多路复用可以在一个线程中监听多个文件描述符(Linux 中万物皆是文件描述符),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

select poll epoll

这三个函数都可以用于实现 IO 多路复用,简单来聊聊这三个函数

select:当被监听的 fd(文件描述符)就绪后会返回,但是我们无法知道具体是哪些 fd 就绪了,只能遍历所有的 fd。通常来说某一时刻,就绪的 fd 并不会很多,但是使用 select 必须要遍历所有的 fd,这就造成了一定程度上的性能损失。select 最多可监听的 fd 是有限制的,32位操作系统默认1024个,64位默认2048

poll:和 select 一样,使用 poll 时也无法知道具体哪些 fd 就绪了,还是需要遍历。poll 最大的改进就是没有了监听数量的限制,但是监听了过多的 fd 会导致性能不佳

epoll:通常在 Linux 系统中使用 IO 多路复用,都是在使用 epoll 函数。epoll 是 select 和 poll 的增强,可以通知我们哪些 fd 已经就绪了,并且没有监听数量的限制。所以使用 epoll 的性能要远远优于 select 和 poll

关于这三个函数的细节,感兴趣的同学自行谷歌一下,这里我就不多说了。

基于 IO 多路复用的开发模型

BIO 线程模型

BIO 线程模型

传统的 BIO 模型,每当服务端 accept 到一个 socket 时,就将其分配给一个线程单独处理。

单线程 Reactor

单线程 Reactor 模型

上图展示了一个单线程的 Reactor 模型

步骤1:accept,等待事件到来(Reactor负责)
步骤2:read + dispatch,将读就绪事件分发给用户定义的处理器(Reactor负责)
步骤3:decode,读数据(用户处理器负责)
步骤4:compute,处理数据(用户处理器负责)
步骤5:encode(用户处理器负责)
步骤6:send,写就绪后发送数据(Reactor负责)

为了方便大家理解,这里贴出单线程 Reactor 伪代码

// 参考自美团技术团队
interface ChannelHandler{
    void channelReadComplate(Channel channelbyte[] data);
    void channelWritable(Channel channel);
}
class Channel{
    Socket socket;
    Event event;//读,写或者连接
}

//IO线程主循环:
class IOThread extends Thread{
    Map<Channel, ChannelHandler> handlerMap;//所有channel的对应事件处理器

    public void run(){
        Channel channel;
        while(channel=Selector.select()){//选择就绪的事件和对应的连接
            if(channel.event==accept){
                registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
                Selector.interested(read);
            }
            if(channel.event==write){
                getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
            }
            if(channel.event==read){
                byte[] data = channel.read();
                if(channel.read()==0)//没有读到数据,表示本次数据读完了
                {
                    getChannelHandler(channel).channelReadComplate(channel, data);//处理读完成事件
                }

            }
        }
    }
}

通过使用 IO 多路复用,Reactor 模型可以非阻塞的在单个线程中处理多个 Socket,这样做性能确实很不错,但是能否在此基础上充分利用 CPU 多核来实现多路 IO 复用呢?

我们理解了单线程 Reactor 模型的工作原理之后,再来看看多线程 Reactor 模型如何工作

多线程 Reactor 1

多线程 Reactor 1

上图是 Reactor 的多线程的一种。例如 Netty 以及基于 Netty 的一些框架(Vert.x 和 WebFlux),使用的就是类似的线程模型,可以充分利用多核 CPU

mainReactor 负责 accpet。subReactor 是一组线程,在监听到 Socket 读写就绪时,进行相应的处理

需要注意的是,我们刚刚讲过的这两种 Reactor 模型,不能阻塞主 IO 线程。在 compute 过程中如果涉及 IO 调用,或者大量的计算的话,会导致整体系统吞吐量和响应时间降低。

但是不管什么系统,不涉及 IO 调用是不现实的。通常在 Vert.x 和 WebFlux 这种 Reactor 模型的框架中,执行阻塞 IO 和数据库调用都会在单独的线程池中

多线程 Reactor 2

多线程 Reactor 2

这也是多线程 Reactor 的一种,同样可以利用 IO 多路复用非阻塞的进行 read 和 send。

和单线程 Reactor 的区别是,把 decode, compute, encode 这三个步骤交给线程池来处理

该模型的缺点也和单线程模型类似,只使用了一个线程进行 Socket 的读写,所以该模型可以再优化一下。如下图所示

多线程 Reactor 2 优化

和上面的多线程 Reactor 1 思路是一样的,mainReactor 负责 accept,subReactor 是一组线程负责 Socket 的读写

Tomcat NIO 模式使用的就是类似的线程模型。虽然 Tomcat 中支持了 NIO,但是为什么基于 Tomcat 的应用还是需要大量的线程?首先 Tomcat 的NIO 仅仅是作用在 Socket 的读写。其次我们业务开发中使用了太多的 BIO API(例如 JDBC),没法完全的非阻塞化

使用 IO 多路复用框架能带来什么

IO 多路复用这么强,如果把业务开发全部改造成这种模型是不是性能会大幅度提升?实则不然,IO 多路复用的优势是使用更少的线程处理更多的连接,例如 Nginx,网关,这种可能需要处理海量连接转发的服务,它们就非常适合使用 IO 多路复用。IO 多路复用并不能让你的业务系统提速,但是它可以让你的系统支撑更多的连接

参考

https://tech.meituan.com/2016/11/04/nio.html
http://c.biancheng.net/view/2349.html
https://segmentfault.com/a/1190000003063859
https://www.zhihu.com/question/28594409
https://www.zhihu.com/question/37271342
https://juejin.cn/post/6844903492121788429
https://www.pdai.tech/md/java/io/java-io-nio-select-epoll.html

Spring Kafka:Retry Topic、DLT 的使用与原理

1. 背景

原生 Kafka 是不支持 Retry Topic 和 DLT (Dead Letter Topic,死信队列)。但是 Spring Kafka 在客户端实现了这两个功能。

2. 版本

spring-kafka 2.7.14(2.7.x 以下版本不支持 Retry Topic)

3. 默认重试策略

默认情况下,spring-kafka 在消费逻辑抛出异常时,会快速重试 10 次(无间隔时间),如果重试完成后,依旧消费失败,spring-kafka 会 commit 这条记录。

默认重试的实现原理是:重置当前 consumer offset,感兴趣的同学可以在 SeekUtils#doSeeks debug 一下

可以通过自定义 SeekToCurrentErrorHandler 来控制消费失败后的处理逻辑。例如:添加重试间隔,重试完成后依旧失败的消息发送到 DLT

3.1. 自定义 SeekToCurrentErrorHandler

    @Bean
    public ErrorHandler errorHandler(KafkaTemplate<Object, Object> kafkaTemplate) {
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        // 设置重试间隔 10秒 次数为 3次
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        // 创建 SeekToCurrentErrorHandler 对象
        return new SeekToCurrentErrorHandler(recoverer, backOff);
    }

添加上述代码后,消费逻辑抛出异常后,会间隔 10s 重试 3 次,重试后依旧失败,会将消息发送到 DLT

关于默认重试策略,Kafka 的 TopicPartition 只会分配给一个消费者,而消费者对于某条消息的重试,会占用消费线程,影响整个 TopicPartition 的消费速度。如果使用 Retry Topic 功能,不会占用消费线程,会有专门的 retry 线程订阅 Retry Topic 执行重试消费。

4. Retry Topic + DLT 使用

可以通过注解和全局配置的方式开启 Retry Topic 功能

4.1. @RetryableTopic

使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可

@Slf4j
@Component
public class SimpleConsumer {

    @RetryableTopic()
    @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
    public void onMessage(MessageWrapper message) {
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
        throw new RuntimeException("test kafka exception");
    }

}

此时 Retry Topic 功能已经启用了。当消费逻辑抛出异常时,spring-kafka 会先将消息发送到 Retry Topic,随后在 Main Topic(对应上文的test_topic)中 commit 这条消息。会有专门的线程订阅 Retry Topic,不会影响正常消费

默认重试 3 次,间隔为 1s,如果在重试结束后,还没有成功被消费,该消息会被发送到 DLT 中

默认情况,消息被发送到死信队列后,会输出一条日志。

2022-08-09 16:05:03.920  INFO 4048 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: test_topic-dlt-0@233

上述的日志输出是默认的死信订阅逻辑,用户可以在类中添加 @DltHandler 方法自定义死信消费逻辑

    @DltHandler
    public void processMessage(MessageWrapper message) {
        log.info("dlt {}", message);
    }

至此,你的 Kafka 就拥有了类似 RocketMQ 的消息重试能力,但是配置方面还需要调整一下。

4.2. 定制 @RetryableTopic

可以自定义重试次数,延迟时间,死信策略等等,同时大部分参数还支持使用 Spring EL 表达式读取配置,这里简单列举下,更多的配置读者可以自行探索

    @RetryableTopic(
            attempts = "${kafka.retry.attempts}",
            backoff = @Backoff(delayExpression = "${kafka.retry.delay}", multiplierExpression = "${kafka.retry.multiplier}"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
    )
    @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
    public void onMessage(MessageWrapper message) {
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
        throw new RuntimeException("test kafka exception");
    }

解释一下上述的配置

  • attempts:重试次数
  • @backoff delayExpression:消费延迟时间
  • @backoff multiplierExpression:乘数。举个例子,第一次delay = 10s,如果 multiplier = 2,则下次 delay = 20s,以此类推,但是会有一个 maxDelay 作为延迟时间上限
  • fixedDelayTopicStrategy:可选策略包括:每次重试发送到单独的 Topic、只使用一个重试 Topic

fixedDelayTopicStrategy 这个参数还是挺重要的,具体应该怎么选呢,我们稍后再说

4.3. RetryTopicConfiguration

    @Bean
    public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .maxAttempts(4)
                .fixedBackOff(15000)
                .includeTopic("test_topic")
                .create(template);
    }

使用这个方式配置项基本和注解一样,如果你有多个需要配置重试的消费者,使用 RetryTopicConfiguration 的方式要比注解方式更简单

5. 源码解析

5.1. 延迟重试怎么实现的

延迟重试这个功能应该分为两步

  1. 将需要重试的消息发送到 Retry Topic
  2. Retry Topic 的订阅者延迟消费

非常遗憾的是,Kafka 并没有延迟消息这样的功能,所以这个延迟消费也是 spring-kafka 自己实现的,不得不说这个组件真的下了很多功夫

接下来聊聊延迟重试的实现原理

5.1.1. 延迟消息标识

消息发送到 Retry Topic 这个步骤,感兴趣的同学可以 debug 一下 SeekToCurrentErrorHandler#handle 这里就不详细说了

每个需要被重试的消息,都会被添加 retry_topic-backoff-timestamp 这个 header,这个值代表这个消息的期望执行时间

开启了重试功能的 KafkaListener,在执行消费逻辑前,会先执行KafkaBackoffAwareMessageListenerAdapter#onMessage,该方法会先对消息进行检查

KafkaBackoffAwareMessageListenerAdapter#onMessage

这部分逻辑是:

  1. 首先检查 consumerRecord 是否包含 retry_topic-backoff-timestamp,如果有则进入步骤2
  2. 现在时间是否达到了期望执行时间,if ( nowTime > executeTime ) 该方法什么也不做,程序会立刻执行消费逻辑
  3. 未达到期望执行时间,准备暂停消费者对当前 TopicPartition 的消费,但是并不是在这里完成的,这个方法内部只是记录了一下需要暂停的 TopicPartition(这个数据存储在 KafkaMessageListenerContainer 的 pauseRequestedPartitions 中),并在 PartitionPausingBackoffManager 中存储了 BackOffContext,随后抛出一个异常打断消费流程

5.1.2. 暂停分区

只要 Kafka 消费线程还在运行,就会无限调用 KafkaMessageListenerContainer#pollAndInvoke

KafkaMessageListenerContainer#pollAndInvoke

pollAndInvoke 中 pausePartitionsIfNecessary 方法会根据 KafkaMessageListenerContainer 中存储的 pauseRequestedPartitions 暂停 partition,使用的方法是 Kafka Client 的 consumer.pause

调用 consumer.pause 之后,之后调用 consumer.poll 不会返回任何数据,直到调用 resume 恢复消费。该方法不会造成 Rebalance

5.1.3. 恢复分区

有了上面暂停消费的逻辑,还得有对应的恢复消费才能实现“延迟消费”,下面来看下恢复消费的逻辑

checkIdlePartition

KafkaMessageListenerContainer#checkIdlePartition 方法会不断地检查 partition 是否空闲(长时间未拉取到消息)。如果符合了空闲 partition 的标准,则发送事件 ListenerContainerPartitionIdleEvent

PartitionPausingBackoffManager

PartitionPausingBackoffManager 监听该事件,并尝试查找该 TopicPartition 是否存在 BackOffContext。存在则代表该分区被暂停,如果时间条件满足,从 KafkaMessageListenerContainer 的 pauseRequestedPartitions 删除该分区

KafkaMessageListenerContainer#resumePartitionsIfNecessary

最后 KafkaMessageListenerContainer#resumePartitionsIfNecessary 会将“已被 Kafka Consumer 暂停”但是“不存在于 KafkaMessageListenerContainer 的 pauseRequestedPartitions 的分区”恢复消费(通过 consumer.resume

5.1.4. 小结

画一张图来总结一下 Retry Topic 的执行流程

这里补充说明一下

  • 其实 MAIN_TOPIC 和 RETRY TOPIC 执行的代码是完全相同的,上图只是为了更好的让大家理解 Retry Topic 的流程
  • 本身 Kafka 消费流程是一个无限循环

5.2. 关于 Retry Topic 策略

下面详细说说 Topic 策略这个事

5.2.1. FixedDelayStrategy.MULTIPLE_TOPICS

test_topic 为例,此时我 attempts = 3, delay=10, multiplier=2,会额外创建以下三个 Topic

  • test_topic-retry-0
  • test_topic-retry-1
  • test_topic-dlt

第一次消费失败,会发送到 test_topic-retry-0,消息延迟为 10s
第二次消费失败,会发送到 test_topic-retry-1,消息延迟为 20s
第三次消费失败,会发送到 test_topic-dlt

此时每个 Retry Topic 中的消息延迟时间是相同的,在消费时间可控的情况下,消息延迟的时间不会有过大的偏差

该策略的缺点就是,使用了过多的 Topic,但是可以实现重试时间指数级上升

5.2.2. FixedDelayStrategy.SINGLE_TOPIC

延迟时间固定的情况适合使用 SINGLE_TOPIC 策略,该策略下只有一个 Retry Topic。如果 SINGLE_TOPIC 延迟时间指数级增长的话,很可能出现的问题是,第一条消息第三次重试延迟时间为 30s,第二条消息第一次重试延迟时间为 10s,两条消息被分配到同一分区,这二条消息被迫在 40s 之后才能重试

补充:如何使用多个 retry 线程

默认情况下,Main Topic,每个 Retry Topic,DLT 分别有 1 个消费线程,默认情况下 Retry 和 DLT 会使用 KafkaListener 提供的 ContainerFactory 初始化。

例如我把 KafkaListener concurrency 设置为 4。此时 Retry Topic,每个 Retry Topic,DLT 分别有 4 个消费线程

也可以自定义 Retry Topic 消费者使用的 ContainerFactory

spring-kafka 相关 demo

https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-kafka

参考

Project Reactor:OptimizableOperator 原理

前言

通常来说在响应式编程中 Publisher 的创建到真正的订阅者中间会经过许多的响应式操作符,而大部分的操作符其实都是 OptimizableOperator 的实现。

随便举几个例子,例如:map,flatMap,filter,doOnNext 等等。基本上所有对上游数据做处理的函数都实现了 OptimizableOperator

准备工作

你需要对 Reactor 或者响应式编程有一定了解

推荐阅读:
「响应式编程入门之 Project Reactor」

OptimizableOperator

OptimizableOperator

首先 OptimizableOperator 继承了 CorePublisher,这没什么可说的,因为不管是使用 Mono 还是 Flux 执行完任何操作之后返回的依旧是一个 Publisher

下面我们来看下 OptimizableOperator 的核心方法,先来简单说下,后边会结合源码来详细理解

OptimizableOperator.subscribeOrReturn

该方法有两种方式实现

  1. 返回一个 CoreSubscriber,返回的订阅者包装了下游的实际订阅者

  1. 返回 null
    当选择这种实现方式时,当前 OptimizableOperator 自行消费真正的 Publisher。将自己作为下游 Subscriber 的发布者

这两种实现方式有什么区别呢?第一种方式仅仅是作为一个消费者订阅。而第二种方式,Operator 相当于翻身农奴把歌唱了,自己当上了 Publisher,能做的事情比一种方式更多

OptimizableOperator.source

返回当前 OptimizableOperator 的上游(上游可能还是一个 OptimizableOperator)

OptimizableOperator.nextOptimizableSource

返回链中的下一个 OptimizableOperator(如果上游是 OptimizableOperator 则返回,否则返回 null)

InternalMonoOperator

我们来通过 InternalMonoOperator 来理解一下上述的几个方法

InternalMonoOperator

InternalMonoOperator 构造方法

InternalMonoOperator 是所有 Mono 操作符的父类,在执行操作符动作之前都会先构建操作符对象,最终会调用到 InternalMonoOperator 构造方法。

  1. 入参 source 为当前操作符的上游,
  2. super(source) 内部实现是将上游赋值给成员变量 source
  3. 如果上游同样也是操作符,还会将其赋值给 optimizableOperator

subscribe

当最后一个操作符被订阅时会执行如下逻辑。看似很难读,其实和上面我们讲的是一样的,下面一起来分析下

public final void subscribe(CoreSubscriber<? super O> subscriber) {
	// 将当前对象赋值给 operator
	OptimizableOperator operator = this;
	try {
		while (true) {
			// 调用操作符的 subscribeOrReturn
			// 1. 返回 != null,我们认为 operator 已经对实际订阅者做了包装。所以继续调用下一个操作符(也就是当前操作符的上游)
			// 2. 如果返回 == null,则代表 operator 要自己处理上游的消费和下游的订阅,方法结束
			subscriber = operator.subscribeOrReturn(subscriber);
			if (subscriber == null) {
				// null means "I will subscribe myself", returning...
				return;
			}
			// 返回当前 operator 的下一个操作符(也就是返回的上游的操作符)
			OptimizableOperator newSource = operator.nextOptimizableSource();
			if (newSource == null) {
				// 如果所有 operator 都执行完了,那么可以直接向 Publisher 发起订阅了
				// 订阅结束后,退出当前方法
				operator.source().subscribe(subscriber);
				return;
			}
			// 存在下一个操作符,继续执行该逻辑
			operator = newSource;
		}
	}
	catch (Throwable e) {
		Operators.reportThrowInSubscribe(subscriber, e);
		return;
	}
}

在理解了 subscribe 的逻辑之后,我们在以后在阅读 Reactor 操作符的源码时,就可以清楚地知道只需要关注该操作符的 subscribeOrReturn 方法即可。

如果 subscribeOrReturn 仅是返回一个 subscriber,那么我们只需要关注其 Subscriber 的相关逻辑即可。如果返回的 null,则代表该操作符要自行消费上游然后向下游传递订阅,我们需要关注他的 Subscription 相关

最后

如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助

Sentinel 热点参数限流原理

何为热点

热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制,比如:

  • 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
  • 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制

版本

本文基于 1.8.0

如何使用

  1. pom 中引入如下
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-core</artifactId>
            <version>${sentinel.version}</version>
        </dependency>
        <!-- 热点参数限流 -->
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-parameter-flow-control</artifactId>
            <version>${sentinel.version}</version>
        </dependency>
  1. 定义 ParamFlowRule
private static void loadRules() {
	ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
			.setParamIdx(0) // 指定当前 rule 对应的热点参数索引
			.setGrade(RuleConstant.FLOW_GRADE_QPS) // 限流的维度,该策略针对 QPS 限流
			.setDurationInSec(1) // 限流的单位时间
			.setCount(50) // 未使用指定热点参数时,该资源限流大小为50
			.setParamFlowItemList(new ArrayList<>());

	// item1 设置了对 goods_id = goods_uuid1 的限流,单位时间(DurationInSec)内只能访问10次
	ParamFlowItem item1 = new ParamFlowItem().setObject("goods_uuid1") // 热点参数 value
			.setClassType(String.class.getName()) // 热点参数数据类型
			.setCount(10); // 针对该value的限流值

	ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
}

这里的配置属性后文讲源码的时候都会看到,所以要重点关注一下

  • Rule 本身可以定义一个限流阈值,每个热点参数也可以定义自己的限流阈值
  • 还可以为限流阀值设置一个单位时间
  1. 调用
try {
	// 调用限流
	entry = SphU.entry(RESOURCE_KEY, EntryType.IN, 1, hotParamValue);
	// 业务代码...

} catch (BlockException e) {
	// 当前请求被限流
	e.printStackTrace();
} finally {
	if (entry != null) {
		entry.exit(1, hotParamValue);
	}
}

完整 demo 参考:https://github.com/TavenYin/taven-springcloud-learning/blob/master/sentinel-example/src/main/java/com/github/taven/limit/param/SimpleParamFlowDemo.java

之前有用过 Sentinel 的同学的话其实很好理解。配置方面的话 Rule 属性有些不同,调用方面,需要添加上本次调用相关的参数

举个例子,我们配置了对商品 ID = 1 的限流规则,每次请求商品接口之前调用 Sentinel 的限流 API,指定 Resource 并传入当前要访问的商品 ID。
如果 Sentinel 能找到 Resource 对应的 Rule,则根据 Rule 进行限流。Rule 中如果找到 arg 对应的热点参数配置,则使用热点参数的阈值进行限流。找不到的话,则使用 Rule 中的阈值。

实现原理

Sentinel 整体采用了责任链的设计模式(类似 Servlet Filter),每次调用 SphU.entry 时,都会经历一系列功能插槽(slot chain)。不同的 Slot 职责不同,有的是负责收集信息,有的是负责根据不同的算法策略进行熔断限流操作,关于整体流程大家可以阅读下 官网 中对 Sentinel 工作流程的介绍。

ParamFlowSlot

关于热点参数限流的逻辑在 com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot

public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
		// ParamFlowManager 中没有对应的 Rule,则执行下一个Slot
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            return;
        }

		// 限流检查
        checkFlow(resourceWrapper, count, args);
		// 执行下一个Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
		// 执行下一个Slot
        fireExit(context, resourceWrapper, count, args);
    }

    void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
        int paramIdx = rule.getParamIdx();
        if (paramIdx < 0) {
            if (-paramIdx <= length) {
                rule.setParamIdx(length + paramIdx);
            } else {
                // Illegal index, give it a illegal positive value, latter rule checking will pass.
                rule.setParamIdx(-paramIdx);
            }
        }
    }

    void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        if (args == null) {
            return;
        }
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            return;
        }
		// 获取 resource 对应的全部 ParamFlowRule 
        List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

        for (ParamFlowRule rule : rules) {
            applyRealParamIdx(rule, args.length);

            // 初始化该 Rule 需要的限流指标数据
            ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
			// 如果不满足某个 Rule 则抛出异常,代表当前请求被限流
            if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                String triggeredParam = "";
                if (args.length > rule.getParamIdx()) {
                    Object value = args[rule.getParamIdx()];
                    triggeredParam = String.valueOf(value);
                }
                throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
            }
        }
    }
}

ParamFlowSlot 中代码不多,也没做什么事。参考注释的话应该很好理解。咱们直接挑干的讲,来看下 ParamFlowChecker 中是如何实现限流的

ParamFlowChecker 数据结构

热点参数限流使用的算法为令牌桶算法,首先来看一下数据结构是如何存储的

public class ParameterMetric {

    /**
     * Format: (rule, (value, timeRecorder))
     *
     * @since 1.6.0
     */
    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
    /**
     * Format: (rule, (value, tokenCounter))
     *
     * @since 1.6.0
     */
    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
	
    private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
	
	// 省略...
	
}

Sentinel 中 Resource 代表当前要访问的资源(方法或者api接口),一个 Resource 可以对应多个 Rule,这些 Rule 可以是相同的 class。

现在再来看 ParameterMetric 的结构,每个 Resource 对应一个 ParameterMetric 对象,上述 CacheMap<Object, AtomicLong> 的 Key 代表热点参数的值,Value 则是对应的计数器。

所以这里数据结构的关系是这样的

  • 一个 Resource 有一个 ParameterMetric
  • 一个 ParameterMetric 统计了多个 Rule 所需要的限流指标数据
  • 每个 Rule 又可以配置多个热点参数

CacheMap 的默认实现,包装了 com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap 使用该类的主要原因是为了实现热点参数的 LRU

详细解释一下,这三个变量

  • ruleTimeCounters :记录令牌桶的最后添加时间,用于 QPS 限流
  • ruleTokenCounter :记录令牌桶的令牌数量,用于 QPS 限流
  • threadCountMap :用于线程级别限流,这个其实和令牌桶算法没有关系了,线程限流只是在 Rule 中定义了最大线程数,请求时判断一下当前的线程数是否大于最大线程,具体的应用在 ParamFlowChecker#passSingleValueCheck

实际使用 ParameterMetric 时,使用 ParameterMetricStorage 获取 Resource 对应的 ParameterMetric

public final class ParameterMetricStorage {
    // Format (Resource, ParameterMetric)
    private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
    // 省略相关代码 
}

ParamFlowChecker 执行逻辑

ParamFlowChecker 中 QPS 级限流支持两种策略

  • CONTROL_BEHAVIOR_RATE_LIMITER :请求速率限制,对应的方法ParamFlowChecker#passThrottleLocalCheck
  • DEFAULT :只要桶中还有令牌,就可以通过,对应的方法ParamFlowChecker#passDefaultLocalCheck

接下来我们将以 passDefaultLocalCheck 为例,进行分析。但是在这之前,先来捋一下,从 ParamFlowSlot#checkFlowParamFlowChecker#passDefaultLocalCheck 这中间都经历了什么,详见👇

// 伪代码,忽略了一些参数传递
checkFlow() {
	// if 没有对应的 rule,跳出 ParamFlowSlot 逻辑

	// if args == null,跳出 ParamFlowSlot 逻辑

	List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

	rules.forEach(r -> {
	    // 初始化该 Rule 需要的限流指标数据
		ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
		
		if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
			// 抛出限流异常
		}
	})

}

passCheck() {
	// 从 args 中获取本次限流需要使用的 value
	int paramIdx = rule.getParamIdx();
	Object value = args[paramIdx];
	
	// 根据 rule 判断是该请求使用集群限流还是本地限流
	if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
		return passClusterCheck(resourceWrapper, rule, count, value);
	}

	return passLocalCheck(resourceWrapper, rule, count, value);
}

passLocalCheck() {
	// 如果 value 是 Collection 或者 Array
	// Sentinel 认为这一组数据都需要经过热点参数限流校验
    // 遍历所有值调用热点参数限流校验
	if (isCollectionOrArray(value)) {
		value.forEach(v -> {
			// 当数组中某个 value 无法通过限流校验时,return false 外部会抛出限流异常
			if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
				return false;
			}
		})
	}
}

passSingleValueCheck() {
	if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
		if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
			// 速率限制
			return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
		} else {
			// 默认限流
			return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
		}
	} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
		// 线程级限流逻辑
	}
}

上面提到了一个集群限流,和上一篇中说到的集群限流实现原理是一样的,选出一台 Server 来做限流决策,所有客户端的限流请求都咨询 Server,由 Server 来决定。由于不是本文重点,就不多说了。

ParamFlowChecker 限流核心代码

铺垫了这么多,终于迎来了我们的主角 ParamFlowChecker#passDefaultLocalCheck,该方法中实现了简单的令牌桶算法,用于热点参数限流

static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
									 Object value) {
	// 根据 resource 获取 ParameterMetric
	ParameterMetric metric = getParameterMetric(resourceWrapper);
	// 根据 rule 从 metric 中获取当前 rule 的计数器
	CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
	CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

	if (tokenCounters == null || timeCounters == null) {
		return true;
	}

	// Calculate max token count (threshold)
	Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
	long tokenCount = (long)rule.getCount();
	// 如果热点参数中包含当前 value,则使用热点参数配置的count,否则使用 rule 中定义的 count
	if (exclusionItems.contains(value)) {
		tokenCount = rule.getParsedHotItems().get(value);
	}

	if (tokenCount == 0) {
		return false;
	}

	long maxCount = tokenCount + rule.getBurstCount();
	// 当前申请的流量 和 最大流量比较
	if (acquireCount > maxCount) {
		return false;
	}

	while (true) {
		long currentTime = TimeUtil.currentTimeMillis();
		
		// 这里相当于对当前 value 对应的令牌桶进行初始化
		AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
		if (lastAddTokenTime == null) {
			// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
			tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
			return true;
		}

		// Calculate the time duration since last token was added.
		long passTime = currentTime - lastAddTokenTime.get();
		// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
		if (passTime > rule.getDurationInSec() * 1000) {
			// 补充 token
			AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
			if (oldQps == null) {
				// Might not be accurate here.
				lastAddTokenTime.set(currentTime);
				return true;
			} else {
				long restQps = oldQps.get();
				// 每毫秒应该生成的 token = tokenCount / (rule.getDurationInSec() * 1000)
				// 再 * passTime 即等于应该补充的 token
				long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
				// 补充的 token 不会超过最大值
				long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
					: (restQps + toAddCount - acquireCount);

				if (newQps < 0) {
					return false;
				}
				if (oldQps.compareAndSet(restQps, newQps)) {
					lastAddTokenTime.set(currentTime);
					return true;
				}
				Thread.yield();
			}
		} else {
			// 直接操作计数器扣减即可
			AtomicLong oldQps = tokenCounters.get(value);
			if (oldQps != null) {
				long oldQpsValue = oldQps.get();
				if (oldQpsValue - acquireCount >= 0) {
					if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
						return true;
					}
				} else {
					return false;
				}
			}
			Thread.yield();
		}
	}
}

令牌桶算法核心**如下图所示,结合这个图咱们再来理解理解代码

核心逻辑在 while 循环中,咱们直接挑干的讲

先回顾一下上面说过 tokenCounters 和 timeCounters,在默认限流实现中,这两个参数分别代表最后添加令牌时间,令牌剩余数量

while 逻辑:

  1. 首先如果当前 value 对应的令牌桶为空,则执行初始化
  2. 计算当前时间到上次添加 token 时间经历了多久,即 passTime = currentTime - lastAddTokenTime.get() 用于判断是否需要添加 token
    2.1 if (pass > rule 中设定的限流单位时间) ,则使用原子操作为令牌桶补充 token(具体补充 token 的逻辑详见上面代码注释)
    2.2 else 不需要补充 token,使用原子操作扣减令牌

可以看到关于 token 的操作全是使用原子操作(CAS),保证了线程安全。如果原子操作更新失败,则会继续执行。

速率限制的实现

再顺便叨咕下上面说过CONTROL_BEHAVIOR_RATE_LIMITER 速率限制策略是如何实现的,只简单说说思路,具体细节大家可以自己看下源码

该策略中,仅使用 timeCounters,该参数存储的数据变成了 lastPassTime(最后通过时间),所以这个实现和令牌桶也没啥关系了

新的请求到来时,首先根据 Rule 中定义时间范围,count 计算 costTime,代表每隔多久才能通过一个请求

long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);

只有 lastPassTime + costTime <= currentTime ,请求才有可能成功通过,lastPassTime + costTime 过大会导致限流。

最后

如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助

Oracle LogMiner 数据迁移实战

LogMiner 是什么

LogMiner 是Oracle官方提供的工具,可以解析 Redo log 和 Archived Redo log

LogMiner 可以做什么?

官方文档中列举了很多,大家可以自己去看下。

我们目前的项目在使用基于LogMiner 的 Debezium Oracle Connector 做数据迁移

Oracle LogMiner 数据迁移的原理是什么?

首先需要了解几个概念,这里简单介绍下

  • Redo log:Redo中记录了所有对数据块的更改,Oralce 要求至少有两个以上的Redo Log Group

  • Archived Redo log:当一个Redo Log 写满之后,会发生日志切换,数据的更改会记录到下一个Redo Log中(所以一定要有两个以上的Redo)。如果开启了归档模式,Oracle 会将写满的Redo Log 归档。

  • SCN (System Change Number):Oracle 内部逻辑时间戳

  • Flashback:通过闪回查询 SELECT ... AS OF SCN 可以查询Oracle某个时间点的全量数据


思路如下:

  1. 首先查询出一下当前的SCN
  2. 根据SCN 查询出这一时刻的全量数据
  3. 通过Logminer 指定Start_SCN,获取增量数据

安装与配置

想尝试却不太熟悉Oracle的同学,可以参考一下我整理的文档

小试牛刀

在准备好了环境之后,我们来开箱体验一下Logminer

logminer 用户登录 conn c##logminer/password

  1. 构建数据字典

    LogMiner使用数据字典将内部对象标识符和数据类型转换为正常字段和数据格式

      # 这是一条常规的SQL
       INSERT INTO HR.JOBS(JOB_ID, JOB_TITLE, MIN_SALARY, MAX_SALARY)  VALUES('IT_WT','Technical Writer', 4000, 11000);
      # 如果没有数据字典,数据会是这样的
      insert into "UNKNOWN"."OBJ# 45522"("COL 1","COL 2","COL 3","COL 4")     
      values (HEXTORAW('45465f4748'),HEXTORAW('546563686e6963616c20577269746572'),HEXTORAW('c229'),HEXTORAW('c3020b'));
    

    官方文档中提到三种方式:

    • 在线数据字典:当你可以访问创建Redo的源数据库并且表结构不会发生任何变动时。可以考虑使用在线数据字典。这是最简单有效的,也是Oracle的推荐选项
      由于在线数据字典永远存储的是最新的结构。如果发生了表结构变动,Logminer 捕获到旧版本的数据,SQL将会如上述代码块中那样

    • 提取数据字典到redo中:
      需要执行命令BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;
      该操作会占用一定数据库资源

      LogMiner 在启动时会通过指定的数据字典选项维护一个内部数据字典,当启动LogMiner时指定 DBMS_LOGMNR.DDL_DICT_TRACKING ,LogMiner会自动捕获DDL来更新内部字典,这样即使发生了表结构变动时,也可以正确的解析DDL。注意:该选项不能和在线数据字典同时使用
      更多解释参考Oracle文档:https://docs.oracle.com/en/database/oracle/oracle-database/12.2/sutil/oracle-logminer-utility.html#GUID-56743517-A0C0-4CCD-9D20-2883AFB5683B

    • 提取数据字典到Flat File:Oracle维护该选项是为了兼容历史版本,本文并没有使用到该方式,不多做介绍

    这一步我选择在线数据字典,什么都不用做,直接进入下一步

  2. 添加日志文件

        # 查询目前的redol og
    SQL> select member from v$logfile;
    
    MEMBER
    --------------------------------------------------------------------------------
    /opt/oracle/oradata/ORCLCDB/redo03.log
    /opt/oracle/oradata/ORCLCDB/redo02.log
    /opt/oracle/oradata/ORCLCDB/redo01.log
    
         # 添加redo log
    SQL> EXECUTE DBMS_LOGMNR.ADD_LOGFILE( -
       LOGFILENAME => '/opt/oracle/oradata/ORCLCDB/redo03.log', -
       OPTIONS => DBMS_LOGMNR.NEW);
    EXECUTE DBMS_LOGMNR.ADD_LOGFILE( -
       LOGFILENAME => '/opt/oracle/oradata/ORCLCDB/redo02.log', -
       OPTIONS => DBMS_LOGMNR.ADDFILE);
    EXECUTE DBMS_LOGMNR.ADD_LOGFILE( -
       LOGFILENAME => '/opt/oracle/oradata/ORCLCDB/redo01.log', -
       OPTIONS => DBMS_LOGMNR.ADDFILE);
    
  3. START_LOGMNR

    # 使用在线数据字典进行log解析
    SQL> EXECUTE DBMS_LOGMNR.START_LOGMNR(-
    OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG);
    

    然后执行一条INSERT

  4. 查询结果
    通过查询V$LOGMNR_CONTENTS 获取LogMiner捕获的结果。当执行该视图查询时,LogMiner会按照顺序解析Redo和Archived Log,所有执行时间会有一点慢

      SELECT OPERATION, SQL_REDO, SQL_UNDO
      FROM V$LOGMNR_CONTENTS
      WHERE table_name='TEST_TAB';

    结果如下,可以看到我们刚刚INSERT的SQL

    OPERATION    				SQL_REDO								SQL_UNDO
    
    INSERT				insert into "SCOTT"."TEST_TAB"					delete from "SCOTT"."TEST_TAB"  			
    					("ID","PRICE","CURRENT_DATE") 					where "ID" = 'BEC21E77FD10311AE053020011ACC051'
    					values 											and "PRICE" = '22' 
    					('BEC21E77FD10311AE053020011ACC051','22',NULL);	and "CURRENT_DATE" IS NULL and ROWID = 'AAAWbEAAKAAAACHAAC';

实战

我们已经知道了迁移的思路和Logminer如何使用,现在可以动手搞一个demo了。

由于篇幅问题,这里我只讨论思路和我的一些想法。完整代码参考👉 https://github.com/TavenYin/database-cdc/tree/master/oracle-logminer

  1. 整体思路
    相关实现思路参考自Debezium

image.png

需要解释一下第四步为什么,发生Redo发生切换时,需要重启Logminer流程,两点原因

  1. Redo Log 切换后,会生成新的归档,我们需要Add新的归档日志
  2. 长时间开启LogMiner会话,会导致PGA使用量一直上升无法释放,End LogMiner 可以解决这个问题。所以代码逻辑中需要找一个时机去重启LogMiner,而Redo 切换这个时间点确实也挺合适的。

写到这的时候,我突然有了一个疑问

我们刚刚已经说过了,只有在查询 V$LOGMNR_CONTENTS 时,LogMiner才会去解析Redo Log,然后动态的生成视图。

参考上图。如果在第四步和第六步之间,程序检查到没有RedoLog切换准备继续执行。突然插入了大量数据导致Current Redo Log 被覆盖(注意必须是已经被覆盖而不是切换)了,此时是不是我们再查询 V$LOGMNR_CONTENTS 岂不是会丢失一部分数据?

由于start_logminer时会指定,起始和结束SCN,所以即使下次执行时添加了新的Archived Log,由于SCN已经被跨过去了,所以一定不会读这部分数据

在我做了测试之后发现,如果情况真的如此极端,确实会这样。

那么Debezium为什么没有考虑这个问题呢?

个人理解,在生产环境通常Redo Log 不会频繁切换,并且一定会有多个Redo Group。这么短时间内被覆盖的情况几乎不可能发生。


  1. 处理 V$LOGMNR_CONTENTS 结果集
    最开始在看Debezium源码的时候,没仔细注意这个地方,在自己动手搞一遍之后,发现这个地方的逻辑有点麻烦

    V$LOGMNR_CONTENTS 每一行可能是事务的提交、回滚,DDL,DML

    上面提到了一个 TransactionalBuffer是什么?

    我们在读取 V$LOGMNR_CONTENTS 会发生如下图的情况,因为每次只从startScn 读取到 当前Scn。而这中间可能发生的情况是,事务并没有Commit,但是我们拿到了其中一部分的DML,我们并不能确定这些DML是不是要Commit,所以需要将这些**“一半”**的事务暂时缓存在内存中

其实在调用 DBMS_LOGMNR.START_LOGMNR 时,可以指定一个选项 COMMITTED_DATA_ONLY,仅读出已提交的事务。这样就不必要这么麻烦的处理结果集了。但是为什么不选择 COMMITTED_DATA_ONLY?使用该策略会一直等待事务提交才会响应客户端,这很容易造成 "Out of Memory",所以这个策略不适合我们的程序。


  1. 迁移进程宕机处理

    数据迁移必定是一个漫长的过程,如果在执行中遇到什么意外,导致Java进程挂了,那么一切都要从头开始吗?

    如果我们能确定某个SCN之前的所有记录都已经被处理了,那么下次重启时从这个SCN开始处理即可

    两处可以确定之前SCN已经被全部处理的地方,代码如下:

    a. 当前TransactionalBuffer中没有数据,代表END_SCN之前所有的事务都已经被提交了

    b. 提交事务时,如果当前要提交的事务的Start_SCN 早于TransactionalBuffer中的所有事务

  2. SQL解析
    如果你想将Oracle的数据同步到其他数据库(包含NoSQL)的话,最好的办法是将SQL解析成结构化的对象,让下游服务去消费这些对象。

    Debezium的做法,我还没抽出空研究。目前的解决方法是用com.alibaba.druid.sql.SQLUtils,这个类可以将SQL解析成结构化对象,我们再对这些对象进行一些处理,即可让下游服务消费了。

DEMO

运行效果如下

DEMO运行效果

GitHub 👉 https://github.com/TavenYin/database-cdc/tree/master/oracle-logminer

参考

权重随机(Weight random)算法详解

应用场景

  • 客户端负载均衡,例如 Nacos 提供的客户端负载均衡就是使用了该算法
  • 游戏抽奖(普通道具的权重很高,稀有道具的权重很低)

本文目标

Java 实现权重随机算法

算法详解

比如我们现在有三台 Server,权重分别为1,3,2。现在想对三台 Server 做负载均衡

Server1     Server2     Server3

 weight      weight      weight
   1           3          2

权重比例

  1. 我们算出每台 Server 的权重比例,权重比例 = 自己的权重 / 总权重
server1     server2     server3

 weight      weight      weight
   1           3          2

 radio       radio       radio
  1/6         3/6         2/6
  1. 根据权重比例计算覆盖区域
      server1               server2                  server3
         ^                     ^                        ^
    |---------||---------|---------|---------||---------|---------||
    0         1/6                            4/6                  6/6
               ^                              ^                    ^
          0.16666667                      0.66666667              1.0
  1. 根据权重负载均衡

    如步骤2所示,每个 server 都有自己的范围,把每一个格子作为单位来看的话

    • server1 (0,1]
    • server2 (1,4]
    • server3 (4,6]

    使用随机数函数,取 (0,6] 之间的随机数,根据随机数落在哪个范围决定如何选择。例如随机数为 2,处于 (1,4] 范围,那么就选择 server2。

    思路大概就是这样,落实到代码上,用一个数组 [0.16666667, 0.66666667, 1] 来表示这三个 server 的覆盖范围,使用 ThreadLocalRandom 或者 Random 获取 [0,1) 内的随机数。然后使用二分查找法快速定位随机数处于哪个区间

Java 实现

代码基本上与 com.alibaba.nacos.client.naming.utils.Chooser 一致,在可读性方面做了下优化。

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

public class WeightRandom<T> {

    private final List<T> items = new ArrayList<>();
    private double[] weights;

    public WeightRandom(List<ItemWithWeight<T>> itemsWithWeight) {
        this.calWeights(itemsWithWeight);
    }

    /**
     * 计算权重,初始化或者重新定义权重时使用
     * 
     */
    public void calWeights(List<ItemWithWeight<T>> itemsWithWeight) {
        items.clear();

        // 计算权重总和
        double originWeightSum = 0;
        for (ItemWithWeight<T> itemWithWeight : itemsWithWeight) {
            double weight = itemWithWeight.getWeight();
            if (weight <= 0) {
                continue;
            }

            items.add(itemWithWeight.getItem());
            if (Double.isInfinite(weight)) {
                weight = 10000.0D;
            }
            if (Double.isNaN(weight)) {
                weight = 1.0D;
            }
            originWeightSum += weight;
        }

        // 计算每个item的实际权重比例
        double[] actualWeightRatios = new double[items.size()];
        int index = 0;
        for (ItemWithWeight<T> itemWithWeight : itemsWithWeight) {
            double weight = itemWithWeight.getWeight();
            if (weight <= 0) {
                continue;
            }
            actualWeightRatios[index++] = weight / originWeightSum;
        }

        // 计算每个item的权重范围
        // 权重范围起始位置
        weights = new double[items.size()];
        double weightRangeStartPos = 0;
        for (int i = 0; i < index; i++) {
            weights[i] = weightRangeStartPos + actualWeightRatios[i];
            weightRangeStartPos += actualWeightRatios[i];
        }
    }

    /**
     * 基于权重随机算法选择
     * 
     */
    public T choose() {
        double random = ThreadLocalRandom.current().nextDouble();
        int index = Arrays.binarySearch(weights, random);
        if (index < 0) {
            index = -index - 1;
        } else {
            return items.get(index);
        }

        if (index < weights.length && random < weights[index]) {
            return items.get(index);
        }

        // 通常不会走到这里,为了保证能得到正确的返回,这里随便返回一个
        return items.get(0);
    }

    public static class ItemWithWeight<T> {
        T item;
        double weight;

        public ItemWithWeight() {
        }

        public ItemWithWeight(T item, double weight) {
            this.item = item;
            this.weight = weight;
        }

        public T getItem() {
            return item;
        }

        public void setItem(T item) {
            this.item = item;
        }

        public double getWeight() {
            return weight;
        }

        public void setWeight(double weight) {
            this.weight = weight;
        }
    }

    public static void main(String[] args) {
        // for test
        int sampleCount = 1_000_000;

        ItemWithWeight<String> server1 = new ItemWithWeight<>("server1", 1.0);
        ItemWithWeight<String> server2 = new ItemWithWeight<>("server2", 3.0);
        ItemWithWeight<String> server3 = new ItemWithWeight<>("server3", 2.0);

        WeightRandom<String> weightRandom = new WeightRandom<>(Arrays.asList(server1, server2, server3));

        // 统计 (这里用 AtomicInteger 仅仅是因为写起来比较方便,这是一个单线程测试)
        Map<String, AtomicInteger> statistics = new HashMap<>();

        for (int i = 0; i < sampleCount; i++) {
            statistics
                    .computeIfAbsent(weightRandom.choose(), (k) -> new AtomicInteger())
                    .incrementAndGet();
        }

        statistics.forEach((k, v) -> {
            double hit = (double) v.get() / sampleCount;
            System.out.println(k + ", hit:" + hit);
        });
    }
}

这里重点说一下 Arrays.binarySearch(weights, random),这个 API 我之前没有用过导致我在读 Nacos 源码时,对这块的操作十分费解

来看一下 java API 文档对该方法返回值的解释

Returns:
index of the search key, if it is contained in the array; otherwise, (-(insertion point) - 1). The insertion point is defined as the point at which the key would be inserted into the array: the index of the first element greater than the key, or a.length if all elements in the array are less than the specified key. Note that this guarantees that the return value will be >= 0 if and only if the key is found.

解释下,首先该方法的作用是通过指定的 key 搜索数组。(前提条件是要保证数组的顺序是从小到大排序过的)

  • 如果数组中包含该 key,则返回对应的索引
  • 如果不包含该 key,则返回该 key 的 (-(insertion point)-1)

insertion point(插入点):该 key 应该在数组的哪个位置。举个例子,数组 [1,3,5],我的搜索 key 为 2,按照顺序排的话 2 应该在数组的 index = 1 的位置,所以此时 insertion point = 1。

(这里 jdk 将能查到 key 和 查不到 key 两种情况做了区分。为了将未找到的情况全部返回负数,所以做了 (-(insertion point)-1) 这样的操作)

看到这,我们就懂了,insertion point 就是我们需要的,现在我们用小学数学来推导一下如何计算 insertion point

// 小学数学推导一下 insertion point 如何计算
returnValue = (- (insertionPoint) - 1)
insertionPoint = (- (returnValue + 1) )

// 所以就有了上边代码中的
if (index < 0) {
	index = -index - 1;
}

参考

https://github.com/alibaba/nacos/blob/develop/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

深入理解 Dijkstra 算法实现原理

迪杰斯特拉(Dijkstra)算法是典型最短路径算法,用于计算一个节点到其他节点的最短路径。
它的主要特点是以起始点为中心向外层层扩展(广度优先搜索**),直到扩展到终点为止。

大概就是这样一个有权图,Dijkstra算法可以计算任意节点其他节点的最短路径

算法思路

  1. 指定一个节点,例如我们要计算 'A' 到其他节点的最短路径
  2. 引入两个集合(S、U),S集合包含已求出的最短路径的点(以及相应的最短长度),U集合包含未求出最短路径的点(以及A到该点的路径,注意 如上图所示,A->C由于没有直接相连 初始时为∞
  3. 初始化两个集合,S集合初始时 只有当前要计算的节点,A->A = 0
    U集合初始时为 A->B = 4, A->C = ∞, A->D = 2, A->E = ∞敲黑板!!!接下来要进行核心两步骤了
  4. 从U集合中找出路径最短的点,加入S集合,例如 A->D = 2
  5. 更新U集合路径,if ( 'D 到 B,C,E 的距离' + 'AD 距离' < 'A 到 B,C,E 的距离' ) 则更新U
  6. 循环执行 4、5 两步骤,直至遍历结束,得到A 到其他节点的最短路径

算法图解

1.选定A节点并初始化,如上述步骤3所示

2.执行上述 4、5两步骤,找出U集合中路径最短的节点D 加入S集合,并根据条件 if ( 'D 到 B,C,E 的距离' + 'AD 距离' < 'A 到 B,C,E 的距离' ) 来更新U集合

3.这时候 A->B, A->C 都为3,没关系。其实这时候他俩都是最短距离,如果从算法逻辑来讲的话,会先取到B点。而这个时候 if 条件变成了 if ( 'B 到 C,E 的距离' + 'AB 距离' < 'A 到 C,E 的距离' )如图所示这时候A->B距离 其实为 A->D->B

  1. 思路就是这样,往后就是大同小异了

  1. 算法结束

代码实现

public class Dijkstra {
	public static final int M = 10000; // 代表正无穷
	
	public static void main(String[] args) {
		// 二维数组每一行分别是 A、B、C、D、E 各点到其余点的距离, 
		// A -> A 距离为0, 常量M 为正无穷
		int[][] weight1 = {
		        {0,4,M,2,M}, 
		        {4,0,4,1,M}, 
		        {M,4,0,1,3}, 
		        {2,1,1,0,7},   
		        {M,M,3,7,0} 
		    };

		int start = 0;
		
		int[] shortPath = dijkstra(weight1, start);

		for (int i = 0; i < shortPath.length; i++)
			System.out.println("从" + start + "出发到" + i + "的最短距离为:" + shortPath[i]);
	}

	public static int[] dijkstra(int[][] weight, int start) {
		// 接受一个有向图的权重矩阵,和一个起点编号start(从0编号,顶点存在数组中)
		// 返回一个int[] 数组,表示从start到它的最短路径长度
		int n = weight.length; // 顶点个数
		int[] shortPath = new int[n]; // 保存start到其他各点的最短路径
		String[] path = new String[n]; // 保存start到其他各点最短路径的字符串表示
		for (int i = 0; i < n; i++)
			path[i] = new String(start + "-->" + i);
		int[] visited = new int[n]; // 标记当前该顶点的最短路径是否已经求出,1表示已求出

		// 初始化,第一个顶点已经求出
		shortPath[start] = 0;
		visited[start] = 1;

		for (int count = 1; count < n; count++) { // 要加入n-1个顶点
			int k = -1; // 选出一个距离初始顶点start最近的未标记顶点
			int dmin = Integer.MAX_VALUE;
			for (int i = 0; i < n; i++) {
				if (visited[i] == 0 && weight[start][i] < dmin) {
					dmin = weight[start][i];
					k = i;
				}
			}

			// 将新选出的顶点标记为已求出最短路径,且到start的最短路径就是dmin
			shortPath[k] = dmin;
			visited[k] = 1;

			// 以k为中间点,修正从start到未访问各点的距离
			for (int i = 0; i < n; i++) {
				//如果 '起始点到当前点距离' + '当前点到某点距离' < '起始点到某点距离', 则更新
				if (visited[i] == 0 && weight[start][k] + weight[k][i] < weight[start][i]) {
					weight[start][i] = weight[start][k] + weight[k][i];
					path[i] = path[k] + "-->" + i;
				}
			}
		}
		for (int i = 0; i < n; i++) {
			
			System.out.println("从" + start + "出发到" + i + "的最短路径为:" + path[i]);
		}
		System.out.println("=====================================");
		return shortPath;
	}
	
}

分布式事务 Seata AT模式原理与实战

Seata 是阿里开源的基于Java的分布式事务解决方案

AT,XA,TCC,Saga

Seata 提供四种模式解决分布式事务场景,AT,XA,TCC,Saga。简单叨咕叨咕我对这几种模式的理解

AT

image.png

这是Seata的一大特色,AT对业务代码完全无侵入性,使用非常简单,改造成本低。我们只需要关注自己的业务SQL,Seata会通过分析我们业务SQL,反向生成回滚数据

AT 包含两个阶段

  • 一阶段,所有参与事务的分支,本地事务Commit 业务数据和回滚日志(undoLog)
  • 二阶段,事务协调者根据所有分支的情况,决定本次全局事务是Commit 还是 Rollback(二阶段是完全异步)

XA

也是我们常说的二阶段提交,XA要求数据库本身提供对规范和协议的支持。XA用起来的话,也是对业务代码无侵入性的。

上述其他三种模式,都是属于补偿型,无法保证全局一致性。啥意思呢,例如刚刚说的AT模式,我们是可能读到这一次分布式事务的中间状态,而XA模式不会。

补偿型 事务处理机制构建在 事务资源(数据库) 之上(要么在中间件层面,要么在应用层面),事务资源 本身对分布式事务是无感知的,这也就导致了补偿型事务无法做到真正的 全局一致性 。
比如,一条库存记录,处在 补偿型 事务处理过程中,由 100 扣减为 50。此时,仓库管理员连接数据库,查询统计库存,就看到当前的 50。之后,事务因为意外回滚,库存会被补偿回滚为 100。显然,仓库管理员查询统计到的 50 就是 脏 数据。
如果是XA的话,中间态数据库存 50 由数据库本身保证,不会被仓库管理员读到(当然隔离级别需要 读已提交 以上)

但是全局一致性带来的结果就是数据的锁定(AT模式也是存在全局锁的,但是隔离级别无法保证,后边我们会详细说),例如全局事务中有一条update语句,其他事务想要更新同一条数据的话,只能等待全局事务结束

传统XA模式是存在一些问题的,Seata也是做了相关的优化,更多关于Seata XA的内容,传送门👉http://seata.io/zh-cn/blog/seata-xa-introduce.html

TCC

image.png

TCC 模式同样包含两个阶段

  • Try 阶段 :所有参与分布式事务的分支,对业务资源进行检查和预留
  • 二阶段 Confirm:所有分支的Try全部成功后,执行业务提交
  • 二阶段 Cancel:取消Try阶段预留的业务资源

对比AT或者XA模式来说,TCC模式需要我们自己抽象并实现Try,Confirm,Cancel三个接口,编码量会大一些,但是由于事务的每一个阶段都由开发人员自行实现。而且相较于AT模式来说,减少了SQL解析的过程,也没有全局锁的限制,所以TCC模式的性能是优于AT 、XA模式。
PS:果然简单和高效难以两全的

Saga

image.png

Saga 是长事务解决方案,每个参与者需要实现事务的正向操作和补偿操作。当参与者正向操作执行失败时,回滚本地事务的同时,会调用上一阶段的补偿操作,在业务失败时最终会使事务回到初始状态

Saga与TCC类似,同样没有全局锁。由于相比缺少锁定资源这一步,在某些适合的场景,Saga要比TCC实现起来更简单。
由于Saga和TCC都需要我们手动编码实现,所以在开发时我们需要参考一些设计上的规范,由于不是本文重点,这里就不多说了,可以参考分布式事务 Seata 及其三种模式详解

在我们了解完四种分布式事务的原理之后,我们回到本文重点AT模式

AT 如何使用

模拟需求:以下订单为例,在分布式的电商场景中,订单服务和库存服务可能是两个数据库

我们先来看看AT模式下的代码是什么样的,这里忽略了Seata的相关配置,只看业务部分

image.png

在需要开启分布式事务的方法上标记@GlobalTransactional,然后执行分别执行扣减库存和创建订单操作,事务的参与者可以是本地的数据源,或者RPC的远程调用(远程调用的话需要携带全局事务ID,也就是上图的xid)

AT 一阶段

之前说过AT模式分为两个阶段,第一阶段包括提交业务数据和回滚日志(undoLog),第一阶段具体流程如下图

image.png

GlobalTransactional 切面

标记@GlobalTransactional的方法通过AOP实现了,开启全局事务和提交全局事务两个操作,与Spring 事务机制类似,当 GlobalTransactionalInterceptor 在事务执行过程中捕获到Throwable时,会发起全局事务回滚

0.1 步骤中会生成一个全局事务ID

0.2 所有事务参与者执行结束后,一阶段事务提交

undoLog

我们先来看看 Seata undoLog 的结构

// 省略了相关方法
public class SQLUndoLog {
	// insert, update ...
    private SQLType sqlType;

    private String tableName;

    private TableRecords beforeImage;

    private TableRecords afterImage;
}

Seata 在执行业务SQL前后,会生成beforeImage和afterImage,在需要回滚时,根据SQLType,决定具体的回滚策略,例如SQLType=update时,将数据回滚到beforeImage的状态,如果SQLType=insert,则根据afterImage删除数据

如2.4所示,每条业务SQL,执行成功后,会为这条SQL生成LockKey,格式为tableName:PrimaryKey

注册分支事务

在3.1步骤注册分支事务时,client会把所有的LockKey 拼到一起作为全局锁发送给Seata-server。如果注册成功,写入undoLog,并提交本地事务,一阶段结束,等待二阶段反馈

如果当前有其他分支事务已经持有了相同的锁(即其他事务也在处理相同表的同一行),则client 注册事务分支失败。client会根据客户端定义的重发时间和重发次数进行不断的尝试,如果重试结束仍然没有获得锁,则一阶段失败,本地事务回滚。如果该全局事务存在已经注册成功分支事务,Seata-server 进行二阶段回滚

全局锁会在分支事务二阶段结束后释放

Seata 全局锁的设计是为了什么?
以扣减库存场景为例,TX1 完成库存扣减的一阶段,库存从100扣减为99,正在等待二阶段的通知。TX2也要扣减同一商品的库存,如果没有全局锁的限制,TX2库存从99扣减为98,这时如果TX1接收到回滚通知,进行回滚把库存从98回滚到100。因为没有全局锁,造成了脏写

AT 二阶段

二阶段是完全异步化的并且完全由Seata控制,Seata根据所有事务参与者的提交情况决定二阶段如何处理

  • 如果所有事务提交成功,则二阶段的任务就是删除一阶段生成 的undoLog,并释放全局锁
  • 如果部分事务参与者提交失败,则需要根据undoLog对已经注册的事务分支进行回滚,并释放全局锁

对Seata提出的疑问

至此我们已经初步了解了Seata的AT模式是如何实现的了

如果你也和我一样,仔细思考了上述过程,可能会提出一些问题,这边我列举一下我在学习Seata时,遇到的问题,以及我得出的结论

问题1. Seata如何做到无侵入的分析业务SQL生成undoLog,注册事务分支等操作?

Seata 代理了DataSource,我们可以通过在代码注入一个DataSource来验证我的说法,目前的DataSource 是 io.seata.rm.datasource.DataSourceProxy

image.png

所有的Java持久化框架,最终在操作数据库时都会通过DataSource接口获取Connection,通过Connection 实现对数据库的增删改查,事务控制。

image.png

Seata 通过代理Connection的方式,做到了无侵入的生成undoLog,注册事务分支,具体源码可以查看io.seata.rm.datasource.ConnectionProxy

问题2. ConnectionProxy 如何判断当前事务是全局事务,还是本地事务?

通过当前线程是否绑定了全局事务id,在进行全局事务之前,需要调用RootContext.bind(xid);

问题3. 全局事务并发更新

还是以下订单扣减库存的场景为例,如果TX1和TX2同时扣减product_id为1的库存,这时Seata会不会生成相同的beforeImage?

举个例子,TX1读库存为100,TX1扣减库存1,此时BeforeImage为100
紧接着 如果TX2读库存也为100,那么就有问题了,不管TX2扣减多少库存,如果TX1回滚那么相当于覆盖了TX2扣减的库存,出现了脏写

Seata是如何解决这个问题的?

源码位置:io.seata.rm.datasource.exec.AbstractDMLBaseExecutor::executeAutoCommitFalse
image.png

可以看到这里的逻辑和我上面画的图一致,证明我没有瞎说 😄

我们来看一下beforeImage(),这是一个抽象方法,看一下他的子类UpdateExecutor是如何实现的

image.png

通过Debug,可以看出Seata这边也是确实考虑了这个问题,直接简单而有效的解决了这个问题

回到我们的例子,由于SELECT FOR UPDATE的存在,TX2如果也想读同一条数据的话,只能等到TX1 提交事务后,才能读到。所以问题解决

问题4. 全局事务外的更新

我们现在可以确认在Seata的保证下,全局事务,不会造成数据的脏写,但是全局事务外会!

什么意思呢?

还以库存为例

  • 用户正在抢购,用户A完成了1阶段的库存扣减,这个时候库存为99。
  • 此时库存管理员上线了,他查了一下库存为99。嗯...太少了,我加100个,库存管理员把库存更新为200。
  • 而此时seata给用户A生成beforeImage为100,如果此时用户A的全局事务失败了,发生了回滚,再次将库存更新为100... 再次出现脏写

Seata 针对这个问题,提供了@GlobalLock注解,标记该注解时,会像全局事务一样进行SQL分析,竞争全局锁,就不会出现上述问题了

关于这个问题可以参考Seata的FAQ文档 http://seata.io/zh-cn/docs/overview/faq.html

问题5. @GlobalTransactional 和 @transactional 同时使用会怎么样

我们上文中已经说过了 @GlobalTransactional 的作用了,他是负责开启全局事务/提交事务1阶段,说白了@GlobalTransactional 只和Seata-server 交互,而 @transactional 管理的是本地数据库的事务,所以二者不发生冲突。

但是需要注意 @GlobalTransactional AOP 覆盖范围一定要大于 @transactional

问题6. 如果其中某一个事务分支超时未提交,会发生什么

这个我并没有看源码,而是通过跑demo,验证的

例如现在有A,B两个事务分支

  • A 正常提交,并向Seata注册分支成功
  • B 2分钟后提交事务,并向Seata发起注册

Seata的全局事务超时时间,默认是1分钟,Seata-server 在检测到有超时的全局事务时,会向所有已提交的分支,发起回滚。而超时提交的事务,向Seata-server发起分支注册时,响应结果为事务已超时,或者事务不存在,也会回滚本地事务

问题7. Seata-client 如何接收Seata-server发起的通知

Seata-client 包含了Netty服务,在启动时Netty会监听端口,并向Seata-server 发起注册。server中存储了client 的调用地址。

总结

我们学习了Seata的AT模式是如何工作的,可以看出Seata模式在开发上是非常简单的,但是Seata的背后为了维持分布式事务的数据一致性,做了大量的工作,AT模式非常适合现有的业务模型直接迁移。

但是他的缺点也很明显,性能并不是那么的优秀。例如我们刚刚看到的全局锁的问题,为了数据不会发生脏写,Seata牺牲了业务的并发能力。在非常要求性能的场景,可能还是需要考虑TCC,SAGA,可靠消息等方案

在使用Seata开发前,建议大家先去阅读一下FAQ文档,避免踩坑 https://seata.io/zh-cn/docs/overview/faq.html

DEMO

https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-seata

参考

Seata是什么 - http://seata.io/zh-cn/docs/overview/what-is-seata.html
Seata常见问题 - http://seata.io/zh-cn/docs/overview/faq.html
分布式事务中间件 Seata 的设计原理 - http://seata.io/zh-cn/blog/seata-at-mode-design.html
分布式事务 Seata 及其三种模式详解 - http://seata.io/zh-cn/blog/seata-at-tcc-saga.html
分布式事务如何实现?深入解读 Seata 的 XA 模式 - http://seata.io/zh-cn/blog/seata-xa-introduce.html

SpringCloud NamedContextFactory 原理与使用

最近在阅读 Ribbon 的源码,发现 SpringCloud 中 NamedContextFactory 这个类可以实现子容器。Ribbon 为每个 ServiceName 都拥有自己的 Spring Context 和 Bean 实例(不同服务之间的 LoadBalancer 和其依赖的 Bean 都是完全隔离的)。

这么做有什么好处呢

  • 子容器之间数据隔离。不同的 LoadBalancer 只管理自己的服务实例,明确自己的职责。
  • 子容器之间配置隔离。不同的 LoadBalancer 可以使用不同的配置。例如报表服务需要统计和查询大量数据,响应时间可能很慢。而会员服务逻辑相对简单,所以两个服务的响应超时时间可能要求不同。
  • 子容器之间 Bean 隔离。可以让子容器之间注册不同的 Bean。例如订单服务的 LoadBalancer 底层通过 Nacos 获取实例,会员服务的 LoadBalancer 底层通过 Eureka 获取实例。也可以让不同的 LoadBalancer 采用不同的算法

NamedContextFactory 简介

NamedContextFactory 可以创建一个子容器(或者说子上下文),每个子容器可以通过 Specification 定义 Bean。 移植自 spring-cloud-netflix FeignClientFactory 和 SpringClientFactory

上面是对于 NamedContextFactory 类注释的翻译。

接下来,我会使用 NamedContextFactory 实现一个 demo,便于各位理解。

我实在没想到什么特别好的场景。所以我仿照 Ribbon 实现一个 HttpClient,每个子容器的 HttpClient 生效不同的配置,创建不同的 Bean

子容器的定制

NamedContextFactory

下面来进入正题

子容器需要通过 NamedContextFactory 来创建。首先我们先继承一下该类实现一个自己的 Factory

@Component
public class NamedHttpClientFactory extends NamedContextFactory<NamedHttpClientSpec> {

    public NamedHttpClientFactory() {
        super(NamedHttpClientConfiguration.class, "namedHttpClient", "http.client.name");
    }

}

解释一下上述 super 构造方法三个参数的含义

  • 第一个参数,默认配置类。当使用 NamedHttpClientFactory 创建子容器时,NamedHttpClientConfiguration 一定会被加载
  • 第二个参数,我目前没发现有什么用,真的就是随便定义一个 name
  • 第三个参数,很重要。
    创建子容器时通常会提供子容器的容器 name。子容器中的 Environment 会被写入一条配置,http.client.name=容器name(也就是说,子容器可以通过读取配置 http.client.name 来获取容器名)

看到这可能还是很迷惑,这实际有什么用呢?以 Ribbon 为例,容器名就是 ServiceName,Ribbon 可以在配置文件中定制每个子容器的配置或者 Bean,配置如下

# 订单服务的超时时间为 3000
orderService.ribbon.ReadTimeout = 3000
# 指定 orderService 容器的 ServerList
orderService.ribbon.NIWSServerListClassName = com.netflix.loadbalancer.ConfigurationBasedServerList

当每个子容器都知道自己的容器名时,就可以找到自己对应的配置了

接下来看下,我的默认配置类都干了什么

public class NamedHttpClientConfiguration {

    @Value("${http.client.name}")
    private String httpClientName; // 1.

    @Bean
    @ConditionalOnMissingBean
    public ClientConfig clientConfig(Environment env) {
        return new ClientConfig(httpClientName, env); // 2.
    }

    @Bean
    @ConditionalOnMissingBean
    public NamedHttpClient namedHttpClient(ClientConfig clientConfig) {
        return new NamedHttpClient(httpClientName, clientConfig); // 3.
    }

}
  1. @Value("${http.client.name}"),结合上边讲的,这样可以读到当前子容器的 name
  2. ClientConfig,负责根据容器 name,加载属于自己的配置。代码比较简单就不贴出来了
  3. NamedHttpClient,简单的包装一下 HttpClient,会根据 ClientConfig 对 HttpClient 进行配置

NamedContextFactory.Specification

上面讲的是可以手动编程来定制子容器的 Bean,NamedContextFactory 也提供了定制子容器的接口 NamedContextFactory.Specification。

public class NamedHttpClientSpec implements NamedContextFactory.Specification {

    private final String name;
    private final Class<?>[] configuration;

    public NamedHttpClientSpec(String name, Class<?>[] configuration) {
        this.name = name;
        this.configuration = configuration;
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public Class<?>[] getConfiguration() {
        return configuration;
    }
}

我们简单的实现一下该接口,然后通过 NamedHttpClientFactory#setConfigurations,将 Specification 赋值给 NamedHttpClientFactory。

创建子容器时,如果容器的 name 匹配了 Specification 的 name,则会加载 Specification 对应 Configuration 类。

题外话: @RibbonClient 也是通过 NamedContextFactory.Specification 实现的

Run 一下

讲到这,也许你还是没懂,没关系,建议 Debug 一下这个单元测试

public class NamedContextFactoryTest {

    private void initEnv(AnnotationConfigApplicationContext parent) {
        Map<String, Object> map = new HashMap<>();
        map.put("baidu.socketTimeout", 123);
        map.put("google.socketTimeout", 456);
        parent.getEnvironment()
                .getPropertySources()
                .addFirst(new MapPropertySource("test", map));
    }

    @Test
    public void test() {
        // 创建 parent context
        AnnotationConfigApplicationContext parent = new AnnotationConfigApplicationContext();
        // parent context 的 Bean,可以被子容器继承
        parent.register(ParentConfiguration.class);
        initEnv(parent);
        parent.refresh();

        // 容器 name = baidu 的 context 中会注册 TestConfiguration
        NamedHttpClientSpec spec = new NamedHttpClientSpec("baidu", new Class[]{TestConfiguration.class});

        NamedHttpClientFactory namedHttpClientFactory = new NamedHttpClientFactory();
        // SpringBoot 中无需手动设置,会自动注入 parent
        namedHttpClientFactory.setApplicationContext(parent);
        namedHttpClientFactory.setConfigurations(List.of(spec));

        // 准备工作完成,现在开始通过 NamedContextFactory get Bean
        ParentBean baiduParentBean = namedHttpClientFactory.getInstance("baidu", ParentBean.class);
        NamedHttpClient baidu = namedHttpClientFactory.getInstance("baidu", NamedHttpClient.class);
        TestBean baiduTestBean = namedHttpClientFactory.getInstance("baidu", TestBean.class);

        Assert.assertNotNull(baiduParentBean);
        Assert.assertEquals("baidu", baidu.getServiceName());
        Assert.assertEquals(123, baidu.getRequestConfig().getSocketTimeout());
        Assert.assertNotNull(baiduTestBean);

        ParentBean googleParentBean = namedHttpClientFactory.getInstance("google", ParentBean.class);
        NamedHttpClient google = namedHttpClientFactory.getInstance("google", NamedHttpClient.class);
        TestBean googleTestBean = namedHttpClientFactory.getInstance("google", TestBean.class);

        Assert.assertNotNull(googleParentBean);
        Assert.assertEquals("google", google.getServiceName());
        Assert.assertEquals(456, google.getRequestConfig().getSocketTimeout());
        Assert.assertNull(googleTestBean);
    }

    static class ParentConfiguration {
        @Bean
        public ParentBean parentBean() {
            return new ParentBean();
        }
    }

    static class TestConfiguration {
        @Bean
        public TestBean testBean() {
            return new TestBean();
        }
    }


    static class ParentBean {

    }

    static class TestBean {

    }

}

UT 完整运行参考👉
https://github.com/TavenYin/taven-springcloud-learning/blob/master/springcloud-alibaba-nacos/nacos-discovery/src/test/java/com/github/taven/NamedContextFactoryTest.java

Spring 项目中使用子容器参考 👉
https://github.com/TavenYin/taven-springcloud-learning/tree/master/springcloud-alibaba-nacos/nacos-discovery/src/main/java/com/github/taven/namedcontext

如果某个 Configuration 类,只需要子容器加载,那么你可以不添加 @configuration,这样就不会被 Spring 容器(父容器)加载了。

NamedContextFactory 源码分析

使用该类的入口通常是 getInstance 方法

	public <T> T getInstance(String name, Class<T> type) {
		// 1. 获取子容器
		AnnotationConfigApplicationContext context = getContext(name);
		try {  
            // 2. 从子容器中获取 Bean
			return context.getBean(type);
		}
		catch (NoSuchBeanDefinitionException e) {
			// ignore
		}
		return null;
	}

这个方法内部逻辑很简单

  1. 获取子容器(如果不存在的话,会创建)
  2. 从(子)容器中获取 Bean,这步就可理解为和常规 Spring 操作一样了,从容器中获取 Bean(子容器只是概念上的一个东西,实际 API 都是一样的)

所以下面我们重点看下 getContext 方法做了什么

public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>
		implements DisposableBean, ApplicationContextAware {

	private Map<String, C> configurations = new ConcurrentHashMap<>(); // 1.
	
	// 省略其他成员变量
	
	protected AnnotationConfigApplicationContext createContext(String name) {
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
		if (this.configurations.containsKey(name)) { // 2.
			for (Class<?> configuration : this.configurations.get(name)
					.getConfiguration()) {
				context.register(configuration);
			}
		}
		for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
			if (entry.getKey().startsWith("default.")) { // 3.
				for (Class<?> configuration : entry.getValue().getConfiguration()) {
					context.register(configuration);
				}
			}
		}
		context.register(PropertyPlaceholderAutoConfiguration.class,
				this.defaultConfigType); // 4. 
		context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
				this.propertySourceName,
				Collections.<String, Object>singletonMap(this.propertyName, name))); // 5.
		if (this.parent != null) {
			// Uses Environment from parent as well as beans
			context.setParent(this.parent);
			// jdk11 issue
			// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
			context.setClassLoader(this.parent.getClassLoader());
		}
		context.setDisplayName(generateDisplayName(name));
		context.refresh();
		return context;
	}

	// 省略其他方法
}	
  1. 该 Map 的 value 为 Specification 的实现(用于提供 Configuration),name 为容器名,用于定制每个子容器的配置
  2. 如果 name 匹配,则加载 Configuration
  3. 如果 Specification 的 name 以 default. 开头,则每个子容器创建时,都会加载这些配置
  4. 子容器中注册 PropertyPlaceholderAutoConfiguration,以及 defaultConfigType(PropertyPlaceholderAutoConfiguration 用于解析 Bean 或者 @value 中的占位符。defaultConfigType 是上文中提到过的,构造方法中提供的子容器默认配置类)
  5. 也是我们上文中说过的,子容器中写入一条配置。以上文为例,会在容器中写入一条 http.client.name=容器name
  6. 剩下的就是,设置父容器,以及初始化操作

最后

如果觉得我的文章对你有帮助,动动小手点下关注或者喜欢,你的支持是对我最大的帮助

电商技术:库存设计

前言

最近在解决一套老电商系统的库存"超卖"问题。一直以为超卖问题,最难解决的是库存扣减,实则不然,我们的系统在解决了库存扣减问题之后,还会一直有“超卖”现象?这一切的背后到底是道德的沦丧,还是人性的扭曲,欢迎收看本期走近科学

本文带你解决以下电商场景问题

  1. 保证库存线程安全的扣减
  2. 防止库存的多次扣减、回滚
  3. 超时未支付被取消的订单(取消会回滚库存), 如果收到了支付回调怎么办

如何线程安全的扣减库存

先来说说库存扣减的问题,这是我们原来老系统的逻辑,注意!这里是错误的示例

// 以下是伪代码,错误的示例
// 查询出Goods对象
$goods = selectGoodsById($id);
if ($goods->num - $order_num > 0) {
	// 计算出扣减后的库存
	$goods->num = $goods->num - $order_num;
	// 保存
	save($goods);
}

上述代码犯了大忌,并发情况会导致多个线程读到相同的库存数,然后扣减,然后保存到DB,下面我们来说下正确的姿势

正确的做法

利用MySQL update 会持有当前记录锁的特点,保证线程安全的扣减

SQL 示例:

update kucun set num = num - ? where id = ? and num - ? >= 0

我们的这条记录根据主键更新,当事务A update 这条记录时,会持有当前记录的锁,当事务A未提交时,其他想要更新这条记录的事务只能等待锁释放

关于MySQL update 锁的细节,本文不讨论,可以参考MySQL文档

https://dev.mysql.com/doc/refman/8.0/en/innodb-locks-set.html
https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
https://dev.mysql.com/doc/refman/8.0/en/innodb-locking.html

虽然MySQL可以保证数据的准确性,但是大并发量场景下,大量的锁竞争,导致库存的扣减可能成为系统性能的瓶颈

使用 Redis 库存扣减

使用Redis的优势很多,单线程的文件事件处理器保证了并发下可以线程的安全扣减、回滚库存, 以及Redis高性能。

虽然Redis解决了线程安全和性能的问题,但是Redis并不能做到像MySQL那样一条SQL命令完成库存扣减,我们需要先读出已有库存,再和当前下单库存做一个判断是否可以库存扣减。所以最佳的实现方案是通过Redis 执行lua脚本,保证整个逻辑处理期间,不会有其他客户端插进来

    /**
     *
     * 扣减库存Lua脚本
     * 库存(stock)-1:表示不限库存
     * 库存(stock)0:表示没有库存
     * 库存(stock)大于0:表示剩余库存
     *
     * @params 库存key
     * @return
     *      -3:库存未初始化
     *      -2:库存不足
     *      -1:不限库存
     *      大于等于0:剩余库存(扣减之后剩余的库存)
     */
    const SUB_STOCK_LUA = "
        if (redis.call('exists', KEYS[1]) == 1) then
            local stock = tonumber(redis.call('get', KEYS[1]));
            local num = tonumber(ARGV[1]);   
            if (stock == -1) then
                return -1;
            end;
            
            if (stock >= num) then
                return redis.call('incrby', KEYS[1], 0 - num);
            end;
            
            return -2;
        end;
        
        return -3;
    ";

注意:
当对一个订单中的 good_list 扣减库存的时候,需要注意,当某一个商品库存扣减失败时,之前的扣减的商品库存需要回滚。这会涉及到对redis的多次操作,你可以把整体逻辑写到一个lua脚本中

使用Redis 做库存扣减会有一个问题(伪代码如下),Redis数据和MySQL数据并不能保证强一致性,因为Redis的数据相当于直接写进去了,如果在需要回滚的时候,Redis不可用了导致数据无法回滚,最终会造成MySQL没有写入订单数据,Redis却扣减了库存

try {
    $db->beginTransaction();
	
    $db->saveOrder();
	$redis->reduceStock();

	$db->commit();
		
} catch (Exception $e) {
    $db->rollback();
    $redis->rollbackStock();
}

这种情况并没有什么好的解决办法,这是一个几率非常小的故障,首先我们肯定要尽可能的保证Redis的高可用性,其次在发生这种情况后,我们要想办法恢复Redis中的数据,例如我们可以在整个逻辑之后,选择异步的方式(例如MQ)向MySQL中同步库存,当发生故障后,以MySQL数据为准恢复数据

所以Redis是一把双刃剑,提升性能的同时,也带来了问题

AliSQL

这是后来我在网上看到的方案,AliSQL 是阿里自研 MySQL 分支,AliSQL 针对并发修改同一记录的情况,使用数据库层面的缓冲队列,避免大量争锁的代价。感兴趣的同学可以试下(阿里云MySQL 8.0 集成了这一功能),如果AliSQL解决了性能问题的话,那么这个方案相比Redis要更好

关于库存多次扣减的问题

当订单的提交和库存的扣减同步进行的时候,不需要考虑这个问题。

举例:订单系统生成订单之后,通过MQ通知库存系统,库存系统异步扣减库存,这个时候库存系统可能会多次消费,这个时候就需要考虑这个问题了。

或者我们上面说的通过MQ同步MySQL库存也需要考虑可能发生多次扣减

解决方案如图,通过订单做为唯一索引保证流水记录的唯一性,从而保证只能有一次成功的扣减

image.png

库存回滚问题

多数博客对于超卖的讲解只在于库存的扣减,但是库存扣减安全了,真的就可以保证不超卖吗?我们的系统在解决了库存扣减问题后,还是出现成交订单 > 库存的问题,为此我也是绞尽脑汁,抓破了头

在对下单进行压力测试之后,我坚信下单不会出现超卖的问题,后来我怀疑问题出在了库存回滚,如果一个订单回滚了两次库存(取消超时未支付订单的线程和用户线程同时取消一个订单),同样也会出现超卖的现象。

解决方法:
和防止多次扣减一样,采用写入订单回滚流水的方式,个人认为这种方法比较加锁要好,数据有迹可循

超时未支付被取消的订单收到了支付回调

在解决了库存回滚问题之后,超卖问题还没有解决,最后通过日志定位到了这个问题。

问题描述:用户在系统即将自动取消订单的前一瞬间完成了支付,系统取消了该订单并回滚了库存,同时系统收到了该订单的支付回调,该订单的状态更改为已支付,因为不该出现的库存回滚导致了“超卖”

下面说下我们的解决方案,以微信支付为例

我们的系统在提交订单之后,会调用微信的统一下单接口,这时候微信收到了我们的商户订单号(微信已经生成订单),用户选择不支付。超时自动取消逻辑处理之前,先调用微信的关闭订单接口,如果关闭成功,则这个时候用户后续无法对该订单发起支付。如果返回订单已支付,则无需处理该订单,该订单会收到微信支付的回调

参考

https://www.jianshu.com/p/76bc0e963172
https://www.zhihu.com/question/268937734
https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_3

如果觉得文章有帮助,欢迎点赞、转发、关注我的公众号,你的支持就是我最大的动力

浅析 Spring 事务传播实现原理

本文目标

  • 理解Spring事务管理核心接口
  • 理解Spring事务管理的核心逻辑
  • 理解事务的传播类型及其实现原理

版本

SpringBoot 2.3.3.RELEASE

什么是事务的传播?

Spring 除了封装了事务控制之外,还抽象出了 事务的传播 这个概念,事务的传播并不是关系型数据库所定义的,而是Spring在封装事务时做的增强扩展,可以通过@Transactional 指定事务的传播,具体类型如下

事务传播行为类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。Spring的默认事务传播类型
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起(暂停)。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

举个栗子

以嵌套事务为例

@Service
public class DemoServiceImpl implements DemoService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private DemoServiceImpl self;

    @Transactional
    @Override
    public void insertDB() {
        String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
        jdbcTemplate.update(sql, uuid(), "taven");

        try {
            // 内嵌事务将会回滚,而外部事务不会受到影响
            self.nested();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Transactional(propagation = Propagation.NESTED)
    @Override
    public void nested() {
        String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
        jdbcTemplate.update(sql, uuid(), "nested");

        throw new RuntimeException("rollback nested");

    }

    private String uuid() {
        return UUID.randomUUID().toString();
    }
}

上述代码中,nested()方法标记了事务传播类型为嵌套,如果nested()中抛出异常仅会回滚nested()方法中的sql,不会影响到insertDB()方法中已经执行的sql

注意:service 调用内部方法时,如果直接使用this调用,事务不会生效。因此使用this调用相当于跳过了外部的代理类,所以AOP不会生效,无法使用事务

思考

众所周知,Spring 事务是通过AOP实现的,如果是我们自己写一个AOP控制事务,该怎么做呢?

// 伪代码
public Object invokeWithinTransaction() {
	// 开启事务
	connection.beginTransaction();
	try {
		// 反射执行方法
		Object result = invoke();
		// 提交事务
		connection.commit();
		return result;
	} catch(Exception e) {
		// 发生异常时回滚
		connection.rollback();
		throw e;
	} 
	
}

在这个基础上,我们来思考一下如果是我们自己做的话,事务的传播该如何实现

PROPAGATION_REQUIRED为例,这个似乎很简单,我们判断一下当前是否有事务(可以考虑使用ThreadLocal存储已存在的事务对象),如果有事务,那么就不开启新的事务。反之,没有事务,我们就创建新的事务

如果事务是由当前切面开启的,则提交/回滚事务,反之不做处理

那么事务传播中描述的挂起(暂停)当前事务,和内嵌事务是如何实现的?

源码入手

要阅读事务传播相关的源码,我们先来了解下Spring 事务管理的核心接口与类

  1. TransactionDefinition
    该接口定义了事务的所有属性(隔离级别,传播类型,超时时间等等),我们日常开发中经常使用的 @Transactional 其实最终会被转化为 TransactionDefinition

  2. TransactionStatus
    事务的状态,以最常用的实现 DefaultTransactionStatus 为例,该类存储了当前的事务对象,savepoint,当前挂起的事务,是否完成,是否仅回滚等等

  3. TransactionManager
    这是一个空接口,直接继承他的 interface 有 PlatformTransactionManager(我们平时用的就是这个,默认的实现类DataSourceTransactionManager)以及
    ReactiveTransactionManager(响应式事务管理器,由于不是本文重点,我们不多说)

从上述两个接口来看,TransactionManager 的主要作用

  • 通过TransactionDefinition开启一个事务,返回TransactionStatus
  • 通过TransactionStatus 提交、回滚事务(实际开启事务的Connection通常存储在TransactionStatus中)
public interface PlatformTransactionManager extends TransactionManager {
	
	TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException;

	
	void commit(TransactionStatus status) throws TransactionException;

	
	void rollback(TransactionStatus status) throws TransactionException;

}
  1. TransactionInterceptor
    事务拦截器,事务AOP的核心类(支持响应式事务,编程式事务,以及我们常用的标准事务),由于篇幅原因,本文只讨论标准事务的相关实现

下面我们从事务逻辑的入口 TransactionInterceptor 入手,来看下Spring事务管理的核心逻辑以及事务传播的实现

TransactionInterceptor

TransactionInterceptor 实现了MethodInvocation(这是实现AOP的一种方式),其核心逻辑在父类TransactionAspectSupport 中,方法位置:TransactionInterceptor::invokeWithinTransaction

	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {
		// If the transaction attribute is null, the method is non-transactional.
		TransactionAttributeSource tas = getTransactionAttributeSource();
		// 当前事务的属性 TransactionAttribute extends TransactionDefinition
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        // 事务属性中可以定义当前使用哪个事务管理器
        // 如果没有定义就去Spring上下文找到一个可用的 TransactionManager
		final TransactionManager tm = determineTransactionManager(txAttr);

		// 省略了响应式事务的处理 ...
        
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                // 如果有下一个拦截器则执行,最终会执行到目标方法,也就是我们的业务代码
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
                // target invocation exception
                // 当捕获到异常时完成当前事务 (提交或者回滚)
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
                cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}
            // 根据事务的状态提交或者回滚
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}

		// 省略了编程式事务的处理 ...
	}

这里代码很多,根据注释的位置,我们可以把核心逻辑梳理出来

  1. 获取当前事务属性,事务管理器(以注解事务为例,这些都可以通过@Transactional来定义)
  2. createTransactionIfNecessary,判断是否有必要创建事务
  3. invocation.proceedWithInvocation 执行拦截器链,最终会执行到目标方法
  4. completeTransactionAfterThrowing当抛出异常后,完成这个事务,提交或者回滚,并抛出这个异常
  5. commitTransactionAfterReturning 从方法命名来看,这个方法会提交事务。
    但是深入源码中会发现,该方法中也包含回滚逻辑,具体行为会根据当前TransactionStatus的一些状态来决定(也就是说,我们也可以通过设置当前TransactionStatus,来控制事务回滚,并不一定只能通过抛出异常),详见AbstractPlatformTransactionManager::commit

我们继续,来看看createTransactionIfNecessary做了什么

TransactionAspectSupport::createTransactionIfNecessary

	protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

		// If no name specified, apply method identification as transaction name.
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {
					return joinpointIdentification;
				}
			};
		}

		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
				// 通过事务管理器开启事务
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
		
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

createTransactionIfNecessary中的核心逻辑

  1. 通过PlatformTransactionManager(事务管理器)开启事务
  2. prepareTransactionInfo 准备事务信息,这个具体做了什么我们稍后再讲

继续来看PlatformTransactionManager::getTransaction,该方法只有一个实现 AbstractPlatformTransactionManager::getTransaction

	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// Use defaults if no transaction definition given.
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

		// 获取当前事务,该方法有继承 AbstractPlatformTransactionManager 的子类自行实现
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

		// 如果目前存在事务
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}

		// 传播类型PROPAGATION_MANDATORY, 要求当前必须有事务
		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		// PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事务时创建事务
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
			}
			try {
				// 开启事务
				return startTransaction(def, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}

代码很多,重点关注注释部分即可

  1. doGetTransaction获取当前事务
  2. 如果存在事务,则调用handleExistingTransaction处理,这个我们稍后会讲到

接下来,会根据事务的传播决定是否开启事务

  1. 如果事务传播类型为PROPAGATION_MANDATORY,且不存在事务,则抛出异常
  2. 如果传播类型为 PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED,且当前不存在事务,则调用startTransaction创建事务
  3. 当不满足 3、4时,例如 PROPAGATION_NOT_SUPPORTED,此时会执行事务同步,但是不会创建真正的事务

Spring 事务同步在之前一篇博客中有讲到,传送门👉https://www.jianshu.com/p/7880d9a98a5f

Spring 如何管理当前的事务

接下来讲讲上面提到的doGetTransactionhandleExistingTransaction,这两个方法是由不同的TransactionManager自行实现的

我们以SpringBoot默认的TransactionManager,DataSourceTransactionManager为例

	@Override
	protected Object doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}

	@Override
	protected boolean isExistingTransaction(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
	}

结合 AbstractPlatformTransactionManager::getTransaction 一起来看,doGetTransaction 其实获取的是当前的Connection。
判断当前是否存在事务,是判断DataSourceTransactionObject 对象中是否包含connection,以及connection是否开启了事务。

我们继续来看下TransactionSynchronizationManager.getResource(obtainDataSource())获取当前connection的逻辑

TransactionSynchronizationManager::getResource

	private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");
	
	@Nullable
	// TransactionSynchronizationManager::getResource
	public static Object getResource(Object key) {
		// DataSourceTransactionManager 调用该方法时,以数据源作为key
		
		// TransactionSynchronizationUtils::unwrapResourceIfNecessary 如果key为包装类,则获取被包装的对象
		// 我们可以忽略该逻辑
		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
		Object value = doGetResource(actualKey);
		if (value != null && logger.isTraceEnabled()) {
			logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
					Thread.currentThread().getName() + "]");
		}
		return value;
	}

	/**
	 * Actually check the value of the resource that is bound for the given key.
	 */
	@Nullable
	private static Object doGetResource(Object actualKey) {
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}

看到这里,我们能明白DataSourceTransactionManager是如何管理线程之间的Connection,ThreadLocal 中存储一个Map,key为数据源对象,value为该数据源在当前线程的Connection

image.png

DataSourceTransactionManager 在开启事务后,会调用TransactionSynchronizationManager::bindResource将指定数据源的Connection绑定到当前线程

AbstractPlatformTransactionManager::handleExistingTransaction

我们继续回头看,如果存在事务的情况,如何处理

	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {

		// 如果事务的传播要求以非事务方式执行 抛出异常
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}

		// PROPAGATION_NOT_SUPPORTED 如果存在事务,则挂起当前事务,以非事务方式执行
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			// 挂起当前事务
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			// 构建一个无事务的TransactionStatus
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}

		// PROPAGATION_REQUIRES_NEW 如果存在事务,则挂起当前事务,新建一个事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				return startTransaction(definition, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}

		// PROPAGATION_NESTED 内嵌事务,就是我们开头举得例子
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			// 非JTA事务管理器都是通过savePoint实现的内嵌事务
			// savePoint:关系型数据库中事务可以创建还原点,并且可以回滚到还原点
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				// 创建还原点
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				return startTransaction(definition, transaction, debugEnabled, null);
			}
		}

		// 如果执行到这一步传播类型一定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED
		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		
		// 校验目前方法中的事务定义和已存在的事务定义是否一致
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		// 构建一个TransactionStatus,但不开启事务
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}

这里代码很多,逻辑看上述注释即可。这里终于看到了期待已久的挂起事务和内嵌事务了,我们还是看一下DataSourceTransactionManager的实现

  • 挂起事务:通过TransactionSynchronizationManager::unbindResource 根据数据源获取当前的Connection,并在resource中移除该Connection。之后会将该Connection存储到TransactionStatus对象中
	// DataSourceTransactionManager::doSuspend
	@Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

在事务提交或者回滚后,调用 AbstractPlatformTransactionManager::cleanupAfterCompletion会将TransactionStatus 中缓存的Connection重新绑定到resource中

  • 内嵌事务:通过关系型数据库的savePoint实现,提交或回滚的时候会判断如果当前事务为savePoint则释放savePoint或者回滚到savePoint,具体逻辑参考AbstractPlatformTransactionManager::processRollbackAbstractPlatformTransactionManager::processCommit

至此,事务的传播源码分析结束

prepareTransactionInfo

上文留下了一个问题,prepareTransactionInfo 方法做了什么,我们先来看下TransactionInfo的结构

	protected static final class TransactionInfo {

		@Nullable
		private final PlatformTransactionManager transactionManager;

		@Nullable
		private final TransactionAttribute transactionAttribute;

		private final String joinpointIdentification;

		@Nullable
		private TransactionStatus transactionStatus;

		@Nullable
		private TransactionInfo oldTransactionInfo;
		
		// ...
	}

该类在Spring中的作用,是为了内部传递对象。ThreadLocal中存储了最新的TransactionInfo,通过当前TransactionInfo可以找到他的oldTransactionInfo。每次创建事务时会新建一个TransactionInfo(无论有没有真正的事务被创建)存储到ThreadLocal中,在每次事务结束后,会将当前ThreadLocal中的TransactionInfo重置为oldTransactionInfo,这样的结构形成了一个链表,使得Spring事务在逻辑上可以无限嵌套下去

如果觉得有收获,可以关注我的公众号,你的点赞和关注就是对我最大的支持

深入理解 Sentinel 中的限流算法

最近在学习 Sentinel,深入学习了源码之后分享一下心得

Sentinel 版本

1.8.0

固定窗口算法

先介绍一下最简单的限流算法

每个窗口都有一个计数器(counter)用于统计流量,如果 counter + 本次申请的请求数 > 预设的 QPS,则拒绝请求。

固定窗口很简单,但是也有很大的问题

假设我们规定 QPS 不能超过 100,如上图所示 r1 和 r2 两个时间点分别来了 60 个请求, QPS 已经大于 100 了。此时应该触发限流了,但是固定窗口算法傻傻的只关注自己窗口的流量,感知不到 QPS 已经超了

滑动窗口算法

该算法将单位时间切成了多个窗口,每次计算 QPS 时,计算 当前窗口 + 过去几个窗口 的流量总和,这样就避免了固定窗口的问题
(具体使用几个窗口,取决于窗口大小和单位时间大小。例如上图,每个窗口大小为 500 ms,以 1 s 为单位时间做限流,每次使用 current + last 即可)

算法实现细节思考

理解算法思路之后,接下来要思考如何实现这个算法了

  1. 首先我们需要有一个上图中的时间轴,来记录时间窗口,可以通过数组来实现这个时间轴。

  2. 时间轴有了,我们再来考虑一下时间窗口。

    每个时间窗口肯定要有一个线程安全的计数器以及当前窗口对应的时间

// 时间轴
List<Window> timeline = new ArrayList<>();
// 每个窗口的大小
int windowTime;

// 时间窗口
class Window {
    Timestamp startTime;
    AtomicInteger counter;
}

但是如果仔细一想,还是存在一些问题的

  • 由于时间是会一直增长的,那我们的数组怎么办?也要跟着时间无限的增大吗?

  • 旧的时间窗口(例如几秒之前的)在之后的计算不会再用到了,如何清理这些无用的窗口?

Sentinel 中滑动窗口算法如何实现的

带着上述的问题与思考来看下 Sentinel 中是如何实现的

LeapArray

Sentinel 中滑动窗口算法的核心类,首先来了解一下他的核心成员变量

public abstract class LeapArray<T> {
	// 要统计的单位时间大小,例如计算QPS时,为1000
    protected int intervalInMs;
	// 样本数量
    protected int sampleCount;
	// 窗口大小 该值 = intervalInMs / sampleCount
    protected int windowLengthInMs;
	
	// 存储时间窗口的数组
    protected final AtomicReferenceArray<WindowWrap<T>> array;
	
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;
		
        this.array = new AtomicReferenceArray<>(sampleCount);
    }	
	
}

单机限流在统计 QPS 时,默认 sampleCount = 2,intervalInMs = 1000,windowLengthInMs = 500

LeapArray#calculateTimeIdx

大体思路相同,同样是利用一个数组实现时间轴,每个元素代表一个时间窗口

Sentinel 中 数组长度是固定的,通过方法 LeapArray#calculateTimeIdx确定时间戳在数组 中的位置 (找到时间戳对应的窗口位置)

怎么理解这个方法呢?

我们把数据带入进去,假设 windowLengthInMs = 500 ms (每个时间窗口大小是 500 ms)

如果 timestamp 从 0 开始的话,每个时间窗口为 [0,500) [500,1000) [1000,1500) ...

这时候先不考虑 timeId % array.length() ,也不考虑数组长度。假设当前 timeMillis = 601,将数值代入到 timeMillis / windowLengthInMs 其实就可以确定出当前的 timestamp 对应的时间窗口在数组中的位置了

由于数组长度是固定的,所以再加上求余数取模来确定时间窗在数组中的位置

LeapArray#currentWindow

先来看一下 Sentinel 中 Window 的结构,基本和我们上面想的一致,计数器使用了泛型,可以更灵活

public class WindowWrap<T> {

    /**
     * Time length of a single window bucket in milliseconds.
     */
    private final long windowLengthInMs;

    /**
     * Start timestamp of the window in milliseconds.
     */
    private long windowStart;

    /**
     * Statistic data.
     */
    private T value;

	// 省略。。。
}

继续说 currentWindow,该方法根据传入的 timestamp 找到 或者 创建 这个时间戳对应的 Window

这个方法源码中注释很多,我删除了部分注释

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }

        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                return old;
            } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

方法逻辑分析如下:

首先要做的两件事

  • 计算 timestamp 在数组中的位置,就是我们上文说的 calculateTimeIdx
  • 计算 timestamp 的 windowStart(窗口开始时间),通过 timeMillis - timeMillis % windowLengthInMs,这个值在后边会用到

然后进入一个 while(true) 循环, 通过 WindowWrap<T> old = array.get(idx) 找出对应的窗口,接下来就是三种情况了

  1. old == null
    这个时候代表数组中还没有这个 window,创建这个 window 加入到数组中(由于此时可能会有多个线程同时添加数组元素,所以一定要保证线程安全,所以这里使用的数组为 AtomicReferenceArray),添加成功后返回新建的 window

  2. windowStart == old.windowStart()
    window 已经存在了,直接返回即可

  3. windowStart > old.windowStart()
    代表数组中的元素已经至少是 25s 之前的了,重置当前窗口的 windowStart 和 计数器,这个操作同样也是一个多线程操作,所以使用了 updateLock.tryLock()

仔细看了代码后,我提出了一个问题。我觉得这个地方并不能一定保证能锁住。会不会出现两个线程同时判断需要更新,由于一个线程很快执行成功并释放了锁,第二个线程也成功获取到 Lock,会执行多次 resetWindow。我认为需要再 tryLock 之后再判断一下执行条件,目前已经给 Sentinel 提交了 Issue

  1. windowStart < old.windowStart()
    通常情况下不会走到这个逻辑分支,上面源码的注释也是这样解释的

LeapArray#values

上文中提到过,计算流量时具体使用几个窗口,取决于窗口大小和单位时间大小

该方法的作用通过传入一个时间戳,找出本次计算所需的所有时间窗口

    public List<T> values(long timeMillis) {
        if (timeMillis < 0) {
            return new ArrayList<T>();
        }
        int size = array.length();
        List<T> result = new ArrayList<T>(size);

        for (int i = 0; i < size; i++) {
            WindowWrap<T> windowWrap = array.get(i);
            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                continue;
            }
            result.add(windowWrap.value());
        }
        return result;
    }

    public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
		// intervalInMs 在单机限流计算QPS时默认为 1000(ms)
        return time - windowWrap.windowStart() > intervalInMs;
    }

values 的逻辑没什么可说的,遍历数组将时间符合的窗口加入到 List 中

重点看一下 isWindowDeprecated 这个方法

还是像上面那样把数值带进去。每个窗口大小为 500 ms,例如 timestamp 为 1601,这个 timestamp 对应的 windowStart 为 1500,此时 (1601 - 1500 > 1000) = false 即这个窗口是有效的,再往前推算,上一个窗口 windowStart 为 1000 也是有效的。再往前推算,或者向后推算都是无效的窗口。

intervalInMs 我是这样理解的,以多长的时间段作为单位时间来限流。即可以以 1s 为一个时间段来做限流,也可以以 60s 为一个时间段来限流。

Sentinel 限流思路

在理解了 LeapArray#currentWindowLeapArray#values 方法的细节之后,其实我们就可以琢磨出限流的实现思路了

首先根据当前时间戳,找到对应的几个 window,根据 所有 window 中的流量总和 + 当前申请的流量数 决定能否通过

  • 如果不能通过,抛出异常
  • 如果能通过,则对应的窗口加上本次通过的流量数

Sentinel 限流实现

Sentinel 基本也是这个思路,只不过逻辑复杂一些,这里贴出几处代码,感兴趣的同学可以自己 debug 一下

Sentinel 限流检查

根据 Sentinel 文档中的解释,我们可以知道负责限流的类为 FlowSlotFlowSlot 会使用 FlowRuleChecker 来检查当前资源是否需要限流

FlowSlot#entry

FlowRuleChecker#checkFlow


根据 FlowRule 的设定来做限流检查,这中间我省略了几段代码,默认情况没有设置 ControlBehavior 会使用 DefaultController#canPass 做限流检查。如下图,通过判断 当前流量数 + 申请的数量 是否大于预设的数量,来决定是否限流

注:当使用 SphU.entry 时 prioritized = false,使用 SphU.entryWithPriority 时 prioritized = true。
node.tryOccupyNext 的含义:如果想占用未来的时间窗口令牌,需要等待多久(上图中的waitInMs)。
如果小于规定的超时时间,则记录正在等待的请求数,然后执行 sleep(waitInMs),外层捕获到 PriorityWaitException 会自己处理掉,然后执行用户逻辑,用户完全无感知。


通过上图 avgUsedTokens 可以看到,当 Rule 的 grade 为 FLOW_GRADE_QPS 时,会调用 node.pass()。这里我剧透一下具体的实现为 StatisticNode#passQps,如下图

  • rollingCounterInSecond.getWindowIntervalInSec() 计算 QPS 时为 1 秒
  • rollingCounterInSecond.pass() 计算 QPS 时,最多返回两个窗口的通过请求数(currentWindow + lastWindow)

rollingCounterInSecond#pass

首先先尝试是否需要创建当前的时间窗口,然后找到相关的窗口,计算流量总和。

Sentinel 请求记录

代码位置 StatisticSlot#entryfireEntry 会根据我们配置的规则进行检查(例如上述的限流)。

如果检查没有抛出异常,则记录线程数和申请的请求数(限流检查依赖的数据就是这里记录的)。

集群限流

集群限流有什么用

在没有集群限流之前,如果想把整个服务的 QPS 限制在某个值。举个例子现在某 Server 有十个实例,我们希望总 QPS 不超过 100,这时我们的做法是把每个实例的 QPS 设置为 10。

在理想情况下,这样做可以将 QPS 控制在 100。但是如果每台 Server 分配到的流量不均匀。这可能会导致总量在没达到 100 的时候,某些 Server 就开始限流了。

这种情况就需要 Sentinel 的集群限流出场了。

集群限流原理

由于篇幅限制,我们这里不讨论如何搭建集群限流,只是来说说 Sentinel 如何在这一基础上做的集群限流。

思路很简单,选出一个 Token Server。在开启集群限流后,所有的 Client 在需要限流时,询问 Token Server,Server 决定当前请求是否限流。具体的实现细节与单机限流略有不同,但是核心的算法还是使用的 LeapArray

这里也是给出几处源码位置,感兴趣的同学自行阅读一下

Client 端根据 Rule 决定本次使用本地限流还是集群限流,FlowRuleChecker#canPassCheck

Server 端,DefaultTokenService#requestToken

并发下限流的问题

在完整的阅读完单机和集群的限流代码之后,发现了一个问题,限流流程可以简化为如下

// 伪代码

// 最大QPS
int maxCount;
// 当前申请的流量数
int aquireCount;

int passQps = getPassQPS();

if (passQps + aquireCount <= maxCount) {
	addPass(aquireCount);
} else {
	// 限流处理
}

由于没有并发控制,并发场景下会出现,多个线程同时满足 passQps + aquireCount <= maxCount,然后增加流量统计,这样的话,没法保证一定将 QPS 控制在 maxCount,并发的情况下会出现实际流量超出预设 QPS 的情况。

这肯定不是个Bug。这里没有并发控制可能是出于性能考虑,在性能和准确度可以接受的情况下做了一个折中

所以在使用时,如果实际 QPS 高于预设值,可能是并发导致的

demo

Java SPI 实战

SPI 全称为 (Service Provider Interface) ,是JDK内置的一种服务提供发现机制,可以轻松实现面向服务的注册与发现,完成服务提供与使用的解耦,并且可以实现动态加载

SPI 能做什么

利用SPI机制,sdk的开发者可以为使用者提供扩展点,使用者无需修改源码,有点类似Spring @ConditionalOnMissingBean 的意思

动手实现一个SPI

例如我们要正在开发一个sdk其中有一个缓存的功能,但是用户很可能不想使用我们的缓存实现,用户想要自定义缓存的实现,此时使用spi就非常的合适了

新建一个maven工程命名为sdk

image.png

Cache 接口

import java.util.ServiceLoader;

public interface Cache {

    String getName();

    static Cache load() {
        // ServiceLoader 实现了 Iterable,可以加载到Cache接口的多个实现类
        ServiceLoader<Cache> cacheServiceLoader =  ServiceLoader.load(Cache.class);
        return cacheServiceLoader.iterator().next();
    }

}

ServiceLoader 是Java提供服务发现工具类,这是我们实现SPI的关键

CacheDefaultImpl

public class CacheDefaultImpl implements Cache {
    public String getName() {
        return "defaultImpl";
    }
}

除此之外,ServiceLoader 还需要在classpath:META-INF/services 下找到以该接口全名命名的文件,这里我们直接在resource 目录下创建META-INF/services/ com.github.tavenyin.Cache文件即可,文件中指定Cache的实现类

# 此处可以指定多个实现类
com.github.tavenyin.CacheDefaultImpl

Run

我们建立一个新的maven子工程,并引入sdk模块,执行测试代码

System.out.println(Cache.load().getName()) 
# 输出结果为 defaultImpl

使用者定制化

那么如果sdk的使用者不想使用我们的CacheDefaultImpl了怎么办,没关系使用者只需要覆盖 classpath:META-INF/services/com.github.tavenyin.Cache 就可以了 (使用者在同样在resource下创建即可覆盖)

我们再来运行一下测试代码,输出结果为 newImpl

image.png

ServiceLoader 实现原理

ServiceLoader 的实现原理还是比较简单的,试想一下,如果我们自己实现一个ServiceLoader,我们会怎么做?

  1. 通过指定的文件加载出所有的类名
  2. 通过反射构建这些对象

没错,ServiceLoader 就是这么做的,我们来简单看一下源码

入口 ServiceLoader::iterator::next

// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// The current lazy-lookup iterator
private LazyIterator lookupIterator;   

// ServiceLoader::iterator
public Iterator<S> iterator() {
    return new Iterator<S>() {

        Iterator<Map.Entry<String,S>> knownProviders
            = providers.entrySet().iterator();

        public boolean hasNext() {
            if (knownProviders.hasNext())
                return true;
            return lookupIterator.hasNext();
        }

        // ServiceLoader::iterator::next
        public S next() {
            if (knownProviders.hasNext())
                return knownProviders.next().getValue();
            return lookupIterator.next();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    };
}

从providers 初始为一个空的LinkedHashMap,我们无需关注,所以knownProviders::hasNext 一定返回false,我们直奔knownProviders::next

knownProviders::next 中核心逻辑在nextService() 中

private S nextService() {
	// hasNextService 中做了两件事
	// 1. 判断是否还有服务的提供者
	// 2. 通过 "META-INF/services/" + 接口全名 加载所有提供者ClassName
	if (!hasNextService())
		throw new NoSuchElementException();
	String cn = nextName;
	nextName = null;
	Class<?> c = null;
	try {
		// 通过ClassName 创建Class
		c = Class.forName(cn, false, loader);
	} catch (ClassNotFoundException x) {
		fail(service,
			 "Provider " + cn + " not found");
	}
	if (!service.isAssignableFrom(c)) {
		fail(service,
			 "Provider " + cn  + " not a subtype");
	}
	try {
		// 反射创建实现类实例
		S p = service.cast(c.newInstance());
		providers.put(cn, p);
		return p;
	} catch (Throwable x) {
		fail(service,
			 "Provider " + cn + " could not be instantiated",
			 x);
	}
	throw new Error();          // This cannot happen
}

与我们上述分析的实现过程一致,更多细节感兴趣的童鞋可自行阅读

ServiceLoader 如何实现动态加载

同一个 ServiceLoader 对象的话,不会重新加载META-INF/services/下的信息。如果我们需要动态加载的话,可以考虑每次重新创建新的ServiceLoader 对象,或者调用 ServiceLoader::reload

demo 地址

https://github.com/TavenYin/java-spi.git

如果觉得有收获,可以关注我的公众号【殷天文】,你的点赞和关注就是对我最大的支持

Kafka Connect 实战:入门

前提

首先你需要了解MQ / Kafka相关的知识

本文目标

了解 Kafka Connect 基本概念与功能

什么是Kafka Connect

Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。

例如我现在想要把数据从MySQL迁移到ElasticSearch,为了保证高效和数据不会丢失,我们选择MQ作为中间件保存数据。这时候我们需要一个生产者线程,不断的从MySQL中读取数据并发送到MQ,还需要一个消费者线程消费MQ的数据写到ElasticSearch,这件事情似乎很简单,不需要任何框架。

但是如果我们想要保证生产者和消费者服务的高可用性,例如重启后生产者恢复到之前读取的位置,分布式部署并且节点宕机后将任务转移到其他节点。如果要加上这些的话,这件事就变得复杂起来了,而Kafka Connect 已经为我们造好这些轮子。

Kafka Connect 如何工作?

image.png

Kafka Connect 特性如下:

  • Kafka 连接器的通用框架:Kafka Connect 标准化了其他数据系统与Kafka的集成,从而简化了连接器的开发,部署和管理
  • 支持分布式模式和单机模式部署
  • Rest API:通过简单的Rest API管理连接器
  • 偏移量管理:针对Source和Sink都有相应的偏移量(Offset)管理方案,程序员无须关心Offset 的提交
  • 分布式模式可扩展的,支持故障转移

Kafka Connect Concepts

这里简单介绍下Kafka Connect 的概念与组成
更多细节请参考 👉 https://docs.confluent.io/platform/current/connect/concepts.html

Connectors

连接器,分为两种 Source(从源数据库拉取数据写入Kafka),Sink(从Kafka消费数据写入目标数据)

连接器其实并不参与实际的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创建连接器时需要提供对应的配置信息)。并且连接器还可以决定启动多少个Task线程。

用户可以通过Rest API 启停连接器,查看连接器状态

Confluent 已经提供了许多成熟的连接器,传送门👉 https://www.confluent.io/product/connectors/

Task

实际进行数据传输的单元,和连接器一样同样分为 Source和Sink

Task的配置和状态存储在Kafka的Topic中,config.storage.topicstatus.storage.topic。我们可以随时启动,停止任务,以提供弹性、可扩展的数据管道

Worker

刚刚我们讲的Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程,Worker 分为两种模式,单机模式和分布式模式

单机模式:比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志,通常来说更多的是使用分布式模式

分布式模式:为Kafka Connect提供了可扩展和故障转移。相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

Worker 集群

当使用Worker集群时,创建连接器,或者连接器Task数量变动时,都会触发Rebalance 以保证集群各个Worker节点负载均衡。但是当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启

Converters

Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换

默认支持以下Converter

  • AvroConverter io.confluent.connect.avro.AvroConverter: 需要使用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json结构
  • StringConverter org.apache.kafka.connect.storage.StringConverter: 简单的字符串格式
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何转换

Converters 与 Connector 是解耦的,下图展示了在Kafka Connect中,Converter 在何时进行数据转换

image.png

Transforms

连接器可以通过配置Transform 实现对单个消息(对应代码中的Record)的转换和修改,可以配置多个Transform 组成一个。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行

Dead Letter Queue

与其他MQ不同,Kafka 并没有死信队列这个功能。但是Kafka Connect提供了这一功能。

当Sink Task遇到无法处理的消息,会根据errors.tolerance配置项决定如何处理,默认情况下(errors.tolerance=none) Sink 遇到无法处理的记录会直接抛出异常,Task进入Fail 状态。开发人员需要根据Worker的错误日志解决问题,然后重启Task,才能继续消费数据

设置 errors.tolerance=all,Sink Task 会忽略所有的错误,继续处理。Worker中不会有任何错误日志。可以通过配置errors.deadletterqueue.topic.name = <dead-letter-topic-name> 让无法处理的消息路由到 Dead Letter Topic

快速上手

下面我来实战一下,如何使用Kafka Connect,我们先定一个小目标 将MySQL中的全量数据同步到Redis


  1. 新建文件 docker-compose.yaml
version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zk
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      # 宿主机ip
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.21:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    depends_on:
      - zookeeper

在终端上执行 docker-compose -f docker-compose.yaml up -d 启动docker容器

准备连接器,这里我是自己写了一个简单的连接器😄。下载地址:https://github.com/TavenYin/kafka-connect-example/blob/master/release/kafka-connector-example-bin.jar

# 将连接器上传到kafka 容器中
docker cp kafka-connector-example-bin.jar kafka:/opt/connectors
  1. 修改配置并启动Worker
#在配置文件末尾追加 plugin.path=/opt/connectors
vi /opt/kafka/config/connect-distributed.properties

# 启动Worker
bin/connect-distributed.sh -daemon config/connect-distributed.properties
  1. 准备MySQL

由于我宿主机里已经安装了MySQL,我就直接使用了,使用如下Sql创建表。创建之后随便造几条数据

CREATE TABLE `test_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ;
  1. 创建连接器

新建 source.json

{
  "name" : "example-source",
  "config" : {
    "connector.class" : "com.github.taven.source.ExampleSourceConnector",
    "tasks.max" : "1",
    "database.url" : "jdbc:mysql://192.168.3.21:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true",
    "database.username" : "root",
    "database.password" : "root",
    "database.tables" : "test_user"
  }
}

向Worker 发送请求,创建连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

source.json 中,有一些属性是Kafka Connect 提供的,例如上述文件中 name, connector.class, tasks.max,剩下的属性可以在开发Connector 时自定义。关于Kafka Connect Configuration 相关请阅读这里 👉 https://docs.confluent.io/platform/current/installation/configuration/connect/index.html

  1. 确认数据是否写入Kafka

首先查看一下Worker中的运行状态,如果Task的state = RUNNING,代表Task没有抛出任何异常,平稳运行

bash-4.4# curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},
"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"source"}

查看kafka 中Topic 是否创建

bash-4.4# bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
test_user

这些Topic 都存储了什么?

  • __consumer_offsets: 记录所有Kafka Consumer Group的Offset
  • connect-configs: 存储连接器的配置,对应Connect 配置文件中config.storage.topic
  • connect-offsets: 存储Source 的Offset,对应Connect 配置文件中offset.storage.topic
  • connect-status: 连接器与Task的状态,对应Connect 配置文件中status.storage.topic

查看topic中数据,此时说明MySQL数据已经成功写入Kafka

bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":1,"name":"yyyyyy"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":2,"name":"qqqq"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":3,"name":"eeee"}}

数据结构为Json,可以回顾一下上面我们修改的connect-distributed.properties,默认提供的Converter 为JsonConverter,所有的数据包含schema 和 payload 两项是因为配置文件中默认启动了key.converter.schemas.enable=truevalue.converter.schemas.enable=true两个选项

  1. 启动 Sink

新建sink.json

{
  "name" : "example-sink",
  "config" : {
    "connector.class" : "com.github.taven.sink.ExampleSinkConnector",
    "topics" : "test_user, test_order",
    "tasks.max" : "1",
    "redis.host" : "192.168.3.21",
    "redis.port" : "6379",
    "redis.password" : "",
    "redis.database" : "0"
  }
}

创建Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json

然后查看Sink Connector Status,这里我发现由于我的Redis端口只对localhost开发,所以这里我的Task Fail了,修改了Redis配置之后,重启Task curl -X POST localhost:8083/connectors/example-sink/tasks/0/restart

在确认了Sink Status 为RUNNING 后,可以确认下Redis中是否有数据

关于Kafka Connect Rest api 文档,请参考👉https://docs.confluent.io/platform/current/connect/references/restapi.html

  1. 如何查看Sink Offset消费情况

使用命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink

下图代表 test_user topic 三条数据已经全部消费

Kafka Connect 高级功能

我们的小目标已经达成了。现在两个Task无事可做,正好借此机会我们来体验一下可扩展和故障转移

集群扩展

我启动了开发环境中的Kafka Connect Worker,根据官方文档所示通过注册同一个Kafka 并且使用相同的 group.id=connect-cluster 可以自动组成集群

启动我开发环境中的Kafka Connect,之后检查两个连接器状态

bash-4.4#  curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.23.176.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.23.176.1:8083"}],"type":"source"}bash-4.4#

bash-4.4#  curl -X GET localhost:8083/connectors/example-sink/status
{"name":"example-sink","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"sink"}

观察worker_id 可以发现,两个Connectors 已经分别运行在两个Worker上了

故障转移

此时我们通过kill pid结束docker中的Worker进程观察是否宕机之后自动转移,但是发现Task并没有转移到仅存的Worker中,Task 状态变为UNASSIGNED,这是为啥呢?难道是有什么操作错了?

在网上查阅了一番得知,Kafka Connect 的集群扩展与故障转移机制是通过Kafka Rebalance 协议实现的(Consumer也是该协议),当Worker节点宕机时间超过 scheduled.rebalance.max.delay.ms 时,Kafka才会将其踢出集群。踢出后将该节点的连接器和任务分配给其他Worker,scheduled.rebalance.max.delay.ms默认值为五分钟。

后来经测试发现,五分钟之后查看连接器信息,已经转移到存活的Worker节点了

本来还计划写一下如何开发连接器和Kafka Rebalance,但是这篇已经够长了,所以计划后续更新这两篇文章

Spring Cloud Gateway Filter 执行原理刨析

准备工作

需要了解响应式编程,推荐阅读

版本

Spring Cloud Gateway:2.2.3.RELEASE

本文目标

了解 Gateway Filter 内部执行原理

问题:

@Component
public class TestGlobalFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("start");
        Mono<Void> mono = chain.filter(exchange);
        log.info("end")
        return mono;
    }

    @Override
    public int getOrder() {
        return 1;
    }
}

我编写了一个 TestGlobalFilter,下一个 Filter 的逻辑是输出日志 HelloWorld。日志中输出的顺序会是什么样?

正确答案是

start
end
HelloWorld

如果按照 Servlet 的开发**,调用 chain.filter 一定会立刻执行下一个 Filter,Gateway 为什么不可以呢?

因为 chain.filter 的返回值是 Mono,必须要有订阅者调用 subscribe 后才会执行发布者逻辑

DefaultGatewayFilterChain

我们来看下 DefaultGatewayFilterChain 的代码

DefaultGatewayFilterChain

DefaultGatewayFilterChain 返回的是一个 MonoDefer。其内部包含了调用下一个 Filter 的内部函数,那么这个逻辑怎样才能触发的呢?下面继续来看 MonoDefer 的源码

MonoDefer

MonoDefer

MonoDefer subscribe 逻辑如下

  • 调用 supplier.get(),执行内部函数的命令式代码,执行结束后,内部函数会返回一个 Mono
  • p.subscribe(actual); 订阅内部函数返回的 Mono

supplier.get() 抛出异常时,首先向订阅者传递一个空的 Subscription,然后再传递异常

Operators.error

MonoDefer 虽然也是发布者,但是他只是在真正的发布者和订阅者之间做一个承载的作用

过滤器链刨析

在理解了上述两个类之后,我们现在可以梳理一下 Gateway 过滤器链的执行逻辑了

虽然从 Gateway 接收到请求到过滤器链中间还会经历很多步骤,这里我们为了方便理解,直接把过滤器链的调用方,抽象为一个订阅者(因为最终过滤器链会返回一个 Publisher)

除此之外,再简化一下 Filter 的返回值。正常来说 Filter 可以返回任何响应式的发布者逻辑,我们这里简化为每个 Filter 都返回 chain.filter (将最简单的流程理解后,其实复杂的响应式返回也是大同小异)

  1. 订阅者请求 First Filter,这里首先会执行 filter 方法中所有的命令式的代码(响应式的代码并不会执行,因为 Mono 并没有被消费)

  2. 订阅者调用 Filter 返回的 MonoDefer 的 subscribe 方法。MonoDefer 被订阅时首先会执行内部函数。如果还有下一个过滤器,则执行并返回 nextFilter.filter,如果所有过滤器都已执行完毕则返回 Mono.empty(对应 MonoDefer 的 44 行)

  3. nextFilter.filter 先执行 filter 方法中所有的命令式的代码,然后返回 chain.filter

  4. First Filter 返回的 MonoDefer 内部会去订阅 nextFilter.filter 返回的 Mono(对应 MonoDefer 的 52 行)。Second MonoDefer(nextFilter.filter 的返回值)被订阅,接下来就是重复步骤 2 的逻辑,无限套娃下去直到所有 Filter 执行完毕...

下面用一张图来解释一下上面的逻辑

通过上述的图文讲解,我们可以看到响应式编程中一个过滤器链该怎么设计和实现

回到问题

回到最开始的问题,如果想在 Spring Cloud Gateway 中实现先执行过滤器链再执行某某操作,应该怎么写呢?

@Slf4j
@Component
public class LogFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("hello");
        return chain.filter(exchange)
                .then(Mono.defer(() -> {
                    log.info("world");
                    return Mono.empty();
                }));
    }

    @Override
    public int getOrder() {
        return -9;
    }
}

Mono.then 的作用就是内部消费并忽略第一个 Mono(但是 Error 信号会被传递下去),然后入参的 Mono 作为生产者向下游传播数据。

忽略了 chain.filter 返回的 Mono 不会造成问题吗?当然不会,Gateway 的 Filter 链的订阅者并不需要我们传递什么数据,我们只需要将所有过滤器代码执行完即可

最后

如果觉得我的文章对你有帮助,动动小手点下喜欢和关注,你的支持是对我最大的帮助

响应式编程入门之 Project Reactor

本文目标

  • 理解响应式编程

前言

之前的「聊聊 IO 多路复用」中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)

本文以 Reactor 为例带大家入门响应式编程

版本

	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
		<version>3.4.6</version>
	</dependency>

什么是 Reactor

Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。
在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。

上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。

回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。

从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?

// 以下案例来自 Reactor 官网
userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?

// 以下案例来自 Reactor 官网
userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);

可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思 😂

响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,它也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。

响应式编程内幕

Reactor 实现了 org.reactivestreams 提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。

下图展示了 reactivestreams 中的核心接口

reactivestreams 核心接口

  • Publisher:发布者

  • Subscriber:订阅者

  • Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介

  • Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)

在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的

响应式编程执行逻辑

在 Reactor 中大部分实现都是按照上图的逻辑来执行的

  1. 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
  2. Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
  3. Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
  4. Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
  5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError

调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布

其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
 * @author tianwen.yin
 */
public class SimpleReactiveStream {

    /**
     * 实现一个简单的响应式编程发布者
     * 逻辑:当订阅者发起订阅时,像下游发送一个 HelloWorld,发布逻辑由 SimpleSubscription 完成
     */
    static class SimplePublisher implements Publisher {
        @Override
        public void subscribe(Subscriber s) {
            // 2. Publisher 发布数据之前,调用 Subscriber 的 onSubscribe
            s.onSubscribe(new SimpleSubscription(data(), s));
        }

        private String data() {
            return "Hello World";
        }
    }

    static class SimpleSubscriber implements Subscriber {
        @Override
        public void onSubscribe(Subscription s) {
            // 3. Subscriber 通过 Subscription#request 来请求数据
            // 或者 Subscription#cancel 来取消数据发布
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error");
        }

        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    }

    static class SimpleSubscription implements Subscription {
        String data;
        Subscriber actual;
        boolean isCanceled;

        public SimpleSubscription(String data, Subscriber actual) {
            this.data = data;
            this.actual = actual;
        }

        @Override
        public void request(long n) {
            if (!isCanceled) {
                try {
                    // 4. Subscription 在接收到订阅者的调用后
                    // 通过 Subscriber#onNext 向下游订阅者传递数据
                    actual.onNext(data);
                    // 5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流
                    actual.onComplete();
                } catch (Exception e) {
                    // 如果数据发布或者处理遇到错误会调用 Subscriber#onError
                    actual.onError(e);
                }
            }
        }

        @Override
        public void cancel() {
            isCanceled = true;
        }
    }

    public static void main(String[] args) {
        // 1. Subscriber ”订阅“ Publisher
        new SimplePublisher().subscribe(new SimpleSubscriber());
    }

}

响应式编程**

响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。

image.png

这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。

Reactor 中的 Operator

Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)

    Mono.just("hello")
            .map(a -> a + "world")
            .subscribe(System.out::println);

举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码

    // 注意,这是我基于 Reactor API 实现的伪代码!
    public static class MonoMap implements Publisher {
        // 我们自定义的转换逻辑
        private Function mapper;
        // source 代表当前操作符的上游发布者
        private Publisher source;

        public MonoMap(Publisher source, Function mapper) {
            this.source = source;
            this.mapper = mapper;
        }

        @Override
        public void subscribe(Subscriber actual) {
            source.subscribe(new MonoMapSubscriber(mapper, actual));
        }
    }

    public static class MonoMapSubscriber implements Subscriber {
        // 我们自定义的转换逻辑
        private Function mapper;
        // 真正的下游
        private Subscriber actual;

        public MonoMapSubscriber(Function mapper, Subscriber actual) {
            this.mapper = mapper;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Subscription s) {
            actual.onSubscribe(s);
        }

        @Override
        public void onNext(Object o) {
            // 当上游数据发送过来时,先进行转换再发送给下游
            Object result = mapper.apply(o);
            actual.onNext(result);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
    }

上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨

我们首先来分析一下 Mono.just("hello").map(a -> a + "world") 这句话

  1. 当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"

  2. 当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)

    • 当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)

    • 当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...

最后通过一张图来总结一下

Operator 实现原理

Reactor 该如何学习

本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的

如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。

最后

如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助

Java ThreadLocal 实现原理

ThreadLocal 线程本地变量,算是Java开发中比较常用的API了,今天我们来一探究竟

使用场景

ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也就是变量在线程间隔离,而在同一线程共享的场景。例如管理Connection,我们希望每个线程只使用一个Connection实例,这个时候用ThreadLocal就很合适。

public class ThreadLocalDemo {
    private static final ThreadLocal<Object> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        threadLocal.set(new Object());
        someMethod();
    }

    static void someMethod() {
        // 获取在threadLocal中存储的对象
        threadLocal.get();
        
        // 清除ThreadLocal中数据
        threadLocal.remove();
    }
}

还有之前写过的一篇动态切换数据源 https://www.jianshu.com/p/0a485c965b8b,AOP 通过 ThreadLocal 保存当前线程需要访问的数据源的key,AbstractRoutingDataSource 再通过 ThreadLocal 中的数据切换到指定的数据源,对业务代码毫无入侵

原理

在我们了解了如何使用之后,来看下 ThreadLocal 是如何实现的

ThreadLocal.get()

我们从get方法来分析,可以看到方法中获取当前线程,并通过当前线程得到一个 ThreadLocalMap,我们可以暂时把这个ThreadLocalMap 理解为我们熟悉的HashMap,然后通过 this(当前ThreadLocal对象)作为key,从Map中获取Entry

图1

我们再来看下,ThreadLocalMap 以及ThreadLocalMap.Entry 中的核心成员变量,ThreadLocalMap 中实现了一个简单的hash表

图2

看到这里你可能还不是很清晰,结合下面这张图理解一下,每个线程(Thread对象)中有一个ThreadLocalMap,使线程之间的数据天然隔离,ThreadLocalMap 有一张hash表 Entry[],每个 Entry 中对应存储着一个ThreadLocal实例 - value,这样使得不同的ThreadLocal 对象之间也形成了隔离

图片来自网络

ThreadLocalMap 中的hash表

我们通过 ThreadLocalMap.set() 来了解下内部的hash表是如何实现的

图3

线性探测是指当发生hash冲突时,利用固定的算法寻找一定步长的下个位置(ThreadLocal中发生hash冲突时,index+1),依次判断,直至找到能够存放的位置

如果线程中操作了大量的 ThreadLocal 对象,势必会造成hash冲突,这是没有必要的性能开销,如果可以的话,我们可以只保留一个ThreadLocal对象

关于 ThreadLocal 的一些思考

  1. 为什么要使用弱引用

图3中,我们看到hash表中会出现 key == null的Entry,这是因为 ThreadLocalMap.Entry 的key (Entry 对ThreadLocal设置了弱引用,可以回顾一下图2)

弱引用的对象拥有更短暂的生命周期。在GC时,一旦发现了对象只具有弱引用,这个对象一定被回收

这么做的原因:如果ThreadLocal 对象需要被回收时(此时并没有调用ThreadLocal.remove),线程中的ThreadLocalMap 一直强引用着 ThreadLocal对象,这会让 ThreadLocal对象 以及对应的value对象内存无法释放,导致内存泄漏。这算是ThreadLocal的一种容错机制,这样做使得了ThreadLocal对象得到了回收,但是value的内存并没有释放,所以ThreadLocalMap 的get、set方法中都会去尝试清理ThreadLocal已经被回收的entry。

  1. 使用过后不及时remove会怎么样

很多博客中都强调了,ThreadLocal.remove的重要性。举个例子,我们新启了一个线程在这个线程中使用了ThreadLocal,我们并没有调用remove,这会导致存储的value对象一直没有办法被回收,直到线程被销毁

  1. 线程池中也需要remove吗

以web线程池为例,如果每次都在过滤器中操作同一个ThreadLocal.set,然后业务代码中get,似乎没什么问题。计算出的hash值都是一样的,槽位也是一样的会覆盖上一次的值。确实业务不会有问题,但是还是推荐大家在使用完之后remove,因为这样会让无用的value对象早点被回收,在很多java源码中都会看到,对一些不再使用的对象进行如下的help GC操作

object = null // help GC

所以我们也需要让无用的对象失去引用,帮助GC

  1. 综上所述

ThreadLocal 使用过后要及时remove,帮助JVM释放内存

参考

https://www.jianshu.com/p/98b68c97df9b

聊聊 Java GC 算法

Java 和 C++ 之间有一堵由内存动态分配和垃圾回收技术所围成的高墙,墙外面的人想进去,墙里面的人却想出来

今天来聊聊 Java GC(Garbage Collection,垃圾回收)中的常见算法

引用与GC的关系

正题开始前,先来了解一下 Java 中的引用。对象使用不同的引用类型,决定 GC 发生时是否会回收它

引用类型 | 特点

  • |-
    强引用(Strong Reference) | Java中的默认引用类型。例如Object obj = new Object(),只要强引用存在,对象永远不会被回收
    软引用(Soft Reference)| 内存足够时,软引用不会被回收。只有当系统要发生内存溢出时,才会被回收。适合用于缓存场景
    弱引用(Weak Reference)| 只要发生垃圾回收,弱引用的对象就会被回收
    虚引用(Phantom Reference)| 一个对象有虚引用的存在不会对生存时间都构成影响,也无法通过虚引用来获取对一个对象的真实引用。唯一的用处:能在对象被GC时收到系统通知

如何判断对象是否可以被回收?

在GC开始之前,首先要做的事就是确定哪些对象『活着』,哪些对象『已死』

常见的两种算法用于判断该对象是否可以被回收

  • 引用计数算法:每个对象中添加引用计数器。每当对象被引用,引用计数器就会加 1;每当引用失效,计数器就会减 1。当对象的引用计数器的值为 0 时,就说明该对象不再被引用,可以被回收了。强调一点,虽然引用计数算法的实现简单,判断效率也很高,但它存在着对象之间相互循环引用的问题(所以在后来的JVM版本中已经不采用这种算法了)。

  • 可达性分析算法:GC Roots 是该算法的基础,GC Roots 是所有对象的根对象。这些对象作为正常对象的起始点,在垃圾回收时,会从这些 GC Roots 开始向下搜索,当一个对象到 GC Roots 没有任何引用链相连时,就证明此对象不再被引用。目前 HotSpot 虚拟机采用的就是这种算法。

可以作为 GC Roots 的对象:

  • 虚拟机栈中引用的对象
  • 方法区中类静态属性引用的对象
  • 方法区中常量引用的对象
  • 本地方法栈中 JNI(Native 方法)引用的对象
  • Java 虚拟机中内部的引用
  • synchronized 持有的对象
  • JMXBean、JVMTI 中注册的回调、本地代码缓存

除此之外,不同的垃圾回收器可能还会加入一些『临时的』对象共同构建 GC Roots

基础的 GC 算法

标记-清除算法 (mark-sweep)

如下图所示,该算法分为『标记』和『清除』两个阶段

从 GC Roots 出发标记出存活的对象,然后遍历堆清除未被标记的对象

最基础的收集算法,标记-清除的效率中等,缺点也比较明显:会产生内存碎片

复制算法(copying)

将可用内存按容量划分为大小相等的两块,每次只使用其中的一块。当这一块内存用完,需要进行垃圾收集时,就将存活的对象复制到另一块上面,然后将第一块内存全部清除

相比较『标记-清除』不会有内存碎片的问题,但每次只使用一半的内存,内存利用率很低。当内存中存活的对象较多时,会进行大量的复制操作,效率会较低(对象的复制也是有成本的,需要复制的对象越多、越大,复制带来的代价也就越大)。适用于存活率低的情况

标记-整理算法(mark-compact)

和『标记-清除』算法有点类似,从 GC Roots 出发标记出存活的对象,然后是整理阶段,将存活移动到一端。整理阶段结束后我们可以知道一个临界点,另一端的内存空间就可以被重新分配了

该算法的优点:不像复制算法那样会浪费内存空间,也不会产生内存碎片。


写到这时,我很想知道『标记-整理』的效率如何。一番搜索后,发现该算法是这三种算法中效率最低的,因为『整理』这个过程会遍历整个堆三次,具体实现思路如下

『标记-整理』以 lisp2 算法为例,实现思路如下

  1. 标记阶段:首先从 GC Roots 出发,标记出所有存活的对象
  2. 设置 forwarding 指针:每个对象头中都有一个forwarding指针,指向该对象整理后的位置。遍历整个堆计算存活对象的 forwarding
  3. 更新指针:遍历整个堆,根据 forwarding 更新 GC Roots 指针以及所有存活对象的子对象的指针,将指针指向新的位置
  4. 移动对象:遍历整个堆,根据 forwarding 移动对象并且清除对象中的标记

聊聊我个人的理解,为什么2,3,4一定要遍历整个堆?都是针对存活对象的操作,直接遍历 GC Roots 不行吗?

  • 设置 forwarding 阶段:一定要顺序遍历整个堆。如上图所示,如果仅是遍历 GC Roots 的话,你没法知道A这个区域是否可以覆盖
  • 更新指针阶段:我觉得这个阶段仅遍历 GC Roots 的话,确实可行
  • 移动对象阶段:由于 GC Roots 指针已经全部指向新的位置了,只能遍历整个堆

写到这,我又想为什么一定要三个步骤搞这么复杂,直接移动对象不行吗?

根据上面的理解,一定要遍历整个堆来确定存活的对象该移动到哪。就现有的结构来看,如果直接移动,子对象的指针还好说可以处理,GC Roots 中的指针就没法更新了

关于lisp2 实现的更多细节可以参考下这篇Blog 『gc-标记整理算法的两种实现』

小结

总结下上述三种基础GC算法的优缺点

维度 标记-清除 标记-整理 复制算法
速度 中等 最慢 最快
时间开销 mark阶段与存活对象的数量成正比,sweep阶段与整堆大小成正比 mark阶段与存活对象的数量成正比,compact阶段与整堆大小成正比,与存活对象的大小成正比 与存活对象大小成正比
空间开销 少(但会堆积碎片) 少(不堆积碎片) 通常需要存活对象的2倍大小(不堆积碎片)
移动对象

分代回收理论

垃圾回收器都不会只选择一种算法,JVM根据对象存活周期的不同,将内存划分为几块。一般是把堆分为新生代和老年代,根据年代的特点来选择最佳的收集算法。

HotSpot 中大部分垃圾回收器都采用分代回收的**

  • 新生代:复制算法
  • 老年代:标记-清除 / 标记-整理 / 或者两者同时使用

堆大小=新生代+老年代(默认分别占堆空间为1/3、2/3),新生代又被分为Eden、from survivor (S0)、to survivor (S1),默认分配比例为 8:1:1

image.png

对象的分配

对象的分配通常在 Eden 中(需要大量连续内存空间的 Java 对象,如很长的字符串或数据可以直接进入老年代,由 -XX:PretenureSizeThreshold 决定)

新生代的回收

当 Eden 区满后,会触发 Young GC(新生代回收),复制 Eden 区和 S0 区中存活的对象到 S1 或者老年代(其中到达年龄的会被放入老年代,未到达年龄的放入 S1 区)

每经历一次 Young GC,survivor 区中对象年龄 +1

然后清空 Eden 区和 S0 区,交换 S0 与 S1 的名字

若存活对象大于 S1 区容量,则会被直接放入老年代。若打开了自适应(-XX:+AdaptiveSizePolicy),GC会自动重新调整新生代大小

老年代的回收

在发生 Full GC 或者 Old GC 时,会根据不同的垃圾回收器或者情况选择使用 标记-清除 / 标记-整理 来进行回收

除了 Young GC 之外,常见的还有

  • Full GC(新生代、老生代、元空间或永久代的回收)
  • Old GC(只有 CMS 有这个模式)
  • Mixed GC(只有 G1 有这个模式)

通常情况下 Full GC 的触发条件,当准备要触发一次 Young GC时,如果发现统计数据说之前 Young GC 的平均晋升大小比目前老年代剩余的空间大,则不会触发 Young GC 而是转为触发 Full GC

小结

由于新生代的特点是大多数对象都是「朝生夕死」的,存活率低,所以非常适合复制算法。而 survivor 区存在的意义是为了确保「朝生夕死」的对象不会轻易进入老年代,当对象的年龄满足(经历了多次 Young GC)才会进入老年代。

又到了提问环节,为什么分代回收中需要有两个 survivor 区,一个不行吗?

答案是不行。假设只有一个 survivor,Eden 回收后存活的对象进入了 survivor。那么 survivor 区可以被回收的对象该怎么处理?难道要用标记清除和标记整理?那可太没有必要了,所以划分出两个 survivor 区,将新生代的复制算法贯彻到底

参考

『深入理解Java虚拟机』
『极客时间 - Java性能调优实战』
『Java-GC 垃圾收集算法』
『gc-标记整理算法的两种实现』
『Mark-compact algorithm - Wikipedia』
『为什么新生代内存需要有两个Survivor区』
『Major GC和Full GC的区别是什么?触发条件呢? - RednaxelaFX的回答 - 知乎』
『关于JVM垃圾搜集算法(标记-整理算法与复制算法)的效率? - RednaxelaFX的回答 - 知乎』

Kafka Connect:引入了Fastjson后,Rest API响应为{}

前言

最近在学习Kafka Connect,写了个连接器的demo。在demo提交了几个版本之后,突然发现Kafka Connect Rest API 无法正常响应了,明明有正在运行的连接器,查询status,居然返回{}

问题分析

对 Rest API 进行debug后,确认是有数据的,但是数据返回不到客户端,很奇怪。因为我记得之前是好用的,所以我回滚了代码版本,逐一排查之后发现当引入Fastjson 依赖之后,会导致Connect Rest API 不可用

如果懒一点的话,到这里就已经结束了,直接删除Fastjson依赖,使用其他Json包。但是我很好奇,在我的理解里,Fastjson 这种库就是个工具包,如果我们程序没有主动调用的时候,是不会对我们产生任何影响的。

百度谷歌一通之后,一筹莫展之际,点开了Fastjson的源码包,在这里发现了Fastjson为JAXRS提供的SPI扩展

1

JAXRS:Java API for RESTful Web Services,JavaEE提供的Web服务接口。Jersey 实现了JAXRS,而Kafka Connect 引用了Jersey 。
SPI:Service Provider Interface ,是JDK内置的一种服务提供发现机制,可以参考我之前的博客 Java SPI 实战

打开javax.ws.rs.ext.MessageBodyWriter 文件,可以看到提供的实现类是com.alibaba.fastjson.support.jaxrs.FastJsonProvider,定位到FastJsonProviderwriteTo方法,该方法会把object写入到OutputStream中,看起来很靠谱,debug试一下

2

果然,说明Fastjson果然参与了Rest API的响应。为什么使用Fastjson就响应不了数据呢,看了下源码,这里要求被序列化的Bean必须标记Fastjson相关的注解,而实际的Bean使用的是Jackson的注解,所以Fastjson无法序列化数据。

接下来可以根据调用栈和全局搜索找一下,看看FastJsonProvider是在什么时机加载的,能否干掉他。

调用栈并没有找到什么有用的信息,通过全局搜索MessageBodyWriter找到了FastJsonProvider的加载位置,MessageBodyFactory::initialize

image.png

上图字面意思理解,使用 injectionManager (注入管理器),找到MessageBodyWriter的可用实现

这里的 customMbws size = 2,分别是FastJson和Jackson的实现。但是FastJson在前,而每次需要做JSON序列化的时候,会遍历writers,如果找到支持application/json的MessageBodyWriter则直接返回,所以每次使用的都是FastJson的实现。

至此已经明白了,为什么Fastjson 会影响Kafka Connect了,接下来就是想办法解决了


这个时候还是没有找到Fastjson是在哪加载的,在Fastjson的 wiki 中找到了些灵感,发现Fastjson 在Jersey 中并不是通过SPI的方式进行的扩展,而是通过FastJsonAutoDiscoverable,向Jersey 的 context中注册FastJsonProvider

4

最后,我们在java 进程启动时指定参数 -Dfastjson.auto.discoverable=false,禁用 FastJsonProvider

参考

https://github.com/alibaba/fastjson/wiki/Integrate-Fastjson-in-JAXRS

Opentracing 链路追踪实战

链路追踪的作用

当系统架构从单机转变为微服务后,我们的一次后端请求,可能历经了多个服务才最终响应到客户端。如果请求按照预期正确响应还好,万一在调用链的某一环节出现了问题,排查起来是很麻烦的。但是如果有链路追踪的话,就容易很多了。

可以通过链路埋点,记录请求链中所有重要的步骤,例如与哪些数据库做了交互,调用了哪些下游服务,下游服务又与哪些数据库做了交互,又调用了哪些下游服务...

下图是 jaeger UI 为例,每次链路追踪都会产生一个唯一的 TraceId,通过该 Id 可以查看请求链路的状态

  • 下游服务可以正常访问时

    下游服务可以正常访问

  • RestTemplate 无法访问下游服务时

    无法访问下游服务时

当有了链路追踪之后。我们可以清楚的看到问题出在哪。

什么是 Opentracing

Opentracing 制定了一套链路追踪的 API 规范,支持多种编程语言。虽然OpenTracing不是一个标准规范,但现在大多数链路跟踪系统都在尽量兼容OpenTracing

使用 Opentracing 时,还需要集成实现该规范的链路追踪系统,例如我们的项目正在使用 Jaeger,本文也同样以 Jaeger 为例

Opentracing 核心接口

  • Tracer:用于创建 Span,以及跨进程链路追踪

  • Span:Tracer 中的基本单元,上图中每一个蓝色或者黄色的块都是一个 Span。Span 包含的属性有 tag,log,SpanContext,BaggageItem

  • SpanContext:Span 上下文,用于跨进程传递 Span,以及数据共享

  • ScopeManager:用于获取当前上下文中 Span,或者将 Span 设置到当前上下文

  • Scope:与 ScopeManager 配合使用,我们在后面的例子中会说明

快速开始

首先,我们先部署一个 jaeger 服务。关于 jaeger 服务的更多细节,这里不多说了,各位读者可以自行去 jaeger 官网阅读

执行如下命令,启动 jaeger 服务
docker run -d --name jaeger -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 -p 5775:5775/udp -p 6831:6831/udp -p 6832:6832/udp -p 5778:5778 -p 16686:16686 -p 14268:14268 -p 14250:14250 -p 9411:9411 jaegertracing/all-in-one:1.23

引入 maven 依赖

<!-- guava 依赖,与 jaeger 无关 -->
<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>30.1.1-jre</version>
</dependency>

<dependency>
	<groupId>io.opentracing</groupId>
	<artifactId>opentracing-api</artifactId>
	<version>0.33.0</version>
</dependency>

<dependency>
	<groupId>io.jaegertracing</groupId>
	<artifactId>jaeger-client</artifactId>
	<version>1.6.0</version>
</dependency>

记录一个简单的链路

import com.google.common.collect.ImmutableMap;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.JaegerTracer;
import io.opentracing.Span;
import io.opentracing.Tracer;

import java.time.LocalDateTime;

public class GettingStarter {

    public static void main(String[] args) {
        // 指定服务名,初始化 Tracer
        Tracer tracer = initTracer("starter-service");
        // 指定 Span 的 operationName
        Span span = tracer.buildSpan("")
                // 指定当前 Span 的 Tag, key value 格式
                .withTag("env", "local")
                .start();

        span.setTag("system", "windows");

        // log 也是 key value 格式,默认 key 为 event
        span.log("create first Span");
        // 传入一个 Map
        span.log(ImmutableMap.of("currentTime", LocalDateTime.now().toString()));

        // 输出当前 traceId
        System.out.println(span.context().toTraceId());

        // 结束并上报 span
        span.finish();
    }

    public static JaegerTracer initTracer(String service) {
        Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
        Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
        Configuration config = new Configuration(service).withSampler(samplerConfig).withReporter(reporterConfig);
        return config.getTracer();
    }

}

jaeger UI 查询链路,访问 http://localhost:16686/search,右上角输入 traceId 搜索。结果如下

image.png

可以看到我们刚刚在代码中的 Tag 和 Log 都记录在链路上了

线程传递 Span

Span 之间是可以建立父子关系的,使用

Span parent= tracer.buildSpan("say-hello").start();
tracer.buildSpan("son")
                .asChildOf(parent)
                .start();

效果如图

image.png

parent Span 肯定不能一直在调用栈中传递下去,这对集成 opentracing 的程序来说侵入性太大了。回顾一下上面我们提到了一个 ScopeManager ,来看下其核心方法

  • activate(Span span):用于将当前 Span Set 到当前上下文中。上图中的注释也给出了该方法如何使用。返回值 Scope 字面翻译为范围 / 跨度。稍后我们找一个实现类具体分析一下

  • activeSpan():这个方法很好理解,从当前上下文中 Get Span

ThreadLocalScopeManager

Jaeger 中默认的 ScopeManager,该类由 opentracing 提供。

public class ThreadLocalScopeManager implements ScopeManager {
    final ThreadLocal<ThreadLocalScope> tlsScope = new ThreadLocal<ThreadLocalScope>();

    @Override
    public Scope activate(Span span) {
        return new ThreadLocalScope(this, span);
    }

    @Override
    public Span activeSpan() {
        ThreadLocalScope scope = tlsScope.get();
        return scope == null ? null : scope.span();
    }
}
public class ThreadLocalScope implements Scope {
    private final ThreadLocalScopeManager scopeManager;
    private final Span wrapped;
    private final ThreadLocalScope toRestore;

    ThreadLocalScope(ThreadLocalScopeManager scopeManager, Span wrapped) {
        this.scopeManager = scopeManager;
        this.wrapped = wrapped;
        this.toRestore = scopeManager.tlsScope.get();
        scopeManager.tlsScope.set(this);
    }

    @Override
    public void close() {
        if (scopeManager.tlsScope.get() != this) {
            // This shouldn't happen if users call methods in the expected order. Bail out.
            return;
        }

        scopeManager.tlsScope.set(toRestore);
    }

    Span span() {
        return wrapped;
    }
}

ThreadLocalScopeManager#activate:直接调用了new ThreadLocalScope。在 ThreadLocalScope 构造方法中,首先将从 ThreadLocal 中获取到目前上下文中的 ThreadLocalScope 赋值给 toRestore,然后将 this (ThreadLocalScope 对象) set 到 ThreadLocal。

ThreadLocalScope 中存储着当前的 Span。后续的代码如果想要获取 Span,只需要调用 ScopeManager#activeSpan 就可以(ScopeManager 可以在 Tracer 对象中拿到)

在执行 close 时(Scope 继承了 Closeable),将之前的 Span 重新放回到上下文中

线程传递 Span 演示

Span parentSpan = tracer.buildSpan("parentSpan").start();

try (Scope scope = tracer.activateSpan(parentSpan)) {
	xxxMethod();
} finally {
	parentSpan.finish();
}

public void xxxMethod() {
	// 这里并不需要手动从 ScopeManager 中取出上下文中的 Span
	// start 方法中已经做了
	// 如果 ScopeManager.activeSpan() != null 会自动调用 asChildOf
	tracer.buildSpan("sonSpan").start();
}

链路中数据共享

如果需要和链路的下游共享某些数据,使用如下方法

// 写
span.setBaggageItem("key", "value");

// 读
span.getBaggageItem("key");

只要保证在同一条链路中,即使下游 Span 在不同的进程,依然可以通过 getBaggageItem 读到数据

跨进程链路追踪

opentracing 中提供了实现跨进程追踪的规范

Tracer 接口中提供了如下两个方法

  • inject:将 SpanContext 注入到 Carrier 中,传递给下游的其他进程

  • extract:下游进程从 Carrier 中抽取出 SpanContext,用于创建下游的 Child Span

简单举个例子解释下,例如我们使用 Http 协议访问下游服务,inject 可以将 SpanContext 注入到 HttpHeaders 中。
下游服务再从 HttpHeaders 中按照链路中的约定取出有特殊标识的 header 来构建 SpanContext。这样一来就实现了链路的跨进程

再回到代码层面,通过接口方法的声明我们可以看出来,Format 决定了 Carrier 的类型。下面来看看实际代码中如何实现

跨进程链路追踪演示

public class TracingRestTemplateInterceptor implements ClientHttpRequestInterceptor {
    private static final String SPAN_URI = "uri";

    private final Tracer tracer;

    public TracingRestTemplateInterceptor(Tracer tracer) {
        this.tracer = tracer;
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        ClientHttpResponse httpResponse;
        // 为当前 RestTemplate 调用,创建一个 Span
        Span span = tracer.buildSpan("RestTemplate-RPC")
                .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
                .withTag(SPAN_URI, request.getURI().toString())
                .start();
        // 将当前 SpanContext 注入到 HttpHeaders
        tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS,
                new HttpHeadersCarrier(request.getHeaders()));

        try (Scope scope = tracer.activateSpan(span)) {
            httpResponse = execution.execute(request, body);
        } catch (Exception ex) {
            TracingError.handle(span, ex);
            throw ex;
        } finally {
            span.finish();
        }
        return httpResponse;
    }

}

TracingRestTemplateInterceptor 实现了 RestTemplate 的拦截器,用于在 Http 调用之前,将 SpanContext 注入到 HttpHeaders 中。

Format.Builtin.HTTP_HEADERS 决定了当前的 Carrier 类型必须 TextMap (源码中可以看到,这里我没有列出)

public class HttpHeadersCarrier implements TextMap {
    private final HttpHeaders httpHeaders;

    public HttpHeadersCarrier(HttpHeaders httpHeaders)  {
        this.httpHeaders = httpHeaders;
    }

    @Override
    public void put(String key, String value) {
        httpHeaders.add(key, value);
    }

    @Override
    public Iterator<Map.Entry<String, String>> iterator() {
        throw new UnsupportedOperationException("Should be used only with tracer#inject()");
    }
}

tracer.inject 内部会调用 TextMap 的 put 方法,这样就将 SpanContext 注入到 HttpHeaders 了。

下面再来看看下游怎么写

public class TracingFilter implements Filter {
    private final Tracer tracer;

    public TracingFilter(Tracer tracer) {
        this.tracer = tracer;
    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
        HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
        // 通过 HttpHeader 构建 SpanContext
        SpanContext extractedContext = tracer.extract(Format.Builtin.HTTP_HEADERS,
                new HttpServletRequestExtractAdapter(httpRequest));

        String operationName = httpRequest.getMethod() + ":" + httpRequest.getRequestURI();
        Span span = tracer.buildSpan(operationName)
                .asChildOf(extractedContext)
                .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
                .start();

        httpResponse.setHeader("TraceId", span.context().toTraceId());

        try (Scope scope = tracer.activateSpan(span)) {
            filterChain.doFilter(servletRequest, servletResponse);
        } catch (Exception ex) {
            TracingError.handle(span, ex);
            throw ex;
        } finally {
            span.finish();
        }
    }
}

TracingFilter 实现了 Servlet Filter,每次请求访问到服务器时创建 Span,如果可以抽取到 SpanContext,则创建的是 Child Span

public class HttpServletRequestExtractAdapter implements TextMap {
    private final IdentityHashMap<String, String> headers;

    public HttpServletRequestExtractAdapter(HttpServletRequest httpServletRequest) {
        headers = servletHeadersToMap(httpServletRequest);
    }

    @Override
    public Iterator<Map.Entry<String, String>> iterator() {
        return headers.entrySet().iterator();
    }

    @Override
    public void put(String key, String value) {
        throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
    }

    private IdentityHashMap<String, String> servletHeadersToMap(HttpServletRequest httpServletRequest) {
        IdentityHashMap<String, String> headersResult = new IdentityHashMap<>();

        Enumeration<String> headerNamesIt = httpServletRequest.getHeaderNames();
        while (headerNamesIt.hasMoreElements()) {
            String headerName = headerNamesIt.nextElement();

            Enumeration<String> valuesIt = httpServletRequest.getHeaders(headerName);
            while (valuesIt.hasMoreElements()) {
                // IdentityHashMap 判断两个 Key 相等的条件为 k1 == k2
                // 为了让两个相同的字符串同时存在,必须使用 new String
                headersResult.put(new String(headerName), valuesIt.nextElement());
            }

        }

        return headersResult;
    }

}

tracer.extract 内部会调用 HttpServletRequestExtractAdapter iterator 方法用于构建 SpanContext

如果你看完了这些还是对于跨进程链路追踪有疑惑的,可以下载一下我写的 Demo,通过 Debug 来更进一步了解

https://github.com/TavenYin/taven-springcloud-learning/tree/master/jaeger-mutilserver

Demo 中的代码参考了 opentracing 的实现,做了相应的简化,诸位可以放心食用

实际使用

opentracing 已经实现了一些常用 api 的链路埋点,在没有什么特殊需求的时候,我们可以直接使用这些代码。具体参考

Spring 源码解析:@RequestBody @ResponseBody 的来龙去脉

@requestbody@responsebody 是实际开发中很常用的两个注解,通常用来解析和响应JSON,用起来十分的方便,这两个注解的背后是如何实现的?

源码版本

SpringBoot 2.1.3.RELEASE

RequestResponseBodyMethodProcessor

Resolves method arguments annotated with @requestbody and handles return values from methods annotated with @responsebody by reading and writing to the body of the request or response with an HttpMessageConverter.
An @requestbody method argument is also validated if it is annotated with @javax.validation.Valid. In case of validation failure, MethodArgumentNotValidException is raised and results in an HTTP 400 response status code if DefaultHandlerExceptionResolver is configured.

简单来说,这个类用来解析@RequestBody的参数和处理 @ResponseBody返回值,通过 HttpMessageConverter 这个接口来实现。

如果@RequestBody标记的参数包含@Valid,还会对这个参数进行校验。

继承关系

image.png

HandlerMethodArgumentResolver 和 HandlerMethodReturnValueHandler 分别是Spring的参数处理器和返回值处理器

  • HandlerMethodArgumentResolver
public interface HandlerMethodArgumentResolver {

	boolean supportsParameter(MethodParameter parameter);

	Object resolveArgument(MethodParameter parameter, @Nullable ModelAndViewContainer mavContainer,
			NativeWebRequest webRequest, @Nullable WebDataBinderFactory binderFactory) throws Exception;

}

Spring的参数解析器接口,supportsParameter() 方法用于判断解析器是否支持当前Controller方法的参数,resolveArgument() 则是将Request解析为Controller方法对应的参数Bean

  • HandlerMethodReturnValueHandler
public interface HandlerMethodReturnValueHandler {

	boolean supportsReturnType(MethodParameter returnType);

	void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception;

}

同理这个接口将Controller方法返回的对象,封装为Response

我们在实际开发时,也可以实现这两个接口自定义自己的参数解析和响应处理,RequestResponseBodyMethodProcessor 实现了这两个接口,既做了参数解析器也做了响应处理器。

RequestResponseBodyMethodProcessor 源码分析

我们来看一下 RequestResponseBodyMethodProcessor 是如何工作的,以解析参数为例

  • resolveArgument
    @Override
	public boolean supportsParameter(MethodParameter parameter) {
		// 支持标记@RequestBody的参数
		return parameter.hasParameterAnnotation(RequestBody.class);
	}
	
    @Override
	public Object resolveArgument(MethodParameter parameter, @Nullable ModelAndViewContainer mavContainer,
			NativeWebRequest webRequest, @Nullable WebDataBinderFactory binderFactory) throws Exception {

		parameter = parameter.nestedIfOptional();
		// 通过HttpMessageConverters 将请求体, 封装为@RequestBody所标记的XXBean
		Object arg = readWithMessageConverters(webRequest, parameter, parameter.getNestedGenericParameterType());
		String name = Conventions.getVariableNameForParameter(parameter);

		if (binderFactory != null) {
			WebDataBinder binder = binderFactory.createBinder(webRequest, arg, name);
			if (arg != null) {
				// 如果存在@Valid 对参数进行校验
				validateIfApplicable(binder, parameter);
				if (binder.getBindingResult().hasErrors() && isBindExceptionRequired(binder, parameter)) {
					throw new MethodArgumentNotValidException(parameter, binder.getBindingResult());
				}
			}
			if (mavContainer != null) {
				mavContainer.addAttribute(BindingResult.MODEL_KEY_PREFIX + name, binder.getBindingResult());
			}
		}

		return adaptArgumentIfNecessary(arg, parameter);
	}
	
	@Override
	protected <T> Object readWithMessageConverters(NativeWebRequest webRequest, MethodParameter parameter,
			Type paramType) throws IOException, HttpMediaTypeNotSupportedException, HttpMessageNotReadableException {

		HttpServletRequest servletRequest = webRequest.getNativeRequest(HttpServletRequest.class);
		Assert.state(servletRequest != null, "No HttpServletRequest");
		ServletServerHttpRequest inputMessage = new ServletServerHttpRequest(servletRequest);

		Object arg = readWithMessageConverters(inputMessage, parameter, paramType);
		if (arg == null && checkRequired(parameter)) {
			throw new HttpMessageNotReadableException("Required request body is missing: " +
					parameter.getExecutable().toGenericString(), inputMessage);
		}
		return arg;
	}

作为参数解析器,RequestResponseBodyMethodProcessor 支持所有标记@requestbody的参数。在resolveArgument()方法中,通过调用readWithMessageConverters() 将 Request 转为对应 arg。我们来看一下 readWithMessageConverters() 到底做了什么

  • readWithMessageConverters
	protected <T> Object readWithMessageConverters(HttpInputMessage inputMessage, MethodParameter parameter,
			Type targetType) throws IOException, HttpMediaTypeNotSupportedException, HttpMessageNotReadableException {

		// 当前请求的contentType
		MediaType contentType;
		boolean noContentType = false;
		try {
			contentType = inputMessage.getHeaders().getContentType();
		}
		catch (InvalidMediaTypeException ex) {
			throw new HttpMediaTypeNotSupportedException(ex.getMessage());
		}
		if (contentType == null) {
			noContentType = true;
			contentType = MediaType.APPLICATION_OCTET_STREAM;
		}

		// Controller参数的Class
		Class<?> contextClass = parameter.getContainingClass();
		Class<T> targetClass = (targetType instanceof Class ? (Class<T>) targetType : null);
		if (targetClass == null) {
			ResolvableType resolvableType = ResolvableType.forMethodParameter(parameter);
			targetClass = (Class<T>) resolvableType.resolve();
		}
		
		// 当前请求方式
		HttpMethod httpMethod = (inputMessage instanceof HttpRequest ? ((HttpRequest) inputMessage).getMethod() : null);
		Object body = NO_VALUE;

		EmptyBodyCheckingHttpInputMessage message;
		try {
			message = new EmptyBodyCheckingHttpInputMessage(inputMessage);
			
			// 遍历所有的HttpMessageConverter,
			for (HttpMessageConverter<?> converter : this.messageConverters) {
				Class<HttpMessageConverter<?>> converterType = (Class<HttpMessageConverter<?>>) converter.getClass();
				GenericHttpMessageConverter<?> genericConverter =
						(converter instanceof GenericHttpMessageConverter ? (GenericHttpMessageConverter<?>) converter : null);
						
				// 如果当前的HttpMessageConverter可以解析对应的 class和contentType
				if (genericConverter != null ? genericConverter.canRead(targetType, contextClass, contentType) :
						(targetClass != null && converter.canRead(targetClass, contentType))) {
					if (message.hasBody()) {
						HttpInputMessage msgToUse =
								getAdvice().beforeBodyRead(message, parameter, targetType, converterType);
								
						// 将Http报文转换为对应的class
						body = (genericConverter != null ? genericConverter.read(targetType, contextClass, msgToUse) :
								((HttpMessageConverter<T>) converter).read(targetClass, msgToUse));
								
						body = getAdvice().afterBodyRead(body, msgToUse, parameter, targetType, converterType);
					}
					else {
						body = getAdvice().handleEmptyBody(null, message, parameter, targetType, converterType);
					}
					break;
				}
			}
		}
		catch (IOException ex) {
			throw new HttpMessageNotReadableException("I/O error while reading input message", ex, inputMessage);
		}

		if (body == NO_VALUE) {
			if (httpMethod == null || !SUPPORTED_METHODS.contains(httpMethod) ||
					(noContentType && !message.hasBody())) {
				return null;
			}
			throw new HttpMediaTypeNotSupportedException(contentType, this.allSupportedMediaTypes);
		}

		MediaType selectedContentType = contentType;
		Object theBody = body;
		LogFormatUtils.traceDebug(logger, traceOn -> {
			String formatted = LogFormatUtils.formatValue(theBody, !traceOn);
			return "Read \"" + selectedContentType + "\" to [" + formatted + "]";
		});

		return body;
	}

上述代码核心逻辑就是遍历当前解析中配置的所有 HttpMessageConverter,如果某个Converter可以解析当前的 contentType,就把转换工作交给他去进行。

之前做过将默认解析替换为fastjson,当时就是添加一个FastJson实现的HttpMessageConverter,但是那时候并不理解这么做是为了什么,现在才恍然大悟...

  • handleReturnValue

RequestResponseBodyMethodProcessor 的Response处理逻辑和解析逻辑类似,找到一个支持的HttpMessageConverter,把响应工作交给他,感兴趣的童鞋可以自己找下源码。

RequestResponseBodyMethodProcessor 是怎么被调用的

上面讲了 RequestResponseBodyMethodProcessor 做了参数解析和响应处理的工作,那么他在Spring框架中是怎么被调用的,我们来看一下

image.png

如图,RequestMappingHandlerAdapter 的resolvers(Request解析器)、handlers(Response处理器)还有 ExceptionHandlerExceptionResolver 的handlers 调用了 RequestResponseBodyMethodProcessor

RequestMappingHandlerAdapter

我们只分析一下 RequestMappingHandlerAdapter ,该类对所有标记 @RequestMapping的注解进行解析和响应

在WebMvcConfigurationSupport中,配置了该Bean,将其加入到Spring容器中,我们自定义的参数解析、响应解析、和HttpMessageConvert 通过上图的方法set到 RequestMappingHandlerAdapter 中。

	@Bean
	public RequestMappingHandlerAdapter requestMappingHandlerAdapter() {
		RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
		adapter.setContentNegotiationManager(mvcContentNegotiationManager());
		// 获取所有HttpMessageConverter,包括我们自定义的配置
		adapter.setMessageConverters(getMessageConverters());
		adapter.setWebBindingInitializer(getConfigurableWebBindingInitializer());
		// 自定义的参数解析器
		adapter.setCustomArgumentResolvers(getArgumentResolvers());
		// 自定义的响应处理器
		adapter.setCustomReturnValueHandlers(getReturnValueHandlers());

		if (jackson2Present) {
			adapter.setRequestBodyAdvice(Collections.singletonList(new JsonViewRequestBodyAdvice()));
			adapter.setResponseBodyAdvice(Collections.singletonList(new JsonViewResponseBodyAdvice()));
		}

		AsyncSupportConfigurer configurer = new AsyncSupportConfigurer();
		configureAsyncSupport(configurer);
		if (configurer.getTaskExecutor() != null) {
			adapter.setTaskExecutor(configurer.getTaskExecutor());
		}
		if (configurer.getTimeout() != null) {
			adapter.setAsyncRequestTimeout(configurer.getTimeout());
		}
		adapter.setCallableInterceptors(configurer.getCallableInterceptors());
		adapter.setDeferredResultInterceptors(configurer.getDeferredResultInterceptors());

		return adapter;
	}

继续说 RequestMappingHandlerAdapter ,getDefaultArgumentResolvers() 封装了SpringBoot中的默认参数解析器,其中就有我们的本节所讲的 RequestResponseBodyMethodProcessor ,在afterPropertiesSet() 方法中调用了该方法

    @Override
    public void afterPropertiesSet() {
        // Do this first, it may add ResponseBody advice beans
        initControllerAdviceCache();

        if (this.argumentResolvers == null) {
            List<HandlerMethodArgumentResolver> resolvers = getDefaultArgumentResolvers();
            this.argumentResolvers = new HandlerMethodArgumentResolverComposite().addResolvers(resolvers);
        }
        if (this.initBinderArgumentResolvers == null) {
            List<HandlerMethodArgumentResolver> resolvers = getDefaultInitBinderArgumentResolvers();
            this.initBinderArgumentResolvers = new HandlerMethodArgumentResolverComposite().addResolvers(resolvers);
        }
        if (this.returnValueHandlers == null) {
            List<HandlerMethodReturnValueHandler> handlers = getDefaultReturnValueHandlers();
            this.returnValueHandlers = new HandlerMethodReturnValueHandlerComposite().addHandlers(handlers);
        }
    }
    
	private List<HandlerMethodArgumentResolver> getDefaultArgumentResolvers() {
        List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();

        // Annotation-based argument resolution
        resolvers.add(new RequestParamMethodArgumentResolver(getBeanFactory(), false));
        resolvers.add(new RequestParamMapMethodArgumentResolver());
        resolvers.add(new PathVariableMethodArgumentResolver());
        resolvers.add(new PathVariableMapMethodArgumentResolver());
        resolvers.add(new MatrixVariableMethodArgumentResolver());
        resolvers.add(new MatrixVariableMapMethodArgumentResolver());
        resolvers.add(new ServletModelAttributeMethodProcessor(false));
        
        // 添加RequestResponseBodyMethodProcessor
        resolvers.add(new RequestResponseBodyMethodProcessor(getMessageConverters(), this.requestResponseBodyAdvice));
        
        // 省略,详见源码...

        return resolvers;
    }
    
    private List<HandlerMethodReturnValueHandler> getDefaultReturnValueHandlers() {
        List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();

        // Single-purpose return value types
        handlers.add(new ModelAndViewMethodReturnValueHandler());
        handlers.add(new ModelMethodProcessor());
        handlers.add(new ViewMethodReturnValueHandler());
        handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(),
                this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager));
        handlers.add(new StreamingResponseBodyReturnValueHandler());
        handlers.add(new HttpEntityMethodProcessor(getMessageConverters(),
                this.contentNegotiationManager, this.requestResponseBodyAdvice));
        handlers.add(new HttpHeadersReturnValueHandler());
        handlers.add(new CallableMethodReturnValueHandler());
        handlers.add(new DeferredResultMethodReturnValueHandler());
        handlers.add(new AsyncTaskMethodReturnValueHandler(this.beanFactory));

        // Annotation-based return value types
        handlers.add(new ModelAttributeMethodProcessor(false));
        
        // 添加RequestResponseBodyMethodProcessor
        handlers.add(new RequestResponseBodyMethodProcessor(getMessageConverters(),
                this.contentNegotiationManager, this.requestResponseBodyAdvice));

        // 省略,详见源码...

        return handlers;
    }

RequestResponseBodyMethodProcessor 何时被调用

上面铺垫了这么多,终于来了

RequestMappingHandlerAdapter 的 invokeHandlerMethod 中 构建了 invocableMethod 对象并将所有的解析器和处理器封装到该对象,通过invocableMethod.invokeAndHandle() 进行对请求的解析,对controller的调用,以及响应的处理

image.png

invocableMethod.invokeAndHandle() 中是怎么样实现的

	public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {
			
		// 参数解析,并反射调用controller方法,获取方法返回值
		Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
		
		// 下面就是对Response的处理
		setResponseStatus(webRequest);

		if (returnValue == null) {
			if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
				mavContainer.setRequestHandled(true);
				return;
			}
		}
		else if (StringUtils.hasText(getResponseStatusReason())) {
			mavContainer.setRequestHandled(true);
			return;
		}

		mavContainer.setRequestHandled(false);
		Assert.state(this.returnValueHandlers != null, "No return value handlers");
		try {
			this.returnValueHandlers.handleReturnValue(
					returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
		}
		catch (Exception ex) {
			if (logger.isTraceEnabled()) {
				logger.trace(formatErrorForReturnValue(returnValue), ex);
			}
			throw ex;
		}
	}
	
	public Object invokeForRequest(NativeWebRequest request, @Nullable ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {
			
		// 调用参数解析器获取调用controller 所需的参数
		Object[] args = getMethodArgumentValues(request, mavContainer, providedArgs);
		if (logger.isTraceEnabled()) {
			logger.trace("Arguments: " + Arrays.toString(args));
		}
		
		// 反射调用 controller
		return doInvoke(args);
	}
	
	protected Object[] getMethodArgumentValues(NativeWebRequest request, @Nullable ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {

		if (ObjectUtils.isEmpty(getMethodParameters())) {
			return EMPTY_ARGS;
		}
		MethodParameter[] parameters = getMethodParameters();
		Object[] args = new Object[parameters.length];
		
		// 遍历解析参数
		for (int i = 0; i < parameters.length; i++) {
			MethodParameter parameter = parameters[i];
			parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
			args[i] = findProvidedArgument(parameter, providedArgs);
			if (args[i] != null) {
				continue;
			}
			
			// 这里的 resolvers 是一个封装了所有参数解析器的包装类,遍历所有解析器,如果不能找到支持当前参数的,抛出异常
			// 如果找到当前参数对应的解析器,则缓存起来,在下面的 resolvers.resolveArgument 时,直接使用
			if (!this.resolvers.supportsParameter(parameter)) {
				throw new IllegalStateException(formatArgumentError(parameter, "No suitable resolver"));
			}
			try {
				// 调用参数解析器
				args[i] = this.resolvers.resolveArgument(parameter, mavContainer, request, this.dataBinderFactory);
			}
			catch (Exception ex) {
				// Leave stack trace for later, exception may actually be resolved and handled..
				if (logger.isDebugEnabled()) {
					String error = ex.getMessage();
					if (error != null && !error.contains(parameter.getExecutable().toGenericString())) {
						logger.debug(formatArgumentError(parameter, error));
					}
				}
				throw ex;
			}
		}
		return args;
	}

invokeAndHandle() 里做了三件事

  1. 请求的封装(将请求封装为Controller中声明的参数)
  2. 使用封装之后的参数反射调用 Controller 方法
  3. 处理响应

梳理一下

请求封装时会遍历Controller中所声明的所有参数,为每一个参数找到对应的参数解析器(如果找不到则抛异常),然后对其进行解析。
参数解析器中又会根据当前请求的content-type找到对应HttpMessage转器,最终将Http请求解析为我们声明的参数。

当拿到了封装好的参数之后,通过反射调用Controller 方法

响应处理逻辑同上

一次Http请求经历了什么

回过头来再看,这时候我们发一个请求,在 RequestMappingHandlerAdapter 的 invokeHandlerMethod()中 debug一下,看一下线程栈是什么样的

简单画一张图来表示一下

END

欢迎各位给出意见和指正

Redisson 源码解析:如何利用Redis实现分布式可重入锁

正好最近研究了下Redisson的源码,和大家分享一下

前言

首先我们先回顾一下 Java 中的 ReentrantLock 是如何实现的?

这里我先简单介绍一下ReentrantLock 实现的思路

  • 锁标识:通过AQS的state变量作为锁标识,利用Java的CAS保证多线程竞争锁时的线程安全问题

  • 队列:未竞争到锁的线程进入AQS的队列并挂起,等待解锁时被唤醒(或者超时)

如何设计分布式可重入锁

首先锁标识,这个在Redis中很容易实现,可以用lock name 作为key,当前线程生成一个uuid,作为value,加上Redis 单线程模型,实现线程安全的锁竞争

这种方式在之前的博客里也提到过,可以参考下 Redis分布式锁的正确实现方式

但是如何基于Redis 做一个队列,像Java那样可以挂起唤醒线程呢?这点我在看源码之前一直没有想到...

那么Redisson 是如何做的呢?

答案:利用Redis的发布订阅,加上Java的Semaphore(信号量,不了解Semaphore的小伙伴可以Google一下)

Redisson 分布式锁实现思路

锁标识:Hash 数据结构,key 为锁的名字,filed 当前竞争锁成功线程的**"唯一标识"**,value 重入次数

队列:所有竞争锁失败的线程,会订阅当前锁的解锁事件,利用 Semaphore 实现线程的挂起和唤醒

源码分析

我们来看一下tryLock方法的源码

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁,返回null 代表获取锁成功,当获取锁失败时返回当前锁的释放时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        // 如果此时已经超过等待时间则获取锁失败
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        // 订阅解锁事件
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 等待订阅成功,成功后唤醒当前线程
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // 再次判断一下是否超时
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    // 等待解锁消息,此处利用Semaphore,锁未释放时,permits=0,线程处于挂起状态
                    // 当发布解锁消息时,当前的Semaphore对象的release() permits=1
                    // 所有的客户端都会有一个线程被唤醒,去尝试竞争锁
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

tryAcquire(leaseTime, unit, threadId); 这个方法我们下面会分析,现在我们只需要知道这个方法是用来获取锁就可以了

这个时候我们已经可以理清Redisson可重入锁的思路了

  1. 获取锁
  2. 如果获取锁失败,订阅解锁事件
  3. 之后是一个无限循环
while(true) {
  // 尝试获取锁

  // 判断是否超时

  // 等待解锁消息释放信号量 
  //(此时每个Java客户端都可能会有多个线程被挂起,但是只有一个线程会被唤醒)

  // 判断是否超时
}

利用信号量,合理控制线程对锁的竞争,合理利用系统资源,可以说做的灰常的奈斯了

需要注意:
!await(subscribeFuture, time, TimeUnit.MILLISECONDS) ,这里很多博客都解释错了,这里并不是等待发布解锁消息,只要订阅事件成功后,就会往下执行,真正等待解锁消息的是 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

tryLockInnerAsync

tryAcquire 内部依靠 tryLockInnerAsync 来实现获取锁的逻辑,我们来看下源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  // 是否存在锁
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                       // 不存在则创建
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      // 设置过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      // 竞争锁成功 返回null
                      "return nil; " +
                  "end; " +
                   // 如果锁已经被当前线程获取
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                       // 重入次数加1
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  // 锁被其他线程获取,返回锁的过期时间
                  "return redis.call('pttl', KEYS[1]);",

                    // 下面三个参数分别为 KEYS[1], ARGV[1], ARGV[2]
                    // 即锁的name,锁释放时间,当前线程唯一标识
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

tryLockInnerAsync 中利用lua脚本 和 Redis 单线程的特点来实现锁的竞争

这里可以看到锁的结构,和我们上文所说的一样,Hash 数据结构,key 为锁的name,filed 当前竞争锁成功线程的"唯一标识",value 重入次数

unlockInnerAsync

接下来我们再来看解锁的核心代码

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 用锁的name和线程唯一标识去判断是否存在这样的键值对
                // 解铃还须系铃人,不存在则无权解锁,返回null
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                // 解锁逻辑
                // 冲入次数-1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                // 如果大于0 代表当前线程重入锁多次无法解锁,更新锁的有效时间
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    // 解锁,删除key
                    "redis.call('del', KEYS[1]); " +
                    // 发布解锁消息
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                // KEYS[1],KEYS[2]
                // 锁的name,发布订阅的Channel
                Arrays.<Object>asList(getName(), getChannelName()), 
                // ARGV[1] ~ ARGV[3]
                // 解锁消息,释放时间,当前线程唯一标识
                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

发布解锁消息后,会调用到LockPubSub 的 onMessage,释放信号量,唤醒等待锁的线程

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            // 释放信号量
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

参考

欢迎点赞、转发。你的支持就是对我最大的帮助

规则引擎 Drools 执行流程浅析

什么是规则引擎

规则引擎是处理复杂规则集合的引擎。通过输入一些基础事件,以推演或者归纳等方式,得到最终的执行结果。规则引擎的核心作用在于将复杂、易变的规则从系统中抽离出来,由灵活可变的规则来描述业务需求

Drools 简介

Drools 是 Java 编写的一款开源规则引擎。Drools 的核心算法基于 Rete。早些版本中,Drools 使用的是基于 Rete 二次开发的 ReteOO 算法。在 7.x 版本的 Drools 中,其内部算法已经改为使用 Phreak。Phreak 也是Drools 团队自研的算法,虽然网上关于该算法的资料很少,但是总体来说与 Rete 算法相似。阅读本文之前可以先了解下 Rete 算法

编写一个简单的规则

使用 Drools 需要我们将原有的代码抽象成:Rule(规则) + Fact(事实)

首先我们先来编写一个简单的 demo 用于后文的原理学习

  1. 引入 pom 依赖
<properties>
    <drools.version>7.62.0.Final</drools.version>
</properties>
//...
<dependency>
    <groupId>org.drools</groupId>
    <artifactId>drools-compiler</artifactId>
    <version>${drools.version}</version>
</dependency>

<dependency>
    <groupId>org.drools</groupId>
    <artifactId>drools-mvel</artifactId>
    <version>${drools.version}</version>
</dependency>
  1. resource 目录下新建 order.drl
// 包名用于逻辑上区分 rule
package com.example.drools.order;

import com.example.drools.demo.HelloDrools.Order
import com.example.drools.demo.HelloDrools.User
import java.util.ArrayList;

global java.util.List list

// 指定方言为 java
dialect  "java"

// 规则的组成包括,条件(when 部分)和动作(then 部分)
// 当满足 when 时,会执行 then 的逻辑
rule "order can pay"
    when
        // 要求插入的 fact 必须有一个 User 对象
        // 并且 Order fact 必须满足 price < $user.price
        $user: User()
        $order: Order(price < $user.price)
    then
        System.out.println("username:" + $user.getName() + ", order price:" + $order.getPrice());
end

rule "calculate member point"
    when
        $user: User(level > 0)
        $order: Order()
    then
        Double point = $user.getPoint();
        if ($user.getLevel() > 10) {
            $user.setPoint(point + $order.getPrice());
        } else {
            $user.setPoint(point + $order.getPrice() * 0.5);
        }
        System.out.println("previous point:" + point + ", present point:" + $user.getPoint());
end

rule "user age > 18"
    when
        $user: User(age > 18)
    then
        System.out.println("user age > 18");
end

resource 下新建 META-INF\kmodule.xml

<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://www.drools.org/xsd/kmodule">
</kmodule>
  1. java 代码部分
package com.example.drools.demo;

import lombok.Data;
import org.kie.api.KieServices;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;


/**
 * @author tianwen.yin
 */
public class HelloDrools {

    public static void main(String[] args) {
        // 初始化
        KieServices kieServices = KieServices.Factory.get();
        KieContainer kieContainer = kieServices.newKieClasspathContainer();
        KieSession kieSession = kieContainer.newKieSession();
        // 构建 fact
        User user = new User();
        user.setName("taven");
        user.setPoint(10D);
        user.setLevel(5);
        user.setPrice(100D);
        user.setAge(19);

        Order order = new Order();
        order.setPrice(58D);
        // insert fact
        kieSession.insert(user);
        kieSession.insert(order);
        // 触发所有规则
        int fireCount = kieSession.fireAllRules();
        System.out.println("fireRuleCount:" + fireCount);
        kieSession.dispose();
    }

    @Data
    public static class Order {
        private Double price;
    }

    @Data
    public static class User {
        private String name;
        private Integer age;
        private Double price;
        private Integer level;
        private Double point;
    }

}
  1. 执行结果如下
username:taven, order price:58.0
previous point:10.0, present point:39.0
user age > 18
fireRuleCount:3

Drools 执行流程浅析

Drools 的使用看起来还是比较简单的,但是实际上真正落地使用还是需要详读官方文档的,不是本文重点,就不多赘述了。接下来我们进入正题,分析下执行流程

上述的图,是我结合源码总结的 Drools 执行流程图,最终目的就是根据插入的 fact 进行推演,如果能走到最后的 Terminal 节点则代表规则会被执行

我们先来了解一下上图中的所有节点

  • Object Type Node:简称 OTN,fact 会根据类型流转到不同的 OTN

  • Alpha Node:也被称为单输入节点,所有单对象的约束条件都会被构建为 Alpha 节点,例如 "age > 18","leve > 0"

  • Beta Node:双输入节点,不同对象之间的约束会被构建为 Beta 节点,例如 "order.price > user.price";当一个节点需要同时满足多个单对象约束时也是 Beta 节点;一个节点有超过两个条件约束时,会构建为多个 Beta 节点相连

    Beta 节点又分为 Join,Not,Exist 等,本文主要以 Join 节点为例进行说明。对于其他节点来说流程也是一样的,只不过某些具体细节的实现不同

    补充一张多 Beta 节点相连的图

  • LeftInputAdapterNode:左输入节点,这个节点的作用我最开始也很迷惑。后来在反复 Debug 后终于顿悟了,Beta 节点被设计成只存储右侧进入的 fact,左侧的数据来自 LeftInputAdapterNode 或者另一个 Beta 节点(可能理解不了,请继续往下读)

  • Rule Terminal:当一个 fact 流转到 Terminal 时,代表当前 fact 会触发该规则

  • 内存结构:关于 Drools 内存结构这块,与传统 RETE 算法不太一样,我也没有太仔细的研究这块,上图中只是把会存储 fact 的位置标识了出来

实际上 Drools 的源码非常复杂,其中包含的节点远不止提到的这些,我这里仅是基于 RETE 算法的核心内容来刨析下 Drools 原理

注:这里我补充下,Beta 节点的右侧分支,在进入到 Beta 之前,也是可以有 Alpha 节点的。并且当多个 rule 中包含相同条件时也会共用分支。改图和编 demo 实在太麻烦了

准备环节

  1. 在解析规则文件时,应该就已经创建了类似上图的节点关系(这个具体源码没有阅读)

  2. 上述示例中,kieSession.insert(user); 会将 fact 插入到 PropagationList

  3. 调用 kieSession.fireAllRules(); 后,进入到规则引擎核心环节

fireAllRules

字面意思已经很明显了,触发 Session 中的所有规则

flush 阶段

传播 PropagationList 中所有 fact,对照上图中 flush,OTN 下游的所有分支都会遍历访问

  1. 如果某条分支全部都是 Alpha 节点的话,可以直接传播到 Terminal 节点
  2. 如果 fact 流转到 LeftInputAdapterNode 的话,会将 fact 存储在 LeftInputAdapterNode 对应的内存中
  3. 如果 fact 流转到 Beta 节点右侧的话,会将 fact 存储在 Beta 节点的右侧

当分支走到 Alpha Terminal 节点时,构建一个 RuleAgendaItem 插入到 InternalAgendaGroup 中。这个动作代表当前规则需要进行下一个阶段 evaluateAndFire

Beta 节点的逻辑是,当所有的分支入口都存储了数据时,插入 InternalAgendaGroup(这句话可能不太好理解,当仅有一个 Beta 节点时,左右都存储了数据,就会插入 InternalAgendaGroup。如果是多个 Beta 节点相连的话,必须要满足第一个 Beta 的左右,以及下游所有 Beta 的右节点都有数据时才会插入 InternalAgendaGroup)。

evaluate(评估)

纯 Alpha 节点的分支,是没有这个步骤的

以 Beta 节点为例,evaluate 就是基于左右内存进行匹配,找到所有配对成功的数据放入一个集合,将这个集合继续带入到下一个节点,下个节点又可能是 Beta 节点或者 Terminal 节点。

  • 如果是 Beta 节点的话,则继续进行匹配,配对成功的集合带入到下一个节点...
  • 如果是 Terminal 节点的话,会将数据插入到 RuleExecutor 的 tupleList 中。这步又是啥意思呢,tupleList 的数据代表,这些数据会真正的代入到规则执行当中去(Alpha Terminal 也会执行这个操作)

Beta 节点这里还有一个细节,就是在进行左右配对的时候,并不只是遍历查找,而是在条件允许的情况下,Drools 在存储这些数据的时候会建立索引。上述示例的话,并没有建立索引,随便把条件改成 xx.a = yy.b 这种条件的话,就会建立索引。具体索引的实现也很简单,Drools 实现了一个类似 HashMap 的结构来管理索引,感兴趣的同学可以自己打个断点 debug 下。

断点 Class:PhreakJoinNode

注:上图中两个位置,只有一处会被执行

fire

这里会遍历 RuleExecutor 的 tupleList 执行这些规则。我们的规则文件在 Drools 运行时会被编译成字节码动态执行,具体这块具体用啥实现的没研究。

fire 阶段还有一个细节就是,我们的规则文件内部是可以调用 insert modify 这些函数的,这些 fact 同样会被插入到 PropagationList 中,内部也会再执行一次 PropagationList flush 操作。整个 fireAllRules 方法内部是一个循环,如果 fire 内部的 fact 命中了规则的话,在 fire 结束后还会继续执行 evaluateAndFire 直到全部触发完为止(所以在规则编写错误的情况下,Drools 可能进入死循环)

Conflict resolution

冲突解决简单来说就是,当我们知道了要执行的规则都有哪些时,还需要明确这些规则执行的顺序。

Drools 这里如何解决顺序问题的呢?回顾一下上面提到的 flush 阶段。RuleAgendaItem 插入到 InternalAgendaGroup 中这一步,InternalAgendaGroup 的默认实现为 AgendaGroupQueueImpl,AgendaGroupQueueImpl 中使用了 BinaryHeapQueue(二叉堆队列)来存储元素

通过二叉堆算法保证每次队列弹出优先级最高的规则,优先级的计算通过 PhreakConflictResolver 来完成

PhreakConflictResolver 从两个方面来判断优先级

  1. 规则是否声明 salience(salience 越大,优先级越高)
  2. 无法通过 salience 来计算的话,则通过规则 loadOrder 来决定优先级(规则在文件中越靠前则 loadOrder 就越高)

总结下

Drools 这种算法逻辑有什么好处呢?下述结论参考了 https://en.wikipedia.org/wiki/Rete_algorithm

  1. 通过共享节点来减少节点的冗余(如果多个 Rule 中有相同的条件,不会重复计算)

  2. fact 的变化,不需要完全重新评估,只需要进行增量评估(只需要对 fact 对应的 OTN 重新评估就可以)

  3. 支持高效的删除 fact (从 Drools 的角度来看这句话,fact 存储时会建立一个双向关联,也就是 fact 知道自己被哪些节点存储了,所以可以高效的删除)

本文介绍了 Drools 的执行流程,由于网上没有找到太多参考资料,大多数结论都是我自己总结出来的,如果有写错的地方欢迎各位指正。

最后

如果觉得我的文章对你有帮助,欢迎点赞,关注,转发。你的支持是对我最大的帮助

Spring RestTemplate 设置每次请求的 Timeout

前言

在实现这个功能之前,我也上网搜索了一下方案。大多数的解决方法都是定义多个 RestTemplate 设置不同的超时时间。有没有更好的方式呢?带着这个问题,我们一起来深入一下 RestTemplate 的源码

提示:本文包含了大量的源码分析,如果想直接看笔者是如何实现的,直接跳到最后的改造思路

版本

SpringBoot:2.3.4.RELEASE

RestTemplate

RestTemplate#doExecute

doExecute

RestTemplate 发送请求的方法,随便找一个最后都会走到上图的 doExecute。

从上图来看,这个方法做的就是这几件事

  1. createRequest
  2. 执行 RequestCallback
  3. 执行 Request
  4. 处理响应,将响应转换成用户声明的类型

RequestCallback 做了什么

  1. 根据 RestTemplate 中的定义 HttpMessageConverter 填充 Header Accept(支持的响应类型)
  2. 通过 HttpMessageConverter 转换 HttpBody

这里我们需要重点关注的是,createRequest 和 执行 Request 部分

createRequest

RestTemplate 中的 Request 是由 RequestFactory 完成创建。所以我们先来看下获取 RequestFactory 的逻辑

getRequestFactory

如果 RestTemplate 配置了 ClientHttpRequestInterceptor(拦截器)的话,则创建 InterceptingClientHttpRequestFactory,反之则直接获取 RequestFactory

  1. 我们可以通过 RestTemplate#setInterceptors 手动添加拦截器;
  2. 当使用 @LoadBalanced 标记 RestTemplate 时,RestTemplate 中也会被加入拦截器,具体原理可以参考 org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration

我们先来看下 InterceptingClientHttpRequestFactory 是什么逻辑

InterceptingClientHttpRequestFactory

InterceptingClientHttpRequestFactory#createRequest

createRequest 方法直接返回了 InterceptingClientHttpRequest,参考 doExecute 的逻辑,接下来会执行 InterceptingClientHttpRequest#execute,其内部会执行到 InterceptingRequestExecution#execute

InterceptingRequestExecution#execute

这里随便找一个拦截器的实现配合着来看

拦截器实现

逻辑梳理一下:

  1. InterceptingRequestExecution 会先去执行所有的拦截器
  2. 拦截器在执行完逻辑之后,再次 InterceptingRequestExecution#execute。InterceptingRequestExecution 再次调用下一个拦截器
  3. 在拦截器逻辑执行完之后,会去调用真正的 RequestFactory 创建请求,然后执行请求

在阅读完 InterceptingRequestExecution#execute 的代码之后,我们可以发现。这里仅仅是将 request 的 uri,method,header,body 复制到了 delegate 中。说明拦截器只能对这些属性进行处理,并不能在拦截器层面添加 timeout 的相关处理。

默认情况的 RequestFactory

默认情况下 RestTemplate 会使用 SimpleClientHttpRequestFactory 来创建请求,我们也可以在这个类中看到 setReadTimeout 方法。但是 SimpleClientHttpRequestFactory 并没有提供可以拓展的点,只能设置一个针对所有请求的超时时间。感兴趣的同学可以自己阅读下源码,这里就不贴出来了

HttpComponentsClientHttpRequestFactory

在阅读 HttpComponentsClientHttpRequestFactory 时,发现了可以扩展的地方。每次在创建 Request 的时候,都需要在 HttpContext 这个类中设置 RequestConfig,使用过 apache http client 的同学可能知道 RequestConfig 这个类,这个类包含了大量的属性可以定义请求的行为,这其中有一个属性 socketTimeout 正是我们所需要的。

这个类中我们可以扩展的地方就在 createHttpContext 方法中

默认情况下 createHttpContext 返回 null,然后会尝试从 HttpUriRequest 和 HttpClient 中获取 RequestConfig 赋值到 HttpContext 中。

createHttpContext 这个方法我们也来看一下

	@Nullable
	private BiFunction<HttpMethod, URI, HttpContext> httpContextFactory;

	/**
	 * Configure a factory to pre-create the {@link HttpContext} for each request.
	 * <p>This may be useful for example in mutual TLS authentication where a
	 * different {@code RestTemplate} for each client certificate such that
	 * all calls made through a given {@code RestTemplate} instance as associated
	 * for the same client identity. {@link HttpClientContext#setUserToken(Object)}
	 * can be used to specify a fixed user token for all requests.
	 * @param httpContextFactory the context factory to use
	 * @since 5.2.7
	 */
	public void setHttpContextFactory(BiFunction<HttpMethod, URI, HttpContext> httpContextFactory) {
		this.httpContextFactory = httpContextFactory;
	}

	/**
	 * Template methods that creates a {@link HttpContext} for the given HTTP method and URI.
	 * <p>The default implementation returns {@code null}.
	 * @param httpMethod the HTTP method
	 * @param uri the URI
	 * @return the http context
	 */
	@Nullable
	protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) {
		return (this.httpContextFactory != null ? this.httpContextFactory.apply(httpMethod, uri) : null);
	}

至此,已经很清晰了。我们可以通过调用 setHttpContextFactory 来改变 createHttpContext 的结果。

改造思路

我们可以开始进行改造了,思路如下

  1. 默认的超时时间等属性,我们可以通过 HttpComponentsClientHttpRequestFactory#setHttpClient 或者 HttpComponentsClientHttpRequestFactory#setReadTimeout 来决定
  2. 在需要自定义 RequsetConfig 的场景,将 RequsetConfig 存储在 ThreadLocal 中
  3. 我们自定义的 HttpContextFactory 在读取到 ThreadLocal 中的 RequsetConfig 后,会生成一个 HttpContext,其他情况返回 null(走原来的逻辑)

代码如下

public class CustomHttpContextFactory implements BiFunction<HttpMethod, URI, HttpContext> {
    @Override
    public HttpContext apply(HttpMethod httpMethod, URI uri) {
        RequestConfig requestConfig = RequestConfigHolder.get();
        if (requestConfig != null) {
            HttpContext context = HttpClientContext.create();
            context.setAttribute(HttpClientContext.REQUEST_CONFIG, requestConfig);
            return context;
        }
        return null;
    }
}
public class RequestConfigHolder {

    private static final ThreadLocal<RequestConfig> threadLocal = new ThreadLocal<>();

    public static void bind(RequestConfig requestConfig) {
        threadLocal.set(requestConfig);
    }

    public static RequestConfig get() {
        return threadLocal.get();
    }

    public static void clear() {
        threadLocal.remove();
    }
}

配置类

@Configuration
public class RestTemplateConfiguration {

    @Bean("customTimeoutRestTemplate")
    public RestTemplate customTimeout() {
        RestTemplate restTemplate = new RestTemplate();
        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
        requestFactory.setHttpContextFactory(new CustomHttpContextFactory());
        requestFactory.setReadTimeout(3000);
        restTemplate.setRequestFactory(requestFactory);
        return restTemplate;
    }

}

使用案例

    @GetMapping("custom/setTimeout")
    public String customSetTimeout(Integer timeout) {
        RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(timeout).build();
        try {
            RequestConfigHolder.bind(requestConfig);
            customTimeoutRestTemplate.getForObject("https://www.baidu.com", String.class);
        } finally {
            RequestConfigHolder.clear();
        }
        return "OK";
    }

思路就是这样,可以将这个使用方式封装为 注解 + AOP,这样用起来会更简单。

Demo

本文完整 demo:https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-restTemplate

最后

如果觉得我的文章对你有帮助,动动小手点下关注或者喜欢,你的支持是对我最大的帮助

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.