🐘
DOC
BigDataGuide
BigDataGuide
  • 概览
  • bigdata
    • hadoop
      • Hadoop高可用配置
      • HDFS
        • HDFS shell 命令
        • HDFS集群管理
      • MapReduce
        • MapReduce数据操作
        • 分布式处理框架MapReduce
        • MapReduce输入输出剖析
        • MapReduce的工作原理剖析
      • Yarn
        • YARN快速入门
        • Yarn生产配置
    • scheduler
      • Azkaban生产实践
      • 系统架构
    • zookeeper
      • ZookeeperOverView
      • Zookeeper操作与部署
    • cache
      • alluxio
        • AlluxioConfiguration
        • AlluxioDeployment
        • AlluxioOverView
        • AlluxioWithEngine
    • collect
      • canal
        • CanalOverView
      • debezium
        • DebeziumOverView
        • Debezium使用改造
        • Debezium监控系统搭建
      • flume
        • FlumeOverwrite
        • Flume对接Kafka
      • sqoop
        • SqoopOverview
        • Sqoop实战操作
    • datalake
      • hudi
        • Flink基于Apache Hudi+Alluxio的数据湖实践
        • hudiOverview
        • hudiWithFlink
        • hudiWithSpark
        • hudi原理分析
        • hudi数据湖实践
        • hudi调优实践
      • iceberg
        • IcebergWithSpark
        • icebergOverview
        • icebergWithFlink
        • icebergWithHive
    • engine
      • spark
        • SparkOnDeploy
        • SparkOverwrite
        • Spark存储体系
        • Spark计算引擎和Shuffle
        • Spark调优
        • Spark调度系统
        • Spark部署模式
        • 从浅到深剖析Spark源码
        • practice
          • Spark实践
        • spark sql
          • SparkSQL API
        • spark sql
          • SparkSQL优化分析
        • spark streaming
          • SparkStreaming整合Flume
        • 源码分析
          • Spark内存管理
        • 源码分析
          • Spark核心对象
        • 源码分析
          • Spark通信架构
        • 源码分析
          • Spark调度和Shuffle解析
        • 源码分析
          • yarn的部署流程
      • flink
        • connector
          • 自定义TableConnector
        • core
          • Checkpoint机制剖析
          • FlinkOverview
          • 状态处理API
          • TableSQLOverview
        • feature
          • Flink1.12新特性
          • Flink1.13新特性
          • Flink1.14新特性
        • monitor
          • Flink运维监控
          • 搭建Flink任务指标监控系统
        • practice
          • Flink On K8s
          • 记录一次Flink反压问题
        • sourcecode
          • Flink Kafka Connector源码分析
          • FlinkCheckpoint源码分析
          • Blink Planner
          • FlinkTimerService机制分析
          • Flink内核源码分析
          • Flink窗口实现应用原理
          • Flink网络流控及反压
          • Flink运行环境源码解析
          • StreamSource源解析
          • TaskExecutor内存模型原理深入
        • books
          • Flink内核原理与实现
            • 第11-13章Task执行数据交换等
    • graph
      • nebula graph
        • 1.简介
      • nebula graph
        • 2.快速入门
    • kvstore
      • hbase
        • HBaseOverview
        • HBase整合第三方组件
        • Hbase 过滤器详解
      • rocksdb
        • RocksDB On Flink
        • RocksdbOverview
        • Rocksdb组件描述
        • Rocksdb配置
    • mq
      • kafka
        • Kafka Eagle
        • Kafka概念
        • 消费者源码剖析
        • 生产者源码剖析
        • kafka权威指南
          • 1.kafka入门
          • 2.安装Kafka
          • 3.Kafka生产者
          • 4.Kafka消费者
          • 5.深入Kafka
          • 6.可靠的消息传输
          • 7.构建数据管道
          • 8.跨集群数据镜像
          • 9.管理Kafka
        • 深入理解Kafka
          • 深入理解Kafka读书笔记
      • pulsar
        • 1.快速入门
        • 2.原理与实践
    • olap
      • clickhouse
        • ClickHouseOverView
      • druid
        • 概述
      • hive
        • Hive Shell和Beeline命令
        • HiveOverwrite
        • Hive分区表和分桶表
        • hive编程指南
          • 1.基础知识
          • 2.数据类型和文件格式
          • 3.HiveQL相关
          • 4.索引
          • 5.模式设计
          • 7.其他文件格式和压缩方法
          • 8.函数开发
          • 9.文件和记录格式以及Thrift服务
          • 10.存储和安全以及锁
          • 11.HCatalog
      • impala
        • ImpalaOverView
        • Impala Script
        • 使用Impala查询Kudu表
      • kudu
        • KuduConfiguration
        • KuduOverView
        • 表和模式设计
        • Kudu原理分析
        • Kudu生产实践
        • paper
          • KuduPaper阅读
      • kylin
        • 概述
      • presto
        • PrestoOverview
    • tools
      • sqltree
        • calcite
          • 快速入门
  • datawarehouse
    • 数据中台模块设计
      • thoth
      • 数据中台设计
    • 方案实践
      • Kudu数据冷备方案
      • 基于Flink的实时数仓建设
    • 理论
      • 数据仓库概念
      • devops
        • k8s-openshift客户端命令使用
        • maven
          • Maven命令
          • 制作maven骨架
      • 数据中台读书笔记
      • 数据仓库实战
  • base
    • algorithm
      • 算法题解
    • datastructure
      • 数据结构
    • scala
      • Scala基础
    • 分布式理论
      • Raft一致性算法
      • 分布式架构
    • 计算机理论
      • LSM存储模型
    • java
      • 并发编程
        • 并发工具类concurrent
        • 认识并发编程
  • mac os
    • iterm2
      • 多tab操作
  • servicemonitor
    • Prometheus
      • 安装
  • 贡献者指南
由 GitBook 提供支持
在本页
  • 概览
  • 核心逻辑
  • 核心流程图
  • 核心代码
  • 冷热视图
  • 视图创建
  • 执行计划分析
在GitHub上编辑
  1. datawarehouse
  2. 方案实践

Kudu数据冷备方案

上一页方案实践下一页基于Flink的实时数仓建设

最后更新于3年前

概览

核心逻辑

  • 动态获取当月第一天年月日(yyyy-MM-dd)和上个月月第一天年月日(yyyy-MM-dd),根据时间范围去kudu拉取数据,将数据根据特定时间分区按照动态分区方式写入hive中。

  • 数据导入hive后根据时间分区范围统计hive写入数据个数,对比kudu该时间范围内个数和导入hive个数是否一致,不一致则冷备失败。

  • 如果数据导入成功,则会向Kudu表向后添加一个新分区,并且删除当前时间范围的分区,两步操作是一致性操作,要么都成功,要么都失败。

核心流程图

核心代码

 /**
   * 统一处理冷备数据
   * @param coldTableName 冷备表名
   * @param rangeKey      冷备range分区key
   * @param startDate     range分区key起始值
   * @param endDate       range分区key终止值
   */
  override def handleColdBak(coldTableName: String, rangeKey: String, startDate: String, endDate: String): Unit = {
    super.handleColdBak(coldTableName, rangeKey, startDate, endDate)
    val kuduTmpView: String = s"ods_kudu_${coldTableName}_v"
    val coldJobSqlKey: String = s"cold_${coldTableName}_job"
    val coldDdlSqlKey: String = s"cold_${coldTableName}_ddl"
    val hiveTableName = s"""cold_${coldTableName}_t"""
    initColdHiveTable(coldDdlSqlKey, hiveTableName)
    kuduRepository.read(coldTableName).where($"deleted" equalTo Constants.NOT_DELETED)
      .createOrReplaceTempView(kuduTmpView)
    var coldJobSql: String = LoadConfigUtils.getConfig(coldJobSqlKey)
    if (StringUtils.isEmpty(startDate) || StringUtils.isEmpty(endDate)) {
      throw new RuntimeException("分区key开始日期或结束日期不能为空!")
    }
    val startTs: String = DateUtils.getTimestamp(startDate)
    val endTs: String = DateUtils.getTimestamp(endDate)
    coldJobSql = buildDateFilterSql(coldJobSql, startTs, "%startTs")
    coldJobSql = buildDateFilterSql(coldJobSql, endTs, "%endTs")

    val coldData: DataFrame = spark.sql(coldJobSql)

    val coldDataCount: Long = coldData.count()
    coldData.cache()
    val sinkDatabaseTableName: String = s"${Constants.COLD_BAK_DATABASE_NAME}.$hiveTableName"
    LOG.info(s"insert cold data to hiveTable:$sinkDatabaseTableName")
    hiveRepository.insert(coldData, SaveMode.Overwrite, sinkDatabaseTableName)

    val startTsLong: Long = startTs.toLong / 1000
    val endTsLong: Long = endTs.toLong / 1000
    val hiveColdDataCount: Long = hiveRepository.read(sinkDatabaseTableName)
      .where($"dt" >= from_unixtime(lit(startTsLong), "yyyyMMdd"))
      .where($"dt" < from_unixtime(lit(endTsLong), "yyyyMMdd")).count()
    if (coldDataCount != hiveColdDataCount) {
      throw new RuntimeException(String.format("hive备份数据不等于kudu冷备数据,请重新执行当月冷备数据,startDate:%s endDate:%s", startDate,
        endDate))
    }
    LOG.info(s"startDate:${startDate} endDate:${endDate}备份数据:${coldDataCount}")
    coldData.unpersist()
    // 添加分区
    val lastRangeKeyTs: Long = kuduUtils.getLastRangePartition(coldTableName, rangeKey)
    val nextMonthRangeKeyTs: String = DateUtils.getTimestampPlusMonth(lastRangeKeyTs, 1)
    try {
      val isAddPartition: Boolean = kuduUtils.alterRangePartition(isAdd = true, coldTableName, rangeKey,
        lastRangeKeyTs + "", nextMonthRangeKeyTs)
      if (!isAddPartition) {
        throw new RuntimeException("kudu热备数据分区删除异常!")
      }
      // 删除range分区
      val isDeletePartition: Boolean = kuduUtils.alterRangePartition(isAdd = false, coldTableName, rangeKey, startTs,
        endTs)
      if (!isDeletePartition) {
        throw new RuntimeException("kudu冷备数据分区删除异常!")
      }
    } catch {
      case e: Exception =>
        LOG.error("操作分区:lower:{} upper:{}异常,e", lastRangeKeyTs, nextMonthRangeKeyTs, e)
    }
  }

  /**
   * 构建日期过滤sql
   *
   * @param sql  原始sql
   * @param ts   日期毫秒
   * @param rule 规则
   * @return
   */
  private def buildDateFilterSql(sql: String, ts: String, rule: String): String = {
    sql.replace(rule, ts)
  }

  /**
   * 初始化冷备hive表
   *
   * @param coldDdlSqlKey 冷备sql配置key
   * @param hiveTableName 冷备hive表名
   */
  private def initColdHiveTable(coldDdlSqlKey: String, hiveTableName: String): Unit = {
    val coldDdlSql: String = LoadConfigUtils.getConfig(coldDdlSqlKey)
    LOG.info(s"create table ${hiveTableName}")
    hiveRepository.createTableIfAbsent(Constants.COLD_BAK_DATABASE_NAME, hiveTableName, coldDdlSql)
  }

冷热视图

视图创建

create view ods_event_tmp_v as
    select id,
           tenant_id,
           event_id,
           event_time,
           id,
           create_time,
           update_time,
           attr1,
           attr2,
           attr3,
           attr4,
           attr5,
           attr6,
           attr7,
           attr8,
           attr9,
           attr10,
           attr11,
           attr12,
           attr13,
           attr14,
           attr15,
           attr16,
           attr17,
           attr18,
           attr19,
           attr20,
           attr21,
           attr22,
           attr23,
           attr24,
           attr25,
           attr26,
           attr27,
           attr28,
           attr29,
           attr30,
           attr31,
           attr32,
           attr33,
           attr34,
           attr35,
           attr36,
           attr37,
           attr38,
           attr39,
           attr40,
           attr41,
           attr42,
           attr43,
           attr44,
           attr45,
           attr46,
           attr47,
           attr48,
           attr49,
           attr50,
           attr51,
           attr52,
           attr53,
           attr54,
           attr55,
           attr56,
           attr57,
           attr58,
           attr59,
           attr60,
           attr61,
           attr62,
           attr63,
           attr64,
           attr65,
           attr66,
           attr67,
           attr68,
           attr69,
           attr70,
           attr71,
           attr72,
           attr73,
           attr74,
           attr75,
           attr76,
           attr77,
           attr78,
           attr79,
           attr80,
           attr81,
           attr82,
           attr83,
           attr84,
           attr85,
           attr86,
           attr87,
           attr88,
           attr89,
           attr90,
           attr91,
           attr92,
           attr93,
           attr94,
           attr95,
           attr96,
           attr97,
           attr98,
           attr99,
           attr100,
           deleted,
           delete_time,
           -- impala存在时区问题,使用该函数指定对应时区
           FROM_TIMESTAMP(from_utc_timestamp(from_unixtime(CAST(event_time / 1000 AS BIGINT)), 'Asia/Shanghai'),'yyyyMMdd') as dt
    from cdp.event_tmp
    union all
    select id,
           tenant_id,
           event_id,
           event_time,
           id,
           create_time,
           update_time,
           attr1,
           attr2,
           attr3,
           attr4,
           attr5,
           attr6,
           attr7,
           attr8,
           attr9,
           attr10,
           attr11,
           attr12,
           attr13,
           attr14,
           attr15,
           attr16,
           attr17,
           attr18,
           attr19,
           attr20,
           attr21,
           attr22,
           attr23,
           attr24,
           attr25,
           attr26,
           attr27,
           attr28,
           attr29,
           attr30,
           attr31,
           attr32,
           attr33,
           attr34,
           attr35,
           attr36,
           attr37,
           attr38,
           attr39,
           attr40,
           attr41,
           attr42,
           attr43,
           attr44,
           attr45,
           attr46,
           attr47,
           attr48,
           attr49,
           attr50,
           attr51,
           attr52,
           attr53,
           attr54,
           attr55,
           attr56,
           attr57,
           attr58,
           attr59,
           attr60,
           attr61,
           attr62,
           attr63,
           attr64,
           attr65,
           attr66,
           attr67,
           attr68,
           attr69,
           attr70,
           attr71,
           attr72,
           attr73,
           attr74,
           attr75,
           attr76,
           attr77,
           attr78,
           attr79,
           attr80,
           attr81,
           attr82,
           attr83,
           attr84,
           attr85,
           attr86,
           attr87,
           attr88,
           attr89,
           attr90,
           attr91,
           attr92,
           attr93,
           attr94,
           attr95,
           attr96,
           attr97,
           attr98,
           attr99,
           attr100,
           deleted,
           delete_time,
           dt
    from cdp_cold_bak.cold_event_tmp_t;

执行计划分析

explain SELECT * FROM ods_event_tmp_v where tenant_id='26615263' and  dt='20210701'