• ADADADADAD

    使用spark分析mysql慢日志[ mysql数据库 ]

    mysql数据库 时间:2024-12-25 09:58:04

    作者:文/会员上传

    简介:

    熟悉oracle的童鞋都知道,在oracle中,有很多视图记录着sql执行的各项指标,我们可以根据自己的需求编写相应脚本,从oracle中获取sql的性能开销。作为开源数据库,mysql不比oracle,分

    以下为本文的正文内容,内容仅供参考!本站为公益性网站,复制本文以及下载DOC文档全部免费。

    熟悉oracle的童鞋都知道,在oracle中,有很多视图记录着sql执行的各项指标,我们可以根据自己的需求编写相应脚本,从oracle中获取sql的性能开销。作为开源数据库,mysql不比oracle,分析慢sql只能通过slow.log。slow.log看起来不够直观,而且同一条慢sql执行多次的话就会在slow.log中被记录多次,可阅读性较差。
    最近,部门开发的数据库审计平台上线mysql审计模块,需要为客户提供一键化提取slow.log中慢sql的功能。由于本人之前研究过spark,在分析慢日志的文本结构后,使用scala语言,利用spark core相关技术,编写了能够去重slow.log中重复sql,并将按执行时间排序的top sql输入到hive表中的小程序。
    话不多说,上菜!

    开发环境:
    1、CentOS 6.5
    2、JDK 1.7
    3、Hadoop 2.4.1
    4、Hive 0.13
    5、Spark 1.5.1
    6、scala 2.11.4
    hadoop及spark集群环境的搭建方法就不多说了哈,网上资料很多,对大数据感兴趣的童鞋可以尝试搭建。

    step 1 使用scala ide for eclipse编写应用程序
    analyzeSlowLog.scala:

    package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport scala.util.matching.Regeximport scala.collection.mutable.ArrayBufferimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.DoubleTypeimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveContextobject SlowLogAnalyze {def main(args: Array[String]): Unit = {//创建SparkConf,SparkContext和HiveContextval conf=new SparkConf().setAppName("SlowLogAnalyze");val sc=new SparkContext(conf)val hiveContext=new HiveContext(sc)//读取hdfs文件,获取logRDDval logRDD=sc.textFile("hdfs://spark1:9000/files/slow.log", 5)//创建正则表达式,用来过滤slow.log中的无效信息val pattern1="# Time:".rval pattern2="# User@Host:".rval pattern3="SET timestamp=".r //对logRDD进行filter,过滤无效信息val filteredLogRDD=logRDD.filter { str => //正则返回的是option类型,只有Some和None两种类型if(pattern1.findFirstIn(str)!=None){false}else if(pattern2.findFirstIn(str)!=None){false}else if(pattern3.findFirstIn(str)!=None){false}else{true} }/** * 将filteredLogRDD转换为格式为(execute_time,sql_text)的tuple类RDD KV_RDD *///将filteredLogRDD转换为数组val logArray=filteredLogRDD.toArray()//定义正则表达式pattern,用于识别Query_timeval pattern="# Query_time:".r //定义数组KV_Array,用于存放循环映射后的tuple,tuple为(query_time所在行,sql_text)val KV_Array=ArrayBuffer[(String,String)]()for (i<-0 until logArray.length){ if(pattern.findFirstIn(logArray(i))!=None){ val key=logArray(i) var flag=truevar value="" if(i<logArray.length-1){ for(k<-i+1 until logArray.length if flag ){ if(pattern.findFirstIn(logArray(k))!=None){ flag=false }else{ value=value+logArray(k) } }} KV_Array+=((key,value)) } } //并行化集合获取KV_RDD val KV_RDD=sc.parallelize(KV_Array, 1) //执行map,将KV_RDD映射为(execute_time,sql_text)的tuple类RDD time_sql_RDD val sql_time_RDD=KV_RDD .map{tuple=> val timeSplit=tuple._1.split(" ") //注意这里是toDouble,不是toInt!!!!因为日志中的时间是Double类型!!!! (tuple._2,timeSplit(2).toDouble) } /*** 由于慢日志中保存了较多相同sql,需进行去重处理* 对相同的sql的execute_time取均值,最后输出unique的(sql_text,execute_time)*/ val groupBySqlRDD=sql_time_RDD.groupByKey() .map{tuple=> val timeArray=tuple._2.toArray var totalTime=0.0 for(i<-0 until timeArray.length){ totalTime=totalTime + timeArray(i) } val avgTime=totalTime/timeArray.length (tuple._1,avgTime) } val sortedRowRDD=groupBySqlRDD .map{tuple=>(tuple._2,tuple._1)} .sortByKey(false, 1) .map{tuple=>Row(tuple._2,tuple._1)} val top10Array=sortedRowRDD.take(10) val top10RDD=sc.parallelize(top10Array, 1) //将sortedRDD转换为dataframeval structType=new StructType(Array( StructField("sql_text",StringType,true), StructField("executed_time",DoubleType,true) ) ) val top10DF=hiveContext.createDataFrame(top10RDD, structType)hiveContext.sql("drop table if exists sql_top10") top10DF.saveAsTable("sql_top10")}}

    将代码打成jar包并上传至linux。
    step 2 编写执行脚本
    analyzeSlowLog.sh:

    /var/software/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \--class cn.spark.study.sql.SlowLogAnalyze \--num-executors 3 \--driver-memory 100m \--executor-memory 100m \--executor-cores 3 \--files /var/software/hive/conf/hive-site.xml \--driver-class-path /var/software/hive/lib/mysql-connector-java-5.1.17.jar \/var/software/spark_study/scala/SlowLogAnalyze.jar

    step 3 执行analyzeSlowLog.sh,并进入hive查看分析结果:
    hive> show tables;
    OK
    daily_top3_keywords_uvs
    good_students
    sql_top10-- 这张表就是scala程序中定义的表名,程序运行时会在hive中创建
    student_infos
    student_scores
    Time taken: 0.042 seconds, Fetched: 5 row(s)

    查看sql_top10中的内容:
    这里由于长度限制,截断了sql文本,所以看起来部分sql是一样的,实际是两条不同的sql(where 条件不同)。
    hive> select substr(sql_text,1,50),executed_time from sql_top10;
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks is set to 0 since there's no reduce operator
    ...
    Execution completed successfully
    MapredLocal task succeeded
    OK
    select 'true' as QUERYID, ID_GARAG0.0252804
    select count() from pms_garage_vitri_info0.0048902
    select count(
    ) from information_schema.PROCESSLIS3.626E-4
    select 'true' as QUERYID, e_survey2.39E-4
    select 'true' as QUERYID, e_survey2.34E-4
    SELECTaccount_code AS um2.2360000000000001E-4
    select 'true' as QUERYID, e_survey2.19E-4
    select 'true' as QUERYID, e_survey2.18E-4
    select 'true' as QUERYID, e_survey2.15E-4
    SELECTaccount_code AS um2.1419999999999998E-4
    Time taken: 8.501 seconds, Fetched: 10 row(s)

    至此,对mysql slow.log的提取完毕!

    关于在mysql中创建相关视图的思考:
    hadoop和spark一般用于处理大数据,这里用来处理mysql的慢日志实在是大材小用。不过,要想在mysql中提供查看数据库top sql的v$Topsql视图,对slow.log的实时分析是必须的,此时,spark streaming便可派上用场。
    思路如下:
    1.编写crontab定时任务以定时拷贝slow.log至hdfs
    2.编写crontab定时任务以调用spark streaming程序分析hdfs上的最新slow.log ->通过jdbc将将top sql输出到对应mysql数据库中的v$Topsql视图中,并覆盖之前的数据。
    ps:在分析slow.log时,可在程序中executor,timestamp等字段(本文中并未提取这两个字段),以提供更详细的信息。

    使用spark分析mysql慢日志.docx

    将本文的Word文档下载到电脑

    推荐度:

    下载
    热门标签: sparkmysqlslow.log