flume 问题分析与处理


问题一:

org.apache.flume.EventDeliveryException:Failed to send events

       atorg.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)

       atorg.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

       at java.lang.Thread.run(Thread.java:722)

Caused by:org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host:10.95.198.123, port: 44444 }: Failed to send batch

       at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:294)

       atorg.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:366)

       ... 3 more

Caused by:org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host:10.95.198.123, port: 44444 }: Avro RPC call returned Status: FAILED

       atorg.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:370)

分析:

代码分析

    try {
      appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS);
    } catch (Throwable t) {
      // we mark as no longer active without trying to clean up resources
      // client is required to call close() to clean up resources
      setState(ConnState.DEAD);
      if (t instanceof Error) {
        throw (Error) t;
      }
      if (t instanceof TimeoutException) {
        throw new EventDeliveryException(this + ": Failed to send event. " +
            "RPC request timed out after " + requestTimeout + " ms", t);
      }
      throw new EventDeliveryException(this + ": Failed to send batch", t);

    }

请求超时,导致发送event失败

解决:

设置request-timeout长一点,默认20秒



问题二:

org.apache.flume.ChannelException: Unableto put batch on required channel: org.apache.flume.channel.MemoryChannel{name:woStoreSoftWDownloadC2}

       atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

       atorg.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:376)

       atorg.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:336)

       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

       at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

       at java.util.concurrent.FutureTask.run(FutureTask.java:166)

       atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

       atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

       at java.lang.Thread.run(Thread.java:722)

Caused by:org.apache.flume.ChannelException: Space for commit to queue couldn't beacquired Sinks are likely not keeping up with sources, or the buffer size istoo tight

       atorg.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)

       atorg.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)

        atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

       ... 8 more

30 Mar 2014 10:16:00,960 ERROR[timedFlushExecService18-0](org.apache.flume.source.ExecSource$ExecRunnable$1.run:322)  - Exception occured when processing eventbatch

org.apache.flume.ChannelException: Unableto put batch on required channel: org.apache.flume.channel.MemoryChannel{name:woStoreSoftWDownloadC2}

       atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

       atorg.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:376)

       atorg.apache.flume.source.ExecSource$ExecRunnable.access$100(ExecSource.java:249)

       at org.apache.flume.source.ExecSource$ExecRunnable$1.run(ExecSource.java:318)

       atjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

       atjava.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)

       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)

        at 


代码分析:

 protected void doCommit() throws InterruptedException {
      int remainingChange = takeList.size() - putList.size();
      if(remainingChange < 0) {
        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
          throw new ChannelException("Space for commit to queue couldn't be acquired" +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        putList.clear();
        takeList.clear();
      }
      queueStored.release(puts);
      if(remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }


      channelCounter.setChannelSize(queue.size());
    }

等待keep-alive之后,还是没办法插入event


解决方案:

设置keep-alive(默认3秒),capacity(100),transactionCapacity(100)大一点

相关内容