ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

练习 : 自定义sink mysql hbase

2022-04-08 21:31:06  阅读:177  来源: 互联网

标签:String 自定义 System ____ sink print new hbase out


 

mysql

 1 package sink;
 2 
 3 //import com.util.Propss;
 4 //import com.bean.Sku;
 5 import org.apache.flink.configuration.Configuration;
 6 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 7 import org.apache.kafka.clients.producer.KafkaProducer;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 
10 import java.lang.reflect.Field;
11 import java.sql.*;
12 
13 /**
14  * @Description:
15  * @Author: liuyuan
16  * @Times : 2021/9/24 20:09
17  */
18 
19 //自定义SINK RichSinkFuncation
20 public class MySQLSink extends RichSinkFunction<String> {
21     private static Connection conn;
22     private static PreparedStatement pre;
23 
24     private static String database;
25     private static String sql;
26     private static Class T;
27 
28     public MySQLSink(String database,String sql,Class T){
29         this.database=database;
30         this.sql=sql;
31         this.T=T;
32     }
33 
34     @Override
35     public void open(Configuration parameters) throws Exception {
36         Class.forName("com.mysql.jdbc.Driver");
37         conn = DriverManager.getConnection("jdbc:mysql://hadoop106:3306/"+database, "root", "root");
38         conn.setAutoCommit(true);
39     }
40 
41     @Override
42     public void invoke(String value, Context context) throws Exception {
43         String[] split = value.split(",");
44         Field[] declaredFields = T.getDeclaredFields();
45         pre = conn.prepareStatement(sql);
46         for (int i = 0; i < declaredFields.length; i++) {
47             if(declaredFields[i].getType().toString().equals("class java.lang.String")){
48                 pre.setString((i+1),split[i]);
49             }
50             if(declaredFields[i].getType().toString().equals("class java.lang.Integer")){
51                 pre.setInt((i+1),Integer.valueOf(split[i]));
52             }
53             if(declaredFields[i].getType().toString().equals("class java.lang.Double")){
54                 pre.setDouble((i+1),Double.valueOf(split[i]));
55             }
56             if(declaredFields[i].getType().toString().equals("class java.lang.Long")){
57                 pre.setLong((i+1),Long.valueOf(split[i]));
58             }
59         }
60         pre.execute();
61     }
62 
63     @Override
64     public void close() throws Exception {
65         pre.close();
66         conn.close();
67 
68     }
69 }

hbase

  1 package sink;
  2 
  3 import org.apache.flink.configuration.Configuration;
  4 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  5 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  6 import org.apache.hadoop.hbase.*;
  7 import org.apache.hadoop.hbase.client.*;
  8 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  9 import org.apache.hadoop.hbase.util.Bytes;
 10 
 11 import java.io.IOException;
 12 import java.lang.reflect.Field;
 13 import java.util.*;
 14 
 15 public class HBaseSink extends RichSinkFunction<String> {
 16     private Connection connection;
 17     private Class T;
 18     private String tableName;
 19     private String[] fieldsName;
 20     List<Put> list=new ArrayList<Put>();
 21     
 22     public static String[] getFiledName(Class T) {
 23         Field[] fields =T.getClass().getDeclaredFields();
 24         String[] fieldName = new String[fields.length];
 25         for (int i = 0; i < fieldName.length; i++) {
 26             fieldName[i] = fields[i].getName();
 27 
 28         }
 29         return fieldName;
 30     }
 31 
 32     public HBaseSink(Class T, String tableName){
 33         this.T=T;
 34         this.tableName=tableName;
 35         this.fieldsName=getFiledName(T);
 36     }
 37 
 38     @Override
 39     public void open(Configuration parameters) throws Exception {
 40         connection= HBase_Util.getConf();
 41     }
 42 
 43     @Override
 44     public void invoke(String value, Context context) throws Exception {
 45         String[] s1 = value.split(",");
 46         Table table = connection.getTable(TableName.valueOf(tableName));
 47 //        String rowkey = UUID.randomUUID().toString().replaceAll("-", "");
 48         Put put = new Put(Bytes.toBytes(s1[0]));
 49         for (int i = 0; i < fieldsName.length; i++) {
 50             put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(fieldsName[i]),Bytes.toBytes(s1[i]));
 51             list.add(put);
 52         }
 53         table.put(list);
 54     }
 55 
 56     @Override
 57     public void close() throws Exception {
 58         connection.close();
 59     }
 60 
 61 
 62     public static  class HBase_Util {
 63         static org.apache.hadoop.conf.Configuration con = HBaseConfiguration.create();
 64         static org.apache.hadoop.conf.Configuration conf = Propss.setConf(con);
 65         static Connection connection;
 66         static HBaseAdmin admin;
 67         static Table t;
 68 
 69         static {
 70             try {
 71                 connection = ConnectionFactory.createConnection(conf);
 72                 admin = (HBaseAdmin)connection.getAdmin();
 73             } catch (IOException e) {
 74                 e.printStackTrace();
 75             }
 76         }
 77         //获取 conn
 78         public static Connection getConf(){
 79             //创建HBase的配置对象
 80             org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
 81             //设置hbase配置属性
 82             conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108");
 83             conf.set("hbase.zookeeper.property.clientPort","2181");
 84             Connection connection=null;
 85             //通过连接函数,创建连接对象
 86             try {
 87                 connection = ConnectionFactory.createConnection(conf);
 88 
 89             } catch (IOException e) {
 90                 e.printStackTrace();
 91             }
 92             return connection;
 93         }
 94 
 95         //建表
 96         public static void build_Table(String tableName,List<String> FamilyNames) throws Exception {
 97             TableDescriptorBuilder buider = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
 98             for (String columnName : FamilyNames) {
 99                 ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of(Bytes.toBytes(columnName));
100                 buider.setColumnFamily(info);
101             }
102             TableDescriptor build = buider.build();
103             admin.createTable(build);
104             System.out.println("____build_done____");
105         }
106 
107         //插入一条数据
108         public static void insert_Row(String tableName,String row,String Family,String qualifier,String value) throws Exception {
109             t = connection.getTable(TableName.valueOf(tableName));
110             Put put = new Put(Bytes.toBytes(row));
111             Put put1 = put.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
112             t.put(put1);
113             System.out.println("____insert_Row_done____");
114         }
115 
116         //插入num条数据
117         public  static  void insert_Batch(String tableName,String row,String Family,String qualifier,String value,Integer num) throws Exception {
118             t = connection.getTable(TableName.valueOf(tableName));
119             List<Put> list=new ArrayList<>();
120             for (int i = 0; i < num; i++) {
121                 String s = UUID.randomUUID().toString().replaceAll("-", "");
122                 Put puts = new Put(Bytes.toBytes(s));
123                 Put putss = puts.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value+i));
124                 list.add(putss);
125             }
126             t.put(list);
127             System.out.println("____insert_Batch_done____");
128         }
129 
130         //删除表
131         public static void drop_Table(String tableName) throws Exception {
132             if (admin.tableExists(TableName.valueOf(tableName))){
133                 admin.disableTable(TableName.valueOf(tableName));
134                 admin.deleteTable(TableName.valueOf(tableName));
135                 System.out.println("____drop_Table_done____");
136             }else {
137                 System.out.println("____no_such_Table_found____");
138             }
139 
140         }
141 
142         //删除一条数据
143         public static void  delete_Row(String tableName,String row) throws Exception {
144             t = connection.getTable(TableName.valueOf(tableName));
145             Delete delete = new Delete(Bytes.toBytes(row));
146             t.delete(delete);
147             System.out.println("____delete_Row_done____");
148         }
149 
150         //特定列过滤查询
151         public static void scan_Filter(String tableName, String Family, String qualifier, CompareOperator compare, byte[] value) throws Exception {
152             t = connection.getTable(TableName.valueOf(tableName));
153             SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Family), Bytes.toBytes(qualifier), compare, value);
154             Scan scan = new Scan();
155             Scan scan1 = scan.setFilter(filter);
156             ResultScanner scanner = t.getScanner(scan1);
157             Iterator<Result> iterator = scanner.iterator();
158             while (iterator.hasNext()){
159                 Cell[] cells = iterator.next().rawCells();
160                 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____");
161                 for (Cell cell : cells) {
162                     System.out.print(new String(CellUtil.cloneRow(cell)));
163                     System.out.print(" - ");
164                     System.out.print(new String(CellUtil.cloneFamily(cell)));
165                     System.out.print(" - ");
166                     System.out.print(new String(CellUtil.cloneQualifier(cell)));
167                     System.out.print(" - ");
168                     System.out.println(new String(CellUtil.cloneValue(cell)));
169                 }
170 
171             }
172             System.out.println("____scan_Filter_done____");
173         }
174         //查询一条数据
175         public static void scan_Row(String tableName,String row)throws Exception{
176             t = connection.getTable(TableName.valueOf(tableName));
177             Get get = new Get(Bytes.toBytes(row));
178             Result result = t.get(get);
179             Cell[] cells = result.rawCells();
180             for (Cell cell : cells) {
181                 System.out.print(new String(CellUtil.cloneRow(cell)));
182                 System.out.print(" - ");
183                 System.out.print(new String(CellUtil.cloneFamily(cell)));
184                 System.out.print(" - ");
185                 System.out.print(new String(CellUtil.cloneQualifier(cell)));
186                 System.out.print(" - ");
187                 System.out.println(new String(CellUtil.cloneValue(cell)));
188             }
189             System.out.println("____scan_Row_done____");
190         }
191         //区间查询数据
192         public static void scan_Rows(String tableName,String row1,String row2)throws Exception{
193             t = connection.getTable(TableName.valueOf(tableName));
194             Scan sc=new Scan(Bytes.toBytes(row1),Bytes.toBytes(row2));
195             ResultScanner scanner = t.getScanner(sc);
196             Iterator<Result> iterator = scanner.iterator();
197             System.out.println("____前闭后开____");
198             while (iterator.hasNext()){
199                 Result next = iterator.next();
200                 Cell[] cells = next.rawCells();
201                 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____");
202                 for (Cell cell : cells) {
203                     System.out.print(new String(CellUtil.cloneRow(cell)));
204                     System.out.print(" - ");
205                     System.out.print(new String(CellUtil.cloneFamily(cell)));
206                     System.out.print(" - ");
207                     System.out.print(new String(CellUtil.cloneQualifier(cell)));
208                     System.out.print(" - ");
209                     System.out.println(new String(CellUtil.cloneValue(cell)));
210                 }
211             }
212             System.out.println("____scan_Rows_done____");
213         }
214         //查询一条特定列族数据
215         public static void get_value_by_family(String tableName,String row,String family)throws Exception{
216             t = connection.getTable(TableName.valueOf(tableName));
217             Get get = new Get(Bytes.toBytes(row));
218             get.addFamily(Bytes.toBytes(family));
219             Result result = t.get(get);
220             Cell[] cells = result.rawCells();
221             for (Cell cell : cells) {
222                 System.out.print(new String(CellUtil.cloneRow(cell)));
223                 System.out.print(" - ");
224                 System.out.print(new String(CellUtil.cloneFamily(cell)));
225                 System.out.print(" - ");
226                 System.out.print(new String(CellUtil.cloneQualifier(cell)));
227                 System.out.print(" - ");
228                 System.out.println(new String(CellUtil.cloneValue(cell)));
229             }
230             System.out.println("____get_value_by_family_done____");
231         }
232 
233         //查询一条特定列数据
234         public static void get_value_by_qualifier(String tableName,String row,String family,String qualifier)throws Exception{
235             t = connection.getTable(TableName.valueOf(tableName));
236             Get get = new Get(Bytes.toBytes(row));
237             get.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier));
238             Result result = t.get(get);
239             Cell[] cells = result.rawCells();
240             for (Cell cell : cells) {
241                 System.out.print(new String(CellUtil.cloneRow(cell)));
242                 System.out.print(" - ");
243                 System.out.print(new String(CellUtil.cloneFamily(cell)));
244                 System.out.print(" - ");
245                 System.out.print(new String(CellUtil.cloneQualifier(cell)));
246                 System.out.print(" - ");
247                 System.out.println(new String(CellUtil.cloneValue(cell)));
248             }
249             System.out.println("____get_value_by_qualifier_done____");
250         }
251 
252         //全查某表
253         public static void  scan_All(String tableName) throws Exception {
254             t = connection.getTable(TableName.valueOf(tableName));
255             Scan sc=new Scan();
256             ResultScanner scanner = t.getScanner(sc);
257             Iterator<Result> iterator = scanner.iterator();
258             while (iterator.hasNext()){
259                 Result next = iterator.next();
260                 Cell[] cells = next.rawCells();
261                 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____");
262                 for (Cell cell : cells) {
263                     System.out.print(new String(CellUtil.cloneRow(cell)));
264                     System.out.print(" - ");
265                     System.out.print(new String(CellUtil.cloneFamily(cell)));
266                     System.out.print(" - ");
267                     System.out.print(new String(CellUtil.cloneQualifier(cell)));
268                     System.out.print(" - ");
269                     System.out.println(new String(CellUtil.cloneValue(cell)));
270                 }
271             }
272             System.out.println("____scan_All_done____");
273         }
274         //查看所有表
275         public static  void list() throws Exception {
276             TableName[] tableNames = admin.listTableNames();
277             for (TableName tableName : tableNames) {
278                 System.out.println(tableName.toString());
279             }
280 
281         }
282         //查看所有表结构
283         public static  void desc_Table(String tableName) throws Exception {
284             List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
285             Iterator<TableDescriptor> iterator = tableDescriptors.iterator();
286             while (iterator.hasNext()){
287                 TableDescriptor next = iterator.next();
288                 if (next.getTableName().toString().equals(tableName)){
289                     System.out.println(next);
290                 }
291             }
292             System.out.println("____list_done____");
293 
294         }
295 
296 
297         //关流
298         public static  void stop() throws Exception {
299             connection.close();
300             System.out.println("____connection_close_done____");
301         }
302 
303 
304         public static class Propss {
305             public static Properties producer_Props = new Properties();
306             public static Properties consumer_Props = new Properties();
307             public static HashMap<String, Object> kafka_Consumer = new HashMap<>();
308             public static HashMap<String, Object> kafka_Producer = new HashMap<>();
309 
310             public static org.apache.hadoop.conf.Configuration setConf(org.apache.hadoop.conf.Configuration conf){
311                 conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108");
312                 conf.set("hbae.zookeeper.property.client","2181");
313                 return conf;
314             }
315             static {
316 
317                 kafka_Producer.put("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
318                 //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功
319                 kafka_Producer.put("acks", "all");
320                 //重试次数
321                 kafka_Producer.put("retries", Integer.MAX_VALUE);
322                 //批处理的字节数
323                 kafka_Producer.put("batch.size", 16384);
324                 //批处理的延迟时间,当批次数据未满之时等待的时间
325                 kafka_Producer.put("linger.ms", 1);
326                 //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB
327                 kafka_Producer.put("buffer.memory", 33554432);
328                 // properties.put("value.serializer",
329                 // "org.apache.kafka.common.serialization.ByteArraySerializer");
330                 // properties.put("key.serializer",
331                 // "org.apache.kafka.common.serialization.ByteArraySerializer");
332                 kafka_Producer.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
333                 kafka_Producer.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
334 
335                 kafka_Consumer.put("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
336                 kafka_Consumer.put("group.id", "com-test");
337                 //from beginning
338                 kafka_Consumer.put("auto.offset.reset", "earliest");
339                 kafka_Consumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
340                 kafka_Consumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
341 
342                 producer_Props.setProperty("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
343                 producer_Props.setProperty("ack", "all");
344                 producer_Props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
345                 producer_Props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
346                 producer_Props.put("auto.offset.reset", "earliest");
347 
348                 consumer_Props.setProperty("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
349                 consumer_Props.setProperty("group.id", "com-test");
350                 consumer_Props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
351                 consumer_Props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
352                 consumer_Props.put("auto.offset.reset", "earliest");
353             }
354 
355 
356         }
357 
358     }
359 
360 }

 

标签:String,自定义,System,____,sink,print,new,hbase,out
来源: https://www.cnblogs.com/chang09/p/16119922.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有