HDFSEventSink用于把数据从channel中拿出来(主动pull的形式)然后放到hdfs中,HDFSEventSink在启动时会启动两个线程池callTimeoutPool 和timedRollerPool ,callTimeoutPool 用于运行append/flush等操作hdfs的task(通过callWithTimeout方法调用,并实现timeout功能),用于运行翻转文件的计划任务timedRollerPool:

    callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());    timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,            new ThreadFactoryBuilder().setNameFormat(rollerName).build());

channel到sink的操作最终调用了sink的process方法(由SinkProcessor实现类调用),比如HDFSEventSink的process方法,每个process方法中都是一个事务,用来提供原子性操作,process方法调用Channel的take方法从Channel中取出Event,每个transaction中最多的Event数量由hdfs.batchSize设定,默认是100,对每一个Event有如下操作:

1.获取文件的完整路径和名称lookupPath
2.声明一个BucketWriter对象和HDFSWriter 对象,HDFSWriter由hdfs.fileType设定,负责实际数据的写入,BucketWriter可以理解成对hdfs文件和写入方法的封装,每个lookupPath对应一个BucketWriter对象,对应关系写入到sfWriters中(这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类(private static class WriterLinkedHashMap  extends LinkedHashMap<String, BucketWriter>),用来存放文件到BucketWriter的对应关系,在start方法中初始化:
this.sfWriters = new WriterLinkedHashMap( maxOpenFiles);
长度为hdfs.maxOpenFiles的设置,默认为5000,这个代表最多能打开的文件数量)
3.调用BucketWriter的append方法写入数据
4.当操作的Event数量达到hdfs.batchSize设定后,循环调用每个BucketWriter对象的flush方法,并提交transaction
5.如果出现异常则回滚事务
6.最后关闭transaction
process方法最后返回的是代表Sink状态的Status对象(BACKOFF或者READY),这个可以用于判断Sink的健康状态,比如failover的SinkProcessor就根据这个来判断Sink是否可以提供服务
主要方法分析:
1.构造函数声明一个HDFSWriterFactory对象
在后面会使用HDFSWriterFactory的getWriter方法会根据file类型返回对应的HDFSWriter实现类
2.configure
1)通过configure方法会根据Context设置各种参数项
比如:

inUseSuffix = context.getString( "hdfs.inUseSuffix", defaultInUseSuffix ); //正在写入的文件的后缀名,默认为".tmp"rollInterval = context.getLong( "hdfs.rollInterval", defaultRollInterval ); //文件翻转时间,默认30rollSize = context.getLong( "hdfs.rollSize", defaultRollSize ); //文件翻转大小,默认1024rollCount = context.getLong( "hdfs.rollCount", defaultRollCount ); //默认为10batchSize = context.getLong( "hdfs.batchSize", defaultBatchSize ); //默认为100idleTimeout = context.getInteger( "hdfs.idleTimeout", 0); //默认为String codecName = context.getString( "hdfs.codeC"); //压缩格式fileType = context.getString( "hdfs.fileType", defaultFileType ); //默认为HDFSWriterFactory.SequenceFileType,即sequencefilemaxOpenFiles = context.getInteger( "hdfs.maxOpenFiles", defaultMaxOpenFiles ); //默认为5000callTimeout = context.getLong( "hdfs.callTimeout", defaultCallTimeout ); //BucketWriter超时时间,默认为10000threadsPoolSize = context.getInteger( "hdfs.threadsPoolSize",        defaultThreadPoolSize); //操作append/open/close/flush任务的线程池大小,默认为10rollTimerPoolSize = context.getInteger( "hdfs.rollTimerPoolSize",        defaultRollTimerPoolSize); //文件翻转计时器线程池大小,默认为1tryCount = context.getInteger( "hdfs.closeTries", defaultTryCount ); //尝试close文件的此数(大于0)retryInterval = context.getLong( "hdfs.retryInterval", defaultRetryInterval); //间隔时间(大于0)

2)获取压缩格式

    if (codecName == null) { //如果hdfs.codeC没有设置      codeC = null; //则没有压缩功能      compType = CompressionType. NONE;     } else {      codeC = getCodec(codecName);  //调用getCodec方法获取压缩格式      // TODO : set proper compression type      compType = CompressionType. BLOCK; //压缩类型为BLOCK类型    }

3)hdfs文件翻转相关设置,在实例化BucketWriter对象时会用到

   needRounding = context.getBoolean( "hdfs.round", false );    if(needRounding) {      String unit = context.getString( "hdfs.roundUnit", "second" );      if (unit.equalsIgnoreCase( "hour")) {        this.roundUnit = Calendar.HOUR_OF_DAY;      } else if (unit.equalsIgnoreCase("minute" )) {        this.roundUnit = Calendar.MINUTE;      } else if (unit.equalsIgnoreCase("second" )){        this.roundUnit = Calendar.SECOND;      } else {        LOG.warn("Rounding unit is not valid, please set one of" +            "minute, hour, or second. Rounding will be disabled" );        needRounding = false ;      }      this. roundValue = context.getInteger("hdfs.roundValue" , 1);      if(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){        Preconditions. checkArgument(roundValue > 0 && roundValue <= 60,            "Round value" +            "must be > 0 and <= 60");      } else if (roundUnit == Calendar.HOUR_OF_DAY){        Preconditions. checkArgument(roundValue > 0 && roundValue <= 24,            "Round value" +            "must be > 0 and <= 24");      }    }

4)最后初始化一个SinkCounter对象用来记录sink的性能数据

    if (sinkCounter == null) {      sinkCounter = new SinkCounter(getName());    }

3.start方法用来启动线程池等

  public void start() {    String timeoutName = "hdfs-" + getName() + "-call-runner-%d" ;    callTimeoutPool = Executors. newFixedThreadPool(threadsPoolSize,            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());    String rollerName = "hdfs-" + getName() + "-roll-timer-%d" ;    timedRollerPool = Executors. newScheduledThreadPool(rollTimerPoolSize,            new ThreadFactoryBuilder().setNameFormat(rollerName).build());    this. sfWriters = new WriterLinkedHashMap(maxOpenFiles); //初始化WriterLinkedHashMap对象    sinkCounter.start();    super.start();  }

4.process方法,从channel中pull出数据并发送到hdfs中(每一个transaction中最多可以有batchSize条Event),获取对应的bucket,序列化数据并写入hdfs文件

  public Status process() throws EventDeliveryException {    Channel channel = getChannel(); //获取对应的channel    Transaction transaction = channel.getTransaction(); //获取Transaction 对象,提供事务功能    List
 writers = Lists. newArrayList();    transaction.begin(); //事务开始    try {      int txnEventCount = 0;      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//这里一个transaction存放的数据最多由hdfs.batchSize指定        Event event = channel.take(); //循环调用Channel的take方法获取Event        if (event == null) {          break;        }        // reconstruct the path name by substituting place holders        String realPath = BucketPath. escapeString(filePath, event.getHeaders(),            timeZone, needRounding, roundUnit , roundValue ); //设置文件路径        String realName = BucketPath. escapeString(fileName, event.getHeaders(),          timeZone, needRounding, roundUnit , roundValue ); //设置文件名称        String lookupPath = realPath + DIRECTORY_DELIMITER + realName; //完整的文件名称        BucketWriter bucketWriter = sfWriters.get(lookupPath);  //根据文件获取对应的BucketWriter对象        // we haven't seen this file yet, so open it and cache the handle        if (bucketWriter == null) {          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //根据文件类型获取HDFSWriter 对象          WriterCallback idleCallback = null;          if(idleTimeout != 0) {            idleCallback = new WriterCallback() {              @Override              public void run(String bucketPath) {                sfWriters.remove(bucketPath); //回调方法              }            };          }          bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount ,              batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,              suffix, codeC, compType, hdfsWriter, timedRollerPool,              proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //实例化BucketWriter          sfWriters.put(lookupPath, bucketWriter); //这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类,用来存放文件到BucketWriter的对应关系,在start方法中初始化:this .sfWriters = new WriterLinkedHashMap(maxOpenFiles);大小为hdfs.maxOpenFiles的设置,默认为5000        }        // track the buckets getting written in this transaction        if (!writers.contains(bucketWriter)) { //List
 writers = Lists.newArrayList();          writers.add(bucketWriter);        }        // Write the data to HDFS        append(bucketWriter, event); //调用append方法写入Event数据      }      if (txnEventCount == 0) {        sinkCounter.incrementBatchEmptyCount();      } else if (txnEventCount == batchSize ) {        sinkCounter.incrementBatchCompleteCount();      } else {        sinkCounter.incrementBatchUnderflowCount();      }      // flush all pending buckets before committing the transaction      for (BucketWriter bucketWriter : writers) {        flush(bucketWriter); //调用flush方法      }      transaction.commit(); //事务提交      if (txnEventCount < 1) {        return Status.BACKOFF ;      } else {        sinkCounter.addToEventDrainSuccessCount(txnEventCount);        return Status.READY ;      }    } catch (IOException eIO) { //如果异常则回滚事务      transaction.rollback();      LOG.warn( "HDFS IO error", eIO);      return Status. BACKOFF;    } catch (Throwable th) {      transaction.rollback();      LOG.error( "process failed", th);      if (th instanceof Error) {        throw (Error) th;      } else {        throw new EventDeliveryException(th);      }    } finally {      transaction.close();    }  }

5.同时定义了几个操作BucketWriter的方法append,flush,close

 1) private void append(final BucketWriter bucketWriter, final Event event)          throws IOException, InterruptedException {    // Write the data to HDFS    callWithTimeout(new Callable
() { //注意这里使用callWithTimeout提供了调用的超时功能      public Void call() throws Exception {        bucketWriter.append(event); //调用BucketWriter.append方法写入Event数据        return null;      }    });  }2)flush-->BucketWriter.flush()3) close-->BucketWriter.close()