Spark读写HBase

不考虑HbaseUtil这种方式

读取HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 获得HBase的配置
val hbaseConf = HbaseUtil.getConf()
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
val admin = new HBaseAdmin(hbaseConf)
if (!admin.isTableAvailable(tableName)){
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}

val hBaseRdd = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

val count = hBaseRdd.count()
hBaseRdd.foreach(x=>{
val result = x._2
// 获取行键
val key = Bytes.toString(result.getRow)
// 通过列簇和列名获取列
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
})

写入HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 获得HBase的配置
val hbaseConf = HbaseUtil.getConf()
messages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
HbaseUtil.getTable(hbaseConn, TABLE_NAME)
val jobConf = HbaseUtil.getNewJobConf(hbaseConf, TABLE_NAME)
// 先处理消息
rdd.map(data => {
val rowKey = data._2.toString
val put = new Put(rowKey.getBytes())
put.addColumn(TABLE_CF.getBytes(), "count".getBytes(), "1".getBytes())
// 转成(Writable,Put)形式,调用spark函数写入HBase
// saveAsHadoopDataset也是同理
(new ImmutableBytesWritable(), put)
}).saveAsNewAPIHadoopDataset(jobConf)
// 再更新offsets
km.updateZKOffsets(rdd)
}
})

HBaseUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
object HbaseUtil {
var conn: Connection = null
var tables: HashMap[String, Table] = new HashMap[String, Table]
def initConn() {
if (conn == null || conn.isClosed) {
println("---- Init Conn -----")
val conf = getConf()
conn = ConnectionFactory.createConnection(conf)
}
}
def getConn() = {
initConn
conn
}
def getConf() = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER)
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf
}
def getTable(tablename: String) = {
if (!getConn().getAdmin.tableExists(TableName.valueOf(tablename))) {
conn.getAdmin.createTable(new HTableDescriptor(TableName.valueOf(tablename)).addFamily(new HColumnDescriptor("info")))
}
tables.getOrElse(tablename, {
initConn
conn.getTable(TableName.valueOf(tablename))
})
}
def put(tableName: String, p: Put) {
getTable(tableName)
.put(p)
}
def get(tableName: String, get: Get, cf: String, column: String) = {
val r = getTable(tableName)
.get(get)
if (r != null && !r.isEmpty()) {
new String(r.getValue(cf.getBytes, column.getBytes))
} else null
}
// 接受配置文件
/**
* 用于直接建立HBase连接
*
* @param properties
* @return
*/
def getConf(properties: Properties) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum"))
conf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort"))
conf.set("hbase.master", properties.getProperty("hbase.master"))
conf
}
/**
* 获取连接
*
* @param conf
* @return
*/
def getConn(conf: Configuration) = {
if (conn == null || conn.isClosed) {
conn = ConnectionFactory.createConnection(conf)
}
conn
}
/**
* 获取表,没有表则创建
*
* @param conn
* @param tableName
* @return
*/
def getTable(conn: Connection, tableName: String) = {
createTable(conn, tableName)
conn.getTable(TableName.valueOf(tableName))
}
/**
* 创建表
*
* @param conn
* @param tableName
*/
def createTable(conn: Connection, tableName: String) = {
if (!conn.getAdmin.tableExists(TableName.valueOf(tableName))) {
val tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
tableDescriptor.addFamily(new HColumnDescriptor("info".getBytes()))
conn.getAdmin.createTable(tableDescriptor)
}
}
/**
* 提交数据
*
* @param conn
* @param tableName
* @param data
*/
def putData(conn: Connection, tableName: String, data: Put) = {
getTable(conn, tableName).put(data)
}
/**
* 对表直接进行批量写入时使用
*
* @param conf
* @param tableName
* @return
*/
def getNewJobConf(conf: Configuration, tableName: String) = {
conf.set("hbase.defaults.for.version.skip", "true")
conf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
conf.setClass("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]], classOf[org.apache.hadoop.mapreduce.OutputFormat[String, Mutation]])
new JobConf(conf)
}
}