Flink读写Hbase之写

主要对Flink写入HBase数据做一个整理,方便快速进行业务代码开发,只针对于具体的方法操作,并不涉及Flink搭建


主要方式(4种)

  • 通过env.addSink(new RichSinkFunction)的形式
  • 通过自定义HBaseUtil操作类的形式
  • 通过env.output(new OutputFormat)的形式
  • 通过env.output(new HadoopOutputFormat)的形式

RichSinkFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
wordCounts.addSink(new RichSinkFunction[(String, String)] {
var conn: Connection = null
var table: Table = null
var mutator: BufferedMutator = null

override def open(parameters: Configuration): Unit = {
val tableName = TableName.valueOf(TABLE_NAME)
val conf: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER)
conf.set("hbase.zookeeper.property.clientPort", "2181")
conn = ConnectionFactory.createConnection(conf)
table = conn.getTable(tableName)
mutator = conn.getBufferedMutator(new BufferedMutatorParams(tableName).writeBufferSize(10 * 1024 * 1024))
}

override def invoke(value: (String, String), context: SinkFunction.Context[_]): Unit = {
val time1 = System.currentTimeMillis()
val put = new Put(Bytes.toBytes(value._1))
// put.addColumn(Bytes.toBytes(TABLE_CF), Bytes.toBytes("count"), Bytes.toBytes(value._2.toString().replace("---", "")))
put.addColumn(Bytes.toBytes(TABLE_CF), Bytes.toBytes("count"), Bytes.toBytes(value._2 + "---"))
mutator.mutate(put)
// 输出数据
mutator.flush()
val time2 = System.currentTimeMillis()
println(time2 - time1)
}

override def close(): Unit = {
try {
if (table != null) table.close()
if (mutator != null) mutator.close()
if (conn != null) conn.close()
} catch {
case e: Exception => println(e.getMessage)
}
}
})

HBaseUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
object HbaseUtil {
var conn: Connection = null
var tables: HashMap[String, Table] = new HashMap[String, Table]

def initConn() {
if (conn == null || conn.isClosed) {
println("---- Init Conn -----")
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER)
conf.set("hbase.zookeeper.property.clientPort", "2181")
conn = ConnectionFactory.createConnection(conf)
}
}

def getConn() = {
initConn
conn
}

def getTable(tablename: String) = {
if (!getConn().getAdmin.tableExists(TableName.valueOf(tablename))) {
conn.getAdmin.createTable(
new HTableDescriptor(
TableName.valueOf(tablename)
).addFamily(
new HColumnDescriptor("info")
))
}
tables.getOrElse(tablename, {
initConn
conn.getTable(TableName.valueOf(tablename))
})
}

def put(tableName: String, p: Put) {
getTable(tableName)
.put(p)
}

def get(tableName: String, get: Get, cf: String, column: String) = {
val r = getTable(tableName)
.get(get)
if (r != null && !r.isEmpty()) {
new String(r.getValue(cf.getBytes, column.getBytes))
} else null
}

// 接受配置文件
/**
* 用于直接建立HBase连接
*
* @param properties
* @return
*/
def getConf(properties: Properties) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum"))
conf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort"))
conf.set("hbase.master", properties.getProperty("hbase.master"))
conf
}

/**
* 获取连接
*
* @param conf
* @return
*/
def getConn(conf: Configuration) = {
if (conn == null || conn.isClosed) {
conn = ConnectionFactory.createConnection(conf)
}
conn
}

/**
* 获取表,没有表则创建
*
* @param conn
* @param tableName
* @return
*/
def getTable(conn: Connection, tableName: String) = {
createTable(conn, tableName)
conn.getTable(TableName.valueOf(tableName))
}

/**
* 创建表
*
* @param conn
* @param tableName
*/
def createTable(conn: Connection, tableName: String) = {
if (!conn.getAdmin.tableExists(TableName.valueOf(tableName))) {
val tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
tableDescriptor.addFamily(new HColumnDescriptor("info".getBytes()))
conn.getAdmin.createTable(tableDescriptor)
}
}

/**
* 提交数据
*
* @param conn
* @param tableName
* @param data
*/
def putData(conn: Connection, tableName: String, data: Put) = {
getTable(conn, tableName).put(data)
}

/**
* 对表直接进行批量写入时使用
*
* @param conf
* @param tableName
* @return
*/
def getNewJobConf(conf: Configuration, tableName: String) = {
conf.set("hbase.defaults.for.version.skip", "true")
conf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
conf.setClass("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]], classOf[org.apache.hadoop.mapreduce.OutputFormat[String, Mutation]])
new JobConf(conf)
}
}

// main
// 输入数据
wordCounts.map(x => {
val put = new Put((x._1).getBytes)
put.addColumn("info".getBytes, "count".getBytes, x._2.toString.getBytes)
// 输出数据
HbaseUtil.put("test.demo_", put)
})

OutputFormat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
val tableOuputFormat = new OutputFormat[Tuple2[String, String]] {
var conn: Connection = null
override def configure(configuration: Configuration): Unit = {
}

override def open(i: Int, i1: Int): Unit = {
val conf: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER)
conf.set("hbase.zookeeper.property.clientPort", "2181")
conn = ConnectionFactory.createConnection(conf)
}

override def writeRecord(it: Tuple2[String, String]): Unit = {
val tableName = TableName.valueOf(TABLE_NAME)
val put = new Put(Bytes.toBytes(it.f0))
put.addColumn(Bytes.toBytes(TABLE_CF), Bytes.toBytes("count"), Bytes.toBytes(it.f1 + "小猪猪"))
conn.getTable(tableName).put(put)
}

override def close(): Unit = {
try {
if (conn != null) conn.close()
} catch {
case e: Exception => println(e.getMessage)
}
}
}
// 输入数据
val hbaseDs = env.createInput(tableInputFormat)
// 输出数据
hbaseDs.output(tableOutputFormat)

HadoopOutputFormat

1
2
3
4
5
6
7
8
9
10
11
// 输入数据
val hbaseDs = env.createInput(tableInputFormat)
val hadoopOF = new HadoopOutputFormat[String, Mutation](new TableOutputFormat(), job)
println(hbaseDs.collect().length)
val ds = hbaseDs.map(x => {
val put = new Put(x.f0.getBytes())
put.addColumn(Bytes.toBytes(TABLE_CF), Bytes.toBytes("count"), Bytes.toBytes(x.f1 + "小猪猪"))
(x.f0, put.asInstanceOf[Mutation])
})
// 输出数据
ds.output(hadoopOF)

优劣

  • 对于上述的四种方式,比较常见的是前三种,也是我在网上搜索Flink操作HBase数据出现次数较多的结果.
  • 但是根据实际的操作会发现,前三种方式写入HBase的速度,速度远远不如SparkRDD.saveAsNewAPIHadoopDataset操作.
  • 第四种方式是总结了Flink将数据转换成HFile的文件,然后进行Bulk Load操作.

** 如果你有更好的方式,欢迎和我联系 **