会不断更新!冲冲冲!跳转连接
https://blog.csdn.net/qq_35349982/category_10317485.html
Cluster模式潜在问题及解决方案、Web服务综合解决方案
1.一致性的哈希算法
1.1分布式与集群
分布式和集群是不⼀样的,分布式⼀定是集群,但是集群不⼀定是分布式
1.2Hash算法
1.1 顺序查找法
list
:List
[1,5,7,6,3,4,8]
for(int element
: list
) {
if(element
== n
) {
如果相等,说明n存在于数据集中
}}
1.2⼆分查找法
1.3 直接寻址法
定义⼀个数组,数组⻓度⼤于等于数据集⻓度,此处⻓度为9,数据1就存储在下标为1的位置,3就存储在下标为3的元素位置,,,依次类推。
这个时候,我想看下5存在与否,只需要判断list.get(5) array[5] 是否为空,如果为空,代表5不存在于数据集,如果不为空代表5在数据集当中,通过⼀次查找就达到了⽬的,时间复杂度为O(1)。这种⽅式叫做“直接寻址法”:
1.4 开放寻址法 | 拉链法
1.3⼀致性Hash算法
⾸先有⼀条直线,直线开头和结尾分别定为为1和2的32次⽅减1,这相当于⼀个地址,对于这样⼀条线,弯过来构成⼀个圆环形成闭环,这样的⼀个圆环称为hash环。
红节点 —服务器节点
绿人像 —请求访问
每⼀台服务器负责⼀段,⼀致性哈希算法对于节点的增减都只需重定位环空间中的⼀⼩部分数据,具有较好的容错性和可扩展性。
但是,⼀致性哈希算法在服务节点太少时,容易因为节点分部不均匀⽽造成数据倾斜问题。
例如系统中只有两台服务器,其环分布如下,节点2只能负责⾮常⼩的⼀段,⼤量的客户端请求落在了节点1上,这就是数据(请求)倾斜问题
解决方案:⼀致性哈希算法引⼊了虚拟节点机制,即对每⼀个服务节点计算多个哈希,每个计算结果位置都放置⼀个此服务节点,称为虚拟节点。
1.4手写Hash算法
1.流程口述
声明服务器的节点,计算hashCode然后存到SortedMap(上圈)中,(为解决数据倾斜问题,增加虚拟节点)声明客户端的节点请求到来时:那客户端的ip的hashCode值去服务器节点中访问,有则取,没有则取第一个
SortedMap
<Integer, String> integerStringSortedMap
= hashServerMap
.tailMap(clientHash
);
2.代码
import java
.util
.SortedMap
;
import java
.util
.TreeMap
;
import java
.util
.UUID
;
public class ConsistentHashWithVirtual {
public static void main(String
[] args
) {
String
[] tomcatServers
= new String[]{"123.111.0.0","123.101.3.1","111.20.35.2","123.98.26.3"};
SortedMap
<Integer,String> hashServerMap
= new TreeMap<>();
int virtaulCount
= 3;
for(String tomcatServer
: tomcatServers
) {
int serverHash
= Math
.abs(tomcatServer
.hashCode());
hashServerMap
.put(serverHash
,tomcatServer
);
for(int i
= 0; i
< virtaulCount
; i
++) {
int virtualHash
= Math
.abs((tomcatServer
+ "#" + i
).hashCode());
hashServerMap
.put(virtualHash
,"----由虚拟节点"+ i
+ "映射过来的请求:"+ tomcatServer
);
}
}
String
[] clients
= new String[]{"10.78.12.3","113.25.63.1","126.12.3.8"};
for(String client
: clients
) {
int clientHash
= Math
.abs(client
.hashCode());
SortedMap
<Integer, String> integerStringSortedMap
= hashServerMap
.tailMap(clientHash
);
if(integerStringSortedMap
.isEmpty()) {
Integer firstKey
= hashServerMap
.firstKey();
System
.out
.println("==========>>>>客户端:" + client
+ " 被路由到服务器:" + hashServerMap
.get(firstKey
));
}else{
Integer firstKey
= integerStringSortedMap
.firstKey();
System
.out
.println("==========>>>>客户端:" + client
+ " 被路由到服务器:" + hashServerMap
.get(firstKey
));
}
}
}
}
1.5nginx的负载均衡
1.ip_hash的问题
Nginx的IP_hash策略可以在客户端ip不变的情况下,将其发出的请求始终路由到同⼀个⽬标服务器上,实现会话粘滞,避免处理session共享问题
如果没有IP_hash策略,那么如何实现会话粘滞?
可以维护⼀张映射表,存储客户端IP或者sessionid与具体⽬标服务器的映射关系
<ip,tomcat1>
缺点
1)那么,在客户端很多的情况下,映射表⾮常⼤,浪费内存空间
2)客户端上下线,⽬标服务器上下线,都会导致重新维护映射表,映射表维护成本很⼤
2.ngx_http_upstream_consistent_hash模块
ngx_http_upstream_consistent_hash 模块是⼀个负载均衡器,使⽤⼀个内部⼀致性hash算法来选择合适的后端节点。
下载地址: https://github.com/replay/ngx_http_consistent_hash
配置策略
consistent_hash $remote_addr:可以根据客户端ip映射
consistent_hash $request_uri:根据客户端请求的uri映射
consistent_hash $args:根据客户端携带的参数进⾏映
安装模块
./configure —add-module
=/root/ngx_http_consistent_hash-master
make
make install
#负载均衡策略
upstream myServer{
#每个请求按照ip的hash结果分配,每⼀个客户端的请求会固定分配到同⼀个⽬标服务器处理,可以解决session问题
consistent_hash $request_uri;
#weight代表权重,默认每⼀个负载的服务器都为1,权重越⾼那么被分配的请求越多(⽤于服务器性能不均衡的场景)
server 47.95.1.96:8080 weight=1;
server 47.95.1.96:8081 weight=2;
}
proxy_pass http://myServer/;
2.时钟同步问题
1.场景
一个电商系统,三个服务器,三个订单,因为时钟不同步,造成三个系统的`订单时间`不一致
2.方案
2.1访问授时中心
ntpdate -u ntp.api.bz
2.2一个节点访问,其他节点同步
⼀个服务器节点可以访问互联⽹或者所有节点都不能够访问互联⽹
一个节点访问,其他节点同步
##修改/etc/ntp.conf⽂件
ntpdate -u ntp.api.bz
restrict 172.17.0.0 mask 255.255.255.0 nomodify notrap
server 127.127.1.0
fudge 127.127.1.0 stratum 10
service ntpd restart
chkconfig ntpd on
ntpdate 172.17.0.17
3.分布式ID解决⽅案
3.1 UUID
UUID 是指Universally Unique Identififier,翻译为中⽂是通⽤唯⼀识别码
public class MyTest {
public static void main(String
[] args
) {
System
.out
.println(java
.util
.UUID
.randomUUID().toString());
}
}
3.2 独⽴数据库的⾃增ID
场景:分表后两张表公用一个独立表的ID
创建一张表
DROP TABLE IF EXISTS `DISTRIBUTE_ID
`;
CREATE TABLE `DISTRIBUTE_ID
` (
`id
` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '主键',
`createtime
` datetime DEFAULT NULL,
PRIMARY KEY (`id
`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
insert into
DISTRIBUTE_ID(createtime
) values(NOW());
select
LAST_INSERT_ID();
缺点:
需要代码连接到数据库才能获取到id,性能⽆法保障,
mysql数据库实例挂掉了,那么就⽆法获取分布式id了。
3.3 SnowFlake 雪花算法
3.4 借助Redis的Incr命令获取全局唯⼀ID
安装redis后
修改配置文件 redis.conf启动
protected-mode no
<dependency>
<groupId>redis.clients
</groupId>
<artifactId>jedis
</artifactId>
<version>2.9.0
</version>
</dependency>
Jedis jedis
= new Jedis("127.0.0.1",6379);
try {
long id
= jedis
.incr("id");
System
.out
.println("从redis中获取的分布式id为:" + id
);
} finally {
if (null
!= jedis
) {
jedis
.close();
} }
源码:
public Long
incr(String key
) {
this.checkIsInMultiOrPipeline();
this.client
.incr(key
);
return this.client
.getIntegerReply();
}
public long readLongCrLf() {
byte[] buf
= this.buf
;
this.ensureFill();
boolean isNeg
= buf
[this.count
] == 45;
if (isNeg
) {
++this.count
;
}
long value
= 0L
;
while(true) {
this.ensureFill();
int b
= buf
[this.count
++];
if (b
== 13) {
this.ensureFill();
if (buf
[this.count
++] != 10) {
throw new JedisConnectionException("Unexpected character!");
} else {
return isNeg
? -value
: value
;
}
}
value
= value
* 10L
+ (long)b
- 48L
;
}
}
4.分布式任务调度
4.1定时任务场景
订单审核,出库订单支付退款礼券同步物流信息推送短信推送日志监控
4.2什么是分布式任务调度
在分布式集群环境下的调度任务(同一个定时任务部署多分,只执行一个定时任务)分布式调度->定时任务的分布式->定时任务拆分(把一个大任务拆分成切分,同时执行)
4.3定时任务与消息队列的区别
共同点
异步处理
比如 注册,下单时间
应用解耦
不管定时任务作业还是MQ 都可以实现应用解耦
流量削峰
任何作业和MQ都可以用来抗流量
不同点
定时任务作业时时间驱动,MQ是时间驱动时间驱动是不可替代的,比如说金融系统是每日的利息结算,不是来一条做一条,还是批量时间去处理定时任务作业更倾向于批处理,MQ倾向于逐条处理
4.4 Quartz定时
<dependency>
<groupId>org.quartz-scheduler
</groupId>
<artifactId>quartz
</artifactId>
<version>2.3.2
</version>
</dependency>
public class QuartzMan {
public static Scheduler
createScheduler() throws SchedulerException
{
SchedulerFactory schedulerFactory
= new StdSchedulerFactory();
Scheduler scheduler
= schedulerFactory
.getScheduler();
return scheduler
;
}
public static JobDetail
createJob() {
JobBuilder jobBuilder
= JobBuilder
.newJob(DemoJob
.class);
jobBuilder
.withIdentity("jobName","myJob");
JobDetail jobDetail
= jobBuilder
.build();
return jobDetail
;
}
public static Trigger
createTrigger() {
CronTrigger cronTrigger
= TriggerBuilder
.newTrigger()
.withIdentity("triggerName","myTrigger")
.startNow()
.withSchedule(CronScheduleBuilder
.cronSchedule("*/2 * * * * ?")).build();
return cronTrigger
;
}
public static void main(String
[] args
) throws SchedulerException
{
Scheduler scheduler
= QuartzMan
.createScheduler();
JobDetail job
= QuartzMan
.createJob();
Trigger trigger
= QuartzMan
.createTrigger();
scheduler
.scheduleJob(job
,trigger
);
scheduler
.start();
}
}
4.5 Elastic-job
github地址:https://github.com/elasticjob
1.主要功能
分布式环境下,能够避免同一个任务多实例重复执行弹性扩容缩容,当集群中增加某一个实例,它能被选举且执行任务,减少一个实例,任务自动分配给其他实例任务失败时,转移给其他实例自动记录错过执行的作业支持并行调度,支持任务分片作业分片后,能后保证同一分片在分布式环境中仅执行一个
2.代码执行实例
分片 ====>shardingItemParameters(“0=bachelor,1=master,2=doctor”)
把一个任务分成多个task,每一个task交给一个具体的一个机器实例去处理Strategy策略定义这些分片项怎么去分配到机器,默认是平均分,分片和作业是通过一个注册中心协调的 扩容
分⽚项也是⼀个JOB配置,修改配置,重新分⽚,在下⼀次定时运⾏之前会重新调⽤分⽚算法,那么这个分⽚算法的结果就是:哪台机器运⾏哪⼀个⼀⽚,这个结果存储到zk中的,主节点会把分⽚给分好放到注册中⼼去,然后执⾏节点从注册中⼼获取信息(执⾏节点在定时任务开启的时候获取相应的分⽚)高可用,其他节点全部挂点了,剩下一个节点,则所有的节点都指向这个节点
<dependency>
<groupId>com.dangdang
</groupId>
<artifactId>elastic-job-lite-core
</artifactId>
<version>2.1.5
</version>
</dependency>
public static void main(String
[] args
) {
ZookeeperConfiguration zookeeperConfiguration
= new ZookeeperConfiguration("localhost:2181","data-archive-job");
CoordinatorRegistryCenter coordinatorRegistryCenter
= new ZookeeperRegistryCenter(zookeeperConfiguration
);
coordinatorRegistryCenter
.init();
JobCoreConfiguration jobCoreConfiguration
= JobCoreConfiguration
.newBuilder("archive-job", "*/2 * * * * ?", 3)
.shardingItemParameters("0=bachelor,1=master,2=doctor").build();
SimpleJobConfiguration simpleJobConfiguration
= new SimpleJobConfiguration(jobCoreConfiguration
,ArchivieJob
.class.getName());
JobScheduler jobScheduler
= new JobScheduler(coordinatorRegistryCenter
, LiteJobConfiguration
.newBuilder(simpleJobConfiguration
).overwrite(true).build());
jobScheduler
.init();
}
public class ArchivieJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext
) {
int shardingItem
= shardingContext
.getShardingItem();
System
.out
.println("=====>>>>当前分片:" + shardingItem
);
String shardingParameter
= shardingContext
.getShardingParameter();
String selectSql
= "select * from resume where state='未归档' and education='"+ shardingParameter
+"' limit 1";
List
<Map
<String, Object>> list
= JdbcUtil
.executeQuery(selectSql
);
if(list
== null
|| list
.size() ==0 ) {
System
.out
.println("数据已经处理完毕!!!!!!");
return;
}
Map
<String, Object> stringObjectMap
= list
.get(0);
long id
= (long) stringObjectMap
.get("id");
String name
= (String
) stringObjectMap
.get("name");
String education
= (String
) stringObjectMap
.get("education");
System
.out
.println("=======>>>>id:" + id
+ " name:" + name
+ " education:" + education
);
String updateSql
= "update resume set state='已归档' where id=?";
JdbcUtil
.executeUpdate(updateSql
,id
);
String insertSql
= "insert into resume_bak select * from resume where id=?";
JdbcUtil
.executeUpdate(insertSql
,id
);
}
}
分布式的理解
5.Session的共享
5.1 Session问题原因分析
Http是无状态协议
两种用于保持Http状态的技术,那就是Cookie和Session
JSESSIONID是一个Cookie,Servlet容器(tomcat,jetty)用来记录用户session
5.2 Session一致性的解决方案
nginx的ip_hash策略
同⼀个客户端IP的请求都会被路由到同⼀个⽬标服务器,也叫做会话粘滞
优点:
配置简单,不⼊侵应⽤,不需要额外修改代码
缺点:
服务器重启Session丢失
存在单点负载⾼的⻛险
单点故障问题
Session复制(不推荐)多个tomcat之间通过修改配置⽂件,达到Session之间的复制
Session共享(交代redis)
优点:
能适应各种负载均衡策略服务器重启或者当即不会造成Session丢失扩展能力强适合大集群数量使用
缺点
对应用有侵入性
5.3代码实现(Session共享)
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-data-redis
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.session
</groupId>
<artifactId>spring-session-data-redis
</artifactId>
</dependency>
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
@SpringBootApplication
@EnableCaching
@EnableRedisHttpSession
public class LoginApplication extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder
configure(SpringApplicationBuilder builder
) {
return builder
.sources(LoginApplication
.class);
}
public static void main(String
[] args
) {
SpringApplication
.run(LoginApplication
.class, args
);
}
}
5.4源码刨析
引入了RedisHttpSessionConfiguration类继承了SpringHttpSessionConfiguration类SessionRepositoryFilter类
@Import({RedisHttpSessionConfiguration
.class})
public @
interface EnableRedisHttpSession {
}
public class RedisHttpSessionConfiguration extends SpringHttpSessionConfiguration {
}
public class SpringHttpSessionConfiguration{
@Bean
public <S
extends Session> SessionRepositoryFilter
<? extends Session> springSessionRepositoryFilter(SessionRepository
<S> sessionRepository
) {
SessionRepositoryFilter
<S> sessionRepositoryFilter
= new SessionRepositoryFilter(sessionRepository
);
sessionRepositoryFilter
.setHttpSessionIdResolver(this.httpSessionIdResolver
);
return sessionRepositoryFilter
;
}
}
protected void doFilterInternal(HttpServletRequest request
, HttpServletResponse response
, FilterChain filterChain
) throws ServletException
, IOException
{
request
.setAttribute(SESSION_REPOSITORY_ATTR
, this.sessionRepository
);
SessionRepositoryFilter
<S>.SessionRepositoryRequestWrapper wrappedRequest
= new SessionRepositoryFilter.SessionRepositoryRequestWrapper(request
, response
);
SessionRepositoryFilter
.SessionRepositoryResponseWrapper wrappedResponse
= new SessionRepositoryFilter.SessionRepositoryResponseWrapper(wrappedRequest
, response
);
try {
filterChain
.doFilter(wrappedRequest
, wrappedResponse
);
} finally {
wrappedRequest
.commitSession();
}
}
private void commitSession() {
SessionRepositoryFilter
<S>.SessionRepositoryRequestWrapper
.HttpSessionWrapper wrappedSession
= this.getCurrentSession();
if (wrappedSession
== null
) {
if (this.isInvalidateClientSession()) {
SessionRepositoryFilter
.this.httpSessionIdResolver
.expireSession(this, this.response
);
}
} else {
S session
= wrappedSession
.getSession();
this.clearRequestedSessionCache();
SessionRepositoryFilter
.this.sessionRepository
.save(session
);
String sessionId
= session
.getId();
if (!this.isRequestedSessionIdValid() || !sessionId
.equals(this.getRequestedSessionId())) {
SessionRepositoryFilter
.this.httpSessionIdResolver
.setSessionId(this, this.response
, sessionId
);
}
}
}
附录
1.wrapper的概念
实现Wrapper的类需要:
实现某个接口依赖该接口的子类, 并通过接口方法访问子类对象的方法
public class MyArrays {
public List
<Point> asList(Point
[] points
) {
List
<Point> list
= new ArrayList<Point>();
for (int i
= 0; i
< points
.length
; i
++) {
list
.add(points
[i
]);
}
return new ArrayListWrapper(list
);
}
}
class ArrayListWrapper implements List<Point> {
private List
<Point> list
;
ArrayListWrapper(List
<Point> list
) {
this.list
= list
;
}
@Override
public int size() {
return list
.size();
}
@Override
public boolean add(Point e
) {
throw new UnsupportedOperationException("...");
}
@Override
public boolean remove(Object o
) {
throw new UnsupportedOperationException("...");
}
…………………………
}
单词
Cluster 群,集群
strict 严格
stratum 层
ensure 保证 Coordinator协调,班主任
elastic 弹性的