博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Dubbo总结(五)
阅读量:3958 次
发布时间:2019-05-24

本文共 15396 字,大约阅读时间需要 51 分钟。

一、RPC原理

 

一次完整的RPC调用流程(同步调用)如下

1、服务消费方(client)调用以本地调用方式调用服务

2、client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体

3、client stub 找到服务地址,并将消息发送给服务端

4、server stub收到消息后进行解码

5、server stub根据解码结果调用本地服务

6、执行并将结果返回给server stub

7、server stub将返回结果打包发送给client stub

8、client stub接收消息,并解码

9、服务消费方(client)得到最终结果

RPC框架的目标就是封装2-8步

二、Netty通信原理

 

Netty是一个基于NIO 异步事件驱动框架,

用于开发高性能协议服务端与客户端,极大简化了TCP和UDP套结字服务器网络编程

 

BIO:(堵塞式网络通信模型) 

Thread —read()—> InputStream ——> Socket

请求开启线程,线程进行读操作,转化成输入流的方式,进行Socket

缺点:当大量网络请求时,会造成系统性能下降和网络堵塞的情况

 

NIO:(非堵塞式网络通信)

Channel(Buffer)——> Register ——> Selector ——> Thread(one or more)

case 1:Connect

case 2:Accept

case 3:Read

case 4:Write

请求被放入Channel管道中,通过Register注册到Selector选择器上,

根据对应的请求状态(连接,读操作,写操作),开启线程执行操作

Dubbo原理

 

1、框架设计

/dev-guide/images/dubbo-framework.jpg

【图例说明】

  1. 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  2. 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  3. 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  4. 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

【各层说明】

  • config 配置层:对外配置接口,以 ServiceConfigReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactoryRegistryRegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 ClusterDirectoryRouterLoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactoryMonitorMonitorService
  • protocol 远程调用层:封装 RPC 调用,以 InvocationResult 为中心,扩展接口为 ProtocolInvokerExporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 RequestResponse 为中心,扩展接口为 ExchangerExchangeChannelExchangeClientExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 ChannelTransporterClientServerCodec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 SerializationObjectInputObjectOutputThreadPool

【关系说明】

  1. 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。
  2. 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 Provider, Consumer, Registry, Monitor 划分逻辑拓普节点,保持统一概念。
  3. 而 Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有一个提供者时,是不需要 Cluster 的。
  4. Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
  5. 而 Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
  6. Registry 和 Monitor 实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起。

 

2、启动解析加载配置信息

2.1、流程图

2.2、DubboNameSpaceHandler(创建dubbo标签解析器)

public class DubboNamespaceHandler extends NamespaceHandlerSupport {    public DubboNamespaceHandler() {    }    public void init() {        this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));        this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));        this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));        this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));        this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));        this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));        this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));        this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));        this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));        this.registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());    }    static {        Version.checkDuplicate(DubboNamespaceHandler.class);    }}

2.3、DubboBeanDefinitionParser (parse方法解析dubbo标签)

继承Spring的Bean定义分析器接口(BeanDefinitionParser )并实现parse方式

public interface BeanDefinitionParser {    @Nullable BeanDefinition parse(Element var1, ParserContext var2);}

DubboBeanDefinitionParser 代码太多,部分关键代码截图

 

3、服务暴露流程

3.1、流程图

3.2、ServiceBean(ServiceFactoryBean)

public class ServiceBean
extends ServiceConfig
implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener
, BeanNameAware

创建容器

继承了applicationlistener,当spring容器创建完毕后会回调,这个方法

相当于会回调如下这两个方法 

getProvider就是获取标签内对应的属性值,这个方法的作用就是将这些属性值保存

例如,provider信息,Application信息,Monitor信息等等

当IOC容器刷新后,回调这个方法,当这个事件不是延迟暴露,不是已暴露的,就执行export方法 

 3.3、Service.config

跟进export,来到Service.config,doexport()最后调用doExportUrls()方法

doExportUrls()

1、读取注册中心的URL地址

2、遍历protocol协议,调用doExportUrlsForlProtocol(协议,注册中心的URL)

 代理工厂获取执行器

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

执行器的组成:getInvoker(ref(实现对象),infaceClass(接口),注册中心的URL地址)

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

获取扩展的类加载器并且得到它的适配器:1、RegistryProtocol  2、DubboProtocol

基于Java的spi机制,可以得到当前要使用的Protocol,使用dubbo协议,DubboProtocol,要使用注册中心RegistryProtocol

3.4、RegistryProtocol

调用export方法@Override    public 
Exporter
export(final Invoker
originInvoker) throws RpcException { //export invoker 在本地暴露服务 final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registedProviderUrl.getParameter("register", true); 提供者消费者注册表,注册提供者(提供者,注册中心URL,提供者的URL) ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter
(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }

跟进ProviderConsumerRegTable的registerProvider方法

providerInvokers<127.0.0.1:20881,invoker>public static ConcurrentHashMap
> providerInvokers = new ConcurrentHashMap
>(); public static ConcurrentHashMap
> consumerInvokers = new ConcurrentHashMap
>(); public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) { ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl); String serviceUniqueName = providerUrl.getServiceKey(); Set
invokers = providerInvokers.get(serviceUniqueName); if (invokers == null) { providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet
()); invokers = providerInvokers.get(serviceUniqueName); } invokers.add(wrapperInvoker); }

3.5、DubboProtocol

部分关键代码

@Override    public 
Exporter
export(Invoker
invoker) throws RpcException { URL url = invoker.getUrl(); // export service. 服务暴露的URL(service) String key = serviceKey(url); 将invoker包装成暴露器 DubboExporter
exporter = new DubboExporter
(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } 启动netty服务器监听20880端口 openServer(url); optimizeSerialization(url); return exporter; }

继续跟进openServer方法

private void openServer(URL url) {        // find server.        127.0.0.1:20881        String key = url.getAddress();        //client can export a service which's only for server to invoke        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);        if (isServer) {            ExchangeServer server = serverMap.get(key);            if (server == null) {                创建一个信息交换服务器                创建(启动)netty服务器,监听20881端口                    serverMap.put(key, createServer(url));            } else {                // server supports reset, use together with override                server.reset(url);            }        }    }

4、服务引用流程

4.1、流程图

4.2、ReferenceBean

public class ReferenceBean
extends ReferenceConfig
implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean

由于它实现了 FactoryBean,因此在Spring就会把容器中studentService注入

注入是通过调用getObject方法实现的,跟进 get() 方法

@Overridepublic Object getObject() throws Exception {    return get();}

4.3、ReferenceConfig

到了ReferenceConfig的 get() 方法,跟进 init() 方法

public synchronized T get() {    if (destroyed) {        throw new IllegalStateException("Already destroyed!");    }    if (ref == null) {        init();    }    return ref;}

发现 ReferenceConfig 的 init() 方法生成了一个代理对象

这个代理对象里是一个map,map里包含了 注册中心的地址,调用的方法,以及要调用的接口

跟进ReferenceConfig 的createProxy()方法

发现refprotocol调用refer方法后会获得invoker,执行器

注册中心的地址:urls.get(0) 

远程引用的接口:interfaceClass

4.4、跟进refer(),就到了protocol的refer()里

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

获取扩展的类加载器并且得到它的适配器:1、RegistryProtocol  2、DubboProtocol

基于Java的spi机制,可以得到当前要使用的Protocol,使用dubbo协议,DubboProtocol,要使用注册中心RegistryProtocol

@SPI("dubbo")public interface Protocol

4.5、RegistryProtocol

于是,我们查看RegistryProtocol 的refer(注册中心地址,要引用的类型)方法

跟进dorefer()方法,发现消费者是在这里订阅服务的

4.6、DubboProtocol

订阅服务后, DubboProtocol执行了refer方法

在跟进getclients方法后,发现到了netty的底层,具体是创建了netty的客户端去监听这个127.0.0.1:20881(URL)地址

具有client属性的invoker被返回

DubboProtocol refer()方法执行完后,RegistryProtocol执行最后一步

在注册表里填入consumer的信息

将Consumer的URL地址,和对应的invoker执行器填入注册表 

5、服务调用流程

5.1、调用链,展开总设计图的红色调用链,如下:

/dev-guide/images/dubbo-extension.jpg

 

5.2、InvokerInvocationHandler

跟进invoke方法

@Override    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        String methodName = method.getName();        Class
[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }

5.3、AbstractClusterInvoker

@Override    public Result invoke(final Invocation invocation) throws RpcException {        checkWhetherDestroyed();        LoadBalance loadbalance = null;        从注册中心获取可执行的对应invoker列表        List
> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { 获取负载均衡机制 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }

5.4、FailoverClusterInvoker

根据负载均衡策略,选择对应的invoker

5.5、DubboInvoker

@Override    protected Result doInvoke(final Invocation invocation) throws Throwable {        RpcInvocation inv = (RpcInvocation) invocation;        final String methodName = RpcUtils.getMethodName(invocation);        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());        inv.setAttachment(Constants.VERSION_KEY, version);                发送请求的客户端        ExchangeClient currentClient;        if (clients.length == 1) {            currentClient = clients[0];        } else {            currentClient = clients[index.getAndIncrement() % clients.length];        }        try {            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);            if (isOneway) {                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);                currentClient.send(inv, isSent);                RpcContext.getContext().setFuture(null);                return new RpcResult();            } else if (isAsync) {                ResponseFuture future = currentClient.request(inv, timeout);                RpcContext.getContext().setFuture(new FutureAdapter(future));                return new RpcResult();            } else {                RpcContext.getContext().setFuture(null);                客户端发起请求,并得到响应结果                    return (Result) currentClient.request(inv, timeout).get();            }        } catch (TimeoutException e) {            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);        } catch (RemotingException e) {            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

 

转载地址:http://gqazi.baihongyu.com/

你可能感兴趣的文章
linux杀死进程详解
查看>>
字符串表示的IP地址与点分式表示的IP地址间的相互转化
查看>>
implicit declaration of function 这种警告问题的原因及解决方法
查看>>
utorrent如何处理占资源过大的问题
查看>>
<好文分享>妖怪和和尚过河问题
查看>>
uTP协议的前世今生(from wikipedia)
查看>>
uTP协议的前世今生(from wikipedia)
查看>>
utp的包头格式<2>
查看>>
开源搜索引擎的比较(收藏几个博客文章)最近要做搜索系统的研究方向
查看>>
asii码表
查看>>
<读书笔记>WebUsage Mining:Discovery and Applications of Usage Patterns from Web Data
查看>>
并查集(Disjoint Sets)
查看>>
在Linux下安装MATLAB
查看>>
readme
查看>>
微服务概念
查看>>
数据库分库分表
查看>>
hibernate inverse 和cascade讲解
查看>>
建模工具Rose的学习
查看>>
javascript ajax提出异步请求
查看>>
Hibernate 中的 QBC
查看>>