标签:Flume String org sink str import kudu public row
kudu中的flume sink代码路径:
https://github.com/apache/kudu/tree/master/java/kudu-flume-sink
kudu-flume-sink默认使用的producer是
org.apache.kudu.flume.sink.SimpleKuduOperationsProducer
public List<Operation> getOperations(Event event) throws FlumeException { try { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); row.addBinary(payloadColumn, event.getBody()); return Collections.singletonList((Operation) insert); } catch (Exception e) { throw new FlumeException("Failed to create Kudu Insert object", e); } }
是将消息直接存放到一个payload列中
如果想要支持json格式数据,需要二次开发
package com.cloudera.kudu; public class JsonKuduOperationsProducer implements KuduOperationsProducer {
网上已经有人共享出来代码:https://cloud.tencent.com/developer/article/1158194
但是以上代码有几个不方便的地方,1)不允许null;2)对时间类型支持不好;3)所有的值必须是string,然后根据kudu中字段类型进行解析,在生成数据时需要注意,否则需要自行修改代码;
针对以上不便修改后代码如下:
JsonKuduOperationsProducer.java
package com.cloudera.kudu; import com.google.common.collect.Lists; import com.google.common.base.Preconditions; import org.json.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; import org.apache.kudu.flume.sink.KuduOperationsProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.List; import java.util.function.Function; @InterfaceAudience.Public @InterfaceStability.Evolving public class JsonKuduOperationsProducer implements KuduOperationsProducer { private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class); private static final String INSERT = "insert"; private static final String UPSERT = "upsert"; private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT); public static final String ENCODING_PROP = "encoding"; public static final String DEFAULT_ENCODING = "utf-8"; public static final String OPERATION_PROP = "operation"; public static final String DEFAULT_OPERATION = UPSERT; public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn"; public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false; public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue"; public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false; public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows"; public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true; private KuduTable table; private Charset charset; private String operation; private boolean skipMissingColumn; private boolean skipBadColumnValue; private boolean warnUnmatchedRows; public JsonKuduOperationsProducer() { } @Override public void configure(Context context) { String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING); try { charset = Charset.forName(charsetName); } catch (IllegalArgumentException e) { throw new FlumeException( String.format("Invalid or unsupported charset %s", charsetName), e); } operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase(); Preconditions.checkArgument( validOperations.contains(operation), "Unrecognized operation '%s'", operation); skipMissingColumn = context.getBoolean(SKIP_MISSING_COLUMN_PROP, DEFAULT_SKIP_MISSING_COLUMN); skipBadColumnValue = context.getBoolean(SKIP_BAD_COLUMN_VALUE_PROP, DEFAULT_SKIP_BAD_COLUMN_VALUE); warnUnmatchedRows = context.getBoolean(WARN_UNMATCHED_ROWS_PROP, DEFAULT_WARN_UNMATCHED_ROWS); } @Override public void initialize(KuduTable table) { this.table = table; } @Override public List<Operation> getOperations(Event event) throws FlumeException { String raw = new String(event.getBody(), charset); logger.debug("get raw:" + raw); List<Operation> ops = Lists.newArrayList(); if(raw != null && !raw.isEmpty()) { JSONObject json = null; //just pass if it is not a json try { json = new JSONObject(raw); } catch (Exception e) { e.printStackTrace(); } if (json != null) { Schema schema = table.getSchema(); Operation op; switch (operation) { case UPSERT: op = table.newUpsert(); break; case INSERT: op = table.newInsert(); break; default: throw new FlumeException( String.format("Unrecognized operation type '%s' in getOperations(): " + "this should never happen!", operation)); } PartialRow row = op.getRow(); for (ColumnSchema col : schema.getColumns()) { try { coerceAndSet(json.get(col.getName()), col.getName(), col.getType(), row); } catch (NumberFormatException e) { String msg = String.format( "Raw value '%s' couldn't be parsed to type %s for column '%s'", raw, col.getType(), col.getName()); logOrThrow(skipBadColumnValue, msg, e); } catch (IllegalArgumentException e) { String msg = String.format( "Column '%s' has no matching group in '%s'", col.getName(), raw); logOrThrow(skipMissingColumn, msg, e); } catch (Exception e) { throw new FlumeException("Failed to create Kudu operation", e); } } ops.add(op); } } return ops; } protected <T> T getValue(T defaultValue, Object val, boolean compressException, Function<String, T> fromStr) { T result = defaultValue; try { if (val != null) { boolean isConverted = false; //handle: try to convert directly try { result = (T)val; isConverted = true; } catch (Exception e1) { // e1.printStackTrace(); } //handle: parse from string if (!isConverted) result = fromStr.apply(val.toString()); } } catch(Exception e) { if (compressException) e.printStackTrace(); else throw e; } return result; } private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private void coerceAndSet(Object rawVal, String colName, Type type, PartialRow row) throws NumberFormatException { switch (type) { case INT8: row.addByte(colName, this.getValue((byte)0, rawVal, this.skipBadColumnValue, (String str) -> Byte.parseByte(str))); break; case INT16: row.addShort(colName, this.getValue((short)0, rawVal, this.skipBadColumnValue, (String str) -> Short.parseShort(str))); break; case INT32: row.addInt(colName, this.getValue(0, rawVal, this.skipBadColumnValue, (String str) -> Integer.parseInt(str))); break; case INT64: row.addLong(colName, this.getValue(0l, rawVal, this.skipBadColumnValue, (String str) -> Long.parseLong(str))); break; case BINARY: row.addBinary(colName, rawVal == null ? new byte[0] : rawVal.toString().getBytes(charset)); break; case STRING: row.addString(colName, rawVal == null ? "" : rawVal.toString()); break; case BOOL: row.addBoolean(colName, this.getValue(false, rawVal, this.skipBadColumnValue, (String str) -> Boolean.parseBoolean(str))); break; case FLOAT: row.addFloat(colName, this.getValue(0f, rawVal, this.skipBadColumnValue, (String str) -> Float.parseFloat(str))); break; case DOUBLE: row.addDouble(colName, this.getValue(0d, rawVal, this.skipBadColumnValue, (String str) -> Double.parseDouble(str))); break; case UNIXTIME_MICROS: row.addLong(colName, this.getValue(0l, rawVal, this.skipBadColumnValue, (String str) -> { Long result = 0l; boolean isPatternOk =false; //handle: yyyy-MM-dd HH:mm:ss if (str.contains("-") && str.contains(":") && str.contains(" ")) { try { result = this.sdf.parse(str).getTime() * 1000; isPatternOk = true; } catch (Exception e) { // e.printStackTrace(); } } //handle: second, millisecond, microsecond if (!isPatternOk && (str.length() == 10 || str.length() == 13 || str.length() == 16)) { result = Long.parseLong(str); if (str.length() == 10) result *= 1000000; if (str.length() == 13) result *= 1000; } return result; })); break; default: logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName); } } private void logOrThrow(boolean log, String msg, Exception e) throws FlumeException { if (log) { logger.warn(msg, e); } else { throw new FlumeException(msg, e); } } @Override public void close() { } }
去掉类JsonStr2Map,主要是getValue和coerceAndSet配合,支持默认值,支持null,支持传递任意类型(自动适配处理),时间类型支持yyyy-MM-dd HH:mm:ss和秒、毫秒、微秒4种格式,并且会自动将秒和毫秒转成微秒;
打包放到$FLUME_HOME/lib下
标签:Flume,String,org,sink,str,import,kudu,public,row 来源: https://www.cnblogs.com/barneywill/p/10573221.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。