hbase

tech2024-06-16  82

day01

一、大纲形式

##一、Linux 1. 操作系统 2. 开源 3. 免费 4. 多用户 5. 多进程 6. 多线程 7. 性能稳定 8. 安全(权限的管理) ##二、Hadoop的核心模块之一HDFS 解决了大数据集如何存储的问题 ##三、Hadoop的核心模块之一Mapreduce 解决了分布式系统上的大数据集如何快速,高效的分析与计算的问题,是一个运行在hdfs上的并发的计算与分析框架 ##四、Hadoop的核心模块之一yarn 是一个资源(内存,cpu,磁盘)管理框架 ##五、Zookeeper 是一个为分布式应用程序提供协调服务的分布式框架。 ##六、hive 是一款数据仓库的工具,可以将HDFS上的具有结构化的文件映射(描述信息,也叫元数据,存储在MYSQL)成一张表。可以使用类sql语言进行管理。其实底层就是MR、SPARK、TEZ.

1、Linux

1. 概述 2. 常用指令 3. VI/VIM编辑工具 4. 网络配置(静态IP的设置)(重点) 5. 用户和权限管理(重点) 6. 软件包的安装 7. 虚拟机的克隆(学习期间 一定要会) 8. scp命令 9. 免密登录认证(原理,重点) 10. 时间同步(重点) 11. shell脚本(重点)

2、HDFS

1. 概述 --4V(重点) --google的三篇论文:《GFS》《MapReduce》《Bigtable》 2. 安装: -- 单节点: --1)本地模式 --2) 伪分布式:使用分布式文件系统,守护进程都是独立的(每一个进程都有自己的jvm) -- 多节点(重点) -- 完全分布式:使用分布式文件系统,守护进程是分布到各个节点的。也都是独立的进程。 3. 块的概念(重点) -- 合理的块的设计,都是为了解决稀缺的网络带宽和负载均衡问题 -- 块是hdfs的存储单元 -- 块的大小是相同的(除了最后一个块) -- 块的大小选择(参考的是寻址时间与传输时间的比例,认为在1:100的这个比例是最优) 4. hdfs的体系结构(重点) 5. hdfs的工作机制 -- 开机启动过程 -- 安全模式 -- 检查点机制 (重点) -- 心跳反馈(10分30秒)(重点) -- 网络拓扑 -- 机架感知(2.8.2是一个分水岭, 版本之前是 前两个副本一个机架, 版本之后是后两个副本在同一个机架)(重点) -- 动态的上下线 6. 读写流程(重中之重) 7. API -- 关心1: 此方法是静态的还是非静态的 -- 关心2: 此方法的形参类型 -- 关心3: 此方法的返回值类型

3、Mapreduce

1. 概述 -- 框架的思想(重点) 移动计算而非移动数据,分而治之,然后进行汇总处理。 数据的扭转: fetch 原始数据--><K1,V1>-->map函数--<K2,V2>---><K2,List<V2>>--->reduce函数---><K3,V3> 2. 入门案例 -- 分片机制(重点) inputSplit--->FileSplit(path,start,length,hosts) -- 分片的特点 3. 序列化机制 4. MapTask的流程(重点) 5. ReduceTask的流程(重点) 6. shuffle流程(重中之重) 7. yarn的job提交流程(重中之重) 8. 经典案例: --topN案例 --共同好友案例 --自定义输入格式案例 --Map-join --Reduce-join

4、Zookeeper

1. 简介与特点 --本身是一个分布式集群框架,一个leader,多个follower --适合安装在奇数台机器上,能正常服务的条件是,(n+1)/2台机器正常运行 允许(n-1)/2台机器宕机 -- 每个服务节点的数据一致(重点理解) 2. 安装: 集群模式(重点) 3. 数据模型:(重点) 类似Linux/Unix的文件系统的多层次的数据结构。每个节点称之为znode. znode可以记录子节点的名字和个数(相当于文件系统的目录),还可以存储1M以内的数据(相当于文件系统的文件).znode通过路径进行唯一标识。 每一个服务节点上都有这个数据模型,因此客户端连接上任务一个服务节点,数据都是一致的。 4. 选举制度 权重: epoch > zxid >serverid 5. 监听和通知机制(重点理解) 6. 应用场景(重点): -- 集群的管理,比如HA -- 配置文件的管理 -- 服务器的动态上下线感知 -- 分布式锁 -- 分布式队列

5、HIVE

1. 概念和体系结构,工作流程 -- 体系结构(重点) -- 工作流程(重点) 2. hive的安装 -- 内嵌模式: derby,只支持单session -- 本地模式: hive客户端指令会内置开启metastore服务项,自己连接自己,与mysql在哪一台机器上无关。 -- 远程模式: hive的服务项(hiveserver2或metastore)是单独开启的,然后供客户端指令去连接。 重点理解本地模式和远程模式, 实际生产环境中本地和远程用的一样多。 3. hive的库和表操作 -- 本地上,库和表都是hdfs上的一个目录 4. hive的表类型 -- 内部表 -- 外部表 5. hive的基本查询子句 -- left semi join(与exists的原理相同) -- map-side-join 6. 函数 -- 日期函数 -- 字符串函数 -- 数学函数 -- 开窗函数(重点) --1) 排名函数 --2) 聚合函数 --3) 序列函数 -- 自定义函数 7. serde 8. 分区与分桶(重点)

二、Hbase的讲解

1、Hbase的简介

1. hadoop base 缩写成Hbase -- 开源的,基于hdfs的分布式的,可扩展的,面向列式存储,非关系型(nosql->not only sql)的数据库 -- 数据可以有多个版本(历史版本) -- 提供了高可靠性 -- 本身是基于内存的(高性能,近似实时的访问) -- 起源于google的《bigtable》论文 2. 与hive的区别 -- hive是用于OLAP,提供类sql语言的分析与计算的框架,底层就是MR。 -- hbase是用于存储的,设计目录是想存储数十亿行X数百万列的大数据集

2、Hbase的表模型

2.1 关系型数据库的表模型(扩展)

关系型数据库的表模型:面向行式存储。在定义表结构时,需要提前定义好列名,列类型,列数目

缺点:

1. 一旦数据表中存储数据后,修改表结构变得特别困难。 2. 如果我们想扩展字段时,会对表结构产生影响。 3. 即使某一行中的某个字段没有赋值,也要使用null填充 4. 一旦涉及到多张表,因为数据表存在着复杂的关系,管理非常不方便。 5. 一旦面对海量数据的处理时,读写性能特别差,尤其在高并发这一块。

2.2 Hbase的表模型

1)Cell

hbase是面向列式存储的表模型,列指的是KV对,这个key就是column,v就是column对应的值。KV对被称之单元格,也就是cell。单元格有属于自己的版本号,其实就是一个时间戳。

2)rowkey

为了表示某些单元格是同一个事物的,所以引入了rowkey的概念。因此rowkey是不可以重复的,否则会出现覆盖情况。

3)column family(列族)

为了更好的方便管理单元格,以及有相同意义的单元格尽可能的汇聚到一起,所以引入了列族的概念。

4)region

region是hbase的物理存储模型,是整张表(数据量比较小的时候),或者是表的一部分(数据量比较大时,有多个region)

5)排序机制

由于hbase是优先基于内存存储的,因此内存中的数据进行排序,排序规则是字典排序(ascii升序) 排序方式: -- 先按照rowkey进行排序 -- 然后按照列族进行排序 -- 再按照key进行排序 -- 如果是多个版本,会按照时间戳进行降序排序

6)数据类型

hbase是不支持其他类型的维护的,底层就是byte[]类型

3、Hbase的体系结构

4、Hbase的安装

4.1 单机模式

步骤1)上传并解压,更名

[root@qianfeng01 ~]# tar -zxvf hbase-1.2.1-bin.tar.gz -C /usr/local/ [root@qianfeng01 ~]# cd /usr/local/ [root@qianfeng01 local]# mv hbase-1.2.1/ hbase

步骤2)配置环境变量并重新引导

[root@qianfeng01 local]# vim /etc/profile .........省略........ # hbase environment export HBASE_HOME=/usr/local/hbase export PATH=$HBASE_HOME/bin:$PATH [root@qianfeng01 local]# source /etc/profile

步骤3)修改hbase的环境脚本(hbase-env.sh)

[hadoop@qianfeng01 local]$ vim $HBASE_HOME/conf/hbase-env.sh #找到下面内容,解开注释,添加具体路径 # The java implementation to use. Java 1.7+ required. export JAVA_HOME=/usr/local/jdk # Tell HBase whether it should manage it's own instance of Zookeeper or not. # hbase内置zookeeper开启 export HBASE_MANAGES_ZK=true

步骤4)修改hbase的site.xml(自定义配置文件)

<configuration> <!-- 属性hbase.rootdir用于指定hbase产生的数据的存储位置 --> <property> <name>hbase.rootdir</name> <value>file:///usr/local/hbase/data</value> </property> <!-- hbase依赖于zookeeper,需要指定内置zookeeper的数据存储位置 --> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/usr/local/hbase/zkdata</value> </property> </configuration>

步骤5)可以开心的玩了

start-hbase.sh hbase shell stop-hbase.sh

4.2 伪分布式模式

4.2.1 情况说明

hbase的伪分布式,其实就是指相关的守护进程都有,并且运行在同一个机器上,是独立的进程(每一个守护进程都有自己的JVM) -- hbase的伪分布式的数据的存储位置,可以是hdfs,也可以是本地文件系统。 -- hbase的伪分布式如果选择hdfs,针对于zookeeper来说,可以选择内置的,也可以我们自行安装的。 下面的演示是自行安装的zookeeper

步骤1)上传并解压,更名

[root@qianfeng01 ~]# tar -zxvf hbase-1.2.1-bin.tar.gz -C /usr/local/ [root@qianfeng01 ~]# cd /usr/local/ [root@qianfeng01 local]# mv hbase-1.2.1/ hbase

步骤2)配置环境变量并重新引导

[root@qianfeng01 local]# vim /etc/profile .........省略........ # hbase environment export HBASE_HOME=/usr/local/hbase export PATH=$HBASE_HOME/bin:$PATH [root@qianfeng01 local]# source /etc/profile

步骤3)修改hbase的环境脚本(hbase-env.sh)

[hadoop@qianfeng01 local]$ vim $HBASE_HOME/conf/hbase-env.sh #找到下面内容,解开注释,添加具体路径 # The java implementation to use. Java 1.7+ required. export JAVA_HOME=/usr/local/jdk # Tell HBase whether it should manage it's own instance of Zookeeper or not. # 由于要使用我们自行安装的zookeeper,所以要把内置的关闭掉 export HBASE_MANAGES_ZK=false

步骤4)修改hbase的site.xml(自定义配置文件)

<configuration> <!-- 属性hbase.rootdir用于指定hbase产生的数据的存储位置 --> <property> <name>hbase.rootdir</name> <value>hdfs://qianfeng01/hbase</value> </property> <!-- 开启用hbase集群模式 --> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <!-- 指定hbase使用的zookeeper集群 --> <property> <name>hbase.zookeeper.quorum</name> <value>qianfeng01:2181,qianfeng02:2181,qianfeng03:2181</value> </property> <!--将属性hbase.unsafe.stream.capability.enforce 改为true --> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>true</value> </property> </configuration>

步骤5)可以开心的玩了

由于使用自行安装的zookeeper,所以再启动hbase服务进程时,应该提前开启zookeeper集群

zkServer.sh start <---三台一起开 start-hbase.sh hbase shell stop-hbase.sh

4.3 完全分布式模式

4.3.1 情况说明和布局安排
4.3.2 安装步骤

步骤1)上传并解压,更名

[root@qianfeng01 ~]# tar -zxvf hbase-1.2.1-bin.tar.gz -C /usr/local/ [root@qianfeng01 ~]# cd /usr/local/ [root@qianfeng01 local]# mv hbase-1.2.1/ hbase

步骤2)配置环境变量并重新引导

[root@qianfeng01 local]# vim /etc/profile .........省略........ # hbase environment export HBASE_HOME=/usr/local/hbase export PATH=$HBASE_HOME/bin:$PATH [root@qianfeng01 local]# source /etc/profile

步骤3)修改hbase的环境脚本(hbase-env.sh)

[hadoop@qianfeng01 local]$ vim $HBASE_HOME/conf/hbase-env.sh #找到下面内容,解开注释,添加具体路径 # The java implementation to use. Java 1.7+ required. export JAVA_HOME=/usr/local/jdk # Tell HBase whether it should manage it's own instance of Zookeeper or not. # 由于要使用我们自行安装的zookeeper,所以要把内置的关闭掉 export HBASE_MANAGES_ZK=false

步骤4)修改hbase的site.xml(自定义配置文件)

<configuration> <!-- 属性hbase.rootdir用于指定hbase产生的数据的存储位置 --> <property> <name>hbase.rootdir</name> <value>hdfs://qianfeng01/hbase</value> </property> <!-- 开启用hbase集群模式 --> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <!-- 指定hbase使用的zookeeper集群 --> <property> <name>hbase.zookeeper.quorum</name> <value>qianfeng01:2181,qianfeng02:2181,qianfeng03:2181</value> </property> <!--将属性hbase.unsafe.stream.capability.enforce 改为true --> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>true</value> </property> </configuration>

步骤5)三台机器的免密登录认证要做好

步骤6)三台机器的时间一定要同步,不能超过30秒

步骤7)配置regionserver的布局

[root@qianfeng01 hbase]# vim conf/regionservers qianfeng01 qianfeng02 qianfeng03

步骤8) 配置备份的hmaster

[root@qianfeng01 hbase]# echo "qianfeng02" > ./conf/backup-masters

步骤9)将hadoop的core-site.xml和hdfs-site.xml拷贝到hbase的conf目录下

[root@qianfeng01 hbase]# cp ${HADOOP_HOME}/etc/hadoop/{core-site.xml,hdfs-site.xml} ./conf/

步骤10)分发到其他两台机器上

[root@qianfeng01 local]# scp -r hbase qianfeng02:/usr/local/ [root@qianfeng01 local]# scp -r hbase qianfeng03:/usr/local/ [root@qianfeng01 local]# scp /etc/profile qianfeng02:/etc/ [root@qianfeng01 local]# scp /etc/profile qianfeng03:/etc/ 最好去另外两台机器上,重新加载一下环境变量的文件

步骤11)启动hbase

hdfs和zookeeper一定要先启动, 然后再启动hbase的服务进程 启动后,要去webui界面查看一下,ip:16010 如果启动失败,应该去相关进程所在的机器上的hbase的家里的logs目录下查看响应的日志文件

day02


一 Regionserver的动态上下线(了解)

1.1 动态上线

1.1.1 原理

当我们启动一个新的regionserver机器时,会主动向zookeeper的某一个znode(/hbase/rs/)下创建一个临时节点,名字是代表自己的唯一标识。zookeeper会通知master,master收到信息后会进行维护新的regionserver,比如分region等等。

1.1.2 环境准备

准备1)搭建一台新机器:包括jdk,hadoop的环境,hdfs的动态上线。

hdfs的动态上线的步骤: ##1. 在hdfs-site.xml里添加属性 <property> <name>dfs.hosts</name> <value>/usr/local/hadoop/etc/hadoop/include</value> </property> ##2. 创建include文件,添加所有的datanode节点的主机名,包含要上线的 ##3. 使用管理命令,刷新节点 hdfs dfsadmin -refreshNodes ##4. 启动新机器上的datanode [root@qianfeng04 local]# hadoop-daemon.sh start datanode

准备2)准备hbase的环境

可以scp, 并且将/etc/profile也拷贝过去, 保证新机器上的hbase的配置要与hmaster上的一致。

1.1.3 上线步骤

##1. 启动新机器上的regionserver [root@qianfeng04 local]# hbase-daemon.sh start regionserver ##2. 检查webui上是否多了一个regionserver ##3. 为了下次启动时,带上新机器,因此要将新机器的主机名添加到regionservers文件中,要记得分发哦。

1.2 动态下线

1.2.1 原理

regionserver与zookeeper维护一个会话,在/hbase/rs下有一个代表自己的唯一标识znode。如果regionserver动态下线,那么这个会话会断开,zookeeper会删除此唯一标识znode。master也会与zookeeper时刻保证一个会话,也就是监听/hbase/rs下的znode的增删变量,当有动态下线的机器时,重新分配下线的regionserver上的region, 最后删除regionserver,停止服务。

1.2.2 下线步骤

##1.使用下线命令,下线一台机器,比如qianfeng04 [root@qianfeng01 local]# graceful_stop.sh qianfeng04 ##@ 为了下次启动时,不带上已经动态下线的机器,因此要将主机名从regionservers文件中删除,要记得分发哦。

二、hbase shell操作(熟悉)

1.namespace的DDL

##1. list_namespace ##2. create_namespace ##3. describe_namespace ##4. alter_namespace: 可以add/modify 也可以delete属性 ##5. list_namespace_tables ##6. drop_namespace: 不能删除非空的namespace

2.table的DDL

##1. create ##2. desc|describe ##3. alter: 可以增删列族,还可以修改列族的属性 ##4. list: 不能列出系统表 ##5. disable ##6. drop ##7. disable_all,enable_all,drop_all

3.table的CRUD(DML+DQL)

## 插入数据使用put ## 修改数据使用put ## 查询表数据scan: 浏览整个表,或者rowkey范围的所有行记录 ## 查询表数据get: 指定行进行查询 ## delete :删除单元格 ## deleteall: 删除指定行 ## count ## truncate ## exists

三、Hbase的API操作(重点)

0.准备工作

1. 创建maven项目:sz2002_hbase 2. 导入jar包(坐标) <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.1</version> </dependency> </dependencies> 3. 导入log4j.properties 注意事项: 因为要访问zookeeper,必须使用域名(ip无效),因此要在windows里的hosts文件里配置映射关系 C:/windows/system32/drivers/etc/hosts

1.namespace的DDL

createNamespace() describeNamespace() alterNamespaceAddProperties() alterNamespaceDropProperties() listAllNamespace() listTablesOfSpecifiedNamespace() listAllTable() listTablesOfALLNamespace() deleteNamespace() package com.qf.hbase.api; import com.qf.hbase.util.HbaseUtil; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Map; import java.util.Set; /** * 使用Hbase的客户端API接口来操纵namespace的DDL操作 */ public class NamespaceDDL { private Admin admin; @Before public void getAdmin() throws IOException { admin = HbaseUtil.getAdmin(); } @After public void closeAdmin(){ HbaseUtil.closeAdmin(admin); } /** * create_namespace 'myns2' * @throws IOException */ @Test public void createNamespace() throws IOException { /*//获取一个配置对象,构造器中的逻辑会读取默认的配置文件中的所有属性 Configuration conf = new Configuration(); // 将zookeeper的集群配置设置到配置对象上 conf.set("hbase.zookeeper.quorum","qianfeng01,qianfeng02,qianfeng03"); //使用连接工厂工具类的静态方法createConnection(Configuration conf)来获取连接对象 Connection connection = ConnectionFactory.createConnection(conf); //打印连接对象的地址 System.out.println(connection);*/ //获取一个命名空间描述器对象 NamespaceDescriptor.Builder builder = NamespaceDescriptor.create("myns2"); NamespaceDescriptor namespace = builder.build(); /* //获取可以执行DDL操作的客户端API----->Admin对象 Admin admin = HbaseUtil.getAdmin()*/ //调用admin的createNamespace方法将命名空间描述器对象发送到hbase中。其实就是提交操作 admin.createNamespace(namespace); //关闭操作 /* admin.close(); connection.close();*/ } /** * describe_namespace 'myns2' */ @Test public void describeNamespace() throws IOException { //使用admin客户端API 向hbase发送请求获取一个已经存在的命名空间的描述器对象 NamespaceDescriptor myns2 = admin.getNamespaceDescriptor("myns2"); //获取命名空间的属性信息,属性信息会封装到map集合中 Map<String, String> map = myns2.getConfiguration(); //遍历map集合 Set<String> keys = map.keySet(); for (String key : keys) { System.out.println(key+"="+map.get(key)); } } /** * alter_namespace 'myns2',{METHOD=>'set','authoer'=>'michael'} */ @Test public void alterNamespaceAddProperties() throws IOException { //使用admin客户端API 向hbase发送请求获取一个已经存在的命名空间的描述器对象 NamespaceDescriptor myns2 = admin.getNamespaceDescriptor("myns2"); //调用setConfiguration(String key,String value)设置额外的属性 myns2.setConfiguration("author","michael"); myns2.setConfiguration("time","2020-09-01"); myns2.setConfiguration("company","qf"); //将赋有新属性的命名空间描述器对象提交到hbase中 admin.modifyNamespace(myns2); } /** * alter_namespace 'myns2',{METHOD=>'unset',NAME=>'time'} */ @Test public void alterNamespaceDropProperties() throws IOException { //使用admin客户端API 向hbase发送请求获取一个已经存在的命名空间的描述器对象 NamespaceDescriptor myns2 = admin.getNamespaceDescriptor("myns2"); //调用removeConfiguration(String key)移除已经存在的属性 myns2.removeConfiguration("time"); //将描述器对象提交到hbase中 admin.modifyNamespace(myns2); } /** * list_namespace * @throws IOException */ @Test public void listAllNamespace() throws IOException { NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors(); for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) { System.out.println(namespaceDescriptor.getName()); } } /** * list_namespace 'myns1' */ @Test public void listTablesOfSpecifiedNamespace() throws IOException { TableName[] tables = admin.listTableNamesByNamespace("myns1"); for (TableName tableName : tables) { System.out.println(tableName); } } /** * list */ @Test public void listAllTable() throws IOException { TableName[] tables = admin.listTableNames(); for (TableName tableName : tables) { System.out.println(tableName); } } /** * 列出所有的命名空间下的所有的表 * @throws IOException */ @Test public void listTablesOfALLNamespace() throws IOException { NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors(); for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) { TableName[] tableNames = admin.listTableNamesByNamespace(namespaceDescriptor.getName()); for (TableName tableName : tableNames) { System.out.println(tableName.getNameAsString()); } } } /** * drop_namespace 'ns11' */ @Test public void deleteNamespace() throws IOException { admin.deleteNamespace("ns11"); } }

2.table的DDL

createTable() describeTable() alterTableProperties() alterTableAddFamily() alterTableDropFamily() dropTable() package com.qf.hbase.api; import com.qf.hbase.util.HbaseUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Map; public class TableDDL { private Admin admin; @Before public void getAdmin() throws IOException { admin = HbaseUtil.getAdmin(); } @After public void closeAdmin(){ HbaseUtil.closeAdmin(admin); } /** * create 'myns2:student','base_info' */ @Test public void createTable() throws IOException { //使用TableName来描述一个表名 TableName tableName = TableName.valueOf("myns2:student"); //获取一个表描述器对象 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); //获取一个列族描述器对象 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("base_info".getBytes()); //设置块大小为128KB hColumnDescriptor.setBlocksize(1024*64*2); //设置版本数为3 hColumnDescriptor.setVersions(1,3); //设置一个单元格的存活时间7天 hColumnDescriptor.setTimeToLive(3600*24*7); //将列族绑定到表上 hTableDescriptor.addFamily(hColumnDescriptor); //使用Admin将表提交到Hbase中 admin.createTable(hTableDescriptor); } /** * desc 'myns2:student' */ @Test public void describeTable() throws IOException { //使用Admin向hbase发送请求,获取已经存在的表的描述器对象 HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("myns2:student")); //获取表描述器有哪些列族描述器对象 HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies(); for (HColumnDescriptor columnFamily : columnFamilies) { System.out.print("列族名:"+columnFamily.getNameAsString()+"\t"); System.out.print("块大小:"+columnFamily.getBlocksize()+"\t"); System.out.print("布隆过滤器:"+columnFamily.getBloomFilterType()+"\t"); System.out.print("版本数:"+columnFamily.getMaxVersions()+"\t"); System.out.println("存活时间:"+columnFamily.getTimeToLive()); } } /** * alter 'myns:student',{NAME=>'base_info',VERSIONS=>5} */ @Test public void alterTableProperties() throws IOException { //使用Admin向hbase发送请求,获取已经存在的表的描述器对象 HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("myns2:student")); //获取指定的列族描述器,通过getFamily(byte[] name); HColumnDescriptor base_info = tableDescriptor.getFamily(Bytes.toBytes("base_info")); //重新设置存活时间 base_info.setTimeToLive(3600*24*5); //重新设置版本数量 base_info.setVersions(1,5); //将修改过属性的列族描述器和某一个表名关联,并提交到hbase中 admin.modifyColumn(TableName.valueOf("myns2:student"),base_info); } /** * alter 'myns:student','f2','f3','f4' */ @Test public void alterTableAddFamily() throws IOException { //使用TableName来描述一个表名对象 TableName tableName = TableName.valueOf("myns2:student"); //新创建三个列族描述器 HColumnDescriptor hColumnDescriptor1 = new HColumnDescriptor("f1"); hColumnDescriptor1.setVersions(1,3); HColumnDescriptor hColumnDescriptor2 = new HColumnDescriptor("f2"); hColumnDescriptor2.setTimeToLive(3600*3); HColumnDescriptor hColumnDescriptor3 = new HColumnDescriptor("f3"); hColumnDescriptor3.setBloomFilterType(BloomType.ROWCOL); //将新的列族描述器绑定到表描述器上 admin.addColumn(tableName,hColumnDescriptor1); admin.addColumn(tableName,hColumnDescriptor2); admin.addColumn(tableName,hColumnDescriptor3); } /** * alter 'myns2:student',{NAME=>'f3',METHOD=>'delete'} */ @Test public void alterTableDropFamily() throws IOException { //使用TableName来描述一个表名对象 TableName tableName = TableName.valueOf("myns2:student"); //描述一个列族名 byte[] columnName = "f3".getBytes(); admin.deleteColumn(tableName,columnName); } /** * drop 'myns2:student' */ @Test public void dropTable() throws IOException { //描述一个表名对象 TableName tableName = TableName.valueOf("myns2:student"); //使用Admin向hbase发送请求,判断表是否存在 if(admin.tableExists(tableName)){ //判断表是否已经禁用 if(!admin.isTableDisabled(tableName)){ //禁用表 admin.disableTable(tableName); } //删除表 admin.deleteTable(tableName); } } }

3.table的CRUD(DML+DQL)

putOneRowData() putBatchData() getOneRowData() ------------------------------------- getMultiRowData() scanMultiRowData() deletOneRowData() deleteMultiRowData() deleteOneCell() package com.qf.hbase.api; import com.qf.hbase.util.HbaseUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * 练习Hbase的表的增删改查操作 * <p> * 要使用的API是Table对象 */ public class TableCRUD { private Table table; @Before public void getTable() throws IOException { table = HbaseUtil.getTable("myns2:student"); } @After public void closeTable() { HbaseUtil.closeTable(table); } /** * put 'myns2:student','rk00001','base_info:name','zhangsan' */ @Test public void putOneRowData() throws IOException { //获取put对象,同时指定rowkey Put put = new Put("rk02004".getBytes()); /* addColumn(byte[] family,byte[] columnName,byte[] value) family: 列族名 columnName: 列名也就是key的名字 value:key对应的值 */ put.addColumn("base_info".getBytes(), Bytes.toBytes("name"), "hanxin".getBytes()); put.addColumn("f1".getBytes(), Bytes.toBytes("province"), "广东".getBytes()); //使用Table对象将put对象提交到hbase中 table.put(put); } @Test public void putBatchData() throws IOException { //获取一个泛型为Put的线性表对象 List<Put> puts = new ArrayList<Put>(); for (int i = 2; i < 1010; i++) { String rowkey = ""; if (i < 10) { rowkey = "rk0000" + i; } else if (i < 100) { rowkey = "rk000" + i; } else if (i < 1000) { rowkey = "rk00" + i; } else { rowkey = "rk0" + i; } Put put = new Put(rowkey.getBytes()); put.addColumn("base_info".getBytes(), "name".getBytes(), Bytes.toBytes("zhaoyun" + i)); put.addColumn("base_info".getBytes(), "age".getBytes(), Bytes.toBytes((int) (Math.random() * 90) + 10 + "")); int num = (int) (Math.random() * 2); String gender = num == 0 ? "f" : "m"; put.addColumn("base_info".getBytes(), "gender".getBytes(), Bytes.toBytes(gender)); //将put添加到集合中 puts.add(put); //每300行一提交 if (i % 300 == 0) { table.put(puts); //提交后需要清空集合,否则会出现重复提交 puts.clear(); } } table.put(puts); } /** * 获取指定一行的所有的单元格 */ @Test public void getOneRowData() throws IOException { //获取Get对象,指定行号 Get get = new Get("rk00009".getBytes()); //使用table对象的get方法,获取指定行的数据,返回的数据 被封装到Result对象里 Result result = table.get(get); //强调:Result里是这一行的所有的单元格 // advance():相当于迭代器的hasNext方法:问 while(result.advance()){ //current():相当于迭代器的next方法,取 Cell cell = result.current(); //CellUtils工具类,提供了克隆方法,将Cell中的每一个部分单独克隆出来 System.out.print(new String(CellUtil.cloneRow(cell))+"\t"); System.out.print(new String(CellUtil.cloneFamily(cell))+"\t"); System.out.print(new String(CellUtil.cloneQualifier(cell))+"\t"); System.out.println(new String(CellUtil.cloneValue(cell))); } } /** * 获取多行的所有的单元格 */ @Test public void getMultiRowData() throws IOException { //获取一个泛型为Get的集合对象 List<Get> list = new ArrayList<Get>(); //获取多个Get对象 Get g1= new Get(Bytes.toBytes("rk00001")); Get g2= new Get(Bytes.toBytes("rk00002")); Get g3= new Get(Bytes.toBytes("rk00003")); Get g4= new Get(Bytes.toBytes("rk00004")); list.add(g1); list.add(g2); list.add(g3); list.add(g4); //调用table的get(list<Get> list),向Hbase发送查询请求 Result[] results = table.get(list); for (Result result : results) { HbaseUtil.printResult(result); } } /** * 使用Scan对象封装一个行范围,进行查询数据 * * * 如果rowkey的长度不统一,那么在scan查询时,可能会查询出来我们不想要的数据。 * 如何避免这种情况呢? * * 小技巧: * 因为排序时是按照ascii码进行字典升序排序 * rk00010 * rk0001010 * rk00011 * * 如果不想要rk0001010 那么必须要指定一个rowkey是小于等于它的rowkey, * 反过来说,如果要想rk00010,那么只需要指定一个稍微比之大一点点的rowkey即可。 * rk00010-->rk00010 只需要在其后拼接一个ascii中最小的字符即可 \000 * * * * 从上面的情况可知,在企业中,rowkey的长度要统一。 * */ @Test public void scanMultiRowData() throws IOException { /* 调用构造器Scan(byte[] startrow,byte[] stoprow) 需要指定开始rowkey,和结束rowkey. 但是左闭右开的情况 */ //查询rk00001 到rk00010的10行数据的内容 Scan scan = new Scan("rk00001".getBytes(),("rk00020"+"\000").getBytes()); //调用table的getScanner(Scan scan)方法获取所有的行数据,返回的是ResultScanner对象,本质依然是一个迭代器 ResultScanner scanner = table.getScanner(scan); //获取ResultScanner的迭代器形式 Iterator<Result> iterator = scanner.iterator(); //问:有没有下一个元素,如果返回的是true,指针会主动向下移动 while(iterator.hasNext()){ //取出当前的元素 Result result = iterator.next(); HbaseUtil.printResult(result); } } /** * 删除一行的所有的单元格 */ @Test public void deletOneRowData() throws IOException { //使用Delete类型的实例封装一个要删除的行号 Delete d1 = new Delete(Bytes.toBytes("rk0001010")); // 调用table的delete(Delete delete) table.delete(d1); } /** * 删除多行的数据 */ @Test public void deleteMultiRowData() throws IOException { //获取一个泛型是Delete的集合对象 List<Delete> list = new ArrayList<Delete>(); Delete d1 = new Delete(Bytes.toBytes("rk00010")); Delete d2 = new Delete(Bytes.toBytes("rk00011")); Delete d3 = new Delete(Bytes.toBytes("rk00012")); Delete d4 = new Delete(Bytes.toBytes("rk00013")); list.add(d1); list.add(d2); list.add(d3); list.add(d4); //调用table的delete(List<Delete> list) table.delete(list); } /** * 删除指定行中的某一个单元格 */ @Test public void deleteOneCell() throws IOException { //使用Delete类型的实例封装一个要删除的行号 Delete d1 = new Delete(Bytes.toBytes("rk00009")); //指定这一行中的gender单元格 d1.addColumn("base_info".getBytes(),"gender".getBytes()); //删除操作 table.delete(d1); } }

day03

一、API(续)

1.table的CRUD(DML+DQL)(续)

putOneRowData() putBatchData() getOneRowData() -------------------------------------续讲 getMultiRowData() scanMultiRowData() deletOneRowData() deleteMultiRowData() deleteOneCell()

2.hbase的过滤器(重点)

2.1 为什么要学习过滤器

因为之前学习的查询有局限性,只能查询指定行号的或者行范围的,或者是查询column叫什么名字的这些情况 不能查询单元格的值是什么,比如name=zhangsan or age = 23的这种需求。 过滤器是可以帮助我们实现上述需求。

2.2过滤器的分类

1. 列值(ColumnValue)过滤器 -- SingleColumnValueFilter : select * 2. 结构(Structural)过滤器 -- FilterList: 可以绑定多个过滤器(条件) 3. keyvalue元数据(metadata)过滤器 : 查询的都是符合指定条件的单元格数据,不是select * -- FamilyFilter -- QualifierFilter -- ColumnPrefixFilter -- MultipleColumnPrefixFilter -- ColumnRangeFilter 4. 行键(RowKey)过滤器 -- RowFilter : 查询效果是select * 5. 实用(utility)过滤器 -- FirstKeyOnlyFilter : 查询效果是每一行中的第一个单元格 6. 分页过滤器 -- PageFilter package com.qf.hbase.api; import com.qf.hbase.util.HbaseUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.webapp.view.HtmlPage; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Iterator; /** * 演示所有的过滤器的练习 */ public class FilterDemo { private Table table; @Before public void getTable() throws IOException { table = HbaseUtil.getTable("myns2:student"); } @After public void closeTable() { HbaseUtil.closeTable(table); } /** * 测试单列值过滤器的用法 * * 值得注意的地方: * 默认情况下,如果要查询的单元格不存在,那么认为是满足条件的,所以会返回。 * 但是在实际需求中,是不应该出现这种情况, * 问:那么如何避免这种情况的发生?????? * * 答:需要通过设置一个参数,将不存在此单元格的行数据过滤器 * * 过滤器会提供setFilterIfMissing(boolean flag)方法 * true:表示过滤掉,不返回 * false: 不过滤掉,返回,认为满足条件,默认就是false * * */ @Test public void testSingleColumnValueFilter() throws IOException { /** * SingleColumnValueFilter(final byte [] family, final byte [] qualifier,final CompareOp compareOp, final byte[] value) { * * 第一个参数:要指定一个查询的列族名 * 第二个参数:要指定一个列族下的单元格的key * 第三个参数:要传入一个比较符号的常量,比如>,>=,=,<,<=,<> * CompareFilter.CompareOp.NOT_EQUAL * CompareFilter.CompareOp.EQUAL * CompareFilter.CompareOp.GREATER * CompareFilter.CompareOp.GREATER_OR_EQUAL * CompareFilter.CompareOp.LESS * CompareFilter.CompareOp.LESS_OR_EQUAL * 第四个参数:要指定一个列族下的单元格的value */ //指定要查询的是所有的age是23的行记录 : select * from student where age = 23; SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter( "base_info".getBytes(),"age".getBytes(), CompareFilter.CompareOp.EQUAL,"23".getBytes()); //设置过滤字段的方法 singleColumnValueFilter.setFilterIfMissing(true); //因为不知道有多少行,应该使用Scan去查询整张表的数据 Scan scan = new Scan(); //绑定过滤器条件 scan.setFilter(singleColumnValueFilter); ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()){ Result next = iterator.next(); HbaseUtil.printResult(next); } } /** * 查询 age>89 并且 gender = m的需求 */ @Test public void testFilterList() throws IOException { //使用单列值过滤器指定 age>89 SingleColumnValueFilter ageFilter = new SingleColumnValueFilter( "base_info".getBytes(), Bytes.toBytes("age"), CompareFilter.CompareOp.GREATER,"89".getBytes() ); //使用单列值过滤器指定 gender = m SingleColumnValueFilter genderFilter = new SingleColumnValueFilter( "base_info".getBytes(), Bytes.toBytes("gender"), CompareFilter.CompareOp.EQUAL,"m".getBytes() ); //缺失字段的设置 ageFilter.setFilterIfMissing(true); genderFilter.setFilterIfMissing(true); /** * FilterList.Operator.MUST_PASS_ALL 相当于 and * FilterList.Operator.MUST_PASS_ONE 相当于 or */ //获取一个结构过滤器对象 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); //将单列值过滤器绑定到结构过滤器上 filterList.addFilter(ageFilter); filterList.addFilter(genderFilter); HbaseUtil.printScan(filterList,table); } /** * 四种比较器的使用 */ @Test public void testComparator() throws IOException { //写一个正则表达式,匹配字符串中有en字符的 // 第一种 "^en" : 匹配en开头的字符串 // 第二种 "en$" : 匹配en结尾的字符串 // 第三种 "en" :匹配字符串中有en字符的 // 第四种 "^en$" : 全匹配 //RegexStringComparator comparator = new RegexStringComparator("^en"); //SubstringComparator comparator = new SubstringComparator("en"); //与正则比较器的第三种情况一致。 //BinaryComparator comparator = new BinaryComparator("zhenji".getBytes()); // 与正则比较器的第四种一致 BinaryPrefixComparator comparator = new BinaryPrefixComparator("zhen".getBytes());//与正则比较器的第一种一致。 //创建一个单列值过滤器,查询 name like '%en%' SingleColumnValueFilter nameFilter = new SingleColumnValueFilter( "base_info".getBytes(),"name".getBytes(), CompareFilter.CompareOp.EQUAL,comparator ); //设置缺失字段 nameFilter.setFilterIfMissing(true); //查询 HbaseUtil.printScan(nameFilter,table); } /** * 列族过滤器:FamilyFilter * 构造器中需要传入比较器对象 * 返回值是要查询的列族下的所有的单元格, select score.math,score.english,score.chinese from ..... where 列族=名字 * @throws IOException */ @Test public void testFamilyFilter() throws IOException { //使用正则比较器 RegexStringComparator comparator = new RegexStringComparator("^f1"); FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL,comparator); HbaseUtil.printScan(filter,table); } /** * QualifierFilter: 列名过滤器 * 返回的结果是所有匹配到的列名的所有行的单元格数据 * @throws IOException */ @Test public void testQualifierFilter() throws IOException { //使用正则比较器 RegexStringComparator comparator = new RegexStringComparator("ender"); QualifierFilter filter = new QualifierFilter(CompareFilter.CompareOp.EQUAL,comparator); HbaseUtil.printScan(filter,table); } /** * ColumnPrefixFilter:列名前缀过滤器 * 返回的结果是所有匹配到的列名的所有行的单元格数据 * @throws IOException */ @Test public void testColumnPrefixFilter() throws IOException { //列名前缀过滤器 ColumnPrefixFilter filter = new ColumnPrefixFilter("gen".getBytes()); HbaseUtil.printScan(filter,table); } /** * MultipleColumnPrefixFilter:多列名前缀比较器 * 返回的是每行中所有的匹配到的单元格 * @throws IOException */ @Test public void testMultipleColumnPrefixFilter() throws IOException { byte[][] columns = new byte[][]{"gen".getBytes(),"name".getBytes()}; //获取一个多列名前缀比较器对象 MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(columns); HbaseUtil.printScan(filter,table); } /** * ColumnRangeFilter(final byte[] minColumn, boolean minColumnInclusive,final byte[] maxColumn, boolean maxColumnInclusive) * 列名范围过滤器: 两个列名(age和name)之间的范围. 由于底层使用的ascii码进行比较,所有age---->name是有一定范围的, * 比如 gender 就在age和name范围内 * minColumn: 指定两个列名中较小的列名 * minColumnInclusive: 返回的结果是否要包含最小的列名,true表示包含,false表示不包含 * maxColumn: 指定两个列名中较大的列名 * maxColumnInclusive: 返回的结果是否要包含最大的列名,true表示包含,false表示不包含 */ @Test public void testColumnRangeFilter() throws IOException { ColumnRangeFilter filter = new ColumnRangeFilter("age".getBytes(),true,"name".getBytes(),false); HbaseUtil.printScan(filter,table); } /** * RowFilter:行键过滤器 * 返回的是匹配到的所有的行里的单元格数据 * @throws IOException */ @Test public void testRowFilter() throws IOException { //使用二进制比较器 BinaryPrefixComparator comparator = new BinaryPrefixComparator("rk0001".getBytes()); RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,comparator); HbaseUtil.printScan(filter,table); } /** * FirstKeyOnlyFilter: 第一个单元格过滤器 * 返回的是所有行的第一个单元格的数据 * @throws IOException */ @Test public void testFirstKeyOnlyFilter() throws IOException { FirstKeyOnlyFilter filter = new FirstKeyOnlyFilter(); HbaseUtil.printScan(filter,table); } @Test public void testPageFilter() throws IOException { // 每页10行,构造器中要指定10 PageFilter filter = new PageFilter(10); Scan scan = new Scan(); scan.setFilter(filter); String maxRow = ""; //定义一个计数器,用于统计当前的页号 int currentPage = 0; while(true){ //定义一个计数器 int count = 0; ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); System.out.println("--------------currentPage"+ (++currentPage) +"-----------------"); while(iterator.hasNext()){ Result result = iterator.next(); HbaseUtil.printResult(result); //取出当前行的行键 maxRow=new String(result.getRow()); //每打印一行就自动+1 count++; } System.out.println(""); //判断是否是最后一页 if(count<10){ break; } //为查询下一页做准备 scan.setStartRow((maxRow+"\000").getBytes()); } } }

3.hbase的比较器

--1. RegexStringComparator: 正则字符串比较器 --2. SubstringComparator: 子串比较器 --3. BinaryComparator : 二进制比较器 --4. BinaryPrefixComparator: 二进制前缀比较器

二、工作机制

2.1 寻址过程

客户端会发送访问请求,可能是一个查询操作,也可能是插入操作 查询操作:get 'myns2:student','rk01000'; 插入操作:put 't1','25','base_info:name','wangzhaojun' 从上述的操作中,可以看出需要找到具体表的具体的某一个region,如果是插入,就向对应的store的memstore里插入,如果是查询,就从需要查询的store里查询数据(memstore-写缓存,storefile,blockCache-读缓存)

访问过程分3步:

第1步:Client请求ZooKeeper获取hbase:meta表所在的RegionServer的地址。 第2步:Client 请求hbase:meta表所在的RegionServer获取访问的具体的表的region的RegionServer地址,Client会将hbase:meta表的相关信息cache下来,以便下一次快速访问。 第3步:Client请求数据所在的 RegionServer,获取所需要的数据。

2.2 存储机制

2.2.1 存储模型region

2.2.2 flush、compact、split

参考文档

day04

一、复习

一、API(重点) 1. table的crud --. get对象: 只能指定单行查询,但是可以使用List<Get>查询多行 查询效果: select * 的形式 --. scan对象: 默认情况下是全表查询: select * 的形式 也可以指定行范围进行查询: select * 的形式 也可以指定过滤器进行查询: 可能是select *的形式 也可能是select colName1,colName2.....where 的形式 2. 过滤器 --. 单列值过滤器: select * --. 结构过滤器: 可以绑定多个其他过滤器进行查询 --. keyvalue元数据过滤器: 返回指定要查询的单元格 --. 行键过滤器: 查询指定行的所有单元格 --. 实用过滤器(FirstKeyOnlyFilter): 查询的每行的第一个单元格 --. 分页过滤器(PageFilter): 可以进行一页固定多少条记录进行查询 3. 比较器 --. RegexStringComparator: 可以指定一个正则表达式的字符串进行匹配想要的数据 --. SubStringComparator: 可以指定一个字符串,查询包含此字符串的数据 --. BinaryComparator: 可以指定一个字符串的字节数组,查询必须是此字符串的数据 --. BinaryPrefixComparator: 可以指定一个字符串的字节数组,查询必须是以此字符串开头的数据 二、工作机制 1. 寻址流程 --. 客户端连接zookeeper,获取hbase:meta表的region的位置(regionserver的ip地址) --. 紧接着,客户端请求该位置,查询hbase:meta表的数据,获取要操作的表的region及其地址。 --. 然后,客户端就会访问该region的regionserver,然后处理(存取)数据。 2. 存储机制(region) --. region是hbase中的表或者是表的一部分,也就是基本的存储结构(存储模型,是内存中的java对象) --. region管理的数据除了在内存中有一部分外,剩下的全都是以文件(storefile)的形式存储在hdfs上。 --. region所要管理的数据可能由于单元格过多,或者是意义不同,分为列族进行管理,一个列族对应一个store(也是内存中的对象) --. 一个store就管理着相同意义的单元格,这些单元格在对应的memstore中存储,但是由于阈值,最终会flush成storefile。 --. storefile的个数越来越多,store(对象)管理着这些文件的索引信息。 --. storefile的个数太多的话,也不好管理,因此会合并机制,合并成一个文件 --. 如果一个storefile过大,region和regionserver的负载不均衡,因此会有切分机制,细节上是切分文件(从某一个rowkey开始切分,文件大小尽可能均分,也可能造成其他的文件的切分),宏观上是region的切分,因为region有自己的rowkey范围。 3. flush、compact、split、merge_region -- flush: 指的是memstore存储单元格时,达到自己的阈值(比如是128m,或者是1个小时,或者是整个regionserver的内存的40%),会进行flush,刷成storefile -- compact: 指的是storefile的数量达到阈值(3)时,会进行合并成一个storefile -- split: 宏观上指的就是region的切分

二、工作机制

2.1 region

为了更方便的管理表的region以及负载均衡,避免热点问题,所以region一般情况下会预切分。 就是在建表期间提前划分好每一个region的rowkey的范围 create 'ns1:student','base_info',SPLITS=>['RK250000','RK500000','RK750000'] 上述就是一种预切分的手段,需要预先估计要存储的真实数据的情况。

2.2 写流程(重点)

(1) Client通过Zookeeper的调度,向RegionServer发出写数据请求,在Region中写数据。 (2) 数据被写入Region的MemStore,直到MemStore达到预设阈值。 (3) MemStore中的数据被Flush成一个StoreFile。 (4) 随着StoreFile文件的不断增多,当其数量增长到一定阈值后,触发Compact合并操作,将多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除。 (5) StoreFiles通过不断的Compact合并操作,逐步形成越来越大的StoreFile。 (6) 单个StoreFile大小超过一定阈值后,触发Split操作,把当前Region Split成2个新的Region。父Region会下线,新Split出的2个子Region会被HMaster分配到相应的RegionServer上,使得原先1个Region的压力得以分流到2个Region上。

2.3 读流程(重点)

(1) Client 通过寻址流程,找到具体要查询的region。 (2) 由于Regionserver的内存分为MemStore(写存储)和BlockCache(读缓存)两部分。所以,读操作会先读取BlockCache里的数据,然后再读取MemStore里的数据,最后再通过索引找到可能有数据的storefile进行读取,从storefile中读到的数据会暂时缓冲到BlockCache中,为了下次的快速读取操作。

三、Hbase与hive的整合(了解)

3.1 说明

hbase是hadoop数据库,用于存储数据,虽然能提供近似实时的读写操作,但是依然不能从事OLTP的工作,OLAP也不合适。 HIVE底层是MR(MR是一个离线的分析计算框架),也决定了HIVE适合OLAP的工作。 因此Hbase与HIVE整合到一起,就可以满足存储和分析的工作了。 整合的目的:hbase中的表数据在hive中能看到,hive中的表数据在hbase中也能看到

3.2 Hive-to-Hbase

在hive中创建的表,一定要让hbase可以看到

create table if not exists student ( uid int, uname string, age int, sex string, province string, city string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties( "hbase.columns.mapping"=":key,base_info:name,base_info:age,base_info:gender,address_info:province,address_info:city" ) tblproperties( "hbase.table.name"="student1" );

向hive的student表中插入数据

insert into student values (1001,'zs',23,'f','广东','广州'); insert into student (uid,uname,city) values (1002,'lisi','杭州');

向hbase的student1表中插入数据

put 'student1','1003','base_info:name','wangwu' put 'student1','1003','address_info:city','changchun'

然后在hive中查询,在hbase中查询

3.3 Hbase-to-Hive

先在hbase中维护一张表 hbase> create_namespace 'ns1' hbase> create 'ns1:t1','f1','f2' put 'ns1:t1','rk00001','f1:name','zhaoyun' put 'ns1:t1','rk00001','f1:age',23 put 'ns1:t1','rk00001','f1:gender','m' put 'ns1:t1','rk00001','f2:math','100' put 'ns1:t1','rk00001','f2:english','10' put 'ns1:t1','rk00001','f2:chinese','90' put 'ns1:t1','rk00002','f1:name','zhenji' put 'ns1:t1','rk00002','f1:age',24 put 'ns1:t1','rk00002','f1:gender','f' put 'ns1:t1','rk00003','f1:name','貂蝉' put 'ns1:t1','rk00003','f1:age',24 put 'ns1:t1','rk00003','f1:gender','f'

2)在hive中创建一张表与之进行映射

create external table if not exists score_info ( uid string, uname string, age int, sex string, math string, chinese string, english string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties( "hbase.columns.mapping"="f1:name,f1:age,f1:gender,f2:math,f2:chinese,f2:english" ) tblproperties( "hbase.table.name"="ns1:t1" ); 在hive中查询一下数据

3.4 注意事项

1. hive和hbase在字段的映射关系上,是按照建表的字段顺序进行映射的,不是根据名称来的。 2. :key 是要映射成hbase的rowkey的。可以写出来,也可以不写,不写的话是默认提供的。 3. hbase先有表数据时,然后映射到hive中,那么hive的表必须是外部表(external)

四、布隆过滤器的原理(熟悉)

4.1 布隆过滤器的由来

1970年由Howard Bloom提出的一个二进制向量存储结构。用于判断一个元素是否在集合中,如果是“否”,那么一定不在集合中,如果是“是”,那么可能在集合中,牺牲了正确率来节省存储空间

4.2 布隆过滤器的应用场景

在爬虫爬取网页数据时,是根据网页链接来计算这个链接是否爬取过,使用布隆过滤器的存储结构,来判断这个网页链接是否没爬过,如果存储结构中经过计算,发现没有这个链接,那么这个链接一定没有爬过,那就爬。 如果经过计算,这个链接可能在存储结构中,那就意味着这个链接可能爬过,也可能没爬过,那么就不爬(大不了不要此网页的数据了)。

4.3 原理

布隆过滤器内部维护了一个64k大小的位数组,以及n个hashcode函数(每个hashcode函数的逻辑不同) 在初始化时,位数组里全都是0,假设有三个hashcode函数 hashCode1() int值-----是位数组的一个下标-----下标对应的元素存为1. hashCode2() int值-----是位数组的另一个下标-----下标对应的元素存为1. hashCode3() int值-----是位数组的另一个下标-----下标对应的元素存为1. 判断一个元素是否在一个集合中, 针对于这个元素分别调用三个函数,来计算出三个下标,如果三个下标上的位数组里的元素只要有一个是0,那么这个元素一定不存在集合中,如果三个下标的位数组的上的元素都是1,那么要判断的元素不一定在集合中。

4.4 在Hbase中的应用

当我们随机读get数据时,如果采用hbase的块索引机制,hbase会加载很多块文件。如果采用布隆过滤器后,它能够准确判断该HFile的所有数据块中,是否含有我们查询的数据,从而大大减少不必要的块加载,从而增加hbase集群的吞吐率。这里有几点细节: 1. 布隆过滤器的存储在哪? 对于hbase而言,当我们选择采用布隆过滤器之后,HBase会在生成StoreFile(HFile)时包含一份布隆过滤器结构的数据,称其为MetaBlock;MetaBlock与DataBlock(真实的KeyValue数据)一起由LRUBlockCache维护。所以,开启bloomfilter会有一定的存储及内存cache开销。但是在大多数情况下,这些负担相对于布隆过滤器带来的好处是可以接受的。 2. 采用布隆过滤器后,hbase如何get数据? 在读取数据时,hbase会首先在布隆过滤器中查询,根据布隆过滤器的结果,再在MemStore中查询,最后再在对应的HFile中查询。 3. 采用ROW还是ROWCOL布隆过滤器? 这取决于用户的使用模式。 如果用户只做行扫描,使用更加细粒度的行加列布隆过滤器不会有任何的帮助,这种场景就应该使用行级布隆过滤器。 当用户不能批量更新特定的一行,并且最后的使用存储文件都含有该行的一部分时,行加列级的布隆过滤器更加有用。 例如:ROW 使用场景假设有2个Hfile文件hf1和hf2, hf1包含kv1(r1 cf:q1 v)、kv2(r2 cf:q1 v) hf2包含kv3(r3 cf:q1 v)、kv4(r4 cf:q1 v) 如果设置了CF属性中的bloomfilter(布隆过滤器)为ROW,那么get(r1)时就会过滤hf2,get(r3)就会过滤hf1 。 ROWCOL使用场景假设有2个Hfile文件hf1和hf2, hf1包含kv1(r1 cf:q1 v)、kv2(r2 cf:q1 v) hf2包含kv3(r1 cf:q2 v)、kv4(r2 cf:q2 v) 如果设置了CF属性中的bloomfilter为ROW,无论get(r1,q1)还是get(r1,q2),都会读取hf1+hf2;而如果设置了CF属性中的bloomfilter为ROWCOL,那么get(r1,q1)就会过滤hf2,get(r1,q2)就会过滤hf1。 tip: ROW和ROWCOL只是名字上有联系,但是ROWCOL并不是ROW的扩展,也不能取代ROW

五、Rowkey的设计原则及其案例(重点)

5.1 热点问题

当大量的客户端要访问的数据集中在一个regionserver或者region里,那么这个regionserver或者这个region一定会不堪重负,造成性能上的影响,也可能影响到其他的region的访问。这种情况就是热点问题。因此在存储数据时,应该尽量散列存储行数据。

5.2 rowkey的重要性

因为hbase的存储机制,以及查询方式(通常是按照rowkey来查询,这样的性能非常高,而不是全表遍历),所以rowkey的设计显得格外重要。 比如在建表之处,我们就要考虑好,我们未来可能发生的查询操作,比如按照部门号查询,按照时间、地域性查询。那么就可以将这些信息设计到rowkey中,来达到更高的查询性能。

5.3 设计原则

--1. 唯一性原则 rowkey一定要遵守唯一性原则,如果不唯一,某一些单元格可能会被不小心覆盖掉 --2. 统一长度原则 rowkey的长度如果不统一,那么在查询时,返回的结果可能与想要的结果不一致。 如果rowkey的长度不统一,假如现在有5个Rowkey,分别是:"012", "0", "123", "234", "3"。因此底层是字典排序,所以排序的结果是: "0" "012" "123" "234" "3" 而我们想要的结果是如下的: "0" "3" "012" "123" "234" 由于是字典排序,做不到这样的效果。如果想要这样的效果,只能长度统一 "000" "003" "012" "123" "234" 建议在10个字节到100个字节以内,最好是16个字节,也就是8的倍数个字节。原则是能短绝对不要长。 --3. 散列原则 目的:为了尽量避开热点问题,应该使数据散列到不同的region中。 方式1: rowkey的reverse反转 130xxxx6666 130xxxx6789 131xxxx1234 131xxxx2234 159xxxx3234 188xxxx1111 手机号反转: 1234xxxx031 9876xxxx031 4321xxxx131 4322xxxx131 4323xxxx951 1111xxxx881 方式2:salting(加盐) abc001 abc002 abc003 abc004 ..... abc019 ..... 在设计存储时,随机a-g中的任意一个字符,作为前缀拼接到rowkey上 d-abc001 [,b][b,d][d,f][f,] a-abc002 g-abc003 f-abc004 d-abc005 ........ 方式3:hashcode散列/取模(也是加前缀) abc001 abc002 abc003 abc004 ..... abc019 --- 可以对这些rowkey调用hashcode函数或者是其他算法,比如md5等得到一个固定的字符串。 如abc001-->md5算法--->9bf049097142c168c38a94c626eddf3d-->取前四位作为rowkey的前缀 9bf0-abc001 7006-abc002 95e6-abc003 1ba5-abc010 如果是纯数字的rowkey 100001 100002 100003 100004 如果不想连续,而是散列存储,可以使用原始的rowkey对某一个数字比如4进行取模,结果作为前缀 1-100001 1-100005 2-100002 3-100003 0-100004

5.4 案例:多条件的rowkey设计

案例需求:

List<XXXX> find(starttime,endtime,filename,filetype,userid) -1. 文件创建时间区间 (比如从20120901到20120914期间创建的文件) -2. 文件名(“中国好声音”), -3. 分类(“综艺”), -4. 所有者(“浙江卫视”)。 设计rowkey时,可以是userid+createtime+fileid rowKey(userID 6 + time 8 + fileID 6) name category …. 00000120120902000001 00000120120904000002 00000120120906000003 00000120120908000004 00000120120910000005 00000120120914000007 00000220120912000006 00000220120916000008 00000320120918000009 00000420120920000010 真正应用时:传入一下参数 find(20120901,20120914,“中国好声音”,“综艺”,1){ 将形参拼接成想要的rowkey,传入到scan里 scan.setRowKeyRange(starttime,endtime) }

六、二级索引表的概念(熟悉)

rk0001 f1:name=zhangsan f1:age=23 的f2:province的值 rk0002 f1:name=lisi f1:age=23 rk0003 f1:name=wangwu f1:age=23 rk0004 f1:age=23 rk0005 f1:name=zhaoliu f1:gender=xxx ........................... rk0010 f1:name=zhangsan f1:gender=xxx f2:province rk0011 f1:name=zhangsan f1:gender=xxx 创建一张表: 存储 f1:name===xxxxx所有的行号 rowkey f1:name=zhangsan f1:rk1 rk0001 f1:rk2 rk0010 f1:rk3 rk0011 f1:name=lisi f1:rk rk0002 二级索引表的概念: 二级索引表存储的就是一些单元格与rowkey的映射关系。在查询原表时,为了不遍历整张表,可以通过二级索引表查询出要查询的行号,然后再去原表中通过行号查询对应的数据,这样就避免了全表扫描,提高了查询效率

七、协处理器(重要)

在hbase的低版本中,想要维护一张二级索引表是非常困难的。后来在高版本中,引入了协处理器的API。可以很轻松的完成二级索引表的建立。

还有就是求和,计数,排序等操作,低版本无法轻易完成,而引入的协处理器可以在server端进行并发运算,从而提高性能。

7.1 协处理器的分类

一、Observer: 相当于关系型数据库支持的触发器(重点) -- RegionObserver:针对Region的观察者,可以监听关于Region的操作 -- RegionServerObserver:针对RegionServer的观察者,可以监听关于RegionServer的操作 -- WALObserver:针对WAL的观察者,可以监听关于WAL的操作 -- MasterObserver:针对Master的观察者,可以监听关于Master的操作 二、EndPoint: 相当于关系型数据库支持的存储过程,提供的就是一个接口API。

7.2 两种类型的区别

- Observer 允许集群在正常的客户端操作过程中可以有不同的行为表现 - Observer 类似于 RDBMS 中的触发器,主要在服务端工作 - Observer 可以实现权限管理、优先级设置、监控、ddl 控制、二级索引等功能(重点) - Endpoint 允许扩展集群的能力,对客户端应用开放新的运算命令 - Endpoint 类似于 RDBMS 中的存储过程,主要在服务端工作 - Endpoint 可以实现 min、max、avg、sum、distinct、group by 等功能(重点)

7.3 协处理器的应用

需求:帮助"关注表"创建一个二级索引表"粉丝表"

表名:guanzhu rowkey cell cell liushuai-canglaoshi f1:from liushuai f1:to canglaoshi liushuai-bolaoshi f1:from liushuai f1:to bolaoshi ............... jiashuai-longzelaoshi f1:from jiashuai f1:to longzelaoshi ...... 二级索引表:fensi rowkey canglaoshi-liushuai f1:from liushuai f1:to canglaoshi canglaoshi-jiashuai f1:from jiashuai f1:to canglaoshi .......................... 需要提前创建好两张表 hbase(main):027:0> create 'guanzhu','f1' hbase(main):028:0> create 'fensi','f1'

步骤1:

1. 自定义类型,继承BaseRegionObserver 重写prePut方法,拦截put对象 2. 打包程序,上传到HDFS上 3. 将此协处理器挂载到关注表上 alter 'guanzhu',METHOD =>'table_att','coprocessor'=>'hdfs://qianfeng01:8020/jar/mycoprocessor.jar|com.qf.hbase.coprocessor.FensiObserver|1001|' 4. 进行测试 注意:在加载协处理器的时候,如果报 set hbase.table.sanity.checks to false等提示,需要在hbase-site.xml里配置以下属性,并分发到其他机器上,然后重启hbase服务 <property> <name>hbase.table.sanity.checks</name> <value>false</value> </property>

加载方式的介绍

1) 静态加载 通过修改 hbase-site.xml 这个文件来实现,启动全局 aggregation,能过操纵所有的表上 的数据。只需要添加如下代码: <property> <name>hbase.coprocessor.user.region.classes</name> <value>类全名</value> </property> 可以用”,”分割加载多个 class 2) 动态加载 只对特定的表生效。通过 HBase Shell 来实现。 1. 停用表  disable 'mytable' 2. 添加协处理器  alter 't_guanzhu',METHOD => 'table_att','coprocessor'=>'hdfs://supercluster/jar/mycoprocessor.jar|com.qf.hbase.coprocessor.MyIndexCoprocessor|1001|' 3. 启用表  enable 'mytable'

longzelaoshi …

二级索引表:fensi rowkey canglaoshi-liushuai f1:from liushuai f1:to canglaoshi canglaoshi-jiashuai f1:from jiashuai f1:to canglaoshi …

需要提前创建好两张表 hbase(main):027:0> create ‘guanzhu’,‘f1’ hbase(main):028:0> create ‘fensi’,‘f1’

步骤1: 自定义类型,继承BaseRegionObserver 重写prePut方法,拦截put对象打包程序,上传到HDFS上将此协处理器挂载到关注表上 alter ‘guanzhu’,METHOD =>‘table_att’,‘coprocessor’=>‘hdfs://qianfeng01:8020/jar/mycoprocessor.jar|com.qf.hbase.coprocessor.FensiObserver|1001|’进行测试

注意:在加载协处理器的时候,如果报 set hbase.table.sanity.checks to false等提示,需要在hbase-site.xml里配置以下属性,并分发到其他机器上,然后重启hbase服务 hbase.table.sanity.checks false

加载方式的介绍 静态加载 通过修改 hbase-site.xml 这个文件来实现,启动全局 aggregation,能过操纵所有的表上 的数据。只需要添加如下代码: hbase.coprocessor.user.region.classes 类全名

可以用”,”分割加载多个 class

动态加载 只对特定的表生效。通过 HBase Shell 来实现。 停用表  disable ‘mytable’添加协处理器  alter ‘t_guanzhu’,METHOD => ‘table_att’,‘coprocessor’=>‘hdfs://supercluster/jar/mycoprocessor.jar|com.qf.hbase.coprocessor.MyIndexCoprocessor|1001|’启用表  enable ‘mytable’ ## 八、优化参数的讲解
最新回复(0)