本文共 15396 字,大约阅读时间需要 51 分钟。
一、RPC原理
一次完整的RPC调用流程(同步调用)如下
1、服务消费方(client)调用以本地调用方式调用服务
2、client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体
3、client stub 找到服务地址,并将消息发送给服务端
4、server stub收到消息后进行解码
5、server stub根据解码结果调用本地服务
6、执行并将结果返回给server stub
7、server stub将返回结果打包发送给client stub
8、client stub接收消息,并解码
9、服务消费方(client)得到最终结果
RPC框架的目标就是封装2-8步
二、Netty通信原理
Netty是一个基于NIO 异步事件驱动框架,
用于开发高性能协议服务端与客户端,极大简化了TCP和UDP套结字服务器网络编程
BIO:(堵塞式网络通信模型)
Thread —read()—> InputStream ——> Socket
请求开启线程,线程进行读操作,转化成输入流的方式,进行Socket
缺点:当大量网络请求时,会造成系统性能下降和网络堵塞的情况
NIO:(非堵塞式网络通信)
Channel(Buffer)——> Register ——> Selector ——> Thread(one or more)
case 1:Connect
case 2:Accept
case 3:Read
case 4:Write
请求被放入Channel管道中,通过Register注册到Selector选择器上,
根据对应的请求状态(连接,读操作,写操作),开启线程执行操作
Dubbo原理
1、框架设计
【图例说明】
- 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
- 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
- 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
- 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。
【各层说明】
- config 配置层:对外配置接口,以
ServiceConfig
,ReferenceConfig
为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类- proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以
ServiceProxy
为中心,扩展接口为ProxyFactory
- registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为
RegistryFactory
,Registry
,RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以
Invoker
为中心,扩展接口为Cluster
,Directory
,Router
,LoadBalance
- monitor 监控层:RPC 调用次数和调用时间监控,以
Statistics
为中心,扩展接口为MonitorFactory
,Monitor
,MonitorService
- protocol 远程调用层:封装 RPC 调用,以
Invocation
,Result
为中心,扩展接口为Protocol
,Invoker
,Exporter
- exchange 信息交换层:封装请求响应模式,同步转异步,以
Request
,Response
为中心,扩展接口为Exchanger
,ExchangeChannel
,ExchangeClient
,ExchangeServer
- transport 网络传输层:抽象 mina 和 netty 为统一接口,以
Message
为中心,扩展接口为Channel
,Transporter
,Client
,Server
,Codec
- serialize 数据序列化层:可复用的一些工具,扩展接口为
Serialization
,ObjectInput
,ObjectOutput
,ThreadPool
【关系说明】
- 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。
- 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 Provider, Consumer, Registry, Monitor 划分逻辑拓普节点,保持统一概念。
- 而 Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有一个提供者时,是不需要 Cluster 的。
- Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
- 而 Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
- Registry 和 Monitor 实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起。
2、启动解析加载配置信息
2.1、流程图
2.2、DubboNameSpaceHandler(创建dubbo标签解析器)
public class DubboNamespaceHandler extends NamespaceHandlerSupport { public DubboNamespaceHandler() { } public void init() { this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); this.registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser()); } static { Version.checkDuplicate(DubboNamespaceHandler.class); }}2.3、DubboBeanDefinitionParser (parse方法解析dubbo标签)
继承Spring的Bean定义分析器接口(BeanDefinitionParser )并实现parse方式
public interface BeanDefinitionParser { @Nullable BeanDefinition parse(Element var1, ParserContext var2);}DubboBeanDefinitionParser 代码太多,部分关键代码截图
3、服务暴露流程
3.1、流程图
3.2、ServiceBean(ServiceFactoryBean)
public class ServiceBeanextends ServiceConfig implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener , BeanNameAware 创建容器
继承了applicationlistener,当spring容器创建完毕后会回调,这个方法
相当于会回调如下这两个方法
getProvider就是获取标签内对应的属性值,这个方法的作用就是将这些属性值保存
例如,provider信息,Application信息,Monitor信息等等
当IOC容器刷新后,回调这个方法,当这个事件不是延迟暴露,不是已暴露的,就执行export方法
3.3、Service.config
跟进export,来到Service.config,doexport()最后调用doExportUrls()方法
doExportUrls()
1、读取注册中心的URL地址
2、遍历protocol协议,调用doExportUrlsForlProtocol(协议,注册中心的URL)
代理工厂获取执行器
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
执行器的组成:getInvoker(ref(实现对象),infaceClass(接口),注册中心的URL地址)
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
获取扩展的类加载器并且得到它的适配器:1、RegistryProtocol 2、DubboProtocol
基于Java的spi机制,可以得到当前要使用的Protocol,使用dubbo协议,DubboProtocol,要使用注册中心RegistryProtocol
3.4、RegistryProtocol
调用export方法@Override publicExporter export(final Invoker originInvoker) throws RpcException { //export invoker 在本地暴露服务 final ExporterChangeableWrapper exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registedProviderUrl.getParameter("register", true); 提供者消费者注册表,注册提供者(提供者,注册中心URL,提供者的URL) ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter (exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); } 跟进ProviderConsumerRegTable的registerProvider方法
providerInvokers<127.0.0.1:20881,invoker>public static ConcurrentHashMap> providerInvokers = new ConcurrentHashMap >(); public static ConcurrentHashMap > consumerInvokers = new ConcurrentHashMap >(); public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) { ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl); String serviceUniqueName = providerUrl.getServiceKey(); Set invokers = providerInvokers.get(serviceUniqueName); if (invokers == null) { providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet ()); invokers = providerInvokers.get(serviceUniqueName); } invokers.add(wrapperInvoker); } 3.5、DubboProtocol
部分关键代码
@Override publicExporter export(Invoker invoker) throws RpcException { URL url = invoker.getUrl(); // export service. 服务暴露的URL(service) String key = serviceKey(url); 将invoker包装成暴露器 DubboExporter exporter = new DubboExporter (invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } 启动netty服务器监听20880端口 openServer(url); optimizeSerialization(url); return exporter; } 继续跟进openServer方法
private void openServer(URL url) { // find server. 127.0.0.1:20881 String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { 创建一个信息交换服务器 创建(启动)netty服务器,监听20881端口 serverMap.put(key, createServer(url)); } else { // server supports reset, use together with override server.reset(url); } } }
4、服务引用流程
4.1、流程图
4.2、ReferenceBean
public class ReferenceBeanextends ReferenceConfig implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean 由于它实现了 FactoryBean,因此在Spring就会把容器中studentService注入
注入是通过调用getObject方法实现的,跟进 get() 方法
@Overridepublic Object getObject() throws Exception { return get();}4.3、ReferenceConfig
到了ReferenceConfig的 get() 方法,跟进 init() 方法
public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref;}发现 ReferenceConfig 的 init() 方法生成了一个代理对象
这个代理对象里是一个map,map里包含了 注册中心的地址,调用的方法,以及要调用的接口
跟进ReferenceConfig 的createProxy()方法
发现refprotocol调用refer方法后会获得invoker,执行器
注册中心的地址:urls.get(0)
远程引用的接口:interfaceClass
4.4、跟进refer(),就到了protocol的refer()里
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
获取扩展的类加载器并且得到它的适配器:1、RegistryProtocol 2、DubboProtocol
基于Java的spi机制,可以得到当前要使用的Protocol,使用dubbo协议,DubboProtocol,要使用注册中心RegistryProtocol
@SPI("dubbo")public interface Protocol4.5、RegistryProtocol
于是,我们查看RegistryProtocol 的refer(注册中心地址,要引用的类型)方法
跟进dorefer()方法,发现消费者是在这里订阅服务的
4.6、DubboProtocol
订阅服务后, DubboProtocol执行了refer方法
在跟进getclients方法后,发现到了netty的底层,具体是创建了netty的客户端去监听这个127.0.0.1:20881(URL)地址
具有client属性的invoker被返回
DubboProtocol refer()方法执行完后,RegistryProtocol执行最后一步
在注册表里填入consumer的信息
将Consumer的URL地址,和对应的invoker执行器填入注册表
5、服务调用流程
5.1、调用链,展开总设计图的红色调用链,如下:
5.2、InvokerInvocationHandler
跟进invoke方法
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class [] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }5.3、AbstractClusterInvoker
@Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; 从注册中心获取可执行的对应invoker列表 List> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { 获取负载均衡机制 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } 5.4、FailoverClusterInvoker
根据负载均衡策略,选择对应的invoker
5.5、DubboInvoker
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); 发送请求的客户端 ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter
转载地址:http://gqazi.baihongyu.com/