compact处理流程分析


compact处理流程分析

compact的处理与split相同,由client端与flush时检查发起。

针对compact还有一个在rs生成时生成的CompactionChecker线程定期去检查是否需要做compact操作

线程执行的间隔时间通过hbase.server.thread.wakefrequency配置,默认为10*1000ms

CompactionChecker线程主要作用:

生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否需要compact的检查线程,

如果需要进行compact,会在此处通过compact的线程触发compcat的请求

此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置majorcompact的优先级,

如果majorcompact的优先级大过此值,compact的优先级设置为此值.

Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间需要进行compact检查的间隔

默认为1000ms,

compactionChecker的检查周期为wakefrequency*multiplierms,

也就是默认情况下线程调用1000次执行一次compact检查

a.compaction检查时发起compact的条件是

如果一个store中所有的file个数减去在做(或发起compact请求)的个数,大于或等于

hbase.hstore.compaction.min配置的值,

老版本使用hbase.hstore.compactionThreshold进行配置,默认值为3

b.majorcompact的条件检查

通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24

通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默认为0.2,

也就是major的时间上下浮动4.8小时

b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示需要major,

b2.1>store下是否只有一个文件,同时这个文件已经到了major的时间,

b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间需要major,否则不做

b2.2>文件个数大于1,到达major的时间,需要major


Client端发起compactRegionrequest

Client通过HBaseAdmin.compact发起regionserverrpc连接,调用regionserver.compactRegion

如果传入的是tablename而不是regionname,会迭代出此table的所有region调用HRegionServer.compactRegion

client发起,调用HRegionServer.compactRegion

publicCompactRegionResponse compactRegion(finalRpcController controller,

finalCompactRegionRequest request)throwsServiceException {

try{

checkOpen();

requestCount.increment();

onlineRegions中得到requestHregion实例

HRegion region= getRegion(request.getRegion());

region.startRegionOperation(Operation.COMPACT_REGION);

LOG.info("Compacting" +region.getRegionNameAsString());

booleanmajor =false;

byte[] family =null;

Storestore =null;

如果client发起的request中传入有columnfamily的值,得到此cfHStore

if(request.hasFamily()){

family= request.getFamily().toByteArray();

store= region.getStore(family);

if(store ==null){

thrownewServiceException(newIOException("columnfamily " + Bytes.toString(family)+

"does not exist in region " +region.getRegionNameAsString()));

}

}

检查是否是majorcompact请求

if(request.hasMajor()){

major= request.getMajor();

}

如果是发起majorcompaction的操作,

if(major) {

if(family !=null){

store.triggerMajorCompaction();

} else{

region.triggerMajorCompaction();

}

}


String familyLogMsg= (family!= null)?"for column family: " +Bytes.toString(family):"";

LOG.trace("User-triggeredcompaction requested for region " +

region.getRegionNameAsString()+ familyLogMsg);

String log= "User-triggered "+ (major ?"major ": "")+ "compaction"+ familyLogMsg;

否则是一般compation的请求,通过compactsplitThread.requestCompaction发起compactrequest

if(family!= null){

compactSplitThread.requestCompaction(region,store, log,

Store.PRIORITY_USER,null);

} else{

compactSplitThread.requestCompaction(region,log,

Store.PRIORITY_USER,null);

}

returnCompactRegionResponse.newBuilder().build();

}catch(IOException ie){

thrownewServiceException(ie);

}

}


majorcompact处理流程

requestCompaction不管是直接传入sotre或者是region的传入,

如果传入的是region,那么会拿到region下的所有store,迭代调用每一个storecompactionrequest操作。

所有的非majorcompaction request最终会通过如下方法发起compactionrequest

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r,

finalStore s,

finalString why,intpriority,CompactionRequest request,booleanselectNow)


针对storecompactionrequest处理流程

如果要对一个HBASE的表禁用掉compaction操作,可以通过createtable时配置COMPACTION_ENABLED属性

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r, finalStore s,

finalString why,intpriority,CompactionRequest request,booleanselectNow)

throwsIOException {

if(this.server.isStopped()

|| (r.getTableDesc()!= null&& !r.getTableDesc().isCompactionEnabled())){

returnnull;

}


CompactionContextcompaction= null;


此时的调用selectNowtrue,(如果是系统调用,此时的selectNowfalse,)

也就是在发起requestCompactSplitThread.CompactionRunner线程执行时,

如果是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例


if(selectNow){

通过HStore.requestCompaction得到一个compactionContext,计算要进行compactstorefile

并设置其request.priorityStore.PRIORITY_USER表示用户发起的request

如果是flush时发起的compact

并设置其request.priorityhbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

表示系统发起的request,

如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

那么priority的值为PRIORITY_USER+1

见生成CompactionRequest实例

compaction= selectCompaction(r,s,priority,request);

if(compaction== null)returnnull;// message logged inside

}


//We assume that most compactionsare small. So, put system compactionsinto small

//pool; we will do selection there, and move to large pool ifnecessary.

longsize =selectNow ?compaction.getRequest().getSize(): 0;


此时好像一直就得不到largeCompactions的实例(system时通过CompactionRunner线程检查)

因为selectNow==false时,size的大小为0

不可能大于hbase.regionserver.thread.compaction.throttle配置的值

此配置的默认值是hbase.hstore.compaction.max*2*memstoresize


ThreadPoolExecutor pool= (!selectNow&& s.throttleCompaction(size))

? largeCompactions: smallCompactions;


通过smallCompactions的线程池生成CompactionRunner线程并执行,见执行Compaction的处理线程


pool.execute(newCompactionRunner(s,r,compaction,pool));

if(LOG.isDebugEnabled()){

String type= (pool ==smallCompactions)? "Small ": "Large ";

LOG.debug(type+ "Compaction requested: "+ (selectNow? compaction.toString(): "system")

+ (why!= null&& !why.isEmpty()? "; Because: "+ why : "")+ "; "+ this);

}

returnselectNow ?compaction.getRequest(): null;

}


生成CompactionRequest实例

Hstore.requestcompaction得到要进行compactstorefile,并生成一个CompactionContext

publicCompactionContextrequestCompaction(intpriority, CompactionRequest baseRequest)

throwsIOException {

//don't even select for compaction if writes are disabled

if(!this.areWritesEnabled()){

returnnull;

}

生成一个DefaultStoreEngine.DefaultCompactionContext实例(如果storeEngine是默认的配置)

CompactionContextcompaction= storeEngine.createCompaction();

this.lock.readLock().lock();

try{

synchronized(filesCompacting){

//First, see if coprocessorwould want to override selection.

if(this.getCoprocessorHost()!= null){

List<StoreFile>candidatesForCoproc= compaction.preSelect(this.filesCompacting);

booleanoverride =this.getCoprocessorHost().preCompactSelection(

this,candidatesForCoproc,baseRequest);

if(override){

//Coprocessoris overriding normal file selection.

compaction.forceSelect(newCompactionRequest(candidatesForCoproc));

}

}


//Normal case - coprocessoris not overriding file selection.

if(!compaction.hasSelection()){

如果是client端发起的compact,此时的值为true,如果是flush时发起的compact,此时的值为false


booleanisUserCompaction= priority== Store.PRIORITY_USER;


offPeakHours的值说明:

1.通过hbase.offpeak.start.hour配置major的启动开始小时,如配置为1

2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2

如果启动时间是12配置的小时时间内,那么配置有这两个值后,

主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默认为10

减去1个文件的总和的多少倍,

如:有10个待做compact的文件,第一个文件(i=0)size=i+max(10)-1=9

以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,如果超过了倍数,不做compact

如果12配置为不等于-1,同时start小于end,当前做compact的时间刚好在此时间内,

多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默认为5.0f

否则通过hbase.hstore.compaction.ratio配置得到,默认为1.2f


booleanmayUseOffPeak= offPeakHours.isOffPeakHour()&&

offPeakCompactionTracker.compareAndSet(false,true);

try{


调用DefaultStoreEngine.DefaultCompactionContext实例的select方法,返回true/false,

compaction.select的具体分析说明可参见majorcompact处理流程


true表示有compactrequest,否则表示没有compactrequest

此方法最终调用RatioBasedCompactionPolicy.selectCompaction方法,

生成CompactRequest并放入到DefaultStoreEngine.DefaultCompactionContextrequest属性中

得到要compactstorefile列表,放入到HStore.filesCompacting列表中

方法传入的forceMajor实例只有在发起majorcompact时同时fileCompacting列表中没有值时,此值为true,

其它情况值都为false.就是最后一个参数的值为false

a.compaction.select方法中得到此store中所有的storefile列表,

传入到compactionPolicy.selectCompaction方法中。

RatioBasedCompactionPolicy.selectCompaction方法处理流程:

1.检查所有的storefile的个数减去正在做compactstorefile文件个数

是否大于hbase.hstore.blockingStoreFiles配置的值,默认为7,

比对方法:

a.如果filesCompacting(正在做compactstorefile列表)不为空

那么storefiles的个数减去正在做compactstorefile文件个数加1是否大于blockingStoreFiles配置的值

b.如果filesCompacting(正在做compactstorefile列表)为空

那么storefiles的个数减去正在做compactstorefile文件个数是否大于blockingStoreFiles配置的值

2.从所有的storefile列表中移出正在做compcatstorefile列表(fileCompacting列表中的数据)

得到还没做(可选的)compactstorefiles列表

3.如果columnfamily配置中的MIN_VERSIONS的值没有配置(=0)

得到TTL配置的值(不配置=Integer.MAX_VALUE=-1)配置的值为秒为单位,否则得到Long.MAX_VALUE

4.检查如果hbase.store.delete.expired.storefile配置的值为true(default=true),同时ttl非默认值

2中得到的storefile列表中得到ttl超时的所有storefile列表。

4.1如果有ttl过期的storefile,生成这些storefileCompactionRequest请求并返回

4.2如果没有ttl过期的storefile,(控制大文件先不做小的compact)

storefile列表中size超过hbase.hstore.compaction.max.size配置的storefile移出,默认为Long.MAX_VALUE

5.检查storefile是否需要做majorcompact操作,

5.1得到通过hbase.hregion.majorcompaction配置的值默认为1000*60*60*24*7

5.2得到通过hbase.hregion.majorcompaction.jitter配置的值,默认为0.5f

5.3检查storefile中最先更新的storefile的更新时间是否在5.15.2配置的时间内(默认是3.5天到7天之间)

如果配置为24小时,那么执行时间的加减为4.8个小时

5.4如果还没有超过配置的时间,表示不需要发做majorcompact,

5.5如果在时间范围内或超过此配置的时间,表示需要做majorcompact,

a.同时如果只有一个storefilestorefile的最小更新时间已经超过了ttl的配置时间,需要做majorcompact

b.如果有多个storefile文件,表示需要做majorcompat.

6.检查是否需要做compact还有一个条件,在5成立的条件下,

如果当前要做compactstorefile的个数小于hbase.hstore.compaction.max配置的值,默认10

  1. 56的检查条件都成立,或者此region(有个split操作,References文件),,表示升级为majorcompact

  2. 如果没有升级成majorcompact,把storefile列表中的blukloadfile移出

  3. 计算出最大的几个storefile,也就是filesize的值是后面几个文件的size的多少倍,

    把超过倍数的storefile移出,不做compact

    可以看上面对offPeakHours的值说明:


10.如果现在还有需要做compcatstorefile列表,检查文件个数是否达到最小compact的配置的值,

通过hbase.hstore.compaction.min配置,默认为3,老版本通过hbase.hstore.compactionThreshold配置

如果没有达到最小的配置值,不做compact

11.如果没有升级到major,把超过hbase.hstore.compaction.max配置的storefile移出列表。默认配置为10


12.生成并返回一个CompactionRequest的实例。如果非major,同时在offPeakHours的值说明的时间内,

CompactionRequestisOffPeak设置为true,否则设置为false(major)


compaction.select(this.filesCompacting,isUserCompaction,

mayUseOffPeak,forceMajor&& filesCompacting.isEmpty());

} catch(IOException e){

if(mayUseOffPeak){

offPeakCompactionTracker.set(false);

}

throwe;

}

assertcompaction.hasSelection();

if(mayUseOffPeak&& !compaction.getRequest().isOffPeak()){

//Compaction policy doesn't want to take advantage of off-peak.

offPeakCompactionTracker.set(false);

}

}

if(this.getCoprocessorHost()!= null){

this.getCoprocessorHost().postCompactSelection(

this,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);

}


//Selected files; see if we have a compaction with some custom baserequest.

if(baseRequest!= null){

//Update the request with what the system thinks the request should be;

//its up to the request if it wants to listen.

compaction.forceSelect(

baseRequest.combineWith(compaction.getRequest()));

}


//Finally, we have the resulting files list. Check if we have any filesat all.

finalCollection<StoreFile>selectedFiles= compaction.getRequest().getFiles();

if(selectedFiles.isEmpty()){

returnnull;

}


//Update filesCompacting (check that we do not try to compact the sameStoreFile twice).

if(!Collections.disjoint(filesCompacting,selectedFiles)){

Preconditions.checkArgument(false,"%s overlaps with %s",

selectedFiles,filesCompacting);

}

把当前要执行compactstorefile列表添加到HStore.filesCompacting中。

filesCompacting.addAll(selectedFiles);

通过storefileseqid按从小到大排序

Collections.sort(filesCompacting,StoreFile.Comparators.SEQ_ID);


//If we're enqueuinga major, clear the force flag.


如果当前要做compact的文件个数等待当前sotre中所有的storefile个数,把当前的compact提升为major


booleanisMajor =selectedFiles.size()== this.getStorefilesCount();

this.forceMajor= this.forceMajor&& !isMajor;


//Set common request properties.

//Set priority, either override value supplied by caller or from store.

compaction.getRequest().setPriority(

(priority!= Store.NO_PRIORITY)? priority: getCompactPriority());

compaction.getRequest().setIsMajor(isMajor);

compaction.getRequest().setDescription(

getRegionInfo().getRegionNameAsString(),getColumnFamilyName());

}

}finally{

this.lock.readLock().unlock();

}


LOG.debug(getRegionInfo().getEncodedName()+ " - "+ getColumnFamilyName()+ ": Initiating "

+(compaction.getRequest().isMajor()? "major": "minor")+ " compaction");

this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());

returncompaction;

}


执行Compaction的处理流程

compact执行时是通过指定的线程池生成并执行CompactSplitThread.CompactionRunner线程

以下是线程执行的具体说明:

publicvoid run(){

Preconditions.checkNotNull(server);

if(server.isStopped()

|| (region.getTableDesc()!= null&& !region.getTableDesc().isCompactionEnabled())){

return;

}

//Common case - system compaction without a file selection. Select now.

如果compaction==null表示是systemcompact非用户发起的compaction得到一个compactionContext


if(this.compaction== null){


queuedPriority的值在此线程实例生成时默认是hbase.hstore.blockingStoreFiles配置的值减去storefile的个数

如果相减的值是1时返回2,否则返回相减的值


intoldPriority = this.queuedPriority;


重新拿到hbase.hstore.blockingStoreFiles配置的值减去storefile的个数的值,


this.queuedPriority= this.store.getCompactPriority();


如果这次拿到的值比上次的值要大,表示有storefile被删除(基本上是有compact完成)


if(this.queuedPriority> oldPriority){

//Store priority decreased while we were in queue (due to some othercompaction?),

//requeuewith new priority to avoid blocking potential higher priorities.


结束本次线程调用,发起一个新的线程调用,用最新的priority


this.parent.execute(this);

return;

}

try{


通过HStore.requestCompaction得到一个compactionContext,计算要进行compactstorefile

并设置其request.priorityhbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

表示系统发起的request,

如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

那么priority的值为PRIORITY_USER+1

如果是client时发起的compact,此处会设置其request.priorityStore.PRIORITY_USER表示是用户发起的request

见生成CompactionRequest实例


this.compaction= selectCompaction(this.region,this.store,queuedPriority,null);

} catch(IOException ex){

LOG.error("Compactionselection failed " + this,ex);

server.checkFileSystem();

return;

}

if(this.compaction== null)return;// nothing to do

//Now see if we are in correct pool for the size; if not, go to thecorrect one.

//We might end up waiting for a while, so cancel the selection.

assertthis.compaction.hasSelection();

此处检查上面提到没用的地方:

compaction.getRequest().getSize()的大小为所有当此要做compactstorefile的总大小

检查是否大于hbase.regionserver.thread.compaction.throttle配置的值

此配置的默认值是hbase.hstore.compaction.max*2*memstoresize

如果大于指定的值,使用largeCompactions,否则使用smallCompactions


ThreadPoolExecutor pool= store.throttleCompaction(

compaction.getRequest().getSize())? largeCompactions: smallCompactions;

如果发现当前重新生成的执行线程池不是上次选择的线程池,结束compaction操作,

并重新通过新的线程池执行当前线程,结束当前线程的调用执行

if(this.parent!= pool) {

this.store.cancelRequestedCompaction(this.compaction);

this.compaction= null;

this.parent= pool;

this.parent.execute(this);

return;

}

}

//Finally we can compact something.

assertthis.compaction!= null;


this.compaction.getRequest().beforeExecute();

try{

//Note: please don't put single-compaction logic here;

// put it into region/store/etc. This is CST logic.

longstart =EnvironmentEdgeManager.currentTimeMillis();

调用HRegion.compact方法,此方法调用HStore.compact方法,把CompactionContext传入

此方法调用返回compact是否成功,如果成功返回true,否则返回false

booleancompleted =region.compact(compaction,store);

longnow =EnvironmentEdgeManager.currentTimeMillis();

LOG.info(((completed)? "Completed": "Aborted")+ " compaction: "+

this+ "; duration="+ StringUtils.formatTimeDiff(now,start));

if(completed){


检查此时的storefile个数是否还大于hbase.hstore.blockingStoreFiles配置的值,默认为7

如要大于或等于此时返回的值为小于或等于0的值,表示还需要进行compact操作,重新再发起一次compactrequest

//degenerate case: blocked regions require recursive enqueues

if(store.getCompactPriority()<= 0) {

requestSystemCompaction(region,store,"Recursive enqueue");

} else{

此时表示compact操作完成后,storefile的个数在配置的范围内,不需要在做compact

检查是否需要split,如果需要发起split操作。

Split的发起条件:

a.splitlimit,hbase.regionserver.regionSplitLimit配置的值大于当前rs中的allonlineregions

默认为integer.maxvalue

b.a检查通过的同时hbase.hstore.blockingStoreFiles配置的值减去storefile的个数

大于等于Store.PRIORITY_USER(1)

c.metanamespace表,同时其它条件见split的分析部分

//see if the compaction has caused us to exceed max region size

requestSplit(region);

}

}

} catch(IOException ex){

IOException remoteEx= RemoteExceptionHandler.checkIOException(ex);

LOG.error("Compactionfailed " + this,remoteEx);

if(remoteEx!= ex) {

LOG.info("Compactionfailed at original callstack: " +formatStackTrace(ex));

}

server.checkFileSystem();

} catch(Exception ex){

LOG.error("Compactionfailed " + this,ex);

server.checkFileSystem();

} finally{

LOG.debug("CompactSplitThreadStatus: " +CompactSplitThread.this);

}

this.compaction.getRequest().afterExecute();

}


Hstore.compact方法流程:


publicList<StoreFile>compact(CompactionContextcompaction) throwsIOException {

assertcompaction!= null&& compaction.hasSelection();

CompactionRequest cr= compaction.getRequest();

得到要做compactstorefile列表

Collection<StoreFile>filesToCompact= cr.getFiles();

assert!filesToCompact.isEmpty();

synchronized(filesCompacting){

//sanity check: we're compacting files that this store knows about

//TODO:change this to LOG.error() after more debugging

Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));

}


//Ready to go. Have list of files to compact.

LOG.info("Startingcompaction of " +filesToCompact.size()+ " file(s) in "

+ this+ " of "+ this.getRegionInfo().getRegionNameAsString()

+ "into tmpdir=" + fs.getTempDir()+ ", totalSize="

+StringUtils.humanReadableInt(cr.getSize()));


longcompactionStartTime= EnvironmentEdgeManager.currentTimeMillis();

List<StoreFile>sfs = null;

try{

执行compact操作,把所有的storefile全并成一个storefile,放入到store/.tmp目录下

通过DefaultCompactor.compact操作,把原有的所有storefile生成一个StoreFileScanner列表,

并生成一个StoreScannerStoreFileScanner列表加入,

如果compact提升成了major,ScanType=COMPACT_DROP_DELETES,否则等于COMPACT_RETAIN_DELETES

针对compact的数据scan可参见后期分析的scan流程

//Commence the compaction.

List<Path>newFiles =compaction.compact();


如果hbase.hstore.compaction.complete设置为false,检查storefile生成是否可用

//TODO:get rid of this!

if(!this.conf.getBoolean("hbase.hstore.compaction.complete",true)){

LOG.warn("hbase.hstore.compaction.completeis set to false");

sfs= newArrayList<StoreFile>();

for(Path newFile: newFiles){

//Create storefilearound what we wrote with a reader on it.

StoreFile sf= createStoreFileAndReader(newFile);

sf.closeReader(true);

sfs.add(sf);

}

returnsfs;

}

把生成的新的storefile添加到cf的目录下。并返回生成后的storefile,此storefile已经生成好reader

//Do the steps necessary to complete the compaction.

sfs= moveCompatedFilesIntoPlace(cr,newFiles);


生成一个compaction的说明信息,写入到wal日志中

writeCompactionWalRecord(filesToCompact,sfs);


把原有的storefile列表中store中的storefiles列表中移出,

并把新的storefile添加到storefiles列表中,对storefiles列表重新排序,通过storefile.seqid

storefiles列表是scan操作时对store中的查询用的storefilereader

HStore.filesCompacting列表中移出完成compactstorefiles列表

replaceStoreFiles(filesToCompact,sfs);


hdfs中此store下移出compact完成的storefile文件列表。

//At this point the store will use new files for all new scanners.

completeCompaction(filesToCompact);// Archive old files & update storesize.

}finally{

HStore.filesCompacting列表中移出完成compactstorefiles列表,如果compact完成此时没有要移出的文件

如果compact失败,此时把没有compact的文件移出

finishCompactionRequest(cr);

}

logCompactionEndMessage(cr,sfs,compactionStartTime);

returnsfs;

}



majorcompact处理流程

majorCompaction不管是直接传入sotre或者是region的传入,

如果传入的是region,那么会拿到region下的所有store,迭代调用每一个storetriggerMajorCompaction操作。

Hstore.triggerMajorCompaction操作流程:设置store中的forcemajor的值为true

publicvoid triggerMajorCompaction(){

this.forceMajor= true;

}


设置完成forceMajor的值后,最终还是直接触发requestCompaction方法

if(family!= null) {

compactSplitThread.requestCompaction(region,store, log,

Store.PRIORITY_USER,null);

} else{

compactSplitThread.requestCompaction(region,log,

Store.PRIORITY_USER,null);

}

requestCompaction的处理流程大至与非majorcoompact处理流程无区别:

CompactSplitThread.requestCompaction-->requestCompactionInternal-->selectCompaction

-->Hstore.requestCompaction(priority,request)得到compactionContext

代码细节如下所示:

是否是用户发起的compaction操作


booleanisUserCompaction= priority== Store.PRIORITY_USER;


以下代码返回为true的条件:

a.hbase.offpeak.start.hour的值不等于-1(0-23之间的值)

b.hbase.offpeak.end.hour的值不等-1(0-23之间的值),同时此值大于a配置的值

c.当前时间的小时部分在ab配置的时间之间

否则返回的值为false


booleanmayUseOffPeak= offPeakHours.isOffPeakHour()&&

offPeakCompactionTracker.compareAndSet(false,true);

try{


此时最后一个参数为true(在没有其它的compact操作的情况下,同时指定的compact模式为major),


compaction.select(this.filesCompacting,isUserCompaction,

mayUseOffPeak,forceMajor&& filesCompacting.isEmpty());

} catch(IOException e){

if(mayUseOffPeak){

offPeakCompactionTracker.set(false);

}

throwe;

}


以上代码的中的compaction.select默认调用为DefaultStoreEngine.DefaultCompactionContext.select方法


publicbooleanselect(List<StoreFile>filesCompacting,booleanisUserCompaction,

booleanmayUseOffPeak,booleanforceMajor)throwsIOException {


调用RatioBasedCompactionPolicy.selectCompaction得到一个CompactionRequest

并把此request设置到当前compaction实例的request属性中


request= compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),

filesCompacting,isUserCompaction,mayUseOffPeak,forceMajor);

returnrequest!= null;

}


RatioBasedCompactionPolicy.selectCompaction处理流程说明:


publicCompactionRequest selectCompaction(Collection<StoreFile>candidateFiles,

finalList<StoreFile>filesCompacting,finalbooleanisUserCompaction,

finalbooleanmayUseOffPeak,finalbooleanforceMajor)throwsIOException {

//Preliminary compaction subject to filters

ArrayList<StoreFile>candidateSelection= newArrayList<StoreFile>(candidateFiles);

//Stuck and not compacting enough (estimate). It is not guaranteed thatwe will be

//able to compact more if stuck and compacting, because ratio policyexcludes some

//non-compacting files from consideration during compaction (seegetCurrentEligibleFiles).

intfutureFiles= filesCompacting.isEmpty()? 0 : 1;


store下所有的storefile的个数减去当前已经在做compact的个数是否大于blockingfile的配置个数

blockingfile通过hbase.hstore.blockingStoreFiles配置,默认为7


booleanmayBeStuck= (candidateFiles.size()- filesCompacting.size()+ futureFiles)

>=storeConfigInfo.getBlockingFileCount();


得到可选择的storefile,也就是得到所有的storefile中不包含正在做compactsotrefile的列表


candidateSelection= getCurrentEligibleFiles(candidateSelection,filesCompacting);

LOG.debug("Selectingcompaction from " +candidateFiles.size()+ " store files, "+

filesCompacting.size()+ " compacting, "+ candidateSelection.size()+

"eligible, " +storeConfigInfo.getBlockingFileCount()+ " blocking");


得到配置的ttl过期时间,通过在cf的表属性中配置TTL属性,

如果配置值为Integer.MAX_VALUE或者-1或者不配置,表示不控制ttl,

TTL属性生效的前提是MIN_VERSIONS属性不配置,TTL属性配置单位为秒

如果以上条件检查通过表示有配置ttl,返回ttl的配置时间,否则返回Long.maxvalue


longcfTtl =this.storeConfigInfo.getStoreFileTtl();


如果不是发起的major操作,

同时配置有ttl过期时间,同时hbase.store.delete.expired.storefile配置的值为true,默认为true

同时ttl属性有配置,

得到当前未做compact操作的所有sotrefilettl过期的storefile

如果有ttl过期的storefile文件,生成CompactionRequest实例,并结束此流程处理


if(!forceMajor){

//If there are expired files, only select them so that compactiondeletes them

if(comConf.shouldDeleteExpired()&& (cfTtl!= Long.MAX_VALUE)){

ArrayList<StoreFile>expiredSelection= selectExpiredStoreFiles(

candidateSelection,EnvironmentEdgeManager.currentTimeMillis()- cfTtl);

if(expiredSelection!= null){

returnnewCompactionRequest(expiredSelection);

}

}


如果非majorstorefile中非reference(split后的文件为reference文件)storefile文件,

同时storefile的大小超过了hbase.hstore.compaction.max.size配置的最大storefile文件大小限制

移出这些文件


candidateSelection= skipLargeFiles(candidateSelection);

}


//Force a major compaction if this is a user-requested majorcompaction,

//or if we do not have too many files to compact and this was requested

//as a major compaction.

//Or, if there are any references among the candidates.



此处检查major的条件包含以下几个:


(forceMajor&& isUserCompaction)


a.如果是用户发起的compaction,同时用户发起的compactionmajorcompact,

同时store中没有其它正在做compactstorefile,此值为true


((forceMajor|| isMajorCompaction(candidateSelection))

&&(candidateSelection.size()< comConf.getMaxFilesToCompact()))


b.检查上面看到代码的3个条件,第一个(b1)与第二个(b2)为一个通过就行,第三个(b3)必须通过


forceMajor


b1.如果是发起的compaction,同时store中没有其它正在做compactstorefile


isMajorCompaction(candidateSelection)


b2.或者以下几个条件检查通过:

b2.1.可选的storefile列表中修改时间最老的一个storefile的时间达到了间隔的majorcompact时间

b2.2.如果可选的storefile列表中只有一个storefile,同时此storefile的最老的一条数据的时间已经达到ttl时间

同时此storefile的时间达到了间隔的major时间间隔

b2.3.如果可选的storefile列表中有多少storefile,同时更新时间最老的一个storefile达到了major的时间间隔

b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,

但是可选的storefile个数只有一个,同时此storefile已经做过major(StoreFile.majorCompaction==true)

同时ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做majorcompact

通过hbase.hregion.majorcompaction配置major的间隔时间,

通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差

如:major的配置时间为24小时,同时间隔的左右差是0.2f,那么default= 20% = +/- 4.8 hrs


(candidateSelection.size()< comConf.getMaxFilesToCompact())


b3.可选的storefile列表的个数小于compactmaxfiles的配置个数,

通过hbase.hstore.compaction.max配置,默认值为10


StoreUtils.hasReferences(candidateSelection)


c.如果storefile列表中包含有reference(split后的文件为reference文件)storefile


booleanmajorCompaction= (

(forceMajor&& isUserCompaction)

|| ((forceMajor|| isMajorCompaction(candidateSelection))

&&(candidateSelection.size()< comConf.getMaxFilesToCompact()))

||StoreUtils.hasReferences(candidateSelection)

);

如果是非majorcompact

if(!majorCompaction){

//we're doing a minor compaction, let's see what files are applicable

从可选的storefile列表中移出是bulkloadstorefile


candidateSelection= filterBulk(candidateSelection);


如果可选的storefile列表中的个数大于或等于hbase.hstore.compaction.max配置的值,

移出可选的storefile列表中最大的几个storefile,

通过如下说明来计算什么文件算是较大的storefile:

a.storefile的文件大小是后面几个文件的总和的多少倍数,倍数的说明在如下几行中进行了说明,

1.通过hbase.offpeak.start.hour配置major的启动开始小时,如配置为1

2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2

如果启动时间是12配置的小时时间内,那么配置有这两个值后,

主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默认为10

减去1个文件的总和的多少倍,

如:有10个待做compact的文件,第一个文件(i=0)size=i+max(10)-1=9

以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,如果超过了倍数,不做compact

如果12配置为不等于-1,同时start小于end,当前做compact的时间刚好在此时间内,

多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默认为5.0f

否则通过hbase.hstore.compaction.ratio配置得到,默认为1.2f

b.storefile的大小必须是大于hbase.hstore.compaction.min.size配置的值,默认是memstore的大小

c.如果现在所有的storefile的个数减去正在做compactstorefile个数大于

通过hbase.hstore.blockingStoreFiles配置的值,默认为7,移出最大的几个storefile

只保留通过hbase.hstore.compaction.min配置的个数,默认为3(配置不能小于2)

老版本通过hbase.hstore.compactionThreshold配置


candidateSelection= applyCompactionPolicy(candidateSelection,mayUseOffPeak,mayBeStuck);


检查可选的能做compact的文件个数是否达到最少文件要求,如果没有达到,清空所有可选的storefile列表值


candidateSelection= checkMinFilesCriteria(candidateSelection);

}

如果不是用户发起的majorcompact,移出可选的storefile列表中超过hbase.hstore.compaction.max配置的个数

candidateSelection= removeExcessFiles(candidateSelection,isUserCompaction,majorCompaction);

生成CompactionRequest实例

CompactionRequest result= newCompactionRequest(candidateSelection);

如果非major同时offpeak有配置,同时当前时间在配置的时间范围内,设置CompactionRequestoffpeaktrue

表示当前时间是非高峰时间内

result.setOffPeak(!candidateSelection.isEmpty()&& !majorCompaction&& mayUseOffPeak);

returnresult;

}


执行compaction的具体处理,见非majorcompaction处理流程中的执行compaction处理流程


flush时的compaction

flush时的compaction通过MemStoreFlusher.FlusherHander.run执行

flushRegion完成后,会触发compact的执行


CompactSplitThread.requestSystemCompaction-->requestCompactionInternal(region)

publicsynchronized voidrequestSystemCompaction(

finalHRegion r,finalString why)throwsIOException {

requestCompactionInternal(r,why,Store.NO_PRIORITY,null,false);

}


CompactSplitThread.requestCompactionInternal(Region)-->requestCompactionInternal(Store)

privateList<CompactionRequest>requestCompactionInternal(finalHRegion r, finalString why,

intp,List<Pair<CompactionRequest,Store>>requests,booleanselectNow)throwsIOException {

//not a special compaction request, so make our own list

List<CompactionRequest>ret = null;

if(requests== null){

ret= selectNow? newArrayList<CompactionRequest>(r.getStores().size()): null;

for(Stores :r.getStores().values()){


迭代发起针对storecompaction操作,传入的priority=Store.NO_PRIORITY,可参见非majorcompact处理流程


CompactionRequest cr= requestCompactionInternal(r,s, why,p, null,selectNow);

if(selectNow)ret.add(cr);

}

}else{

Preconditions.checkArgument(selectNow);// only system requests have selectNow== false

ret= newArrayList<CompactionRequest>(requests.size());

for(Pair<CompactionRequest,Store>pair :requests) {

ret.add(requestCompaction(r,pair.getSecond(),why, p,pair.getFirst()));

}

}

returnret;

}



定时线程执行的compact流程

定期线程执行通过HRegionServer.CompactionChecker执行,

CompactionChecker线程主要作用:

生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否需要compact的检查线程,

如果需要进行compact,会在此处通过compact的线程触发compcat的请求

此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置majorcompact的优先级,

如果majorcompact的优先级大过此值,compact的优先级设置为此值.

Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间需要进行compact检查的间隔

默认为1000ms,

compactionChecker的检查周期为wakefrequency*multiplierms,

也就是默认情况下线程调用1000次执行一次compact检查

a.compaction检查时发起compact的条件是

如果一个store中所有的file个数减去在做(或发起compact请求)的个数,大于或等于

hbase.hstore.compaction.min配置的值,

老版本使用hbase.hstore.compactionThreshold进行配置,默认值为3

b.majorcompact的条件检查

通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24

通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默认为0.2,

也就是major的时间上下浮动4.8小时

b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示需要major,

b2.1>store下是否只有一个文件,同时这个文件已经到了major的时间,

b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间需要major,否则不做

b2.2>文件个数大于1,到达major的时间,需要major


protectedvoid chore(){

for(HRegion r: this.instance.onlineRegions.values()){

if(r == null)

continue;

for(Stores :r.getStores().values()){

try{

longmultiplier= s.getCompactionCheckMultiplier();

assertmultiplier> 0;

if(iteration% multiplier!= 0) continue;

检查是否需要systemcompact,当前所有的storefile个数减去正在做compactstorefile个数,

大于或等于hbase.hstore.compaction.min配置的值,表示需要compact

if(s.needsCompaction()){

//Queue a compaction. Will recognize if major is needed.

发起系统的compact操作,见flush时的coompaction

this.instance.compactSplitThread.requestSystemCompaction(r,s,getName()

              • "requests compaction");


b2.或者以下几个条件检查通过:

b2.1.可选的storefile列表中修改时间最老的一个storefile的时间达到了间隔的majorcompact时间

b2.2.如果可选的storefile列表中只有一个storefile,同时此storefile的最老的一条数据的时间已经达到ttl时间

同时此storefile的时间达到了间隔的major时间间隔

b2.3.如果可选的storefile列表中有多少storefile,同时更新时间最老的一个storefile达到了major的时间间隔

b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,

但是可选的storefile个数只有一个,同时此storefile已经做过major(StoreFile.majorCompaction==true)

同时ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做majorcompact

通过hbase.hregion.majorcompaction配置major的间隔时间,

通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差

如:major的配置时间为24小时,同时间隔的左右差是0.2f,那么default= 20% = +/- 4.8 hrs


} elseif(s.isMajorCompaction()){

if(majorCompactPriority== DEFAULT_PRIORITY

||majorCompactPriority> r.getCompactPriority()){

发起requestCompaction操作,见下面说明A

this.instance.compactSplitThread.requestCompaction(r,s,getName()

+ "requests major compaction; use default priority",null);

} else{

发起requestCompaction操作,见下面说明B

this.instance.compactSplitThread.requestCompaction(r,s,getName()

+ "requests major compaction; use configured priority",

this.majorCompactPriority,null);

}

}

} catch(IOException e){

LOG.warn("Failedmajor compaction check on " + r,e);

}

}

}

iteration= (iteration== Long.MAX_VALUE)? 0 : (iteration+ 1);

}

}


说明A:

CompactSplitThread.requestCompaction-->

requestCompaction(r,s, why,Store.NO_PRIORITY,request);

-->requestCompactionInternal(r,s, why,priority,request,true);此时设置selectNowtrue


说明B:

CompactSplitThread.requestCompaction-->

requestCompactionInternal(r,s, why,priority,request,true);此时设置selectNowtrue


-------------------------------------------------------------

requestCompactionInternal处理流程:


privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r,

finalStore s,

finalString why,intpriority,CompactionRequest request,booleanselectNow)


针对storecompactionrequest处理流程

如果要对一个HBASE的表禁用掉compaction操作,可以通过createtable时配置COMPACTION_ENABLED属性

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r, finalStore s,

finalString why,intpriority,CompactionRequest request,booleanselectNow)

throwsIOException {

if(this.server.isStopped()

|| (r.getTableDesc()!= null&& !r.getTableDesc().isCompactionEnabled())){

returnnull;

}


CompactionContextcompaction= null;


此时的调用selectNowtrue,(如果是系统调用,此时的selectNowfalse,)

也就是在发起requestCompactSplitThread.CompactionRunner线程执行时,

如果是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例


if(selectNow){

通过HStore.requestCompaction得到一个compactionContext,计算要进行compactstorefile

并设置其request.priorityStore.PRIORITY_USER表示用户发起的request

如果是flush时发起的compact

并设置其request.priorityhbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

表示系统发起的request,

如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

那么priority的值为PRIORITY_USER+1

见生成CompactionRequest实例

compaction= selectCompaction(r,s,priority,request);

if(compaction== null)returnnull;// message logged inside

}


//We assume that most compactionsare small. So, put system compactionsinto small

//pool; we will do selection there, and move to large pool ifnecessary.

longsize =selectNow ?compaction.getRequest().getSize(): 0;


此时好像一直就得不到largeCompactions的实例,因为selectNow==false时,size的大小为0

不可能大于hbase.regionserver.thread.compaction.throttle配置的值

此配置的默认值是hbase.hstore.compaction.max*2*memstoresize


ThreadPoolExecutor pool= (!selectNow&& s.throttleCompaction(size))

? largeCompactions: smallCompactions;


通过smallCompactions的线程池生成CompactionRunner线程并执行,见执行Compaction的处理线程


pool.execute(newCompactionRunner(s,r,compaction,pool));

if(LOG.isDebugEnabled()){

String type= (pool ==smallCompactions)? "Small ": "Large ";

LOG.debug(type+ "Compaction requested: "+ (selectNow? compaction.toString(): "system")

+ (why!= null&& !why.isEmpty()? "; Because: "+ why : "")+ "; "+ this);

}

returnselectNow ?compaction.getRequest(): null;

}



相关内容

    暂无相关文章