package com.neusoft.hbase;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.text.MessageFormat;
import java.util.*;
/**
* HBase相关的基本操作
* @author zifangsky
* @date 2018/7/3
* @since 1.0.0
*/
public class HBaseService {
private Logger log = LoggerFactory.getLogger(HBaseService.class);
/**
* 声明静态配置
*/
private Configuration conf = null;
private Connection connection = null;
public HBaseService(Configuration conf) {
this.conf = conf;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
log.error("获取HBase连接失败");
}
}
/**
* 创建表
* @author zifangsky
* @date 2018/7/3 17:50
* @since 1.0.0
* @param tableName 表名
* @param columnFamily 列族名
* @return void
*/
public boolean creatTable(String tableName, String[] columnFamily) {
Admin admin = null;
try {
admin = connection.getAdmin();
boolean tableExists = admin.tableExists(TableName.valueOf(tableName));
if(!tableExists) {
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for(int i =0;i<columnFamily.length;i++){
tableDescriptor.addFamily(new HColumnDescriptor(columnFamily[i]));
}
admin.createTable(tableDescriptor);
} else {
log.debug("table Exists!");
}
} catch (IOException e) {
log.error(MessageFormat.format("创建表{0}失败",tableName),e);
return false;
}finally {
close(admin,null,null);
}
return true;
}
/**
* 预分区创建表
* @param tableName 表名
* @param columnFamily 列族名的集合
* @param splitKeys 预分期region
* @return 是否创建成功
*/
/* public boolean createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
Admin admin = null;
try {
if (StringUtils.isBlank(tableName) || columnFamily == null
|| columnFamily.size() == 0) {
log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
return false;
}
admin = connection.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
return true;
} else {
List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
});
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(familyDescriptors)
.build();
//指定splitkeys
admin.createTable(tableDescriptor,splitKeys);
log.info("===Create Table " + tableName
+ " Success!columnFamily:" + columnFamily.toString()
+ "===");
}
} catch (IOException e) {
log.error("",e);
return false;
}finally {
close(admin,null,null);
}
return true;
}
*/
/**
* 自定义获取分区splitKeys
*/
public byte[][] getSplitKeys(String[] keys){
if(keys==null){
//默认为10个分区
keys = new String[] { "1|", "2|", "3|", "4|",
"5|", "6|", "7|", "8|", "9|" };
}
byte[][] splitKeys = new byte[keys.length][];
//升序排序
TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for(String key : keys){
rows.add(Bytes.toBytes(key));
}
Iterator<byte[]> rowKeyIter = rows.iterator();
int i=0;
while (rowKeyIter.hasNext()) {
byte[] tempRow = rowKeyIter.next();
rowKeyIter.remove();
splitKeys[i] = tempRow;
i++;
}
return splitKeys;
}
/**
* 按startKey和endKey,分区数获取分区
*/
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
byte[][] splits = new byte[numRegions-1][];
BigInteger lowestKey = new BigInteger(startKey, 16);
BigInteger highestKey = new BigInteger(endKey, 16);
BigInteger range = highestKey.subtract(lowestKey);
BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
lowestKey = lowestKey.add(regionIncrement);
for(int i=0; i < numRegions-1;i++) {
BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
byte[] b = String.format("%016x", key).getBytes();
splits[i] = b;
}
return splits;
}
/**
* 获取table
* @param tableName 表名
* @return Table
* @throws IOException IOException
*/
private Table getTable(String tableName) throws IOException {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 查询库中所有表的表名
*/
public List<Map<String,String>> getAllTableNames(){
List<Map<String,String>> result = new ArrayList<>();
Admin admin = null;
try {
admin = connection.getAdmin();
TableName[] tableNames = admin.listTableNames();
for(TableName tableName : tableNames){
Map map = new HashMap();
map.put("name",tableName.getNameAsString());
Table table = getTable(tableName.getNameAsString());
String cf = "";
for(HColumnDescriptor family:table.getTableDescriptor().getColumnFamilies()){
if(StringUtils.isNotBlank(cf)){
cf+="&&"+family.toString();
}else{
cf+=family.toString();
}
}
map.put("family",cf);
result.add(map);
}
}catch (IOException e) {
log.error("获取所有表的表名失败",e);
}finally {
close(admin,null,null);
}
return result;
}
/**
* 遍历查询指定表中的所有数据
* @author zifangsky
* @date 2018/7/3 18:21
* @since 1.0.0
* @param tableName 表名
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScanner(String tableName){
Scan scan = new Scan();
return this.queryData(tableName,scan);
}
/**
* 根据startRowKey和stopRowKey遍历查询指定表中的所有数据
* @author zifangsky
* @date 2018/7/4 18:21
* @since 1.0.0
* @param tableName 表名
* @param startRowKey 起始rowKey
* @param stopRowKey 结束rowKey
* @return java.util