Hadoop集群间的hbase数据迁移


在日常的使用过程中,可能经常需要将一个集群中hbase的数据迁移到或者拷贝到另外一个集群中,这时候,可能会出很多问题

以下是我在处理的过程中的一些做法和处理方式。

前提,两个hbase的版本一直,否则可能出现不可预知的问题,造成数据迁移失败

当两个集群不能通讯的时候,可以先将数据所在集群中hbase的数据文件拷贝到本地

具体做法如下:

在Hadoop目录下执行如下命令,拷贝到本地文件。 

bin/hadoop fs -copyToLocal /hbase/tab_keywordflow /home/test/xiaochenbak 

然后你懂得,将文件拷贝到你需要的你需要迁移到的那个集群中,目录是你的表的目录,

如果这个集群中也有对应的表文件,那么删除掉,然后拷贝。

/bin/hadoop fs -rmr /hbase/tab_keywordflow

/bin/hadoop fs -copyFromLocal /home/other/xiaochenbak /hbase/tab_keywordflow 

此时的/home/other/xiaochenbak为你要迁移到数据的集群。 

重置该表在.META.表中的分区信息

bin/hbase org.jruby.Main /home/other/hbase/bin/add_table.rb /hbase/tab_keywordflow

/home/other/hbase/bin/add_table.rb为ruby脚本,可以执行,脚本内容如下:另存为add_table.rb即可 

  1. #  
  2. # Copyright 2009 The Apache Software Foundation  
  3. #  
  4. # Licensed to the Apache Software Foundation (ASF) under one  
  5. # or more contributor license agreements.  See the NOTICE file  
  6. # distributed with this work for additional information  
  7. # regarding copyright ownership.  The ASF licenses this file  
  8. # to you under the Apache License, Version 2.0 (the  
  9. "License"); you may not use this file except in compliance  
  10. # with the License.  You may obtain a copy of the License at  
  11. #  
  12. #     http://www.apache.org/licenses/LICENSE-2.0   
  13. #  
  14. # Unless required by applicable law or agreed to in writing, software  
  15. # distributed under the License is distributed on an "AS IS" BASIS,  
  16. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
  17. # See the License for the specific language governing permissions and  
  18. # limitations under the License.  
  19. #  
  20. # Script adds a table back to a running hbase.  
  21. # Currently only works on if table data is in place.  
  22. #  
  23. # To see usage for this script, run:  
  24. #  
  25. #  ${HBASE_HOME}/bin/hbase org.jruby.Main addtable.rb  
  26. #  
  27. include Java  
  28. import org.apache.hadoop.hbase.util.Bytes  
  29. import org.apache.hadoop.hbase.HConstants  
  30. import org.apache.hadoop.hbase.regionserver.HRegion  
  31. import org.apache.hadoop.hbase.HRegionInfo  
  32. import org.apache.hadoop.hbase.client.HTable  
  33. import org.apache.hadoop.hbase.client.Delete  
  34. import org.apache.hadoop.hbase.client.Put  
  35. import org.apache.hadoop.hbase.client.Scan  
  36. import org.apache.hadoop.hbase.HTableDescriptor  
  37. import org.apache.hadoop.hbase.HBaseConfiguration  
  38. import org.apache.hadoop.hbase.util.FSUtils  
  39. import org.apache.hadoop.hbase.util.Writables  
  40. import org.apache.hadoop.fs.Path  
  41. import org.apache.hadoop.fs.FileSystem  
  42. import org.apache.commons.logging.LogFactory  
  43.   
  44. # Name of this script  
  45. NAME = "add_table"  
  46.   
  47. # Print usage for this script  
  48. def usage  
  49.   puts 'Usage: %s.rb TABLE_DIR [alternate_tablename]' % NAME  
  50.   exit!  
  51. end  
  52.   
  53. # Get configuration to use.  
  54. c = HBaseConfiguration.new()  
  55.   
  56. # Set hadoop filesystem configuration using the hbase.rootdir.  
  57. # Otherwise, we'll always use localhost though the hbase.rootdir  
  58. # might be pointing at hdfs location.  
  59. c.set("fs.default.name", c.get(HConstants::HBASE_DIR))  
  60. fs = FileSystem.get(c)  
  61.   
  62. # Get a logger and a metautils instance.  
  63. LOG = LogFactory.getLog(NAME)  
  64.   
  65. # Check arguments  
  66. if ARGV.size < 1 || ARGV.size > 2  
  67.   usage  
  68. end  
  69.   
  70. # Get cmdline args.  
  71. srcdir = fs.makeQualified(Path.new(java.lang.String.new(ARGV[0])))  
  72.   
  73. if not fs.exists(srcdir)  
  74.   raise IOError.new("src dir " + srcdir.toString() + " doesn't exist!")  
  75. end  
  76.   
  77. # Get table name  
  78. tableName = nil  
  79. if ARGV.size > 1  
  80.   tableName = ARGV[1]  
  81.   raise IOError.new("Not supported yet")  
  82. elsif  
  83.   # If none provided use dirname  
  84.   tableName = srcdir.getName()  
  85. end  
  86. HTableDescriptor.isLegalTableName(tableName.to_java_bytes)  
  87.   
  88. # Figure locations under hbase.rootdir  
  89. # Move directories into place; be careful not to overwrite.  
  90. rootdir = FSUtils.getRootDir(c)  
  91. tableDir = fs.makeQualified(Path.new(rootdir, tableName))  
  92.   
  93. # If a directory currently in place, move it aside.  
  94. if srcdir.equals(tableDir)  
  95.   LOG.info("Source directory is in place under hbase.rootdir: " + srcdir.toString());  
  96. elsif fs.exists(tableDir)  
  97.   movedTableName = tableName + "." + java.lang.System.currentTimeMillis().to_s  
  98.   movedTableDir = Path.new(rootdir, java.lang.String.new(movedTableName))  
  99.   LOG.warn("Moving " + tableDir.toString() + " aside as " + movedTableDir.toString());  
  100.   raise IOError.new("Failed move of " + tableDir.toString()) unless fs.rename(tableDir, movedTableDir)  
  101.   LOG.info("Moving " + srcdir.toString() + " to " + tableDir.toString());  
  102.   raise IOError.new("Failed move of " + srcdir.toString()) unless fs.rename(srcdir, tableDir)  
  103. end  
  104.   
  105. # Clean mentions of table from .META.  
  106. # Scan the .META. and remove all lines that begin with tablename  
  107. LOG.info("Deleting mention of " + tableName + " from .META.")  
  108. metaTable = HTable.new(c, HConstants::META_TABLE_NAME)  
  109. tableNameMetaPrefix = tableName + HConstants::META_ROW_DELIMITER.chr  
  110. scan = Scan.new((tableNameMetaPrefix + HConstants::META_ROW_DELIMITER.chr).to_java_bytes)  
  111. scanner = metaTable.getScanner(scan)  
  112. # Use java.lang.String doing compares.  Ruby String is a bit odd.  
  113. tableNameStr = java.lang.String.new(tableName)  
  114. while (result = scanner.next())  
  115.   rowid = Bytes.toString(result.getRow())  
  116.   rowidStr = java.lang.String.new(rowid)  
  117.   if not rowidStr.startsWith(tableNameMetaPrefix)  
  118.     # Gone too far, break  
  119.     break  
  120.   end  
  121.   LOG.info("Deleting row from catalog: " + rowid);  
  122.   d = Delete.new(result.getRow())  
  123.   metaTable.delete(d)  
  124. end  
  125. scanner.close()  
  126.   
  127. # Now, walk the table and per region, add an entry  
  128. LOG.info("Walking " + srcdir.toString() + " adding regions to catalog table")  
  129. statuses = fs.listStatus(srcdir)  
  130. for status in statuses  
  131.   next unless status.isDir()  
  132.   next if status.getPath().getName() == "compaction.dir"  
  133.   regioninfofile =  Path.new(status.getPath(), HRegion::REGIONINFO_FILE)  
  134.   unless fs.exists(regioninfofile)  
  135.     LOG.warn("Missing .regioninfo: " + regioninfofile.toString())  
  136.     next  
  137.   end  
  138.   is = fs.open(regioninfofile)  
  139.   hri = HRegionInfo.new()  
  140.   hri.readFields(is)  
  141.   is.close()  
  142.   # TODO: Need to redo table descriptor with passed table name and then recalculate the region encoded names.  
  143.   p = Put.new(hri.getRegionName())  
  144.   p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri))  
  145.   metaTable.put(p)  
  146.   LOG.info("Added to catalog: " + hri.toString())  
  147. end  

好了,以上就是我的做法,如何集群键可以通信,那就更好办了,相信你懂得,scp

相关内容