ASP源码.NET源码PHP源码JSP源码JAVA源码DELPHI源码PB源码VC源码VB源码Android源码
当前位置:首页 >> 数据库 >> MySql >> 104-storm 整合 kafka之保存MySQL数据库

104-storm 整合 kafka之保存MySQL数据库(1/3)

来源:网络整理     时间:2015-10-08     关键词:Exception,zookeeper

本篇文章主要介绍了"104-storm 整合 kafka之保存MySQL数据库",主要涉及到Exception,zookeeper方面的内容,对于MySql感兴趣的同学可以参考一下: 整合KafkaStorm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程...

整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置。1、配置Maven依赖包

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka_2.10</artifactId>
     <version>0.8.2.0</version>
     <exclusions>
               <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
     </exclusions>
</dependency>

<!-- kafka整合storm -->
<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>0.9.3</version>
     <scope>provided</scope>
     <exclusions>
          <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>log4j-over-slf4j</artifactId>
          </exclusion>
          <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-api</artifactId>
          </exclusion>
     </exclusions>
</dependency>

<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-kafka</artifactId>
     <version>0.9.3</version>
</dependency>
storm程序能接收到数据,并进行处理,但是会发现数据被重复处理这是因为在bolt中没有对数据进行确认,需要调用ack或者fail方法, 修改完成之后即可。

2、编写Storm程序

package com.yun.storm;
import java.util.UUID;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

/**
* Storm读取Kafka消息中间件数据
*
* @author shenfl
*
*/
public class KafkaLogProcess {


     private static final String BOLT_ID = LogFilterBolt.class.getName();
     private static final String SPOUT_ID = KafkaSpout.class.getName();

     public static void main(String[] args) {
         
          TopologyBuilder builder = new TopologyBuilder();
          //表示kafka使用的zookeeper的地址
          String brokerZkStr = "192.168.2.20:2181";
          ZkHosts zkHosts = new ZkHosts(brokerZkStr);
          //表示的是kafak中存储数据的主题名称
          String topic = "mytopic";
          //指定zookeeper中的一个根目录,里面存储kafkaspout读取数据的位置等信息
          String zkRoot = "/kafkaspout";
          String id = UUID.randomUUID().toString();
          SpoutConfig spoutconf  = new SpoutConfig(zkHosts, topic, zkRoot, id);
          builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));
          builder.setBolt(BOLT_ID,new  LogFilterBolt()).shuffleGrouping(SPOUT_ID);
         
          LocalCluster localCluster = new LocalCluster();
          localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );
     }
}

package com.yun.storm;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
* 处理来自KafkaSpout的tuple,并保存到数据库中
*
* @author shenfl
*
*/
public class LogFilterBolt extends BaseRichBolt {

     private OutputCollector collector;
     /**
     *
     */
     private static final long serialVersionUID = 1L;

     Pattern p = Pattern.compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]");

     /**
     * 每个LogFilterBolt实例仅初始化一次
     */
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
          this.collector = collector;
     }

     @Override
     public void execute(Tuple input) {
          try {
               // 接收KafkaSpout的数据
               byte[] bytes = input.getBinaryByField("bytes");
               String value = new String(bytes).replaceAll("[\n\r]", "");
               // 解析数据并入库
               Matcher m = p.matcher(value);
               if (m.find()) {
                    String url = m.group(1);
                    String usetime = m.group(2);
                    String currentTime = m.group(3);
                    System.out.println(url + "->" + usetime + "->" + currentTime);
               }
               this.collector.ack(input);
          } catch (Exception e) {
               this.collector.fail(input);
          }
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
     }
}


3、解析日志入库

3.1 引入Maven依赖包

<!-- mysql maven相关依赖 -->
<dependency>
     <groupId>commons-dbutils</groupId>
     <artifactId>commons-dbutils</artifactId>
     <version>1.6</version>
</dependency>
<dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.29</version>
</dependency>


3.2 编写MyDbUtils工具类

(1)创建数据表

create database jfyun;

CREATE TABLE `log_info` (
   `id` int(10) NOT NULL AUTO_INCREMENT,
   `topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,
   `usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,
   `time` datetime DEFAULT NULL,
   PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci

(2)MyDbUtils的程序

相关图片

相关文章