ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【填坑之旅-hadoop-09】2.10.1 jdk1.8 Storm1.2.3 流式计算 nimbus ui supervisor topo spouts bolts tuple tas

2021-11-04 15:59:45  阅读:188  来源: 互联网

标签:jdk1.8 supervisor bolts tuple storm org apache import topology


storm 相关概念

介绍

在这里插入图片描述
在这里插入图片描述

Apache Storm 与任何排队系统和任何数据库系统集成。Apache Storm 的spout抽象使得集成新的排队系统变得容易。示例队列集成包括:
Kestrel
RabbitMQ / AMQP
Kafka
JMS
Amazon Kinesis
同样,将 Apache Storm 与数据库系统集成也很容易。只需像往常一样打开与数据库的连接并进行读/写。Apache Storm 将在必要时处理并行化、分区和故障重试。

基本概念

在这里插入图片描述

框架结构

在这里插入图片描述
spout tuple bolt stream 分发

在这里插入图片描述

###体系架构
在这里插入图片描述

kafka 和storm整合

在这里插入图片描述

storm中的 topologies

在这里插入图片描述

slots 与numworks

这个worker数受限于slot数量,一个worker消耗一个slot,当slot全部分配完,就不能再加载新的topology。slot数量是在 supervisor.slots.ports 中设置,每个端口提供一个slot,这样supervisor数量乘以ports数量,就是storm集群可以使用的worker数量。

Storm的并行度详解

Storm的并行度是非常重要的,通过提高并行度可以提高storm程序的计算能力。

那strom是如何提高并行度的呢?

Strom程序的执行是由多个supervisor共同执行的。supervisor运行的是topology中的spout/bolt task

task 是storm中进行计算的最小的运行单位,表示是spout或者bolt的运行实例。

程序执行的最大粒度的运行单位是进程,刚才说的task也是需要有进程来运行它的,在supervisor中,运行task的进程称为worker,

Supervisor节点上可以运行非常多的worker进程,一般在一个进程中是可以启动多个线程的,所以我们可以在worker中运行多个线程,这些线程称为executor,在executor中运行task。

这样的话就可以提高strom的计算能力。

总结一下:worker>executor>task

要想提高storm的并行度可以从三个方面来改造

worker(进程)>executor(线程)>task(实例)

增加work进程,增加executor线程,增加task实例

storm并行度配置详解(workers、executors、tasks的区别)

Workers (JVMs): 在一个节点上可以运行一个或多个独立的JVM 进程。一个Topology可以包含一个或多个worker(并行的跑在不同的machine上), 所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology;worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大亍machines的数目。

Executors (threads): 在一个worker JVM进程中运行着多个Java线程。一个executor线程可以执行一个或多个tasks.但一般默认每个executor只执行一个task。一个worker可用包含一个或多个executor, 每个component (spout或bolt)至少对应于一个executor, 所以可以说executor执行一个compenent的子集, 同时一个executor只能对应于一个component;executor的数目, component的并发线程数只能在代码中配置(通过setBolt和
setSpout的参数)。

Tasks(bolt/spout instances):Task就是具体的处理逻辑对象,每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder.setSpout和TopBuilder.setBolt来设置并行度 — 也就是有多少个task,tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置。

hadoop业务的整体开发流程

在这里插入图片描述

Netty bio nio selector 阻塞/非阻塞 IO的通信方式

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。在这里插入图片描述
在这里插入图片描述

Netty为什么并发高
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高,两张图让你了解BIO和NIO的区别:

stream groupings shuffle grouping 随机分组

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

hadoop与storm 对比

在这里插入图片描述
在这里插入图片描述

supervisor 上面运行的 worker进程

在这里插入图片描述

storm 安装笔记

1、安装一个zookeeper集群

2、上传storm的安装包,解压

3、修改配置文件storm.yaml

#所使用的zookeeper集群主机
storm.zookeeper.servers:
- “weekend05”
- “weekend06”
- “weekend07”

#nimbus所在的主机名
nimbus.host: “weekend05”

supervisor.slots.ports
-6701
-6702
-6703
-6704
-6705

启动storm
在nimbus主机上
nohup ./storm nimbus 1>/dev/null 2>&1 &
nohup ./storm ui 1>/dev/null 2>&1 &

在supervisor主机上
nohup ./storm supervisor 1>/dev/null 2>&1 &

storm的深入学习:
分布式共享锁的实现
事务topology的实现机制及开发模式
在具体场景中的跟其他框架的整合(flume/activeMQ/kafka(分布式的消息队列系统) /redis/hbase/mysql cluster)

zookeeper 伪分布式集群

zookeeper 伪分布式 集群
dataLogDir=/tmp/zookeeper/logs
clientPort=2181
autopurge.purgeInterval=24
admin.serverPort=2821
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888

echo 1 >myid

./zkCli.sh -server master:2181

server.1=master:2881:3881
server.2=master:2882:3882
server.3=master:2883:3883

clientPort=2182
admin.serverPort=2822

clientPort=2183
admin.serverPort=2823

clientPort=2184
admin.serverPort=2824

master:2182,master:2183,master:2184

storm伪分布式集群,配置文件

指定zookeeper集群 地址
配置slots占用端口
在这里插入图片描述

主机1 storm.yaml

 storm.zookeeper.servers:
     - "master"

 storm.zookeeper.port: 2182
 
 nimbus.seeds: ["master"]
 
 supervisor.slots.ports:
 - 6701
 - 6702
 - 6703
 - 6704

 drpc.servers:
 - "master"

主机2 storm.yam

 storm.zookeeper.servers:
     - "master"

 storm.zookeeper.port: 2183
 
 nimbus.seeds: ["master"]
 
 supervisor.slots.ports:
 - 6711
 - 6712
 - 6713
 - 6714

 drpc.servers:
 - "master"

主机3 storm.yam

 storm.zookeeper.servers:
     - "master"

 storm.zookeeper.port: 2184
 
 nimbus.seeds: ["master"]
 
 supervisor.slots.ports:
 - 6721
 - 6722
 - 6723
 - 6724

 drpc.servers:
 - "master"

jps

[hadoop@master bin]$ jps
72547 core
72418 nimbus
111015 QuorumPeerMain
110345 QuorumPeerMain
111097 QuorumPeerMain
72859 Jps
72698 Supervisor
72847 config_value

http://192.168.25.129:8080/index.html

在这里插入图片描述

应用例子 demotopo.jar 在线把输入转换大写,同时添加时间

storm 流式计算
手机实时位置查询,消息推送
在这里插入图片描述

storm jar ~/demotopo.jar cn.itcast.stormdemo.TopoMain
./storm list
./storm kill demotopo

tail -f 4d10b156-1c7d-4a0e-965f-20148a03b20d | more

在这里插入图片描述

demo-RandomWordSpout.class

func nextTuple(){
SpoutOutputCollector.emit(new Values(godName));
}

func declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields(“orignname”));
}

package cn.itcast.stormdemo;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

	private SpoutOutputCollector collector;
	
	//模拟一些数据
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
	
	//不断地往下一个组件发送tuple消息
	//这里面是该spout组件的核心逻辑
	@Override
	public void nextTuple() {

		//可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
		Random random = new Random();
		int index = random.nextInt(words.length);
		
		//通过随机数拿到一个商品名
		String godName = words[index];
		
		
		//将商品名封装成tuple,发送消息给下一个组件
		collector.emit(new Values(godName));
		
		//每发送一个消息,休眠500ms
		Utils.sleep(500);
		
		
	}

	//初始化方法,在spout组件实例化时调用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		
		
	}

	//声明本spout组件发送出去的tuple中的数据的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("orignname"));
		
	}

}

demo-UpperBolt.class

func execute(Tuple tuple, BasicOutputCollector collector){
collector.emit(new Values(godName_upper));
}

func declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields(“uppername”));
}

package cn.itcast.stormdemo;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class UpperBolt extends org.apache.storm.topology.base.BaseBasicBolt{

	
	//业务处理逻辑
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		
		//先获取到上一个组件传递过来的数据,数据在tuple里面
		String godName = tuple.getString(0);
		
		//将商品名转换成大写
		String godName_upper = godName.toUpperCase();
		
		//将转换完成的商品名发送出去
		collector.emit(new Values(godName_upper));
		
	}

	
	
	//声明该bolt组件要发出去的tuple的字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
		declarer.declare(new Fields("uppername"));
	}

}

demo-SuffixBolt.class

func prepare(){
new FileWriter()
}
func execute(Tuple tuple, BasicOutputCollector collector){
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
}

package cn.itcast.stormdemo;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

public class SuffixBolt extends BaseBasicBolt{
	
	FileWriter fileWriter = null;
	
	
	//在bolt组件运行过程中只会被调用一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {

		try {
			fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
	}
	
	
	
	//该bolt组件的核心处理逻辑
	//每收到一个tuple消息,就会被调用一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		//先拿到上一个组件发送过来的商品名称
		String upper_name = tuple.getString(0);
		String suffix_name = upper_name + "_itisok";
		
		
		//为上一个组件发送过来的商品名称添加后缀
		
		try {
			fileWriter.write(suffix_name);
			fileWriter.write("\n");
			fileWriter.flush();
			
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
		
		
	}

	
	
	
	//本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {

		
	}

}

demo-TopoMain.class

package cn.itcast.stormdemo;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

/**
 * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
 * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
 * @author duanhaitao@itcast.cn
 *
 */
public class TopoMain {

	
	public static void main(String[] args) throws Exception {
		
		TopologyBuilder builder = new TopologyBuilder();
		
		//将我们的spout组件设置到topology中去 
		//parallelism_hint :4  表示用4个excutor来执行这个组件
		//setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task
		builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
		
		//将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
		//.shuffleGrouping("randomspout")包含两层含义:
		//1、upperbolt组件接收的tuple消息一定来自于randomspout组件
		//2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
		builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
		
		//将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
		builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//用builder来创建一个topology
		StormTopology demotop = builder.createTopology();
		
		
		//配置一些topology在集群中运行时的参数
		Config conf = new Config();
		//这里设置的是整个demotop所占用的槽位数,也就是worker的数量
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);
		
		
		//将这个topology提交给storm集群运行
		StormSubmitter.submitTopology("demotopo", conf, demotop);
		
	}
}

标签:jdk1.8,supervisor,bolts,tuple,storm,org,apache,import,topology
来源: https://blog.csdn.net/alwarse/article/details/121139490

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有