UserScan的处理流程分析


UserScan的处理流程分析

前置说明

Userscan是通过clientcp中发起的scanner操作。

Scan中通过caching属性来返回可以返回多少条数据,每次进行next时。

通过batch属性来设置每次在rs端每次nextkv时,可读取多少个kv(在同一行的情况下)

在生成Scan实例时,最好是把familycolumn都设置上,这样能保证查询的最高效.

client端通过生成Scan实例,通过HTable下的如下方法得到ClientScanner实例

publicResultScannergetScanner(finalScan scan)

在生成的ClientScanner实例中的callable属性的值为生成的一个ScannerCallable实例。

并通过callable.prepare(tries!= 0);方法得到此scanstartkey所在的regionlocation.meta表中。

startkey对应的location中得到此locationHRegionInfo信息。

并设置ClientScanner.currentRegion的值为当前的region.也就是startkey所在的region.


通过ClientScanner.nextrs发起rpc调用操作。调用HRegionServer.scan

publicScanResponse scan(finalRpcControllercontroller,finalScanRequest request)



ClientScanner.next时,首先是发起openScanner操作,得到一个ScannerId

通过ScannerCallable.call方法:

if(scannerId== -1L) {

this.scannerId= openScanner();

} else{

openScanner方法:中发起一个scan操作,通过rpc调用rs.scan

ScanRequest request=

RequestConverter.buildScanRequest(

getLocation().getRegionInfo().getRegionName(),

this.scan,0, false);

try{

ScanResponse response= getStub().scan(null,request);

longid =response.getScannerId();

if(logScannerActivity){

LOG.info("Openscanner=" + id+ " for scan="+ scan.toString()

+ "on region " +getLocation().toString());

}

returnid;


HregionServer.scan中对openScanner的处理:

publicScanResponse scan(finalRpcControllercontroller,finalScanRequest request)

throwsServiceException {

Leases.Lease lease= null;

String scannerName= null;

........................................很多代码没有显示

requestCount.increment();


intttl = 0;

HRegion region= null;

RegionScannerscanner =null;

RegionScannerHolder rsh= null;

booleanmoreResults= true;

booleancloseScanner= false;

ScanResponse.Builder builder= ScanResponse.newBuilder();

if(request.hasCloseScanner()){

closeScanner= request.getCloseScanner();

}

introws = 1;

if(request.hasNumberOfRows()){

rows= request.getNumberOfRows();

}

if(request.hasScannerId()){

.................................很多代码没有显示

} else{

得到请求的HRegion实例,也就是startkey所在的HRegion

region= getRegion(request.getRegion());

ClientProtos.Scan protoScan= request.getScan();

booleanisLoadingCfsOnDemandSet= protoScan.hasLoadColumnFamiliesOnDemand();

Scan scan= ProtobufUtil.toScan(protoScan);

//if the request doesn't set this, get the default region setting.

if(!isLoadingCfsOnDemandSet){

scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());

}

scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);

如果scan没有设置family,region中所有的family当成scanfamily

region.prepareScanner(scan);

if(region.getCoprocessorHost()!= null){

scanner= region.getCoprocessorHost().preScannerOpen(scan);

}

if(scanner ==null){

执行HRegion.getScanner方法。生成HRegion.RegionScannerImpl方法

scanner= region.getScanner(scan);

}

if(region.getCoprocessorHost()!= null){

scanner= region.getCoprocessorHost().postScannerOpen(scan,scanner);

}

把生成的RegionScanner添加到scanners集合容器中。并设置scannerid(一个随机的值),

scannernamescanneridstring版本。添加过期监控处理,

通过hbase.client.scanner.timeout.period配置过期时间,默认值为60000ms

老版本通过hbase.regionserver.lease.period配置。

过期检查线程通过Leases完成。对scanner的过期处理通过一个

HregionServer.ScannerListener.leaseExpired实例来完成。


scannerId= addScanner(scanner,region);

scannerName= String.valueOf(scannerId);

ttl= this.scannerLeaseTimeoutPeriod;

}

............................................很多代码没有显示


Hregion.getScanner方法生成RegionScanner实例流程


publicRegionScannergetScanner(Scanscan)throwsIOException {

returngetScanner(scan,null);

}


层次的调用,此时传入的kvscannerlistnull

protectedRegionScannergetScanner(Scanscan,

List<KeyValueScanner>additionalScanners)throwsIOException {

startRegionOperation(Operation.SCAN);

try{

//Verify families are all valid

prepareScanner(scan);

if(scan.hasFamilies()){

for(byte[] family :scan.getFamilyMap().keySet()){

checkFamily(family);

}

}

returninstantiateRegionScanner(scan,additionalScanners);

}finally{

closeRegionOperation();

}

}


最终生成一个HRegion.RegionScannerImpl实例

protectedRegionScannerinstantiateRegionScanner(Scanscan,

List<KeyValueScanner>additionalScanners)throwsIOException {

returnnewRegionScannerImpl(scan,additionalScanners,this);

}


RegionScanner实例的生成构造方法:

RegionScannerImpl(Scanscan,List<KeyValueScanner>additionalScanners,HRegion region)

throwsIOException {


this.region= region;

this.maxResultSize= scan.getMaxResultSize();

if(scan.hasFilter()){

this.filter= newFilterWrapper(scan.getFilter());

} else{

this.filter= null;

}


this.batch= scan.getBatch();

if(Bytes.equals(scan.getStopRow(),HConstants.EMPTY_END_ROW)&& !scan.isGetScan()){

this.stopRow= null;

} else{

this.stopRow= scan.getStopRow();

}

//If we are doing a get, we want to be [startRow,endRow] normally

//it is [startRow,endRow) and if startRow=endRow we get nothing.

this.isScan= scan.isGetScan()? -1 : 0;


//synchronize on scannerReadPoints so that nobody calculates

//getSmallestReadPoint, before scannerReadPoints is updated.

IsolationLevelisolationLevel= scan.getIsolationLevel();

synchronized(scannerReadPoints){

if(isolationLevel== IsolationLevel.READ_UNCOMMITTED){

//This scan can read even uncommitted transactions

this.readPt= Long.MAX_VALUE;

MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);

} else{

this.readPt= MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);

}

scannerReadPoints.put(this,this.readPt);

}


//Here we separate all scanners into two lists - scanner that providedata required

//by the filter to operate (scanners list) and all others(joinedScanners list).

List<KeyValueScanner>scanners =newArrayList<KeyValueScanner>();

List<KeyValueScanner>joinedScanners= newArrayList<KeyValueScanner>();

if(additionalScanners!= null){

scanners.addAll(additionalScanners);

}

迭代每一个要进行scanstore。生成具体的StoreScanner实例。通常情况下joinedHead的值为null

for(Map.Entry<byte[],NavigableSet<byte[]>>entry :

scan.getFamilyMap().entrySet()){

Storestore =stores.get(entry.getKey());

生成StoreScanner实例。通过HStore.getScanner(scan,columns);

KeyValueScannerscanner =store.getScanner(scan,entry.getValue());

if(this.filter== null|| !scan.doLoadColumnFamiliesOnDemand()

||this.filter.isFamilyEssential(entry.getKey())){

scanners.add(scanner);

} else{

joinedScanners.add(scanner);

}

}

生成KeyValueHeap实例,把所有的storescanner的开始位置移动到startkey的位置并得到topStoreScanner,

this.storeHeap= newKeyValueHeap(scanners,comparator);

if(!joinedScanners.isEmpty()){

this.joinedHeap= newKeyValueHeap(joinedScanners,comparator);

}

}


得到StoreScanner实例的HStore.getScanner(scan,columns)方法

publicKeyValueScannergetScanner(Scanscan,

finalNavigableSet<byte[]> targetCols)throwsIOException {

lock.readLock().lock();

try{

KeyValueScannerscanner =null;

if(this.getCoprocessorHost()!= null){

scanner= this.getCoprocessorHost().preStoreScannerOpen(this,scan,targetCols);

}

if(scanner ==null){

scanner= newStoreScanner(this,getScanInfo(),scan,targetCols);

}

returnscanner;

}finally{

lock.readLock().unlock();

}

}

生成StoreScanner的构造方法:

publicStoreScanner(Storestore,ScanInfo scanInfo,Scan scan,finalNavigableSet<byte[]>columns)

throwsIOException {

this(store,scan.getCacheBlocks(),scan,columns,scanInfo.getTtl(),

scanInfo.getMinVersions());

如果设置有scan_raw_属性时,columns的值需要为null

if(columns !=null&& scan.isRaw()){

thrownewDoNotRetryIOException(

"Cannotspecify any column for a raw scan");

}

matcher= newScanQueryMatcher(scan,scanInfo,columns,

ScanType.USER_SCAN,Long.MAX_VALUE,HConstants.LATEST_TIMESTAMP,

oldestUnexpiredTS);

得到StoreFileScanner,StoreFileScanner中引用的StoreFile.Reader中引用HFileReaderV2,

HFileReaderV2的实例在StoreFile.Reader中如果已经存在,不会重新创建,这样会加快scanner的创建时间。

//Pass columns to try to filter out unnecessary StoreFiles.

List<KeyValueScanner>scanners =getScannersNoCompaction();


//Seek all scanners to the start of the Row (or if the exact matchingrow

//key does not exist, then to the start of the next matching Row).

//Always check bloom filter to optimize the top row seek for delete

//family marker.

if(explicitColumnQuery&& lazySeekEnabledGlobally){

for(KeyValueScannerscanner :scanners) {

scanner.requestSeek(matcher.getStartKey(),false,true);

}

}else{

if(!isParallelSeekEnabled){

for(KeyValueScannerscanner :scanners) {

scanner.seek(matcher.getStartKey());

}

} else{

parallelSeek(scanners,matcher.getStartKey());

}

}


//set storeLimit

this.storeLimit= scan.getMaxResultsPerColumnFamily();


//set rowOffset

this.storeOffset= scan.getRowOffsetPerColumnFamily();


//Combine all seekedscanners with a heap

heap= newKeyValueHeap(scanners,store.getComparator());

注册,如果有storefile更新时,把更新后的storefile添加到这个StoreScanner中来。

this.store.addChangedReaderObserver(this);

}


发起scanrpc操作

client端发起openScanner操作后,得到一个scannerId.此时发起scan操作。

通过ScannerCallable.call中发起call的操作,在scannerId不等于-1时,


Result [] rrs= null;

ScanRequest request= null;

try{

incRPCcallsMetrics();

request= RequestConverter.buildScanRequest(scannerId,caching,false,nextCallSeq);

ScanResponse response= null;

PayloadCarryingRpcControllercontroller= newPayloadCarryingRpcController();

try{

controller.setPriority(getTableName());

response= getStub().scan(controller,request);

...................................此处省去一些代码

nextCallSeq++;

longtimestamp =System.currentTimeMillis();

//Results are returned via controller

CellScannercellScanner= controller.cellScanner();

rrs= ResponseConverter.getResults(cellScanner,response);



HregionServer.scan方法中对scan时的处理流程:

得到scan中的caching属性的值,此值主要用来响应client返回的条数。如果一行数据包含多个kv,算一条

introws = 1;

if(request.hasNumberOfRows()){

rows= request.getNumberOfRows();

}

如果client传入的scannerId有值,也就是不等于-1时,表示不是openScanner操作,检查scannerid是否过期

if(request.hasScannerId()){

rsh= scanners.get(scannerName);

if(rsh ==null){

LOG.info("Clienttried to access missing scanner " +scannerName);

thrownewUnknownScannerException(

"Name:" + scannerName+ ", already closed?");

}

此处主要是检查region是否发生过split操作。如果是会出现NotServingRegionException操作。

scanner= rsh.s;

HRegionInfo hri= scanner.getRegionInfo();

region= getRegion(hri.getRegionName());

if(region !=rsh.r){ // Yes, should be the same instance

thrownewNotServingRegionException("Regionwas re-opened after the scanner"

+ scannerName+ " was created: "+ hri.getRegionNameAsString());

}

} else{

...................................此处省去一些生成Regionscanner的代码

}

表示有设置caching,如果是执行scan,此时的默认值为1,当前scan中设置有caching后,使用scan中设置的值

if(rows >0) {

//if nextCallSeq does not match throw Exception straight away. Thisneeds to be

//performed even before checking of Lease.

//See HBASE-5974

是否有配置nextCallSeq的值,第一次调用时,此值为0,每调用一次加一,client也一样,每调用一次加一。

if(request.hasNextCallSeq()){

if(rsh ==null){

rsh= scanners.get(scannerName);

}

if(rsh !=null){

if(request.getNextCallSeq()!= rsh.nextCallSeq){

thrownewOutOfOrderScannerNextException("ExpectednextCallSeq: " + rsh.nextCallSeq

+ "But the nextCallSeq got from client: "+ request.getNextCallSeq()+

";request=" +TextFormat.shortDebugString(request));

}

//Increment the nextCallSeq value which is the next expected fromclient.

rsh.nextCallSeq++;

}

}

try{

先从租约管理中移出此租约,防止查找时间大于过期时间而出现的超时

//Remove lease while its being processed in server; protects againstcase

//where processing of request takes > lease expiration time.

lease= leases.removeLease(scannerName);

生成要返回的条数的一个列表,scan.caching

List<Result>results =newArrayList<Result>(rows);

longcurrentScanResultSize= 0;


booleandone =false;

调用cppreScannernext,如果返回为true,表示不在执行scan操作。

//Call coprocessor. Get region info from scanner.

if(region !=null&& region.getCoprocessorHost()!= null){

Boolean bypass= region.getCoprocessorHost().preScannerNext(

scanner,results,rows);

if(!results.isEmpty()){

for(Result r :results) {

if(maxScannerResultSize< Long.MAX_VALUE){

for(Cellkv :r.rawCells()){

//TODO

currentScanResultSize+= KeyValueUtil.ensureKeyValue(kv).heapSize();

}

}

}

}

if(bypass !=null&& bypass.booleanValue()){

done= true;

}

}

执行scan操作。CppreScannerNext返回为false,或没有设置cp(主要是RegionObServer)

返回给client的最大size通过hbase.client.scanner.max.result.size配置,默认为long.maxvalue

如果scan也设置有maxResultSize,使用scan设置的值

if(!done) {

longmaxResultSize= scanner.getMaxResultSize();

if(maxResultSize<= 0) {

maxResultSize= maxScannerResultSize;

}

List<Cell>values =newArrayList<Cell>();

MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());

region.startRegionOperation(Operation.SCAN);

try{

inti = 0;

synchronized(scanner){

此处开始迭代,开始调用regionScanner(HRegion.RegionScannerImpl.nextRaw(List))进行查找,

迭代的长度为scan设置的caching的大小,如果执行RegionScanner.nextRaw(List)返回为false,时也会停止迭代

for(; i <rows

&&currentScanResultSize< maxResultSize;i++) {

返回的true表示还有数据,可以接着查询,否则表示此region中已经没有符合条件的数据了。

//Collect values to be returned here

booleanmoreRows =scanner.nextRaw(values);

if(!values.isEmpty()){

if(maxScannerResultSize< Long.MAX_VALUE){

for(Cellkv :values) {

currentScanResultSize+= KeyValueUtil.ensureKeyValue(kv).heapSize();

}

}

results.add(Result.create(values));

}

if(!moreRows){

break;

}

values.clear();

}

}

region.readRequestsCount.add(i);

} finally{

region.closeRegionOperation();

}


//coprocessor postNext hook

if(region !=null&& region.getCoprocessorHost()!= null){

region.getCoprocessorHost().postScannerNext(scanner,results,rows,true);

}

}

如果没有可以再查找的数据时,设置responsemoreResultsfalse

//If the scanner's filter - if any - is done with the scan

//and wants to tell the client to stop the scan. This is done bypassing

//a null result, and setting moreResults to false.

if(scanner.isFilterDone()&& results.isEmpty()){

moreResults= false;

results= null;

} else{

添加结果到response中,如果hbase.client.rpc.codec配置有codec的值,

默认取hbase.client.default.rpc.codec配置的值,默认为KeyValueCodec

如果上面说的codec配置不为null时,把results生成为一个iterator,并生成一个匿名的CallScanner实现类

设置到scan时传入的controller中。这样能提升查询数据的读取性能。

如果没有配置codec时,默认直接把results列表设置到response中,这样响应的数据可能会比较大。

addResults(builder,results,controller);

}

} finally{

重新把租约放入到租约检查管理器中,此租约主要来检查client多长时间没有发起过scan的操作。

//We're done. On way out re-add the above removed lease.

//Adding resets expiration time on lease.

if(scanners.containsKey(scannerName)){

if(lease !=null)leases.addLease(lease);

ttl= this.scannerLeaseTimeoutPeriod;

}

}

}


client端获取响应的数据:ScannerCallable.call方法中

rrs= ResponseConverter.getResults(cellScanner,response);


ResponseConverter.getResults方法的实现

publicstaticResult[] getResults(CellScannercellScanner,ScanResponse response)

throwsIOException {

if(response== null)returnnull;

//If cellscanner,then the number of Results to return is the count of elements in the

//cellsPerResult list. Otherwise, it is how many results are embeddedinside the response.

intnoOfResults= cellScanner!= null?

response.getCellsPerResultCount():response.getResultsCount();

Result[] results= newResult[noOfResults];

for(inti = 0; i< noOfResults;i++) {

cellScanner如果codec配置为有值时,在rs响应时会生成一个匿名的实现

if(cellScanner!= null){

......................................

intnoOfCells =response.getCellsPerResult(i);

List<Cell>cells = newArrayList<Cell>(noOfCells);

for(intj = 0; j< noOfCells;j++) {

try{

if(cellScanner.advance()== false){

.....................................

String msg= "Results sent from server="+ noOfResults+ ". But only got "+ i

+ "results completely at client. Resetting the scanner to scan again.";

LOG.error(msg);

thrownewDoNotRetryIOException(msg);

}

} catch(IOException ioe){

...........................................

LOG.error("Exceptionwhile reading cells from result."

+ "Resettingthe scanner to scan again.", ioe);

thrownewDoNotRetryIOException("Resettingthe scanner.", ioe);

}

cells.add(cellScanner.current());

}

results[i]= Result.create(cells);

} else{

否则,没有设置codec,直接从response中读取出来数据,

//Result is pure pb.

results[i]= ProtobufUtil.toResult(response.getResults(i));

}

}

returnresults;

}


ClientScanner.next方法中,如果还没有达到scancaching的值,(默认为1)也就是countdown的值还不等于0

,countdown的值为得到一个Result时减1,通过nextScanner重新得到下一个region,并发起连接去scan数据。


Do{

.........................此处省去一些代码。

if(values !=null&& values.length> 0) {

for(Result rs: values) {

cache.add(rs);

for(Cellkv :rs.rawCells()){

//TODOmake method in Cell or CellUtil

remainingResultSize-= KeyValueUtil.ensureKeyValue(kv).heapSize();

}

countdown--;

this.lastResult= rs;

}

}

}while(remainingResultSize> 0 && countdown> 0 && nextScanner(countdown,values ==null));


对于这种类型的查询操作,可以使用得到一个ClientScanner后,不执行close操作。

rstimeout前每次定期去从rs中拿一定量的数据下来。缓存到ClientScannercache中。

每次next时从cache中直接拿数据


Hregion.RegionScannerImpl.nextRaw(list)方法分析

RegionScannerImpl是对RegionScanner接口的实现。

Rsscan在执行时通过regionScanner.nextRaw(list)来获取数据。

通过regionScanner.isFilterDone来检查此region的查找是否完成。


调用nextRaw方法,此方法调用另一个重载方法,batchscan中设置的每次可查询最大的单行中的多少个kvkv个数

publicbooleannextRaw(List<Cell>outResults)

throwsIOException {

returnnextRaw(outResults,batch);

}


publicbooleannextRaw(List<Cell>outResults,intlimit)throwsIOException {

booleanreturnResult;

调用nextInternal方法。

if(outResults.isEmpty()){

//Usually outResults is empty. This is true when next is called

//to handle scan or get operation.

returnResult= nextInternal(outResults,limit);

} else{

List<Cell>tmpList =newArrayList<Cell>();

returnResult= nextInternal(tmpList,limit);

outResults.addAll(tmpList);

}

调用filter.reset方法,清空当前rowfilter的相关信息。

ResetFilters();

如果filter.filterAllRemaining()的返回值为true,时表示当前region的查找条件已经结束,不能在执行查找操作。

没有可以接着查找的需要,也就是没有更多要查找的行了。

if(isFilterDone()){

returnfalse;

}

................................此处省去一些代码

returnreturnResult;

}


nextInternal方法处理流程:

privatebooleannextInternal(List<Cell>results,intlimit)

throwsIOException {

if(!results.isEmpty()){

thrownewIllegalArgumentException("Firstparameter should be an empty list");

}

RpcCallContextrpcCall =RpcServer.getCurrentCall();

//The loop here is used only when at some point during the next wedetermine

//that due to effects of filters or otherwise, we have an empty row inthe result.

//Then we loop and try again. Otherwise, we must get out on the firstiteration via return,

//"true" if there's more data to read, "false" ifthere isn't (storeHeap is at a stop row,

//and joinedHeap has no more data to read for the last row (if set,joinedContinuationRow).

while(true){

if(rpcCall !=null){

//If a user specifies a too-restrictive or too-slow scanner, the

//client might time out and disconnect while the server side

//is still processing the request. We should abort aggressively

//in that case.

longafterTime =rpcCall.disconnectSince();

if(afterTime>= 0) {

thrownewCallerDisconnectedException(

"Abortingon region " +getRegionNameAsString()+ ", call "+

this+ " after "+ afterTime+ " ms, since "+

"callerdisconnected");

}

}

得到通过startkeyseek后当前最小的一个kv

//Let's see what we have in the storeHeap.

KeyValue current= this.storeHeap.peek();


byte[]currentRow= null;

intoffset = 0;

shortlength = 0;

if(current !=null){

currentRow= current.getBuffer();

offset= current.getRowOffset();

length= current.getRowLength();

}

检查是否到了stopkey,如果是,返回false,joinedContinuationRow是多个cf的关联查找,不用去管它

booleanstopRow =isStopRow(currentRow,offset,length);

//Check if we were getting data from the joinedHeap and hit the limit.

//If not, then it's main path - getting results from storeHeap.

if(joinedContinuationRow== null){

//First, check if we are at a stop row. If so, there are no moreresults.

if(stopRow) {

如果是stopRow,同时filter.hasFilterRow返回为true时,

可通过filterRowCells来检查要返回的kvlist,也可以用来修改要返回的kvlist

if(filter !=null&& filter.hasFilterRow()){

filter.filterRowCells(results);

}

returnfalse;

}

通过filter.filterRowkey来过滤检查key是否需要排除,如果是排除返回true,否则返回false

//Check if rowkey filter wants to exclude this row. If so, loop tonext.

//Technically, if we hit limits before on this row, we don't need thiscall.

if(filterRowKey(currentRow,offset,length)) {

如果rowkey是需要排除的rowkey,检查是否有下一行数据。如果没有下一行数据,返回flase,表示当前region查找结束

否则清空当前的results,重新进行查找

booleanmoreRows =nextRow(currentRow,offset,length);

if(!moreRows)returnfalse;

results.clear();

continue;

}

开始执行region下此scan需要的所有storeStoreScannernext进行查找,把查找的结果放到results列表中。

如果一行中包含有多个kv,现在查找这些kv达到传入的limit的大小的时候,返回kv_limit的一个空的kv

(查找的大小已经达到limit(batch)的一行最大scankv个数,返回kv_limit),

否则表示还没有查找到limitkv个数,但是当前row对应的所有达到条件的kv都已经查找完成,返回最后一个kv

返回的kv如果不是kv_limit,那么有可能是null或者是下一行的第一个kv.

KeyValue nextKv= populateResult(results,this.storeHeap,limit,currentRow,offset,

length);

如果达到limit的限制时,filter.hasFilterRow的值一定得是false,

否则会throw IncompatibleFilterException

如果达到limit的限制时,返回true,当前row的所有kv查找结束,返回true可以接着向下查找

提示:如果hbase一行数据中可能包含多个kv时,最好是在scan时设置batch的属性,否则会一直查找到所有的kv结束

//Ok, we are good, let's try to get some results from the main heap.

if(nextKv ==KV_LIMIT) {

if(this.filter!= null&& filter.hasFilterRow()){

thrownewIncompatibleFilterException(

"Filterwhose hasFilterRow() returns true is incompatible with scan withlimit!");

}

returntrue;// We hit the limit.

}

是否到结束行,从这一行代码中可以看出,stoprow是不包含的,因为nextKv肯定是下一行row中第一个kv的值

stopRow= nextKv ==null||

isStopRow(nextKv.getBuffer(),nextKv.getRowOffset(),nextKv.getRowLength());

//save that the row was empty before filters applied to it.

finalbooleanisEmptyRow= results.isEmpty();


如果是stopRow,同时filter.hasFilterRow返回为true时,

可通过filterRowCells来检查要返回的kvlist,也可以用来修改要返回的kvlist

//We have the part of the row necessary for filtering (all of it,usually).

//First filter with the filterRow(List).

if(filter !=null&& filter.hasFilterRow()){

filter.filterRowCells(results);

}

如果当前row的查找没有找到合法的kv,也就是results的列表没有值,检查是否还有下一行,

如果有,重新进行查找,否则表示当前region的查找最结尾处,不能再进行查找,返回fasle

if(isEmptyRow){

booleanmoreRows =nextRow(currentRow,offset,length);

if(!moreRows)returnfalse;

results.clear();

//This row was totally filtered out, if this is NOT the last row,

//we should continue on. Otherwise, nothing else to do.

if(!stopRow)continue;

returnfalse;

}


//Ok, we are done with storeHeap for this row.

//Now we may need to fetch additional, non-essential data into row.

//These values are not needed for filter to work, so we postpone their

//fetch to (possibly) reduce amount of data loads from disk.

if(this.joinedHeap!= null){

..................................进行关联查找的代码,不显示,也不分析

}

} else{

多个store进行关联查询,不分析,通常情况不会有

//Populating from the joined heap was stopped by limits, populate somemore.

populateFromJoinedHeap(results,limit);

}


//We may have just called populateFromJoinedMap and hit the limits. Ifthat is

//the case, we need to call it again on the next next() invocation.

if(joinedContinuationRow!= null){

returntrue;

}

如果这次的查找,results的结果为空,表示没有查找到结果,检查是否还有下一行数据,如果有重新进行查找,

否则返回false表示此region的查找结束

//Finally, we are done with both joinedHeap and storeHeap.

//Double check to prevent empty rows from appearing in result. It couldbe

//the case when SingleColumnValueExcludeFilter is used.

if(results.isEmpty()){

booleanmoreRows =nextRow(currentRow,offset,length);

if(!moreRows)returnfalse;

if(!stopRow)continue;

}

stoprow时,表示还可以有下一行的数据,也就是可以接着进行next操作。否则表示此region的查找结束

//We are done. Return the result.

return!stopRow;

}

}


UserScan时的ScanQueryMatcher.match方法处理

userscan时的ScanQueryMatchernewRegionScannerImpl(scan,additionalScanners,this);时生成。

在生成StoreScanner时通过如下代码生成matcher实例。


matcher= newScanQueryMatcher(scan,scanInfo,columns,

ScanType.USER_SCAN,Long.MAX_VALUE,HConstants.LATEST_TIMESTAMP,

oldestUnexpiredTS);


matcher.isUserScan的值此时为true.


publicMatchCodematch(KeyValuekv) throwsIOException {

检查当前region的查找是否结束。pageFilter就是通过控制此filter中的方法来检查是否需要

if(filter !=null&& filter.filterAllRemaining()){

returnMatchCode.DONE_SCAN;

}


byte[] bytes =kv.getBuffer();

intoffset =kv.getOffset();


intkeyLength =Bytes.toInt(bytes,offset,Bytes.SIZEOF_INT);

offset+= KeyValue.ROW_OFFSET;


intinitialOffset= offset;


shortrowLength =Bytes.toShort(bytes,offset,Bytes.SIZEOF_SHORT);

offset+= Bytes.SIZEOF_SHORT;

检查传入的kv是否是当前行的kv,也就是rowkey是否相同,如果当前的rowkey小于传入的rowkey

表示现在已经next到下一行了,返回DONE,表示当前行查找结束

intret =this.rowComparator.compareRows(row,this.rowOffset,this.rowLength,

bytes,offset,rowLength);

if(ret <=-1) {

returnMatchCode.DONE;

}elseif(ret >=1) {

如果当前的rowkey大于传入的rowkey,表示当前next出来的kv比现在的kv要小,执行nextrow操作。

//could optimize this, if necessary?

//Could also be called SEEK_TO_CURRENT_ROW, but this

//should be rare/never happens.

returnMatchCode.SEEK_NEXT_ROW;

}

是否跳过当前行的其它kv比较,这是一个优化项。

//optimize case.

if(this.stickyNextRow)

returnMatchCode.SEEK_NEXT_ROW;

如果当前行的所有要查找的(scan)column都查找完成了,其它的当前行中非要scankv

直接不比较,执行nextrow操作。

if(this.columns.done()){

stickyNextRow= true;

returnMatchCode.SEEK_NEXT_ROW;

}


//PassingrowLength

offset+= rowLength;


//Skippingfamily

bytefamilyLength= bytes[offset];

offset+= familyLength+ 1;


intqualLength= keyLength-

(offset- initialOffset)- KeyValue.TIMESTAMP_TYPE_SIZE;

检查当前KVTTL是否过期,如果过期,检查是否SCAN中还有下一个COLUMN,如果有返回SEEK_NEXT_COL

否则返回SEEK_NEXT_ROW

longtimestamp =Bytes.toLong(bytes,initialOffset+ keyLength- KeyValue.TIMESTAMP_TYPE_SIZE);

//check for early out based on timestampalone

if(columns.isDone(timestamp)){

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

}


/*

*The delete logic is pretty complicated now.

*This is corroborated by the following:

*1. The store might be instructed to keep deleted rows around.

*2. A scan can optionally see past a delete marker now.

*3. If deleted rows are kept, we have to find out when we can

* remove the delete markers.

*4. Family delete markers are always first (regardless of their TS)

*5. Delete markers should not be counted as version

*6. Delete markers affect puts of the *same* TS

*7. Delete marker need to be version counted together with puts

* they affect

*/

bytetype =bytes[initialOffset+ keyLength– 1];

如果当前KV是删除的KV

if(kv.isDelete()){

此处会进入。把删除的KV添加到DeleteTracker中,默认是ScanDeleteTracker

if(!keepDeletedCells){

//first ignore delete markers if the scanner can do so, and the

//range does not include the marker

//

//during flushes and compactionsalso ignore delete markers newer

//than the readpointof any open scanner, this prevents deleted

//rows that could still be seen by a scanner from being collected

booleanincludeDeleteMarker= seePastDeleteMarkers?

tr.withinTimeRange(timestamp):

tr.withinOrAfterTimeRange(timestamp);

if(includeDeleteMarker

&&kv.getMvccVersion()<= maxReadPointToTrackVersions){

this.deletes.add(bytes,offset,qualLength,timestamp,type);

}

//Can't early out now, because DelFam come before any other keys

}

此处的检查不会进入,userscan不保留删除的数据

if(retainDeletesInOutput

|| (!isUserScan&& (EnvironmentEdgeManager.currentTimeMillis()- timestamp)<= timeToPurgeDeletes)

|| kv.getMvccVersion()> maxReadPointToTrackVersions){

//always include or it is not time yet to check whether it is OK

//to purge deltesor not

if(!isUserScan){

//if this is not a user scan (compaction), we can filter thisdeletemarkerright here

//otherwise (i.e. a "raw" scan) we fall through to normalversion and timerangechecking

returnMatchCode.INCLUDE;

}

} elseif(keepDeletedCells){

if(timestamp< earliestPutTs){

//keeping delete rows, but there are no puts older than

//this delete in the store files.

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

}

//else: fall through and do version counting on the

//delete markers

} else{

returnMatchCode.SKIP;

}

//note the following next else if...

//delete marker are not subject to other delete markers

}elseif(!this.deletes.isEmpty()){

如果deleteTracker中不为空时,也就是当前行中有删除的KV,检查当前KV是否是删除的KV

提示:删除的KVcompare时,比正常的KV要小,所以在执行next操作时,deleteKV会先被查找出来。

如果是删除的KV,根据KV的删除类型,如果是版本被删除,返回SKIP

否则如果SCAN中还有下一个要SCANcolumn时,返回SEEK_NEXT_COL

否则表示当前行没有需要在进行查找的KV,返回SEEK_NEXT_ROW

DeleteResultdeleteResult= deletes.isDeleted(bytes,offset,qualLength,

timestamp);

switch(deleteResult){

caseFAMILY_DELETED:

caseCOLUMN_DELETED:

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

caseVERSION_DELETED:

caseFAMILY_VERSION_DELETED:

returnMatchCode.SKIP;

caseNOT_DELETED:

break;

default:

thrownewRuntimeException("UNEXPECTED");

}

}

检查KV的时间是否在SCAN要查找的时间范围内,

inttimestampComparison= tr.compare(timestamp);

如果大于SCAN的最大时间,返回SKIP

if(timestampComparison>= 1) {

returnMatchCode.SKIP;

}elseif(timestampComparison<= -1) {

如果小于SCAN的最小时间,如果SCAN中还有下一个要SCANcolumn时,返回SEEK_NEXT_COL

否则表示当前行没有需要在进行查找的KV,返回SEEK_NEXT_ROW

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

}

检查当前KVcolumn是否是SCAN中指定的column列表中包含的值,如果是INCLUDE

否则如果SCAN中还有下一个要SCANcolumn时,返回SEEK_NEXT_COL

否则表示当前行没有需要在进行查找的KV,返回SEEK_NEXT_ROW

//STEP 1: Check if the column is part of the requested columns

MatchCodecolChecker= columns.checkColumn(bytes,offset,qualLength,type);

如果columnSCAN中要查找的column之一

if(colChecker== MatchCode.INCLUDE){

ReturnCodefilterResponse= ReturnCode.SKIP;

//STEP 2: Yes, the column is part of the requested columns. Check iffilter is present

if(filter !=null){

执行filter.filterKeyValue操作。并返回filter过滤的结果

//STEP 3: Filter the key value and return if it filters out

filterResponse= filter.filterKeyValue(kv);

switch(filterResponse){

caseSKIP:

returnMatchCode.SKIP;

caseNEXT_COL:

如果SCAN中还有下一个要SCANcolumn时,返回SEEK_NEXT_COL

否则表示当前行没有需要在进行查找的KV,返回SEEK_NEXT_ROW

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

caseNEXT_ROW:

stickyNextRow= true;

returnMatchCode.SEEK_NEXT_ROW;

caseSEEK_NEXT_USING_HINT:

returnMatchCode.SEEK_NEXT_USING_HINT;

default:

//Itmeans it is either include or include and seek next

break;

}

}

/*

* STEP 4: Reaching this stepmeans the column is part of the requested columns and either

* the filter is null or thefilter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.

* Now check the number ofversions needed. This method call returns SKIP, INCLUDE,

* INCLUDE_AND_SEEK_NEXT_ROW,INCLUDE_AND_SEEK_NEXT_COL.

*

* FilterResponse ColumnChecker Desired behavior

* INCLUDE SKIP row has already been included, SKIP.

* INCLUDE INCLUDE INCLUDE

* INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

* INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

* INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.

* INCLUDE_AND_SEEK_NEXT_COLINCLUDE INCLUDE_AND_SEEK_NEXT_COL

* INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

* INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

*

* In all the above scenarios, wereturn the column checker return value except for

* FilterResponse(INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)

*/


此处主要是检查KV的是否是SCAN的最大版本内,到这个地方,除非是KV超过了要SCAN的最大版本,或者KVTTL过期。

否则肯定是会包含此KV的值。


colChecker=

columns.checkVersions(bytes,offset,qualLength,timestamp,type,

kv.getMvccVersion()> maxReadPointToTrackVersions);

//Optimizewith stickyNextRow

stickyNextRow= colChecker== MatchCode.INCLUDE_AND_SEEK_NEXT_ROW? true: stickyNextRow;

return(filterResponse== ReturnCode.INCLUDE_AND_NEXT_COL&&

colChecker== MatchCode.INCLUDE)? MatchCode.INCLUDE_AND_SEEK_NEXT_COL

: colChecker;

}

stickyNextRow= (colChecker== MatchCode.SEEK_NEXT_ROW)? true

: stickyNextRow;

returncolChecker;

}


相关内容