会不断更新!冲冲冲!跳转连接
https://blog.csdn.net/qq_35349982/category_10317485.html
分布式理论、架构设计(自定义RPC)
1.分布式架构
1.1什么是分布式系统
分布式系统,就是一个业务拆分成多个子业务,分布在不同的服务器节点,共同构成的系统称为分布式系统,同一个分布式系统中的服务器节点在空间部署上是可以随意分布的,这些服务器可能放在不同的机柜中,也可能在不同的机房中,甚至分布在不同的城市。
1.2分布式与集群的区别
集群:多个人在一起作同样的事 。
分布式 :多个人在一起作不同的事 。
1.3特点
(1)分布性
(2)对等性
(3)并发性
(4)缺乏全局时钟
(5)故障总是会发生
1.4 分布式系统面临的问题
通信异常
网络本身的不可靠性,因此每次网络通信都会伴随着网络不可用的风险
网络分区
整个系统的网络环境被切分成了若干个孤立的区域,分布式系统就会出现局部小集群,
节点故障
组成分布式系统的服务器节点出现的宕机或"僵死"现象
三态
分布式系统每一次请求与响应存在特有的“三态”概念,即成功、失败和超时。
分布式理论
1.一致性
- 分布式一致性
分布式数据一致性,指的是数据在多份副本中存储时,各副本中的数据是一致的。
- 副本一致性
- 强一致性
- 弱一致性
- 读写一致性
- 单调读一致性
- 因果一致性
- 最终一致性
2.CAP定理
CAP 理论含义是,一个分布式系统不可能同时满足一致性(C:Consistency),可用性(A: Availability)和分区容错性(P:Partition tolerance)这三个基本需求,最多只能同时满足其中的2个。
分布式一致性的特点?
1.由于存在数据库同步过程,写操作的响应会有一定的延迟
2.为了保定数据的一致性,对资源暂时锁定,待数据同步完成后释放锁定资源
3.如果请求数据同步失败的节点则会返回错误信息, 一定不会返回旧数据.
3.BASE 理论
BASE:全称:**Basically Available(基本可用),Soft state(软状态),和 Eventually consistent(最终一致性)**三个短语的缩写.
BASE是对CAP中一致性和可用性权衡的结果,BASE理论的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。
4.分布式事务
1. 四大特性
- Atomicity(原子性)
- **Consistency(一致性)**
- **Isolation**(隔离性)
- Durablity(持久性)
5.一致性协议 2PC
2PC ( Two-Phase Commit缩写)即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Preparephase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。
6.一致性协议 3PC
3PC,全称 “three phase commit”,是 2PC 的改进版,将 2PC 的 “提交事务请求” 过程一分为二,共形成了由CanCommit、PreCommit和doCommit三个阶段组成的事务处理协议。
7.一致性算法 Paxos
Paxos相关概念
提案(Proposal)。最终要达成一致的value就在提案里
提案 (Proposal):Proposal信息包括提案编号 (Proposal ID) 和提议的值 (Value)
Client:客户端
客户端向分布式系统发出请求,并等待响应。例如,对分布式文件服务器中文件的写请求。
Proposer:提案发起者
提案者提倡客户请求,试图说服Acceptor对此达成一致,并在发生冲突时充当协调者以推动协议向前发展
Acceptor:决策者,可以批准提案
Acceptor可以接受(accept)提案;如果某个提案被选定(chosen),那么该提案里的value就被选定了
Learners:最终决策的学习者 学习者充当该协议的复制因素
8.一致性算法 Raft
http://thesecretlivesofdata.com/raft/ 动画演示
在Raft中,任何时候一个服务器都可以扮演下面的角色之一:
领导者(leader):处理客户端交互,日志复制等动作,一般一次只有一个领导者
候选者(candidate):候选者就是在选举过程中提名自己的实体,一旦选举成功,则成为领导者
跟随者(follower):类似选民,完全被动的角色,这样的服务器等待被通知投票
2.分布式系统设计策略
心跳检测高可用设计容错性负载均衡
3.分布式架构网络通信
3.1基本原理
在底层层面去看,网络通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO,主要有bio、nio、aio三种方式,
3.2什么是RPC
RPC全称为remote procedure call,即远程过程调用。
借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式
3.3RMI
Java RMI 指的是远程方法调用 (Remote Method Invocation),是java原生支持的远程调用 ,采用JRMP(JavaRemote Messageing protocol)作为通信协议,可以认为是纯java版本的分布式远程调用解决方案, RMI主要用于不同虚拟机之间的通信,这些虚拟机可以在不同的主机上、也可以在同一个主机上,这里的通信可以理解为一个虚拟机上的对象调用另一个虚拟机上对象的方法
3.4BIO,NIO,AIO
同步和异步
同步(synchronize)、异步(asychronize)是指应用程序和内核的交互而言的.
同步:指用户进程触发IO操作等待或者轮训的方式查看IO操作是否就绪。
异步:当一个异步进程调用发出之后,调用者不会立刻得到结果。而是在调用发出之后,被调用者通过状态、通知来通知调用者,或者通过回调函数来处理这个调用。
阻塞和非阻塞
阻塞和非阻塞是针对于进程访问数据的时候,根据IO操作的就绪状态来采取不同的方式
阻塞: 阻塞方式下读取和写入将一直等待,
非阻塞: 非阻塞方式下,读取和写入方法会理解返回一个状态值.
BIO 同步阻塞IO。B代表blocking
服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善
NIO 同步非阻塞IO (non-blocking IO / new io)
服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器上**,多路复用器轮询到连接有IO请求时才启动一个线程进行处理**
AIO 异步非阻塞IO。A代表asynchronize
**当有流可以读时,操作系统会将可以读的流传入read方法的缓冲区,并通知应用程序,对于写操作,OS将write方法的流写入完毕是操作系统会主动通知应用程序。**因此read和write都是异步 的,完成后会调用回调函数。
3.5Netty
1.概念
Netty 是由 JBOSS 提供一个异步的、 基于事件驱动的网络编程框架。
2.为什么使用Netty
NIO缺点
NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、
ByteBuffffer 等.
可靠性不强,开发工作量和难度都非常大
NIO 的 Bug。例如 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。
Netty优点
对各种传输协议提供统一的 API
高度可定制的线程模型——单线程、一个或多个线程池
更好的吞吐量,更低的等待延迟
更少的资源消耗
最小化不必要的内存拷贝
阻塞的例子
老张煮开水。 老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
1 老张把水壶放到火上,站立着等水开。(同步阻塞)
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
3 老张把响水壶放到火上,立等水开。(异步阻塞)
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
3.Netty使用
1.执行逻辑
2.服务器端
public class NettyServer {
public static void main(String
[] args
) throws InterruptedException
{
NioEventLoopGroup bossGroup
= new NioEventLoopGroup();
NioEventLoopGroup workGroup
= new NioEventLoopGroup();
ServerBootstrap serverBootstrap
= new ServerBootstrap();
serverBootstrap
.group(bossGroup
,workGroup
)
.channel(NioServerSocketChannel
.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel
) throws Exception
{
ChannelPipeline pipeline
= nioSocketChannel
.pipeline();
pipeline
.addFirst(new StringEncoder());
pipeline
.addLast(new StringDecoder());
pipeline
.addLast(new SimpleChannelInboundHandler<String>() {
protected void channelRead0(ChannelHandlerContext channelHandlerContext
, String msg
) throws Exception
{
System
.out
.println(msg
);
}
});
}
});
ChannelFuture future
= serverBootstrap
.bind(9999).sync();
future
.channel().closeFuture().sync();
}
}
3.客户端
public class NettyClient {
public static void main(String
[] args
) throws InterruptedException
{
NioEventLoopGroup group
= new NioEventLoopGroup();
Bootstrap bootstrap
= new Bootstrap();
bootstrap
.group(group
)
.channel(NioSocketChannel
.class)
.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel channel
) throws Exception
{
channel
.pipeline().addLast(new StringEncoder());
}
});
Channel channel
= bootstrap
.connect("127.0.0.1", 9999).channel();
while (true) {
channel
.writeAndFlush("hello server .. this is client ...");
Thread
.sleep(2000);
}
}
}
4.手写RPC
1.流程
客户端
创建代理对象
创建线程池对象
声明自定义处理器(UserClientHandler)
初始化客户端
初始化自定义处理器UserClientHandler创建连接池对象初始化客户端配置客户端属性
设置通道(NIO),设置协议TCP监听channel 并初始化
设置管道设置编码添加自定义处理器
设置服务器连接
给自定义处理器设置参数
线程处理处理call() 写操作,并返回结果
服务端
初始化服务器
设置两个线程池
配置引导类
设置通道为NIO创建监听channel
获取管道对象设置编码设置自定义处理器 4.绑定端口
##2.代码
1.序列化
//采用JSON的方式,定义JSONSerializer的实现类:(其他序列化方式,可以自行实现序列化接口)
public class JSONSerializer implements Serializer{
/**
* java对象转换为二进制
*
* @param object
* @return
*/
public byte[] serialize(Object object) throws IOException {
return JSON.toJSONBytes(object);
//return new byte[0];
}
/**
* 二进制转换成java对象
*
* @param clazz
* @param bytes
* @param <T>
* @return
*/
public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {
return JSON.parseObject(bytes, clazz);
//return null;
}
}
2.客户端
1.反射,初始化类
public class RPCConsumer {
private static ExecutorService executorService
=
Executors
.newFixedThreadPool(Runtime
.getRuntime().availableProcessors());
private static UserClientHandler userClientHandler
;
public static void initClient() throws InterruptedException
{
userClientHandler
= new UserClientHandler();
EventLoopGroup group
= new NioEventLoopGroup();
Bootstrap bootstrap
= new Bootstrap();
bootstrap
.group(group
)
.channel(NioSocketChannel
.class)
.option(ChannelOption
.TCP_NODELAY
,true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel
) throws Exception
{
ChannelPipeline pipeline
= socketChannel
.pipeline();
pipeline
.addLast( new RpcEncoder(RpcRequest
.class, new JSONSerializer()));
pipeline
.addLast(new StringDecoder());
pipeline
.addLast(userClientHandler
);
}
});
bootstrap
.connect("127.0.0.1",8999).sync();
}
public static Object
createProxy(Class
<?> serviceClass
, final RpcRequest rpcRequest
){
return Proxy
.newProxyInstance(Thread
.currentThread().getContextClassLoader(),
new Class[]{serviceClass
}, new InvocationHandler() {
public Object
invoke(Object o
, Method method
, Object
[] objects
) throws Throwable
{
if(userClientHandler
== null
){
initClient();
}
userClientHandler
.setParam(rpcRequest
);
Object result
= executorService
.submit(userClientHandler
).get();
return result
;
}
});
}
}
2.自定义事件处理器
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context
;
private String result
;
private RpcRequest rpcRequest
;
@Override
public void channelActive(ChannelHandlerContext ctx
) throws Exception
{
this.context
= ctx
;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
result
= msg
.toString();
notify();
}
public synchronized Object
call() throws Exception
{
context
.writeAndFlush(rpcRequest
);
wait();
return result
;
}
public void setParam(RpcRequest param
){
this.rpcRequest
= param
;
}
}
3.服务端
1.获取扫描容器中的类
@Component
public class ApplicationContextUtil implements ApplicationContextAware{
private static ApplicationContext applicationContext
;
public void setApplicationContext(ApplicationContext applicationContext
) throws BeansException
{
System
.out
.println("applicationContext正在初始化");
this.applicationContext
=applicationContext
;
}
public static <T> T
getBean(Class
<T> clazz
){
if(applicationContext
==null
){
System
.out
.println("applicationContext是空的");
}else{
}
return applicationContext
.getBean(clazz
);
}
public static ApplicationContext
getApplicationContext(){
return applicationContext
;
}
}
2.自定义处理器
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
String split
= "{"+StrUtil
.split(msg
.toString(), "{")[1];
RpcRequest rpcRequest
= JSON
.parseObject(split
, RpcRequest
.class);
UserServiceImpl userServiceImpl
= (UserServiceImpl
)ApplicationContextUtil
.getBean(Class
.forName(rpcRequest
.getClassName()));
Class
<?> aClass
= Class
.forName(rpcRequest
.getClassName());
Method method
= aClass
.getMethod(rpcRequest
.getMethodName(), rpcRequest
.getParameterTypes());
Object invoke
= method
.invoke(userServiceImpl
, rpcRequest
.getParameters());
ctx
.writeAndFlush(invoke
);
}
}
3.服务器类
public class nettyServer {
public static void startServer(String ip
, int port
) throws InterruptedException
{
NioEventLoopGroup bossGroup
= new NioEventLoopGroup();
NioEventLoopGroup workGroup
= new NioEventLoopGroup();
ServerBootstrap serverBootstrap
= new ServerBootstrap();
serverBootstrap
.group(bossGroup
,workGroup
)
.channel(NioServerSocketChannel
.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel
) throws Exception
{
ChannelPipeline pipeline
= nioSocketChannel
.pipeline();
pipeline
.addLast(new StringEncoder());
pipeline
.addLast(new StringDecoder());
pipeline
.addLast(new UserServiceHandler());
}
});
serverBootstrap
.bind(8999).sync();
}
}
参考答案记录
编码类
public class RpcEncoder extends MessageToByteEncoder{
private Class
<?> clazz
;
private Serializer serializer
;
public RpcEncoder(Class
<?> clazz
, Serializer serializer
) {
this.clazz
= clazz
;
this.serializer
= serializer
;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext
, Object msg
, ByteBuf byteBuf
) throws Exception
{
if (clazz
!= null
&& clazz
.isInstance(msg
)) {
byte[] bytes
= serializer
.serialize(msg
);
byteBuf
.writeInt(bytes
.length
);
byteBuf
.writeBytes(bytes
);
}
}
}
解码类
public class RpcDecoder extends ByteToMessageDecoder{
private Class
<?> clazz
;
private Serializer serializer
;
public RpcDecoder(Class
<?> clazz
, Serializer serializer
) {
this.clazz
= clazz
;
this.serializer
= serializer
;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext
, ByteBuf byteBuf
, List
<Object> list
) throws Exception
{
if (byteBuf
.readableBytes() < 4) {
return;
}
byteBuf
.markReaderIndex();
int dataLength
= byteBuf
.readInt();
if (byteBuf
.readableBytes() < dataLength
) {
byteBuf
.resetReaderIndex();
return;
}
byte[] data
= new byte[dataLength
];
byteBuf
.readBytes(data
);
Object obj
= serializer
.deserialize(clazz
, data
);
list
.add(obj
);
}
}
客户端的动态代理类
public class RpcConsumer {
private static ExecutorService executor
= Executors
.newFixedThreadPool(Runtime
.getRuntime().availableProcessors());
private static UserClientHandler userClientHandler
;
public Object
createProxy(final Class
<?> serviceClass
){
return Proxy
.newProxyInstance(Thread
.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass
}, new InvocationHandler() {
public Object
invoke(Object proxy
, Method method
, Object
[] args
) throws Throwable
{
if(userClientHandler
== null
){
initClient();
}
RpcRequest request
= new RpcRequest();
String requestId
= UUID
.randomUUID().toString();
System
.out
.println(requestId
);
String className
= method
.getDeclaringClass().getName();
String methodName
= method
.getName();
Class
<?>[] parameterTypes
= method
.getParameterTypes();
request
.setRequestId(requestId
);
request
.setClassName(className
);
request
.setMethodName(methodName
);
request
.setParameterTypes(parameterTypes
);
request
.setParameters(args
);
userClientHandler
.setPara(request
);
System
.out
.println(request
);
System
.out
.println("设置参数完成");
return executor
.submit(userClientHandler
).get();
}
});
}
public static void initClient() throws InterruptedException
{
userClientHandler
= new UserClientHandler();
EventLoopGroup group
= new NioEventLoopGroup();
Bootstrap bootstrap
= new Bootstrap();
bootstrap
.group(group
)
.channel(NioSocketChannel
.class)
.option(ChannelOption
.TCP_NODELAY
,true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast(new RpcEncoder(RpcRequest
.class, new JSONSerializer()));
pipeline
.addLast(new StringDecoder());
pipeline
.addLast(userClientHandler
);
}
});
bootstrap
.connect("127.0.0.1",8990).sync();
}
}
服务器的处理器类
@Component
public class UserServerHandler extends ChannelInboundHandlerAdapter implements ApplicationContextAware {
private static ApplicationContext applicationContext2
;
public void setApplicationContext(ApplicationContext applicationContext
) throws BeansException
{
UserServerHandler
.applicationContext2
= applicationContext
;
}
@Override
public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
RpcRequest msg1
= (RpcRequest
) msg
;
Object handler
= handler(msg1
);
ctx
.writeAndFlush("success");
}
private Object
handler(RpcRequest request
) throws ClassNotFoundException
, InvocationTargetException
{
Class
<?> clazz
= Class
.forName(request
.getClassName());
Object serviceBean
= applicationContext2
.getBean(clazz
);
Class
<?> serviceClass
= serviceBean
.getClass();
String methodName
= request
.getMethodName();
Class
<?>[] parameterTypes
= request
.getParameterTypes();
Object
[] parameters
= request
.getParameters();
FastClass fastClass
= FastClass
.create(serviceClass
);
FastMethod fastMethod
= fastClass
.getMethod(methodName
, parameterTypes
);
return fastMethod
.invoke(serviceBean
, parameters
);
}
}
附录
1.Java多线程学习之wait、notify/notifyAll 详解
wait()、notify/notifyAll() 方法是Object的本地final方法,无法被重写。
wait()使当前线程阻塞,前提是 必须先获得锁
只有当 notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,
单词
Consistency 一致性
Availability 可用性
Partition tolerance 分区容错性
Basically Available(基本可用)
Soft state(软状态)
Eventually consistent(最终一致性)
Proposer 提案发起者
Acceptor:决策者,可以批准提案