高可用主要是为了解决分布式系统中的单点失败问题,同为分布式系统的flink也同样需要处理这个问题。
在flink中,其高可用是通过HighAvailabilityService
接口来实现的,下面代码是其接口定义:
1
2
3
4
5
6
7
8
9
10
|
public interface HighAvailabilityServices
extends ClientHighAvailabilityServices, GloballyCleanableResource {
// resource manager
LeaderElectionService getResourceManagerLeaderElectionService();
LeaderRetrievalService getResourceManagerLeaderRetriever();
// job manager
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
}
|
从HighAvailabilityService
接口类可以看到,其主要是围绕着LeaderElectionService
和LeaderRetrievalService
这两个类来进行的,其中
Heeagteata
Leader选举和地址监控
1. LeaderElectionService
LeaderElectionService
提供了某个组件参加Leader选举的功能,比如flink中的ResourceManager、JobManager等组件均会使用到该接口。
1
2
3
4
5
6
7
8
9
|
public interface LeaderElectionService {
void start(LeaderContender contender) throws Exception;
void stop() throws Exception;
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
|
在使用LeaderElectionService
时,首先会调用LeaderElectionService.start(LeaderContender)
方法,LeaderContender
可以理解为回调函数,当Leader选举成功或者失去Leader时,会调用其相应的函数
1
2
3
4
5
6
7
8
|
public interface LeaderContender {
void grantLeadership(UUID leaderSessionID);
void revokeLeadership();
void handleError(Exception exception);
}
|
2. LeaderRetrievalService
LeaderRetrievalService
提供了监听某组件Leader地址变化的功能,其接口定义如下
1
2
3
4
5
|
public interface LeaderRetrievalService {
void start(LeaderRetrievalListener listener) throws Exception;
void stop() throws Exception;
}
|
在使用LeaderRetrievalService
时,首先会调用LeaderRetrievalService.start(LeaderRetrievalListener)
方法,LeaderRetrievalListener
即为Leader地址发生变化之后的回调类。
1
2
3
4
5
6
|
public interface LeaderRetrievalListener {
void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
void handleError(Exception exception);
}
|
- 当需要监听的某组件Leader地址发生变化时,会调用LeaderRetrievalListener的
notifyLeaderAddress
函数
Zookeeper HA
这里以Zookeeper模式来分析Flink的HighAvailability逻辑。在flink中,比如ResourceManager、Dispatcher、JobManager等组件其实是运行在同一个进程模块中的,而这些组件本身都会使用到HA功能,比如Leader选举等。
如果要实现HA能力,可以为每一个组件创建HighAvailabilityService
,底层去调用Zookeeper的LeaderLatch等选主逻辑;也可以为该进程中的所有组件创建同一个HighAvailabilityService
,所有的组件共用底层同一套Zookeeper框架逻辑。
Flink中采用第二种方式,该方式是通过代码中的ZooKeeperMultipleComponentLeaderElectionHaServices
类来实现的,其LeaderElectionService
和LeaderRetrievalService
的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
/* /flink
* +/cluster_id_1/leader/latch
* | | /resource_manager/connection_info
* | | /dispatcher/connection_info
* | | /rest_server/connection_info
* | | /job-id-1/connection_info
* | | /job-id-2/connection_info
* | |
* | |
* | +jobgraphs/job-id-1
* | | /job-id-2
* | +jobs/job-id-1/checkpoints/latest
* | | /latest-1
* | | /latest-2
* | | /checkpoint_id_counter
*
*/
public class ZooKeeperMultipleComponentLeaderElectionHaServices extends AbstractZooKeeperHaServices {
@Override
protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) {
return ZooKeeperUtils.createLeaderRetrievalService(
leaderNamespacedCuratorFramework, leaderPath, configuration);
}
@Override
protected LeaderElectionService createLeaderElectionService(String leaderName) {
return new DefaultLeaderElectionService(
getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName));
}
}
|
1. LeaderElectionService
在为某个组件(leaderName
)创建LeaderElectionService
时,ZooKeeperMultipleComponentLeaderElectionHaServices
内部新建了一个所有组件共享的DefaultMultipleComponentLeaderElectionService
作为中间层proxy,该proxy内部维护了每一个组件及其对应的LeaderElectionEventHandler
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private MultipleComponentLeaderElectionService getOrInitializeSingleLeaderElectionService() {
synchronized (lock) {
if (multipleComponentLeaderElectionService == null) {
try {
multipleComponentLeaderElectionService =
new DefaultMultipleComponentLeaderElectionService(
fatalErrorHandler,
new ZooKeeperMultipleComponentLeaderElectionDriverFactory(
leaderNamespacedCuratorFramework));
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format(
"Could not initialize the %s",
DefaultMultipleComponentLeaderElectionService.class
.getSimpleName()),
e);
}
}
return multipleComponentLeaderElectionService;
}
}
|
(1). start(LeaderContender)
上面说到,在使用LeaderElectionService
时,会首先调用LeaderElectionService.start(LeaderContender)
方法,而使用这种多组件共享的场景时,其start方法底层则是简单的进行组件及其LeaderElectionEventHandler
的注册,其调用链如下
在MultipleComponentLeaderElectionDriverAdapter
的构造函数中,可以看到其注册过程
1
2
3
4
5
6
7
8
9
|
MultipleComponentLeaderElectionDriverAdapter(
String componentId,
MultipleComponentLeaderElectionService multipleComponentLeaderElectionService,
LeaderElectionEventHandler leaderElectionEventHandler) {
// 注册component和LeaderElectionEventHandler(即DefaultLeaderElectionService) 的映射关系
multipleComponentLeaderElectionService.registerLeaderElectionEventHandler(
this.componentId, leaderElectionEventHandler);
}
|
上面的LeaderElectionEventHandler其实就是DefaultLeaderElectionService
,其实现了LeaderElectionEventHandler
接口。
(2). Zookeeper交互
DefaultMultipleComponentLeaderElectionService
内部会对应一个ZooKeeperMultipleComponentLeaderElectionDriver
类来进行与Zookeeper的交互,主要做了两方面事情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public ZooKeeperMultipleComponentLeaderElectionDriver(
CuratorFramework curatorFramework,
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener)
throws Exception {
this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
this.treeCache =
TreeCache.newBuilder(curatorFramework, "/")
.setCacheData(true)
.setCreateParentNodes(false)
.setSelector(
new ZooKeeperMultipleComponentLeaderElectionDriver
.ConnectionInfoNodeSelector())
.setExecutor(Executors.newDirectExecutorService())
.build();
treeCache
.getListenable()
.addListener(
(client, event) -> {
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleChangedLeaderInformation(event.getData());
break;
case NODE_REMOVED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleRemovedLeaderInformation(event.getData().getPath());
break;
}
});
leaderLatch.addListener(this);
curatorFramework.getConnectionStateListenable().addListener(listener);
leaderLatch.start();
treeCache.start();
}
|
(3). Zookeeper回调
可以看下DefaultMultipleComponentLeaderElectionService
的类实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
public class DefaultMultipleComponentLeaderElectionService
implements MultipleComponentLeaderElectionService, MultipleComponentLeaderElectionDriver.Listener {
private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver;
// LeaderElectionEventHandler这里其实就是createLeaderElectionService所返回的DefaultLeaderElectionService,
// 它实现了`LeaderElectionEventHandler`的接口。
private final Map<String, LeaderElectionEventHandler> leaderElectionEventHandlers;
DefaultMultipleComponentLeaderElectionService(
FatalErrorHandler fatalErrorHandler,
MultipleComponentLeaderElectionDriverFactory multipleComponentLeaderElectionDriverFactory,
ExecutorService leadershipOperationExecutor)
throws Exception {
// 创建ZooKeeperMultipleComponentLeaderElectionDriver用于和Zookeeper交互
// 1. LeaderLatch: Leader选主
// 2. TreeCache: 节点变化监控
multipleComponentLeaderElectionDriver =
multipleComponentLeaderElectionDriverFactory.create(this);
}
public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
// 将/component_info回写到Zookeeper,用于LeaderRetrievalService做Leader Address监控
multipleComponentLeaderElectionDriver.publishLeaderInformation(
componentId, leaderInformation);
//...
}
public void registerLeaderElectionEventHandler(
String componentId, LeaderElectionEventHandler leaderElectionEventHandler) {
// 组件及其回调映射
leaderElectionEventHandlers.put(componentId, leaderElectionEventHandler);
// ...
}
// 1. 选主成功回调
public void isLeader() {
// 由于多组件同进程,如果选主成功,则表示所有组件均选主成功
final UUID newLeaderSessionId = UUID.randomUUID();
synchronized (lock) {
currentLeaderSessionId = UUID.randomUUID();
forEachLeaderElectionEventHandler(
leaderElectionEventHandler ->
leaderElectionEventHandler.onGrantLeadership(newLeaderSessionId));
}
}
// 2. 组件节点变化回调
public void notifyLeaderInformationChange(
String componentId, LeaderInformation leaderInformation) {
synchronized (lock) {
final LeaderElectionEventHandler leaderElectionEventHandler =
leaderElectionEventHandlers.get(componentId);
if (leaderElectionEventHandler != null) {
sendLeaderInformationChange(leaderElectionEventHandler, leaderInformation);
}
}
}
private void sendLeaderInformationChange(
LeaderElectionEventHandler leaderElectionEventHandler,
LeaderInformation leaderInformation) {
leadershipOperationExecutor.execute(
() -> leaderElectionEventHandler.onLeaderInformationChange(leaderInformation));
}
}
|
2. LeaderRetrievalService
相对来说,LeaderRetrievalService
的ZK实现就相对简单很多,整体逻辑和LeaderElectionService
的ZK实现类似,但没有DefaultMultipleComponentLeaderElectionService
中间层,直接就是创建了ZooKeeperLeaderRetrievalDriver
和ZK交互,并进行Event回调。
1
2
3
4
5
6
7
8
9
10
|
public void start(LeaderRetrievalListener listener) throws Exception {
synchronized (lock) {
leaderListener = listener;
leaderRetrievalDriver =
leaderRetrievalDriverFactory.createLeaderRetrievalDriver(
this, new LeaderRetrievalFatalErrorHandler());
running = true;
}
}
|
通过ZooKeeperLeaderRetrievalDriver
和ZK进行交互,主要是做地址变化监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public ZooKeeperLeaderRetrievalDriver(
CuratorFramework client, String path,
LeaderRetrievalEventHandler leaderRetrievalEventHandler,
LeaderInformationClearancePolicy leaderInformationClearancePolicy,
FatalErrorHandler fatalErrorHandler) throws Exception {
// 针对/connection_info地址变化监听
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);
cache.start();
running = true;
}
|
3. 交互框图