mapreduce程序集群模式运行,单词统计案例

tech2025-09-10  113

mapreduce程序集群模式运行,单词统计案例

本地模式运行:https://blog.csdn.net/weixin_43614067/article/details/108386389 本地提交给集群中运行:https://blog.csdn.net/weixin_43614067/article/details/108401227 map,reduce和本地模式运行一样。

yarn高可用集群环境搭建 1.配置hadoop-2.6.5/etc/hadoop/mapred-site.xml

<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>

2.配置hadoop-2.6.5/etc/hadoop/ yarn-site.xml

<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!--启用resourcemanager ha--> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!--声明两台resourcemanager的地址--> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster-yarn1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>node001</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>node002</value> </property> <!--指定zookeeper集群的地址--> <property> <name>yarn.resourcemanager.zk-address</name> <value>node002:2181,node003:2181,node004:2181</value> </property> <!--启用自动恢复--> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!--指定resourcemanager的状态信息存储在zookeeper集群--> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> </configuration>

3.同步其他集群机器上的配置 4.启动yarn ,其他的需要在其他机器上手动启动

sbin/start-yarn.sh

5.查看是否启动成功 (1)访问:node001:8088 ,会被自动切换到node002:8088 (2)命令查看

bin/yarn rmadmin -getServiceState rm1 active bin/yarn rmadmin -getServiceState rm2 standby

修改Runner

package com.bjsxt.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCRunner { public static void main(String[] args) throws Exception { //创建配置对象 Configuration conf = new Configuration(); //创建Job对象 Job job = Job.getInstance(conf, "wordCount"); //设置mapper类 job.setMapperClass(WCMapper.class); //设置 Reduce类 job.setReducerClass(WCReducer.class); //设置运行job类 job.setJarByClass(WCRunner.class); //设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入路径金额输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); long startTime = System.currentTimeMillis(); try { //提交job boolean b = job.waitForCompletion(true); if (b) { System.out.println("单词统计完成!"); } } finally { // 结束的毫秒数 long endTime = System.currentTimeMillis(); System.out.println("Job<" + job.getJobName() + ">是否执行成功:" + job.isSuccessful() + "; 开始时间:" + startTime + "; 结束时间:" + endTime + "; 用时:" + (endTime - startTime) + "ms"); } } }

编译打包后,上传至服务器任意目录,如,/root/project/wordcount.jar hdfs创建目录,input目录下放入需统计的单词文本,如,word.txt

hdfs dfs -mkdir -p /wordcount/input hdfs dfs -put test.txt /wordcount/input

注:output目录不需创建

执行wordcount jar

hadoop jar /root/project/wordcount.jar com/bjsxt/wc/WCRunner /wordcount/input /wordcount/output

wordcount.jar:项目编译打包后的jar com/bjsxt/wc/WCRunner:为Runner类全路径 /wordcount/input:为hdfs中数据输入路径 /wordcount/output:为hdfs中数据输出路径

执行完成后验证

hdfs dfs -cat /wordcount/output/*

本地模式运行:https://blog.csdn.net/weixin_43614067/article/details/108386389 本地提交给集群中运行:https://blog.csdn.net/weixin_43614067/article/details/108401227

最新回复(0)