Hive 排序
sort by
局部排序,每一个 reducer 结果之内是有序的, 但是多个 reducer 之间是无序的;
#set the number of reducers to 2 0: jdbc:hive2://localhost:10000> set mapred.reduce.tasks=2; 0: jdbc:hive2://localhost:10000> select * from employee sort by salary; +--------------+----------------+---------------+------------------+----------------------+--+ | employee.id | employee.name | employee.age | employee.salary | employee.department | +--------------+----------------+---------------+------------------+----------------------+--+ | 10001 | rajesh | 29 | 50000 | BIGDATA | | 1 | aarti | 28 | 55000 | HR | | 2 | sakshi | 22 | 60000 | HR | | 10003 | dinesh | 35 | 70000 | BIGDATA | | 3 | mahesh | 25 | 25000 | HR | | 10002 | rahul | 23 | 250000 | BIGDATA | +--------------+----------------+---------------+------------------+----------------------+--+
order by
全局排序,每一个 reducer 结果之内是有序的额, reducer 与 reducer 之间是有序的,结果可以直接拼接在一起构成全序;
#set the number of reducers to 2 0: jdbc:hive2://localhost:10000> set mapred.reduce.tasks=2; 0: jdbc:hive2://localhost:10000> select * from employee order by salary; +--------------+----------------+---------------+------------------+----------------------+--+ | employee.id | employee.name | employee.age | employee.salary | employee.department | +--------------+----------------+---------------+------------------+----------------------+--+ | 3 | mahesh | 25 | 25000 | HR | | 10001 | rajesh | 29 | 50000 | BIGDATA | | 1 | aarti | 28 | 55000 | HR | | 2 | sakshi | 22 | 60000 | HR | | 10003 | dinesh | 35 | 70000 | BIGDATA | | 10002 | rahul | 23 | 250000 | BIGDATA | +--------------+----------------+---------------+------------------+----------------------+--+
distribute by
组间排序,每一个 reducer 结果之内无序, reducer 与 reducer 之间有序;
#set the number of reducers to 2 0: jdbc:hive2://localhost:10000> set mapred.reduce.tasks=2; 0: jdbc:hive2://localhost:10000> select * from employee distribute by id; +--------------+----------------+---------------+------------------+----------------------+--+ | employee.id | employee.name | employee.age | employee.salary | employee.department | +--------------+----------------+---------------+------------------+----------------------+--+ | 10002 | rahul | 23 | 250000 | BIGDATA | | 2 | sakshi | 22 | 60000 | HR | | 10001 | rajesh | 29 | 50000 | BIGDATA | | 10003 | dinesh | 35 | 70000 | BIGDATA | | 1 | aarti | 28 | 55000 | HR | | 3 | mahesh | 25 | 25000 | HR | +--------------+----------------+---------------+------------------+----------------------+--+
distribute by & sort by
#set the number of reducers to 2 0: jdbc:hive2://localhost:10000> set mapred.reduce.tasks=2; 0: jdbc:hive2://localhost:10000> select * from employee distribute by id sort by salary; +--------------+----------------+---------------+------------------+----------------------+--+ | employee.id | employee.name | employee.age | employee.salary | employee.department | +--------------+----------------+---------------+------------------+----------------------+--+ | 2 | sakshi | 22 | 60000 | HR | | 10002 | rahul | 23 | 250000 | BIGDATA | | 3 | mahesh | 25 | 25000 | HR | | 10001 | rajesh | 29 | 50000 | BIGDATA | | 1 | aarti | 28 | 55000 | HR | | 10003 | dinesh | 35 | 70000 | BIGDATA | +--------------+----------------+---------------+------------------+----------------------+--+
cluster by
只针对某个列的 distribute by col + sort by col;
#set the number of reducers to 2 0: jdbc:hive2://localhost:10000> set mapred.reduce.tasks=2; 0: jdbc:hive2://localhost:10000> select * from employee distribute by id sort by id; +--------------+----------------+---------------+------------------+----------------------+--+ | employee.id | employee.name | employee.age | employee.salary | employee.department | +--------------+----------------+---------------+------------------+----------------------+--+ | 2 | sakshi | 22 | 60000 | HR | | 10002 | rahul | 23 | 250000 | BIGDATA | | 1 | aarti | 28 | 55000 | HR | | 3 | mahesh | 25 | 25000 | HR | | 10001 | rajesh | 29 | 50000 | BIGDATA | | 10003 | dinesh | 35 | 70000 | BIGDATA | +--------------+----------------+---------------+------------------+----------------------+--+
#set the number of reducers to 2 0: jdbc:hive2://localhost:10000> set mapred.reduce.tasks=2; 0: jdbc:hive2://localhost:10000> select * from employee cluster by id; +--------------+----------------+---------------+------------------+----------------------+--+ | employee.id | employee.name | employee.age | employee.salary | employee.department | +--------------+----------------+---------------+------------------+----------------------+--+ | 2 | sakshi | 22 | 60000 | HR | | 10002 | rahul | 23 | 250000 | BIGDATA | | 1 | aarti | 28 | 55000 | HR | | 3 | mahesh | 25 | 25000 | HR | | 10001 | rajesh | 29 | 50000 | BIGDATA | | 10003 | dinesh | 35 | 70000 | BIGDATA | +--------------+----------------+---------------+------------------+----------------------+--+
上述四种排序对应的 map reduce 实现方式
sort by
mapreduce 实现局部排序
--------------------Map Phase---------------------------------|------------------------------Reduce Phase---------------------- (map -> partition -> sort -> |-----------|) -> (shuffle -> merge sort -> group -> reduce) | | | |split merge| | | | | | | |sort | | | | map() Partitioner SortComparator SortComparator GroupComparator reduce()
其实 MapReduce 本身就已经是默认局部排序了, 只要实现自己的 key 的 Partitioner SortComparator 和 GroupComparator 即可自动 reduce 内部排序
order by
mapreduce 实现全序排序
第一种方法是在正常的排序完成之后, 对第一阶段每一个 reducer 结果再次构造 MapReduce,但是该阶段只设置 一个 reducer, 那最后合并到一个reducer文件即可,但是当数量量很大,这个会导致某个reducer 压力很大;
第二种方法就是,在操作partition 的时候, 让 reducer 组间 有序, 即 根据需要改写 Partitioner, 根据 key 范围进行划分不同的 reducer,保证 reducer 之间有序; 这个可以自己实现,但是hadoop 已经提供了一个 TotalOrderPartitioner 了, 采用 trie 树 采样达到负载均衡;
distribute by
MapReduce 实现组间排序
只需要实现自定义的 Partitioner 即可实现组间排序;
cluster by
MapReduce 实现 聚集排序
同时实现自定义 Partitioner SortComparator GroupComparator 即可;
secondary sort
MapReduce 多级排序实现;
自定义复合Key值 CompositeKey, 按需要实现 自定义 Partitioner SortComparator GroupComparator
MapReduce Join data 实现
Reduce-Side Join Repartition join case study
连接操作类型 mysql join type
四个基本的join,其实还有三种。详细可以参考 mysql join
- inner join
SELECT <field_list> FROM TABLE_A A INNER JOIN TABLE_B B ON A.KEY = B.KEY
- left join
SELECT <field_list> FROM TABLE_A A LEFT JOIN TABLE_B B ON A.KEY = B.KEY
- right join
SELECT <field_list> FROM TABLE_A A RIGHT JOIN TABLE_B B ON A.KEY = B.KEY
- outer join
SELECT <field_list> FROM TABLE_A A FULL OUTER JOIN TABLE_B B ON A.KEY = B.KEY
数据集 dataset
两张表,donation 和 project。描述的是项目捐赠情况。donation 主要是捐款人信息,project 主要是受助人信息。
- Donation attributes : id, project id, donor city, date, total
- Project attributes : id, school city, poverty level, primary subject
操作步骤
Map 阶段
将需要连接的表通过map之后,输出 连接主键和其他信息 的(outputKey, outputValue)。即 (joinKey, joinInfo)。这里的 joinKey 可以是 Text 类型也可以是 Writable 类型(可以序列化的对象),outputValue 也可以是 纯文本Text 形式也可以是 Writable 对象,不同的方式会影响数据读写效率。Text 的效率将会更高,因为不需要 serialization/deserialization.
Reduce 阶段
不同表中相同的joinKey会被 partition 到相同的 Reduce 分区。在 reduce 里面,就可以将两个表的 joinInfo 连接起来,这个时候根据不用的 join 类型进行不同的连接操作。
具体操作就是,使用两个列表在内存中,存储两个不同表的 joinInfo。 由于不同表的相同 joinKey joinInfo 被聚集到同一个 reduce 的 values 列表中,如何区分出不同表的 joinInfo 进而继续连接呢,这和 joinInfo 使用什么类型有关系,使用 Text,你就需要在 Map 阶段加入一些标记符号来区分,如果使用自定义的 Writable 对象,那就可以直接 instance of 来判断了。
protected void reduce(Text projectId, Iterable<ObjectWritable> values, Context context) throws IOException, InterruptedException { // Clear data lists donationsList.clear(); projectsList.clear(); // Fill up data lists with selected fields for (ObjectWritable value : values) { Object object = value.get(); if (object instanceof DonationWritable) { DonationWritable donation = (DonationWritable) object; String donationOutput = String.format("%s|%s|%s|%s|%.2f", donation.donation_id, donation.project_id, donation.donor_city, donation.ddate, donation.total); donationsList.add(donationOutput); } else if (object instanceof ProjectWritable) { ProjectWritable project = (ProjectWritable) object; String projectOutput = String.format("%s|%s|%s|%s", project.project_id, project.school_city, project.poverty_level, project.primary_focus_subject); projectsList.add(projectOutput); } else { String errorMsg = String.format("Object of class %s is neither a %s nor %s.", object.getClass().getName(), ProjectWritable.class.getName(), DonationWritable.class.getName()); throw new IOException(errorMsg); } } // Join data lists (example with FULL OUTER JOIN) if (!donationsList.isEmpty()) { for (String dontationStr : donationsList) { if (!projectsList.isEmpty()) { // Case 1 : Both LEFT and RIGHT sides of the join have values // Extra loop to write all combinations of (LEFT, RIGHT) // These are also the outputs of an INNER JOIN for (String projectStr : projectsList) { donationOutput.set(dontationStr); projectOutput.set(projectStr); context.write(donationOutput, projectOutput); } } else { // Case 2 : LEFT side has values but RIGHT side doesn't. // Simply write (LEFT, null) to output for each value of LEFT. // These are also the outputs of a LEFT OUTER JOIN donationOutput.set(dontationStr); projectOutput.set(NULL_PROJECT_OUTPUT); context.write(donationOutput, projectOutput); } } } else { // Case 3 : LEFT side doesn't have values, but RIGHT side has values // Simply write (null, RIGHT) to output for each value of LEFT. // These are also the outputs of a RIGHT OUTER JOIN for (String projectStr : projectsList) { donationOutput.set(NULL_DONATION_OUTPUT); projectOutput.set(projectStr); context.write(donationOutput, projectOutput); } } }
优化1
使用 projection 在map 阶段将不需要的字段过滤掉。
public static class DonationsMapper extends Mapper<Object, DonationWritable, Text, ObjectWritable> { private Text outputKey = new Text(); private ObjectWritable outputValue = new ObjectWritable(); @Override public void map(Object key, DonationWritable donation, Context context) throws IOException, InterruptedException { outputKey.set(donation.project_id); // Create new object with projected values outputValue.set(DonationProjection.makeProjection(donation)); context.write(outputKey, outputValue); } } /** * Projects Mapper. * */ public static class ProjectsMapper extends Mapper<Object, ProjectWritable, Text, ObjectWritable> { private Text outputKey = new Text(); private ObjectWritable outputValue = new ObjectWritable(); @Override public void map(Object offset, ProjectWritable project, Context context) throws IOException, InterruptedException { outputKey.set(project.project_id); // Create new object with projected values outputValue.set(ProjectProjection.makeProjection(project)); context.write(outputKey, outputValue); } }
优化2
使用 纯Text 形式。
public static class JoinReducer extends Reducer<Text, Text, Text, Text> { private Text donationOutput = new Text(); private Text projectOutput = new Text(); private List<String> donationsList; private List<String> projectsList; @Override protected void reduce(Text projectId, Iterable<Text> values, Context context) throws IOException, InterruptedException { // Clear data lists donationsList = new ArrayList<>(); projectsList = new ArrayList<>(); // Fill up data lists with selected fields for (Text value : values) { String textVal = value.toString(); // Get first char which determines the type of data char type = textVal.charAt(0); // Remove the type flag "P|" or "D|" from the beginning to get original data content textVal = textVal.substring(2); if (type == 'D') { donationsList.add(textVal); } else if (type == 'P') { projectsList.add(textVal); } else { String errorMsg = String.format("Type is neither a D nor P."); throw new IOException(errorMsg); } } // Join data lists only if both sides exist (INNER JOIN) if (!donationsList.isEmpty() && !projectsList.isEmpty()) { // Write all combinations of (LEFT, RIGHT) values for (String dontationStr : donationsList) { for (String projectStr : projectsList) { donationOutput.set(dontationStr); projectOutput.set(projectStr); context.write(donationOutput, projectOutput); } } } } }
Map-Side Join Replicated join case study
局限性
- 小表能够刷入Map端JVM 内存之中
- 只能以大表为准做 inner join 或者 left join. 因为小表会被复制到每一个map,因此每一个map 都能看到全局的做连接的右边的小表,但是反过来小表无法看到全局的大表;
基本操作
小表刷入每一个Mapper 的内存,装入 HashMap 之中,之后就可以在 map 函数之中使用其作为右表做inner join 或者 left join 了;这个版本基于 Writable 对象来做,通过从已经做好的SequenceFile中读取 Writable 类型的value
public static class ReplicatedJoinMapper extends Mapper<Object, DonationWritable, Text, Text> { public static final String PROJECTS_FILENAME_CONF_KEY = "projects.filename"; private Map<String, ProjectWritable> projectsCache = new HashMap<>(); private Text outputKey = new Text(); private Text outputValue = new Text(); @Override public void setup(Context context) throws IOException, InterruptedException { boolean cacheOK = false; URI[] cacheFiles = context.getCacheFiles(); final String distributedCacheFilename = context.getConfiguration().get(PROJECTS_FILENAME_CONF_KEY); Configuration conf = new Configuration(); for (URI cacheFile : cacheFiles) { Path path = new Path(cacheFile); if (path.getName().equals(distributedCacheFilename)) { LOG.info("Starting to build cache from : " + cacheFile); try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))) { LOG.info("Compressed ? " + reader.isBlockCompressed()); Text tempKey = new Text(); ProjectWritable tempValue = new ProjectWritable(); while (reader.next(tempKey, tempValue)) { // Clone the writable otherwise all map values will be the same reference to tempValue ProjectWritable project = WritableUtils.clone(tempValue, conf); projectsCache.put(tempKey.toString(), project); } } LOG.info("Finished to build cache. Number of entries : " + projectsCache.size()); cacheOK = true; break; } } if (!cacheOK) { LOG.error("Distributed cache file not found : " + distributedCacheFilename); throw new IOException("Distributed cache file not found : " + distributedCacheFilename); } } @Override public void map(Object key, DonationWritable donation, Context context) throws IOException, InterruptedException { ProjectWritable project = projectsCache.get(donation.project_id); // Ignore if the corresponding entry doesn't exist in the projects data (INNER JOIN) if (project == null) { return; } String donationOutput = String.format("%s|%s|%s|%s|%.2f", donation.donation_id, donation.project_id, donation.donor_city, donation.ddate, donation.total); String projectOutput = String.format("%s|%s|%d|%s", project.project_id, project.school_city, project.poverty_level, project.primary_focus_subject); outputKey.set(donationOutput); outputValue.set(projectOutput); context.write(outputKey, outputValue); } }
优化1
使用 projection 在map 阶段将不需要的字段过滤掉。
/** * Smaller "Project" class used for projection. * * @author Nicomak * */ public static class ProjectProjection { public String project_id; public String school_city; public String poverty_level; public String primary_focus_subject; public ProjectProjection(ProjectWritable project) { this.project_id = project.project_id; this.school_city = project.school_city; this.poverty_level = project.poverty_level; this.primary_focus_subject = project.primary_focus_subject; } } public static class ReplicatedJoinMapper extends Mapper<Object, DonationWritable, Text, Text> { private Map<String, ProjectProjection> projectsCache = new HashMap<>(); private Text outputKey = new Text(); private Text outputValue = new Text(); @Override public void setup(Context context) throws IOException, InterruptedException { boolean cacheOK = false; URI[] cacheFiles = context.getCacheFiles(); final String distributedCacheFilename = context.getConfiguration().get(PROJECTS_FILENAME_CONF_KEY); Configuration conf = new Configuration(); for (URI cacheFile : cacheFiles) { Path path = new Path(cacheFile); if (path.getName().equals(distributedCacheFilename)) { LOG.info("Starting to build cache from : " + cacheFile); try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))) { LOG.info("Compressed ? " + reader.isBlockCompressed()); Text tempKey = new Text(); ProjectWritable tempValue = new ProjectWritable(); while (reader.next(tempKey, tempValue)) { // We are creating a smaller projection object here to save memory space. ProjectProjection projection = new ProjectProjection(tempValue); projectsCache.put(tempKey.toString(), projection); } } LOG.info("Finished to build cache. Number of entries : " + projectsCache.size()); cacheOK = true; break; } } if (!cacheOK) { LOG.error("Distributed cache file not found : " + distributedCacheFilename); throw new IOException("Distributed cache file not found : " + distributedCacheFilename); } } @Override public void map(Object key, DonationWritable donation, Context context) throws IOException, InterruptedException { ProjectProjection project = projectsCache.get(donation.project_id); // Ignore if the corresponding entry doesn't exist in the projects data (INNER JOIN) if (project == null) { return; } if (project != null) { String donationOutput = String.format("%s|%s|%s|%s|%.2f", donation.donation_id, donation.project_id, donation.donor_city, donation.ddate, donation.total); String projectOutput = String.format("%s|%s|%s|%s", project.project_id, project.school_city, project.poverty_level, project.primary_focus_subject); outputKey.set(donationOutput); outputValue.set(projectOutput); context.write(outputKey, outputValue); } } }
优化2
纯文本形式
public static class ReplicatedJoinMapper extends Mapper<Object, DonationWritable, Text, Text> { public static final String PROJECTS_FILENAME_CONF_KEY = "projects.filename"; private Map<String, String> projectsCache = new HashMap<>(); private Text outputKey = new Text(); private Text outputValue = new Text(); @Override public void setup(Context context) throws IOException, InterruptedException { boolean cacheOK = false; URI[] cacheFiles = context.getCacheFiles(); final String distributedCacheFilename = context.getConfiguration().get(PROJECTS_FILENAME_CONF_KEY); Configuration conf = new Configuration(); for (URI cacheFile : cacheFiles) { Path path = new Path(cacheFile); if (path.getName().equals(distributedCacheFilename)) { LOG.info("Starting to build cache from : " + cacheFile); try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))) { LOG.info("Compressed ? " + reader.isBlockCompressed()); Text tempKey = new Text(); ProjectWritable tempValue = new ProjectWritable(); while (reader.next(tempKey, tempValue)) { // Serialize important value to a string containing pipe-separated values String projectString = String.format("%s|%s|%s|%s", tempValue.project_id, tempValue.school_city, tempValue.poverty_level, tempValue.primary_focus_subject); projectsCache.put(tempKey.toString(), projectString); } } LOG.info("Finished to build cache. Number of entries : " + projectsCache.size()); cacheOK = true; break; } } if (!cacheOK) { LOG.error("Distributed cache file not found : " + distributedCacheFilename); throw new IOException("Distributed cache file not found : " + distributedCacheFilename); } } @Override public void map(Object key, DonationWritable donation, Context context) throws IOException, InterruptedException { String projectOutput = projectsCache.get(donation.project_id); // Ignore if the corresponding entry doesn't exist in the projects data (INNER JOIN) if (projectOutput == null) { return; } String donationOutput = String.format("%s|%s|%s|%s|%.2f", donation.donation_id, donation.project_id, donation.donor_city, donation.ddate, donation.total); outputKey.set(donationOutput); outputValue.set(projectOutput); context.write(outputKey, outputValue); } }
内部表:
hive>create table it (name string , age string);
//此时会在hdfs的/user/hive/warehouse/目录下新建一个it表的数据存放地
hive>load data inpath '/input/data' into table it;
//接着上传hdfs数据到表中,此时会将hdfs上的/input/data目录下的数据转移到/user/hive/warehouse/下,而/input/data目录下的数据就没有了
删除it表后,会将it表的数据和元数据信息全部删除,即/user/hive/warehouse/下没有数据。
特别注意:load data会转移数据,也就是/input/data目录下的数据被转移就没有了。
外部表:
hive>create external table et (name string , age string);
//在hdfs的/user/hive/warehouse/下面新建一个表目录et
hive>load data inpath '/input/edata' into table et;
//加载hdfs数据,此时会把hdfs上/input/edata/下的数据转到/user/hive/warehouse/et下
删除这个外部表后,/user/hive/warehouse/et下的数据不会被删除,但是/input/edata/下的数据在上一步load后就已经没有了。数据的位置发生变化,本质是load一个hdfs上的数据时会转移数据。
虚拟列(分区表partition)
Hive中有个"虚拟列"的概念,此列并未在表中真正存在,其用意是为了将Hive中的表进行分区(partition),这对每日增长的海量数据存储而言是非常有用的。为了保证HiveQL的高效运行,强烈推荐在where语句后使用虚拟列作为限定。拿web日志举例,在Hive中为web日志创建了一个名为web_log表,它有一个虚拟列logdate,web_log表通过此列对每日的日志数据进行分区。因此,在对web_log表执行select时,切记要在where后加上logdate的限定条件,如下:
SELECT url FROM web_log WHERE logdate='20090603';
如果不限定分区直接查询,会扫描全表导致非常耗时甚至失败!hive 一张表作为一个目录存在,该目录下的分区也是按照某列的值划分,作为子目录存放起来的。
需求分析hiveql
insert overwrite local directory '/TODB2_PATH/20150508/' select substr(hour_id,9,2), sum(case when a.busi_type_id=15 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=1 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=4 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=7 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=3 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=6 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=5 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=14 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=17 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=13 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=12 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=9 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=2 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=11 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=8 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=16 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id=10 then a.down_flow+a.up_flow else cast(0 as bigint) end), sum(case when a.busi_type_id in(18,19,20,-9) then a.down_flow+a.up_flow else cast(0 as bigint) end) from dw_cal_gprs_bh_yyyymmdd a where month_id=201504 group by substr(hour_id,9,2) ;
hive学习之数据导入导出
把hive数据从hive数据库表中导出,总体分为两类,第一类就是导出到文件里面;第二类就是导出到另一个表
2.1.1把hive数据导出到文件系统
1.直接使用Hql语句
insert overwrite [local] directory 'importedRecordNum/' select count(*) from ods01_am_score_reception_dm where month_id=201506 and day_id=20150616 ;
如果要指定分割符,可以写成
hive> insert overwrite local directory './' hive> row format delimited hive> FIELDS TERMINATED BY '\t' hive> select inserttime,importedrecordnum from tmp_imported_recordnum order by inserttime;
如果记录里有map或者collection,需要指定格式
hive> insert overwrite local directory './' hive> row format delimited hive> FIELDS TERMINATED BY '\t' hive> COLLECTION ITEMS TERMINATED BY ',' hive> MAP KEYS TERMINATED BY ':' hive> select inserttime,importedrecordnum from tmp_imported_recordnum order by inserttime;
加入local表示本地文件系统目录,不加local表示hdfs文件系统目录。注意,如果你有多个导出结果,那么不能导出到同一个目录,否则会覆盖掉以前的结果,最好是一个新的目录,否则以前的所有内容都会被覆盖!!!overwrite
会覆盖掉原来的结果,使用 into
才不会覆盖,但是 into
用于插入到 hive 表中。另外导出的数据默认是压缩的,需要设置set hive.exec.compress.output= false 取消压缩.
2.使用hive命令再加上重定向
hive -e "use dc;select * from ods01_am_score_reception_dm where month_id=201506 and day_id=20150616" >> ./tmp.dat hive –f sql.d >>/home/ocdc/tmp.dat
2.1.2把hive数据导出到数据库中的另一个表中
insert into table tmp_imported_recordnum select count(*) from ods01_am_score_reception_dm where month_id=201506 and day_id=20150616 ;
这里有一个问题就是hive到底支不支持插入一条记录,表面上是支持插入到一张表,但实际使用select语句查询的结果不论是一条记录还是多条,都是一个结果集,也即是一个中间表,所以使用insert into table tablename select ….这种格式本身就不是在插入一条记录。如果多次insert into select到同一张表,其实每一次都会在tablename表对应的分区目录下生成一个文件
[ocdc@HBBDC-Interface-5 tmp_imported_recordnum]$ ls -rtl total 0 -rw-r--r--. 1 ocdc nobody 44 Jun 19 09:55 000000_0.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 09:55 000000_0_copy_1.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 09:56 000000_0_copy_2.lzo_deflate -rw-r--r--. 1 ocdc nobody 42 Jun 19 09:56 000000_0_copy_3.lzo_deflate -rw-r--r--. 1 ocdc nobody 45 Jun 19 09:57 000000_0_copy_4.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 09:57 000000_0_copy_5.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 09:57 000000_0_copy_6.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 09:58 000000_0_copy_7.lzo_deflate -rw-r--r--. 1 ocdc nobody 42 Jun 19 09:58 000000_0_copy_8.lzo_deflate -rw-r--r--. 1 ocdc nobody 41 Jun 19 09:59 000000_0_copy_9.lzo_deflate -rw-r--r--. 1 ocdc nobody 40 Jun 19 10:00 000000_0_copy_10.lzo_deflate -rw-r--r--. 1 ocdc nobody 39 Jun 19 10:00 000000_0_copy_11.lzo_deflate -rw-r--r--. 1 ocdc nobody 40 Jun 19 10:01 000000_0_copy_12.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 10:01 000000_0_copy_13.lzo_deflate -rw-r--r--. 1 ocdc nobody 44 Jun 19 10:02 000000_0_copy_14.lzo_deflate -rw-r--r--. 1 ocdc nobody 41 Jun 19 10:02 000000_0_copy_15.lzo_deflate -rw-r--r--. 1 ocdc nobody 41 Jun 19 10:03 000000_0_copy_16.lzo_deflate -rw-r--r--. 1 ocdc nobody 37 Jun 19 10:03 000000_0_copy_17.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:03 000000_0_copy_18.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 10:04 000000_0_copy_19.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:04 000000_0_copy_20.lzo_deflate -rw-r--r--. 1 ocdc nobody 40 Jun 19 10:05 000000_0_copy_21.lzo_deflate -rw-r--r--. 1 ocdc nobody 40 Jun 19 10:06 000000_0_copy_22.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:06 000000_0_copy_23.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:07 000000_0_copy_24.lzo_deflate -rw-r--r--. 1 ocdc nobody 46 Jun 19 10:07 000000_0_copy_25.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 10:08 000000_0_copy_26.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:08 000000_0_copy_27.lzo_deflate -rw-r--r--. 1 ocdc nobody 41 Jun 19 10:09 000000_0_copy_28.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:09 000000_0_copy_29.lzo_deflate -rw-r--r--. 1 ocdc nobody 44 Jun 19 10:10 000000_0_copy_30.lzo_deflate -rw-r--r--. 1 ocdc nobody 43 Jun 19 10:10 000000_0_copy_31.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:11 000000_0_copy_32.lzo_deflate -rw-r--r--. 1 ocdc nobody 44 Jun 19 10:11 000000_0_copy_33.lzo_deflate -rw-r--r--. 1 ocdc nobody 44 Jun 19 10:11 000000_0_copy_34.lzo_deflate -rw-r--r--. 1 ocdc nobody 45 Jun 19 10:12 000000_0_copy_35.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:13 000000_0_copy_36.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:13 000000_0_copy_37.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:14 000000_0_copy_38.lzo_deflate -rw-r--r--. 1 ocdc nobody 42 Jun 19 10:14 000000_0_copy_39.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:14 000000_0_copy_40.lzo_deflate -rw-r--r--. 1 ocdc nobody 38 Jun 19 10:15 000000_0_copy_41.lzo_deflate
该版本虽然能插入同一个表,但是每一次插入都是在该表目录下生成一个文件拷贝,最后通过 select * from tablename
得到的结果并不是按照插入顺序排序,而是按照拷贝名称排序,hive内部默认进行了排序,因此查询结果顺序无法预测,并不是按照插入顺序,因为这些记录压根不在一个文件里。
从这里也可以看出hive这样使用时不支持随机插入一条数据到数据库的,因为每一次插入的结果都是一个文件的形式存放的,并没有把记录合并的一个文件中,这就是为什么hive数据库也不支持记录级的更改update和delete;
如何让这些记录有序,就是按照他们插入的顺序被查询出来呢,这里解决方案之一是在表 tmp_imported_recordnum
中增加插入时间戳字段或者递增的id字段,而后用 order by id
来排序输出。
hive 统计小例子
#定时任务脚本,统计接口入库记录数,定时为每天的02:00点,运行时间大概为3个小时 #importRecordNum.sh #version9.X #统计接口入库record条数,高效率版本,内嵌hiveQL,充分利用hadoop并行计算功能 #!/bin/bash #必须加入这个命令,定时任务的shell环境和当前shell不一样 source ~/.bashrc opt_time_day=`date +%Y%m%d -d '-2 days'` opt_time_month=`date +%Y%m` echo -e "启动时间\t`date +%c`">/home/ocdc/tianqi/import_record_statistics/hive${opt_time_day}.log echo -e "处理批次\t${opt_time_month}/${opt_time_day}">>/home/ocdc/tianqi/import_record_statistics/hive${opt_time_day}.log hive -e "use dc;drop table tmp_imported_recordnum;CREATE TABLE tmp_imported_recordnum( importedrecordnum string, inserttime timestamp) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://hbcm/user/hive/ocdc/dc.db/tmp_imported_recordnum';" for if_tablename in `cat /home/ocdc/tianqi/import_record_statistics/if_tablename.int` do hive -e "use dc;insert into table tmp_imported_recordnum select count(*),unix_timestamp() from ${if_tablename} where month_id=${opt_time_month} and day_id=${opt_time_day};" echo -e "${if_tablename}\t统计完成">>/home/ocdc/tianqi/import_record_statistics/hive${opt_time_day}.log done #导出查询结果到本地文件系统 hive -e "use dc;select inserttime,importedrecordnum from tmp_imported_recordnum order by inserttime;" >>/home/ocdc/tianqi/import_record_statistics/imp${opt_time_day}.dat hive -e "quit;" echo -e "完成时间\t`date +%c`">>/home/ocdc/tianqi/import_record_statistics/hive${opt_time_day}.log exit 0
shell解释这些hive命令很快,导致hive处于一种高并发状态,有大量的缓存撑爆了整个磁盘,导致集群垮掉。解决办法是让这些sql执行时间间隔长一些。
hive里,同一sql里,会涉及到n个job,默认情况下,每个job是顺序执行的。 如果每个job没有前后依赖关系,可以并发执行的话,可以通过设置该参数 set hive.exec.parallel=true,实现job并发执行,该参数默认可以并发执行的job数为8。