技术博客


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

Apache flume

发表于 2018-01-17 | 分类于 大数据 | | 阅读次数:
字数统计: 575 | 阅读时长 ≈ 2

Apache flume

flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。 ## 同类产品对比 Flume使用基于事务的数据传递方式来保证事件传递的可靠性。而logstash内部是没有persist queue,所以在异常情况下,是可能出现数据丢失的问题的。

具体可参考下面的文档。 ## event event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。

flume架构介绍

  • agent:本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。 agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。
  • source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据。
  • channel:source组件把数据收集来以后,临时存放在channel中。用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
  • sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。

参考

  • Flume架构以及应用介绍
  • Flume日志采集系统——初体验(Logstash对比版)
  • Logstash,flume,sqoop比较

Siddhi

发表于 2018-01-17 | 分类于 大数据 | | 阅读次数:
字数统计: 483 | 阅读时长 ≈ 1

Siddhi

Siddhi是一个复杂事件流程引擎CEP(Complex Event Processing)。使用类SQL的语言描述事件流任务,可以很好的支撑开发一个可扩展的,可配置的流式任务执行引擎。性能管理系统之中,告警模块采用storm作为告警生成组件。传统设计之中,为了支持不同的告警规则类型,我们需要编写不同的业务逻辑代码,但是使用了Siddhi之后,我们只需要配置不同的流任务Siddhiql,即可以支持不同的告警业务。

Siddhi 能做什么?

  • 简单 ETL:使用类SQL
  • 基于 window 聚合:基于时间窗口
  • 多个流 Join
  • Pattern Query:Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival.比如:在一天内,出现一次取现金额 < 100之后,同一张卡,再次出现取现金额 > 10000,则认为是诈骗。
  • Sequence Query:和 pattern 的区别是,pattern 的多个 event 之间可以是不连续的,但 sequence 的 events 之间必须是连续的。我们可以看个例子,用sequence 来发现股票价格的 peak:
1
2
3
4
5
from every e1=FilteredStockStream[price>20], 
e2=FilteredStockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)],
e3=FilteredStockStream[price<e2[last].price]
select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak
insert into PeakStream ;

上面的查询的意思, e1,收到一条 event.price>20。 e2,后续收到的所有 events 的 price,都大于前一条 event。 e3,最终收到一条 event 的 price,小于前一条 event。 ok,我们发现了一个peak。

集成到 JStorm

我将 Siddhi core 封装成一个 Siddhi Bolt,这样可以在 JStorm 的 topology 中很灵活的,选择是否什么方案,可以部分统计用 brain,部分用 Siddhi,非常简单。

参考:

  • Siddhi初探(内含一个实例)
  • CEP简介
  • 让Storm插上CEP的翅膀 - Siddhi调研和集成

kafka

发表于 2018-01-17 | 分类于 大数据 | | 阅读次数:
字数统计: 347 | 阅读时长 ≈ 1

kafka

是一个分布式消息系统,主要用作数据管道和消息系统。

kafka的架构

如下图,由生产者向kafka集群生产消息,消费者从kafka集群订阅消息。

其中,kafka集群中的消息是按照主题(或者说Topic)来进行组成的。

  • 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
  • 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看做是一个FIFO(先进先出)队列;kafka分区是提高kafka性能的关键手段。

这张图在整体上对kafka集群进行了概要,途中kafka集群是由三台机器(Broker)组成,当然,实际情况可能更多。相应的有3个分区,Partition-0~Partition-2,图中能看到每个分区的数据备份了2份。kafka集群从前端应用程序(producer)生产消息,后端通过各种异构的消费者来订阅消息。kafka集群和各种异构的生产者、消费者都使用zookeeper集群来进行分布式协调管理和分布式状态管理、分布式锁服务的。

参考

  • Kafka入门学习(一)
  • Kafka topic常见命令解析

Apache Eagle

发表于 2018-01-17 | 分类于 大数据 | | 阅读次数:
字数统计: 3.3k | 阅读时长 ≈ 12

Apache Eagle

Apache Eagle是一个识别大数据平台上的安全和性能问题的开源解决方案。它主要用来即时监测敏感数据访问和恶意活动,并及时采取行动。除了数据活动管理,Eagle也可以用于节点异常检测,集群和作业性能分析。

功能介绍

主要的应用场景包括:监控Hadoop中的数据访问流量;检测非法入侵和违反安全规则的行为;检测并防止敏感数据丢失和访问;实现基于策略的实时检测和预警;实现基于用户行为模式的异常数据行为检测。

  • 检测和报警:Apache Eagle依赖于Apache Storm来进行数据活动和操作日志的流处理,并且可以执行基于策略的检测和报警。它提供多个API:作为基于Storm API上的一层抽象的流式处理API和policy engine provider API的抽象,它将WSO2的开源Siddhi CEP engine作为第一类对象。Siddhi CEP engine支持报警规则的热部署,并且警报可以使用属性过滤和基于窗口的规则(例如,在10分钟内三次以上的访问)来定义。
  • 集群和作业性能分析:通过处理YARN应用日志和对YARN中所有运行的作业进行快照分析来完成的。Eagle可以检测单个作业趋势、数据偏斜问题、故障原因和考虑所有运行的作业情况下评估集群的整体性能。
  • 基于机器学习的policy provider:Apache Eagle中还包括一个基于机器学习的policy provider。它从过去的用户行为中学习,来将数据访问分类为异常或者正常。这个机器学习policy provider评估在Apache Spark框架中离线训练的模型。

Eagle具有如下特点:

  • 高实时: 尽可能地确保能在亚秒级别时间内产生告警,一旦综合多种因素确订为危险操作,立即采取措施阻止非法行为。
  • 可伸缩
  • 简单易用
  • 用户Profile:Eagle 内置提供基于机器学习算法对Hadoop中用户行为习惯建立用户Profile的功能。我们提供多种默认的机器学习算法供你选择用于针对不同HDFS特征集进行建模,通过历史行为模型,Eagle可以实时地检测异常用户行为并产生预警。

基本概念

  • Site:A site can be considered as a physical data center. Big data platform e.g. Hadoop may be deployed to multiple data centers in an enterprise.
  • Application:An "Application" or "App" is composed of data integration, policies and insights for one data source.
  • Policy:A "Policy" defines the rule to alert. Policy can be simply a filter expression or a complex window based aggregation rules etc.
  • Alerts:An "Alert" is an real-time event detected with certain alert policy or correlation logic, with different severity levels like INFO/WARNING/DANGER.
  • Data Source:A "Data Source" is a monitoring target data. Eagle supports many data sources HDFS audit logs, Hive2 query, MapReduce job etc.
  • Stream:A "Stream" is the streaming data from a data source. Each data source has its own stream.

架构

从这个架构图,可以看到eagle会提供一些内置的应用,通过配置不同的policy,它们能够做很多eagle里面非常核心的事情。

比如一个大的集群里,我们可能希望能够探测一些恶意的操作以及误操作,类似于Hive、MR、Spark的Job,如果一个不相关的人,操作了其他应用独享的数据,这种情况应该及时的通知应用的负责人。security问题在多租户下面其实非常重要,像我们现在一个hadoop集群,不仅仅会提供一些离线的任务,其实这部分任务反而占比很少,大部分都是在线的Yarn任务以及HDFS服务,不同的应用会使用hadoop进行任务分发、状态存储、checkpoint等,但这些任务在文件系统上的隔离是不保证的。在生产环境里,如果某个在线任务的状态文件被其他应用误修改了,整个snapshot都会不可用,一般这样的情况会使当前批次整个回滚,甚至在极端情况下出现错误数据,影响应用的执行语义。

在alertEngine中,你可以添加一个policy,检查JobHistory或者logStash,如果一个Job触发了policy就会通知相关的责任人采取措施。对大集群的监控和报警实际是非常麻烦和耗时的,eagle很好的解决了这个问题。

policy的设置非常灵活,甚至可以用机器学习的方式,离线去train一个模型,然后再把这个模型实时的更新上去。具体的做法是在storm这一层去做模型匹配,比如一个用户通常的几个特征量,read、delete、changename等,会去在线查看实际值与正常值的偏差,超过一定程度就会被eagle认为是异常的。或者一个未授权的用户,做了一件从来没有做过的事情,也可以认为是异常的。

通过内置的应用我们还可以进行作业的异常分析:一个Job有多个stage,上一级stage处理完之后,通过group的方式向下一级节点发送数据,假设hash的对象是userid,那么很可能出现的情况是,下一级一共有十个节点,但是partition之后的数据95%都被分配到了同一个节点上。这个问题通常是很难解决的,但有了eagle,我们就可以及时的发现这种情况,并优化提高Job的性能。

异常分析的另一个好处就是节点分析。稍微大一点规模的Job可能就需要几十上百个节点来执行,通常过程中如果挂掉一个节点,触发FailOver机制重新调度是可以解决的。但如果某个节点上的job经常失败,那么这个节点就应该被识别出来进行处理,避免大量job的重新调度开销。

内置应用

上面这些功能需要底层架构给予支持。eagle底层的流计算引擎是基于storm的,我们知道在传统领域,比如OLAP、数据库这些,通常都会有一门DSL来方便大家使用。近来在流计算领域,一个比较火爆的概念就是我们能不能也创造一门DSL,类似于SQL,但数据却是流式的。比如Spark的StructureStreaming,Flink的TableApi都是因为这个而产生。在eagle上为了支持更方便的执行policy和动态更新,也是需要一个DSL。那么它的做法比较讨巧,相当于利用了Siddhi这个现成的CEP,集成到storm的框架里,利用Siddhi本身的SQL支持来实现storm的DSL。那么用户无论是自定义policy还是自己编写应用,都可以像下面这样写:

  • 定义流定义和查询,并将结果输出到另外一个流里面。
1
2
3
4
define stream TempStream (deviceID long, roomNo int, temp double);
from TempStream
select roomNo, temp * 9/5 + 32 as temp, F as scale, roomNo >= 100 and roomNo < 110 as isServerRoom
insert into RoomTempStream;
  • 多流Join和TimeWindow。
1
2
3
4
5
from TempStream[temp > 30.0]#window.time(1 min) as T 
join RegulatorStream[isOn == false]#window.length(1) as R
on T.roomNo == R.roomNo
select T.roomNo, T.temp, R.deviceID, start as action
insert into RegulatorActionStream ;
  • Pattern Query:这个比较能体现CEP的优势,在下面的查询中,->标示的是事件顺序,也就是说,这个语义实际上表达了同一张卡在一天内,出现一次取现金额 < 100后再次出现取现金额 > 10000的情况,并将其判断为fraud。这是传统SQL所不具备的,也可以说是专为流式计算而设计。
1
2
3
4
5
from every a1 = atmStatsStream[amountWithdrawed < 100]
-> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo == b1.cardNo]
within 1 day
select a1.cardNo as cardNo, a1.cardHolderName as cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as location, b1.cardHolderMobile as cardHolderMobile
insert into possibleFraudStream;

元数据设计

在集群中可能有成百上千个节点,每个节点上GB甚至上TB的的日志文件,如果出现一个异常的访问点,我们希望能在毫秒级别上对其进行预警或者是拦截。然后我们知道storm有一个很大的缺陷是它本身逻辑定义完就固定了。按照以往,这种分布式stream逻辑定义完,想再修改系统,必须要把topology重启,生产环境下肯定不希望这样,牺牲实时性的代价太大了,所以eagle整个的结构是元数据驱动(Metadata Driven)。

-w679
-w679

从上图我们可以看出eagle的输入和输出其实是非常明确的,那么在元数据的定义上,因为下面的存储是基于Hbase,所以eagle做的非常的灵活。一般对于同一个类型的采集日志(例如某个metric),在RowKey上会采用一个固定的前缀,后面加上时间序列,这样在设计上就保证了分布性和同一个metric在数据上的连续性。

在生产场景下,可能一开始训练的Policy模型只有几个G的样本数据,但这个数据的增长是非常快的。那么我们不可能在一个月之后,还不去更换它。在eagle中这样的更新是很简单的。由于eagle的元数据驱动特性,engine会去监听元数据的变化。一旦metadata触发了alertEngine注册的listener,内部是可以通过ClassLoader动态部署的,比如动态的去更新storm里面的spout和bolt,这样整个更新过程可以在毫秒的级别就做完,相对来说,提高了几个数量级,并且这个过程是不会丢失数据的。

编写自定义应用

下面简单介绍一下如何编写一个自己的扩展应用:

  • 首先需要提供应用的Provider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ExampleApplicationProvider extends AbstractApplicationProvider {
private static final Logger LOG = LoggerFactory.getLogger(ExampleApplicationProvider.class);

@Override
public ExampleStormApplication getApplication() {
return new ExampleStormApplication();
}

@Override
public Optional getApplicationListener() {
return Optional.of(new ApplicationListener() {

@Inject ExampleEntityService entityService;
private ApplicationEntity application;

@Override
public void init(ApplicationEntity applicationEntity) {
this.application = applicationEntity;
entityService.getEntities();
}

@Override
public void afterInstall() {
LOG.info("afterInstall {}", this.application);
}

@Override
public void afterUninstall() {
LOG.info("afterUninstall {}", this.application);
}

@Override
public void beforeStart() {
LOG.info("beforeStart {}", this.application);
}

@Override
public void afterStop() {
LOG.info("afterStop {}", this.application);
}
});
}

@Override
protected void onRegister() {
bindToMemoryMetaStore(ExampleEntityService.class,ExampleEntityServiceMemoryImpl.class);
bind(ExampleCommonService.class,ExampleCommonServiceImpl.class);
}

这里需要注意的是,应用本身的Meta是需要指定存储方式的,这个例子里面我们简单指定为Memory的方式。当然,在生产环境一般可以换成Hbase。

  • 然后提供应用本身的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
public class ExampleStormApplication extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("metric_spout", environment.getStreamSource("SAMPLE_STREAM", config)
, config.getInt("spoutNum"));
builder.setBolt("sink_1", environment.getStreamSink("SAMPLE_STREAM_1", config)).fieldsGrouping("metric_spout",
new Fields("metric"));
builder.setBolt("sink_2", environment.getStreamSink("SAMPLE_STREAM_2", config)).fieldsGrouping("metric_spout",
new Fields("metric"));
return builder.createTopology();
}
  • 最后通过配置指定执行环境等参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"application": {
"sink": {
"type": "org.apache.eagle.app.messaging.KafkaStreamSink",
"config": {
"kafkaBrokerHost": "",
"kafkaZkConnection": ""
}
},
"storm": {
"nimbusHost": "localhost"
"nimbusThriftPort": 6627
}
},

"appId": "unit_test_example_app"
"spoutNum": 3
"loaded": true
"mode": "LOCAL"
"dataSinkConfig": {
"topic": "test_topic",
"brokerList": "sandbox.hortonworks.com:6667",
"serializerClass": "kafka.serializer.StringEncoder",
"keySerializerClass": "kafka.serializer.StringEncoder"
}

这里要配置好source和sink,比如kafka的topic、broker。限于篇幅,这里略去了coordinator自身的配置。完成上面的代码和配置,也就完成了一个自定义的应用编写。

数据集成

数据集成使用Apache Kafka通过logstash forwarder 代理或通过log4j kafka appender来实现的。来自多个Hadoop守护进程(例如,namenode,datanode等)的日志条目被反馈到Kafka并由Storm处理。Eagle支持将数据资产分类为多个灵敏度类型。

数据持久化

Eagle支持使用Apache HBase和关系数据库持久化警报。警报可通过电子邮件、Kafka或存储在Eagle支持的存储中进行通知。你也可以开发自己的警报通知插件。

结语

从前面的介绍我们可以看出,整个eagle其实是一套整体的解决方案,这个方案更多的是在应用的层面上进行了许多创新性的使用和整合。但eagle的实时性、可扩展性不仅仅值得在hadoop集群中使用,里面的很多思想其实也是值得给想要搭建流式计算平台的同学进行参考和学习的。而对于底层框架的开发同学,其实eagle在算子层、API层、状态存储层做的许多事情正是很多应用开发者需要自己去做的事情,能不能给开发应用更多的支持,让开发更顺畅更快速,也是值得去思考一下的。

参考

  • Apache Eagle毕业成为顶级项目
  • Apache Eagle 陈浩——Apache+Eagle:架构演化和新特性
  • Apache Eagle 简介--分布式实时 Hadoop 数据安全方案
  • Apache Eagle——eBay开源分布式实时Hadoop数据安全方案
  • Getting Started Eagle
  • kafka eagle安装与使用
  • Kafka Eagle Reference Manual

storm学习

发表于 2018-01-17 | 分类于 大数据 | | 阅读次数:
字数统计: 3.9k | 阅读时长 ≈ 14

storm学习

原理和实例部分

流式框架对比

Hadoop只能处理适合进行批量计算的需求;Storm用来解决分布式流式计算系统。除此之外,流计算还有spark streaming和flink。对比:

-w389
-w389
  • 如果你想要的是一个允许增量计算的高速事件处理系统,Storm会是最佳选择。
  • 如果你必须有状态的计算,恰好一次的递送,并且不介意高延迟的话,那么可以考虑Spark Streaming,特别如果你还计划图形操作、机器学习或者访问SQL的话,Apache Spark的stack允许你将一些library与数据流相结合(Spark SQL,Mllib,GraphX),它们会提供便捷的一体化编程模型。尤其是数据流算法(例如:K均值流媒体)允许Spark实时决策的促进。
  • Flink支持增量迭代,具有对迭代自动优化的功能,在迭代式数据处理上,比Spark更突出,Flink基于每个事件一行一行地流式处理,真正的流式计算,流式计算跟Storm性能差不多,支持毫秒级计算,而Spark则只能支持秒级计算。

storm 主要概念

Storm采用的是Master-Slave结构,就是使用一个节点来管理整个集群的运行状态。Master节点被称为:Nimbus,Slave节点用来维护每台机器的状态,被称为Supervisor。

  • Nimbus的角色是只负责一些管理性的工作,它并不关心Worker之间的数据是如何传输的。
  • Supervisor的角色是听Nimbus的话,来启动并监控真正进行计算的Worker的进程。
  • Worker:运行在工作节点上面,被Supervisor守护进程创建的用来干活的JVM进程。一个Worker里面不会运行属于不同的topology的执行任务。
  • 拓扑(topology):在Storm中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码,将任务分配给工作节点(worker node)执行。一个拓扑中包括spout和bolt两种角色。运行Topology:把代码以及所依赖的jar打进一个jar包,运行strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2。这个命令会运行主类:backtype.strom.MyTopology,参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到nimbus并且上传jar文件。
  • 数据源节点Spout:发送消息,负责将数据流以tuple元组的形式发送出去。
  • 普通计算节点Bolt:负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。
  • 记录Tuples:由spout发射出的tuple是不可变数组,对应着固定的键值对。
  • tuple:storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型。Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List。一个Tuple代表数据流中的一个基本的处理单元,例如一条cookie日志,它可以包含多个Field,每个Field表示一个属性。

  • Stream:一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。

一个简单的Topology

看一下storm-starter里面的ExclamationTopology:

1
2
3
4
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(1, new TestWordSpout(),10);
builder.setBolt(2, new ExclamationBolt(),3).shuffleGrouping(1);
builder.setBolt(3, new ExclamationBolt(),2).shuffleGrouping(2);
  • setSpout和setBolt的三个参数:指定的id、包含处理逻辑的对象(spout或者bolt)、并行度(可选)。
  • spout要实现IRichSpout的接口;bolt要实现IRichBolt接口。
  • 并行度表示集群里面需要多少个thread来一起执行这个节点;如果你忽略它,那么storm会分配一个线程来执行这个节点。
  • setBolt方法返回一个InputDeclarer对象,这个对象是用来定义Bolt的输入。
  • shuffleGrouping表示所有的tuple会被随机的分发给bolt的所有task。给task分发tuple的策略有很多种,后面会介绍:

这个Topology包含一个Spout和两个Bolt。这里第一个Bolt声明它要读取spout所发射的所有的tuple — 使用shuffle grouping。而第二个bolt声明它读取第一个bolt所发射的tuple。Spout发射单词,每个bolt在每个单词后面加个”!!!”。这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt然后把处理好的单词发射给第二个bolt。如果spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。

如果想第二个bolt读取spout和第一个bolt所发射的所有的tuple, 那么应该这样定义第二个bolt:builder.setBolt(3,new ExclamationBolt(),5).shuffleGrouping(1).shuffleGrouping(2);

TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:

1
2
3
4
5
6
7
public void nextTuple() {
Utils.sleep(100);
final String[] words=new String[]{"nathan","mike","jackson","golda","bertels"};
final Random rand =new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(newValues(word));
}

ExclamationBolt把”!!!”拼接到输入tuple后面。实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector){
_collector = collector;
}

public void execute(Tuple tuple) {
_collector.emit(tuple,new Values(tuple.getString(0) +"!!!"));
_collector.ack(tuple);
}

public void cleanup() {}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
  • prepare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任何时候发射tuple: 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法使用。
  • execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源). ExclamationBolt获取tuple的第一个字段,加上”!!!”之后再发射出去。如果一个bolt有多个输入源,可以通过调用Tuple#getSourceComponent方法来知道它是来自哪个输入源的。execute方法里面还有其它一些事情值得一提:输入tuple被作为emit方法的第一个参数,并且输入tuple在最后一行被ack。这些呢都是Storm可靠性API的一部分,后面会解释。
  • cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏。
  • declareOutputFields定义一个叫做”word”的字段的tuple。

storm的运行有两种模式: 本地模式和分布式模式。本地模式主要用于开发测试。

  • 本地模式:运行storm-starter里面的topology的时候,它们就是以本地模式运行的,可以看到topology里面的每一个组件在发射什么消息。
  • 分布式模式:storm由一堆机器组成。当提交topology给master的时候,同时也需要提交topology的代码。master负责分发代码,并且负责给topolgoy分配工作进程。如果一个工作进程挂掉了,master节点会重新分配到其它节点。

下面是以本地模式运行ExclamationTopology的代码:

1
2
3
4
5
6
7
8
Config conf =new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster =new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先, 这个代码定义通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology,三个参数:要运行的topology的名字、配置对象、要运行的topology本身。

topology的名字用来唯一区别一个topology,可以用这个名字来kill这个topology。必须显式的杀掉一个topology, 否则它会一直运行。

Conf对象可以配置很多东西, 下面两个是最常见的:

  • TOPOLOGY_WORKERS(setNumWorkers) 定义集群分配多少个工作进程执行这个topology。topology里面的每个组件都需要线程来执行;每个组件通过setBolt和setSpout来指定需要的线程数;这些线程都运行在工作进程里面。每一个工作进程包含一些节点的一些工作线程。比如,如果你指定300个线程,60个进程, 那么每个工作进程里面要执行6个线程,而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
  • TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。

Topology的三个组件

运行中的Topology主要由以下三个组件组成的:Worker processes(进程)、Executors (threads)(线程)、Tasks。

-w400
-w400

举例:

1
2
3
4
5
6
7
8
Config conf =new Config();
conf.setNumWorkers(2);
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout("blue-spout", new BlueSpout(),2);
builder.setBolt("green-bolt", new GreenBolt(),2)
.setNumTasks(4) //设置Task数量
.shuffleGrouping("blue-spout");
builder.setBolt("yellow-bolt",new ExclamationBolt(),6).shuffleGrouping("green-bolt");

对应的Worker processes(进程)、Executors (threads)(线程)、Tasks数量。指定了2个Worker。共2+2+6=10个Executor线程,每个Worker5个(图中未画出来)。绿色指定了Task数量为4,蓝色和黄色没有指定。

-w600
-w600

流分组策略(Stream grouping)

流分组策略告诉topology如何在两个组件之间发送tuple。spouts和bolts以很多task的形式在topology里面同步执行。如果从task的粒度来看一个运行的topology,它应该是这样的:

-w502
-w502

当Bolt A的一个task要发送一个tuple给Bolt B, 它应该发送给Bolt B的哪个task呢?下面是一些常用的 “路由选择” 机制:

  • ShuffleGrouping:随机选择一个Task来发送。
  • FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
  • AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
  • GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
  • NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
  • DirectGrouping:直接将Tuple发送到指定的Task来处理。

使用其他的语言来定义Bolt

Bolt可以使用任何语言来定义。用其它语言定义的bolt会被当作子进程(subprocess)来执行, storm使用JSON消息通过stdin/stdout来和这些subprocess通信。这个通信协议是一个只有100行的库,storm团队给这些库开发了对应的Ruby, Python和Fancy版本。

可靠的消息处理

Storm允许用户在Spout中发射一个新的源Tuple时为其指定一个MessageId,这个MessageId可以是任意的Object对象。多个源Tuple可以共用同一个MessageId,表示这多个源Tuple对用户来说是同一个消息单元。Storm的可靠性是指Storm会告知用户,每一个消息单元是否在一个指定的时间内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple,都经过了Topology中每一个应该到达的Bolt的处理。

在Spout中由message 1绑定的tuple1和tuple2分别经过bolt1和bolt2的处理,然后生成了两个新的Tuple,并最终流向了bolt3。当bolt3处理完之后,称message 1被完全处理了。

Storm中的每一个Topology中都包含有一个Acker组件。Acker组件的任务就是跟踪从Spout中流出的每一个messageId所绑定的Tuple树中的所有Tuple的处理情况。如果在用户设置的最大超时时间内这些Tuple没有被完全处理,那么Acker会告诉Spout该消息处理失败,相反则会告知Spout该消息处理成功。

Storm接口详解

IComponent接口

Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口。IComponent的继承关系如下图所示:

绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关。BaseComponent是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。

Spout

-w535
-w535

各个接口说明:

  • open方法:是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。
  • close方法:在该spout关闭前执行,但是并不能得到保证其一定被执行。spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。
  • activate和deactivate方法 :一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。
  • nextTuple方法:负责消息的接入,执行数据发射。是Spout中的最重要方法。
  • ack(Object)方法:传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。
  • fail(Object)方法:同ack,只不过是tuple处理失败时执行。

如果继承了BaseRichSpout,就不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。

总结:通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。

Bolt

类图如下图所示:

-w577
-w577

  • prepare方法:IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文。
  • execute方法:接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。
  • cleanup方法:同ISpout的close方法,在关闭前调用。同样不保证其一定执行。

Bolt实现时一定要注意execute方法。为什么IBasicBolt并没有继承IBolt?Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。如果你确实要反馈失败,可以抛出FailedException。

总结:通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);

部署

  • 本地打jar:mvn clean install -DskipTests=true,jar包会打到$HOME/.m2/repository目录
  • 为集群打包(包含其他依赖):mvn package-->target/storm-starter-{version}.jar
  • local模式执行:storm jar target/storm-starter-*.jar org.apache.storm.starter.ExclamationTopology -local
  • 集群模式执行,名称为production-topology:storm jar target/storm-starter-*.jar org.apache.storm.starter.RollingTopWords production-topology

参考

  • Storm入门学习随记
  • 流式大数据处理的三种框架:Storm,Spark和Flink
  • storm 入门原理介绍
  • 细细品味Storm_Storm简介及安装V1.1.pdf
  • storm的topology提交执行
  • Example Storm Topologies
1…345…8
DanielJyc

DanielJyc

数据驱动 Java 大数据 算法

40 日志
5 分类
28 标签
RSS
Links
  • Danieljyc blog
© 2014 — 2019 DanielJyc | Site words total count: 53k
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
京ICP备 - 19007489号