9.文件和记录格式以及Thrift服务

Streaming

  • Steaming提供了另一种处理数据的方式。hadoop streaming api会为外部进程开启一个IO管道,然后数据会被传给这个进程,其会从标准输入中读取数据,然后通过标准输出来写结果数据,最好返回给Streaming API job。

  • Hive提供多个语法来使用streaming,包括Map()、REDUCE()和TRANSFORM()。

使用MapReduce

# Hive中使用java编写的MapReduce JOB 
FROM ( 
 FROM src 
 MAP value, key 
 USING 'java -cp hive-contrib-0.9.0.jar 
  org.apache.hadoop.hive.contrib.mr.example.IdentityMapper' 
 AS k, v 
 CLUSTER BY k) map_output 
REDUCE k, v 
USING 'java -cp hive-contrib-0.9.0.jar 
 org.apache.hadoop.hive.contrib.mr.example.WordCountReduce' 
AS k, v; 
  • Map Task和Reduce Task分别使用GenericMR的map方法和reduce方法

计算cogroup

  • 如果对多个数据集进行JOIN连接处理,然后使用TRANSFORM进行处理。使用UNION ALL和CLUSTER BY,可以实现CO GROUP BY的常见操作。

  • Pig提供原生的COGROUP BY操作

FROM ( 
 FROM ( 
  FROM order_log ol 
  -- User Id, order Id, and timestamp: 
  SELECT ol.userid AS uid, ol.orderid AS id, av.ts AS ts 

  UNION ALL 

  FROM clicks_log cl 
  SELECT cl.userid AS uid, cl.id AS id, ac.ts AS ts 
 ) union_msgs 
SELECT union_msgs.uid, union_msgs.id, union_msgs.ts 
CLUSTER BY union_msgs.uid, union_msgs.ts) map 
INSERT OVERWRITE TABLE log_analysis 
SELECT TRANSFORM(map.uid, map.id, map.ts) USING 'reduce_script' 
AS (uid, id, ...); 

文件和记录格式

  • Hive中文件格式间具有明显差异,例如文件中记录的编码方式、记录格式以及记录中字节流的编码的方式。

  • Hive文本文件格式选择和记录格式对应的。

设置文件存储格式

  • Stored AS SEQUENCEFILE、ROW Format delimited、serde、inputformat、outputformat这些语法。

  • 例如 STORED AS SEQUENCEFILE等同于INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileOutputFormat' .

文件格式

SequenceFile

  • 通过Stored AS SequenceFile指定,SequenceFile文件是含有键-值对的二进制文件。将Hive查询转换成MapReduce JOB时,对于指定的记录,其取决使用那些合适的键值对。

  • 其实Hadoop的一种给文件格式,因此和其他非Hadoop系统的工具无法共享。

  • SequenceFile支持块和记录级别压缩,优化磁盘利用率和IO,同时可以支持有边界的压缩。

RCfile

  • 大部分Hadoop和Hive存储都是行式存储,在大多数常见下是高效的。

    • 大多数的表具有的字段个数都不大(1到20字段)

    • 对文件按块进行压缩对于需要处理重复数据的情况比较高效,同时很多处理和调试工具都很好适应行式存储的数据。

  • 列式存储适合多字段时,并且大多数的查询只需要其中一小部分字段时,可以使用列式存储,这时候必须要扫描全部的行就可以拿到特定列的数据。

  • 列式存储的压缩式非常高效的,特别是在这列的数据具有低计数时(没有太多重复数据),一些列式存储不需要存储null的列。

  • RCFile是一个Hive列式存储文件格式。

  • 使用hive -service rcfilecat工具查看RCFile格式文件

hive --service rcfilecat /user/hive/warehouse/columntable/000000_0 

记录格式:SerDe

  • SerDe是序列化/反序列化的简写。一个SerDe包含将一条记录的非结构化字节转换为Hive可以使用的一条记录的过程。

  • 使用RegexSerDe处理Apache Web日志

CREATE TABLE serde_regex( 
 host STRING, 
 identity STRING, 
 user STRING, 
 time STRING, 
 request STRING, 
 status STRING, 
 size STRING, 
 referer STRING, 
 agent STRING) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' 
WITH SERDEPROPERTIES ( 
 "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) 
  ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") 
  ([^ \"]*|\"[^\"]*\"))?", 
 "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s" 
) 
STORED AS TEXTFILE; 

JSON SerDe

CREATE EXTERNAL TABLE messages ( 
 msg_id    BIGINT, 
 tstamp    STRING, 
 text     STRING, 
 user_id    BIGINT, 
 user_name   STRING 
) 
ROW FORMAT SERDE "org.apache.hadoop.hive.contrib.serde2.JsonSerde" 
WITH SERDEPROPERTIES ( 
 "msg_id"="$.id", 
 "tstamp"="$.created_at", 
 "text"="$.text", 
 "user_id"="$.user.id", 
 "user_name"="$.user.name" 
) 
LOCATION '/data/messages'; 

Avro Hive SerDe

  • Avro是一个序列化系统,其主要特点是它是一个进化的模式驱动的二进制数据存储模式。

使用表属性信息定义Avro Schema

CREATE TABLE doctors 
ROW FORMAT 
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' 
TBLPROPERTIES ('avro.schema.literal'='{ 
 "namespace": "testing.hive.avro.serde", 
 "name": "doctors", 
 "type": "record", 
 "fields": [ 
  { 
   "name":"number", 
   "type":"int", 
   "doc":"Order of playing the role" 
  }, 
  { 
   "name":"first_name", 
   "type":"string", 
   "doc":"first name of actor playing role" 
  }, 
  { 
   "name":"last_name", 
   "type":"string", 
   "doc":"last name of actor playing role" 
  } 
 ] 
}'); 
  • 使用desc tablename可以查看avro定义的模式

从指定URL中定义Schema

CREATE TABLE doctors 
ROW FORMAT 
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' 
TBLPROPERTIES ('avro.schema.url'='hdfs:///test.schema''); 

进化的模式

  • avro如果需要减少一个字段,那么缺省值默认为null,增加字段也可以增加一个默认值

ObjectInspector

  • ObjectIbnspector的检查器来将记录转换成Hive可以访问的对象。

Hive的Thrift服务

  • Hive具有一个可选的组件叫做HiveServer或者hiveThrift,其运行通过指定端口访问Hive。

启动thrift server

hive --service hiveserver2 & 
# 查看thrift端口情况 
lsof -i tcp:10000 

配置Java使用Hive JDBC

  • 引入依赖

<dependency> 
    <groupId>org.apache.hive</groupId> 
    <artifactId>hive-jdbc</artifactId> 
    <version>2.3.2</version> 
</dependency> 
  • 测试

public class UseThriftHive { 
   public static void main(String[] args) throws SQLException { 
        Connection con = DriverManager.getConnection("jdbc:hive2://hadoop:10000"); 
        Statement stmt = con.createStatement(); 
        ResultSet resultSet = stmt.executeQuery("select * from test2"); 
        while (resultSet.next()) { 
            System.out.println(resultSet.getString(1) + "---" + resultSet.getString(2)); 
        } 
    } 
} 

最后更新于