1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      flume-ng怎么自定義攔截器

      本篇內(nèi)容主要講解“flume-ng怎么自定義攔截器”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“flume-ng怎么自定義攔截器”吧!

      成都創(chuàng)新互聯(lián)公司長期為超過千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為華州企業(yè)提供專業(yè)的成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì),華州網(wǎng)站改版等技術(shù)服務(wù)。擁有10多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

      代碼如下:

      package com.wy.flume.interceptor;
      
      import java.util.List;
      import java.util.Map;
      import java.util.regex.Matcher;
      import java.util.regex.Pattern;
      
      import org.apache.commons.lang.StringUtils;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;
      import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;
      import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import com.google.common.base.Charsets;
      import com.google.common.base.Preconditions;
      import com.google.common.base.Throwables;
      import com.google.common.collect.Lists;
      
      public class RegexExtractorHeaderInterceptor implements Interceptor {
      
          static final String REGEX = "regex";
          static final String SERIALIZERS = "serializers";
          
          
          static final String EXTRACTOR_HEADER = "extractorHeader";  
          static final boolean DEFAULT_EXTRACTOR_HEADER = false;  
          static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey"; 
      
          private static final Logger logger = LoggerFactory
              .getLogger(RegexExtractorHeaderInterceptor.class);
      
          private final Pattern regex;
          private final List serializers;
      
          private final boolean extractorHeader;  
          private final String extractorHeaderKey;  
          
          private RegexExtractorHeaderInterceptor(Pattern regex,
              List serializers,boolean extractorHeader, String extractorHeaderKey) {
            this.regex = regex;
            this.serializers = serializers;
            
            this.extractorHeader = extractorHeader;
            this.extractorHeaderKey = extractorHeaderKey;
            
          }
      
          @Override
          public void initialize() {
            // NO-OP...
          }
      
          @Override
          public void close() {
            // NO-OP...
          }
      
          @Override
          public Event intercept(Event event) {
            String extractorHeaderVal;
            if (extractorHeader){
                
                extractorHeaderVal = event.getHeaders().get(extractorHeaderKey);
                
            }else{
                
                extractorHeaderVal = new String(event.getBody(),Charsets.UTF_8);
                
            }
            
            Matcher matcher = regex.matcher(extractorHeaderVal);
            Map headers = event.getHeaders();
            if (matcher.find()) {
              for (int group = 0, count = matcher.groupCount(); group < count; group++) {
                int groupIndex = group + 1;
                if (groupIndex > serializers.size()) {
                  if (logger.isDebugEnabled()) {
                    logger.debug("Skipping group {} to {} due to missing serializer",
                        group, count);
                  }
                  break;
                }
                NameAndSerializer serializer = serializers.get(group);
                if (logger.isDebugEnabled()) {
                  logger.debug("Serializing {} using {}", serializer.headerName,
                      serializer.serializer);
                }
                headers.put(serializer.headerName,
                    serializer.serializer.serialize(matcher.group(groupIndex)));
              }
            }
            return event;
          }
      
          @Override
          public List intercept(List events) {
            List intercepted = Lists.newArrayListWithCapacity(events.size());
            for (Event event : events) {
              Event interceptedEvent = intercept(event);
              if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
              }
            }
            return intercepted;
          }
      
          public static class Builder implements Interceptor.Builder {
      
            private Pattern regex;
            private List serializerList;
            
            private boolean extractorHeader;
            private String extractorHeaderKey;
            
            private final RegexExtractorInterceptorPassThroughSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
            
      
            @Override
            public void configure(Context context) {
              String regexString = context.getString(REGEX);
              Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
                  "Must supply a valid regex string");
              regex = Pattern.compile(regexString);
              regex.pattern();
              regex.matcher("").groupCount();
              configureSerializers(context);
              
              extractorHeader = context.getBoolean(EXTRACTOR_HEADER,DEFAULT_EXTRACTOR_HEADER);
              
              if (extractorHeader){
                  
                  extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);
                  Preconditions.checkArgument(!StringUtils.isEmpty(extractorHeaderKey),"header key must");
                  
              }
              
            }
      
            private void configureSerializers(Context context) {
              String serializerListStr = context.getString(SERIALIZERS);
              Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),
                  "Must supply at least one name and serializer");
      
              String[] serializerNames = serializerListStr.split("\\s+");
      
              Context serializerContexts =
                  new Context(context.getSubProperties(SERIALIZERS + "."));
      
              serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
              for(String serializerName : serializerNames) {
                Context serializerContext = new Context(
                    serializerContexts.getSubProperties(serializerName + "."));
                String type = serializerContext.getString("type", "DEFAULT");
                String name = serializerContext.getString("name");
                Preconditions.checkArgument(!StringUtils.isEmpty(name),
                    "Supplied name cannot be empty.");
      
                if("DEFAULT".equals(type)) {
                  serializerList.add(new NameAndSerializer(name, defaultSerializer));
                } else {
                  serializerList.add(new NameAndSerializer(name, getCustomSerializer(
                      type, serializerContext)));
                }
              }
            }
      
            private RegexExtractorInterceptorSerializer getCustomSerializer(
                String clazzName, Context context) {
              try {
                RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class
                    .forName(clazzName).newInstance();
                serializer.configure(context);
                return serializer;
              } catch (Exception e) {
                logger.error("Could not instantiate event serializer.", e);
                Throwables.propagate(e);
              }
              return defaultSerializer;
            }
      
            @Override
            public Interceptor build() {
              Preconditions.checkArgument(regex != null,
                  "Regex pattern was misconfigured");
              Preconditions.checkArgument(serializerList.size() > 0,
                  "Must supply a valid group match id list");
              return new RegexExtractorHeaderInterceptor(regex, serializerList, extractorHeader, extractorHeaderKey);
            }
          }
      
          static class NameAndSerializer {
            private final String headerName;
            private final RegexExtractorInterceptorSerializer serializer;
      
            public NameAndSerializer(String headerName,
                RegexExtractorInterceptorSerializer serializer) {
              this.headerName = headerName;
              this.serializer = serializer;
            }
          }
        }

      應(yīng)用配置:

      hdp2.sources.s1.interceptors = i2
      hdp2.sources.s1.interceptors.i2.type = com.wy.flume.interceptor.RegexExtractorHeaderInterceptor$Builder
      hdp2.sources.s1.interceptors.i2.regex = ([^_]+)_(\\d{8}).*
      hdp2.sources.s1.interceptors.i2.extractorHeader = true
      hdp2.sources.s1.interceptors.i2.extractorHeaderKey = basename
      hdp2.sources.s1.interceptors.i2.serializers = s1 s2
      hdp2.sources.s1.interceptors.i2.serializers.s1.name = log_type
      hdp2.sources.s1.interceptors.i2.serializers.s2.name = log_day

      到此,相信大家對“flume-ng怎么自定義攔截器”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


      本文名稱:flume-ng怎么自定義攔截器
      網(wǎng)站URL:http://ef60e0e.cn/article/gsjipg.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        宁阳县| 西乌珠穆沁旗| 平阳县| 改则县| 七台河市| 定安县| 高要市| 宿州市| 凤山县| 申扎县| 灯塔市| 邵阳市| 富裕县| 历史| 仁化县| 葵青区| 新兴县| 金阳县| 新宁县| 青龙| 江城| 麻江县| 益阳市| 运城市| 乌苏市| 郓城县| 毕节市| 德化县| 长沙市| 吉安县| 攀枝花市| 苗栗市| 吉隆县| 邻水| 简阳市| 偃师市| 大英县| 石渠县| 吉安市| 邹城市| 九寨沟县|