ASP源码.NET源码PHP源码JSP源码JAVA源码DELPHI源码PB源码VC源码VB源码Android源码
当前位置:首页 >> 数据库 >> MySql >> 104-storm 整合 kafka之保存MySQL数据库

104-storm 整合 kafka之保存MySQL数据库(2/3)

来源:网络整理     时间:2015-10-08     关键词:Exception,zookeeper

本篇文章主要介绍了"104-storm 整合 kafka之保存MySQL数据库",主要涉及到Exception,zookeeper方面的内容,对于MySql感兴趣的同学可以参考一下: 整合KafkaStorm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程...

package com.yun.storm.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;

public class MyDbUtils {
    
     private static String className = "com.mysql.jdbc.Driver";
     private static String url = "jdbc:mysql://192.168.2.20:3306/jfyun?useUnicode=true&characterEncoding=utf-8";
     private static String user = "root";
     private static String password = "123";
     private static QueryRunner queryRunner = new QueryRunner();

     public static final String INSERT_LOG = "insert into log_info(topdomain,usetime,time) values(?,?,?)";

     static{
          try {
               Class.forName(className);
          } catch (ClassNotFoundException e) {
               e.printStackTrace();
          }
     }
     public static void main(String[] args) throws Exception {
          String topdomain = "taobao.com";
          String usetime = "100";
          String currentTime="1444218216106";
          MyDbUtils.update(MyDbUtils.INSERT_LOG, topdomain,usetime,currentTime);
          update(INSERT_LOG,topdomain,usetime,MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
     }
     /**
     * @param conn
     * @throws SQLException
     */
     public static void update(String sql,Object... params) throws SQLException {
          Connection connection = getConnection();
          //更新数据
          queryRunner.update(connection,sql, params);
          connection.close();
     }
    
     public static List<String> executeQuerySql(String sql) {
         
          List<String> result = new ArrayList<String>();
          try {
               List<Object[]> requstList = queryRunner.query(getConnection(), sql,
                         new ArrayListHandler(new BasicRowProcessor() {
                              @Override
                              public <Object> List<Object> toBeanList(ResultSet rs,
                                        Class<Object> type) throws SQLException {
                                   return super.toBeanList(rs, type);
                              }
                         }));
               for (Object[] objects : requstList) {
                    result.add(objects[0].toString());
               }
          } catch (SQLException e) {
               e.printStackTrace();
          }
          return result;
     }
     /**
     * @throws SQLException
     *
     */
     public static Connection getConnection() throws SQLException {
          //获取mysql连接
          return DriverManager.getConnection(url, user, password);
     }
}

(3)修改storm程序

相关图片

相关文章