安装

Kudu提供了通过源码安装的方式,通过源码安装步骤比较繁琐,因此这里我们选择在CentOS 6操作系统上使用安装包进行安装。使用安装包也有两种安装方式:

  • 添加Kudu源之后使用yum进行安装;
  • 手动下载rpm包之后进行安装;

由于网络原因,我这里使用yum进行安装速度特别慢,所以最终我们选择使用rpm包进行手动安装。

我们从官方文档中提供的CentOS 6 Individual Packages中下载得到如下的安装包

kudu-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el6.x86_64.rpm
kudu-client-devel-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el6.x86_64.rpm
kudu-client0-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el6.x86_64.rpm
kudu-debuginfo-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el6.x86_64.rpm
kudu-master-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el6.x86_64.rpm
kudu-tserver-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el6.x86_64.rpm

上面的一共有六个包,分别为core、client、debug、master和tablet server包,其中debug包的安装是可选的。

下载了这些安装包之后我们使用 rpm -ivh PACKAGE_NAME 命令进行手动安装,如果在安装过程中提示缺少某些包,那么可以使用yum先将缺失的包进行安装,随后再安装相应的Kudu包。

启动

使用如下命令启动或停止Kudu进程

# 启动
$ service kudu-master start
$ service kudu-tserver start
# 停止
$ service kudu-master stop
$ service kudu-tserver stop

启动了kudu-master进程之后(运行master进程的机器CPU不应该少于3个核,否则会启动失败),可以使用浏览器访问对应主机的8051端口打开kudu-master的Web控制台页面,kudu-tserver (KuduTabletServer) 的Web控制台则位于8050端口。

启动了master和tablet-server之后我们就已经拥有了一个单节点的Kudu服务,你已经可以使用这个节点进行数据的增删改查操作了,下面我们再了解一下如何构建一个Kudu服务的集群。

首先我们使用三台负载并安装好Kudu,它们的IP地址如下

  • 172.21.3.39
  • 172.21.3.92
  • 172.21.3.177

之后在这三台负载的 /etc/kudu/conf/master.gflagfile 中新增master地址的配置

--master_addresses=172.21.4.192:7051,172.21.3.92:7051,172.21.3.177:7051
--fs_data_dirs=/home/kudu/data/master
--fs_wal_dir=/home/kudu/wal/master
--log_dir=/home/kudu/kudu/logs/master
--memory_limit_soft_percentage=85
--maintenance_manager_num_threads=4
--block_cache_capacity_mb=2048
--max_clock_sync_error_usec=20000000
--rpc-encryption=disabled
--rpc_authentication=disabled
--rpc_negotiation_timeout_ms=9000
--consensus_rpc_timeout_ms=60000

除了master的配置,还需要在 /etc/kudu/conf/tserver.gflagfile 中新增tablet-server的相关配置

--tserver_master_addrs=172.21.4.192:7051,172.21.3.92:7051,172.21.3.177:7051
--fs_data_dirs=/home/kudu/data/tserver
--fs_wal_dir=/home/kudu/wal/tserver
--log_dir=/home/kudu/kudu/logs/tserver
--maintenance_manager_num_threads=8
--memory_limit_soft_percentage=85
--block_cache_capacity_mb=2048
--memory_limit_hard_bytes=21000000000
--max_clock_sync_error_usec=20000000
--rpc-encryption=disabled
--rpc_authentication=disabled
--rpc_negotiation_timeout_ms=9000
--consensus_rpc_timeout_ms=60000
--tablet_history_max_age_sec=60

添加了相关的配置之后启动集群。

数据操作

在创建了集群之后我们使用Java语言来进行数据以及表的相关操作,首先我们通过maven引入相关的依赖jar包

<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.4.0</version>
</dependency>

之后我们使用Java实现建表、删表、数据写入和数据查询操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;

import java.util.ArrayList;
import java.util.List;

public class Test {

private static final String KUDU_MASTER = "172.21.4.192:7051";

public static void main(String[] args) {
System.out.println("-----------------------------------------------");
System.out.println("Will try to connect to Kudu master at " + KUDU_MASTER);
System.out.println("-----------------------------------------------");
String tableName = "java_sample-" + System.currentTimeMillis();
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

try {
// 建表
List<ColumnSchema> columns = new ArrayList<>(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("key");

Schema schema = new Schema(columns);

CreateTableOptions options = new CreateTableOptions();
options.setRangePartitionColumns(rangeKeys);
options.setNumReplicas(1);

client.createTable(tableName, schema, options);

// 插入数据
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
for (int i = 0; i < 3; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt(0, i);
row.addString(1, "value " + i);
session.apply(insert);
}

// 查询数据
List<String> projectColumns = new ArrayList<>(1);
projectColumns.add("value");
KuduScanner scanner = client.newScannerBuilder(table).setProjectedColumnNames(projectColumns).build();
while (scanner.hasMoreRows()) {
RowResultIterator results = scanner.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
System.out.println(result.getString(0));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 删除表
client.deleteTable(tableName);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

}

与SparkSQL进行集成

Kudu与SparkSQL集群需要用到如下的依赖

<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark2_2.11</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.11</artifactId>
    <version>2.8.8</version>
</dependency>

添加了依赖之后,我们可以参照之前Kudu的操作部分,创建表并写入数据,但是最后并不删除此表。之后在http://172.21.4.192:8051/tables中可以找到我们刚刚创建的表 java_sample-1584005217403,随后我们使用SparkSQL对该表进行查询,具体代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.HashMap;
import java.util.Map;

public class SparkTest {

public static void main(String[] args) {
Map<String, String> options = new HashMap<String, String>() {
{
put("kudu.table", "java_sample-1584005217403");
put("kudu.master", "172.21.4.192:7051");
}
};

SparkContext sc = new SparkContext("local", "test-spark-kudu", new SparkConf());
SparkSession sparkSession = SparkSession.builder().sparkContext(sc).getOrCreate();
sparkSession
.read()
.options(options)
.format("org.apache.kudu.spark.kudu")
.load()
.registerTempTable("table0");
Dataset<Row> result = sparkSession.sql("SELECT * FROM table0");
result.show();
}

}

执行以上代码得到结果如下

+---+-------+
|key|  value|
+---+-------+
|  0|value 0|
|  1|value 1|
|  2|value 2|
+---+-------+

参考

官方安装文档
Apache Kudu 初体验
https://github.com/apache/kudu/tree/master/examples/java/java-example