高可用主要是为了解决分布式系统中的单点失败问题,同为分布式系统的flink也同样需要处理这个问题。

在flink中,其高可用是通过HighAvailabilityService接口来实现的,下面代码是其接口定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface HighAvailabilityServices
        extends ClientHighAvailabilityServices, GloballyCleanableResource {
    // resource manager
    LeaderElectionService getResourceManagerLeaderElectionService();
    LeaderRetrievalService getResourceManagerLeaderRetriever();

    // job manager
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
}

HighAvailabilityService接口类可以看到,其主要是围绕着LeaderElectionServiceLeaderRetrievalService这两个类来进行的,其中

  • LeaderElectionService:主要是进行某组件的Leader选举

  • LeaderRetrievalService:主要是监听某组件Leader地址变化

Heeagteata

Leader选举和地址监控

1. LeaderElectionService

LeaderElectionService提供了某个组件参加Leader选举的功能,比如flink中的ResourceManager、JobManager等组件均会使用到该接口。

1
2
3
4
5
6
7
8
9
public interface LeaderElectionService {
    void start(LeaderContender contender) throws Exception;

    void stop() throws Exception;

    void confirmLeadership(UUID leaderSessionID, String leaderAddress);

    boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

在使用LeaderElectionService时,首先会调用LeaderElectionService.start(LeaderContender)方法,LeaderContender可以理解为回调函数,当Leader选举成功或者失去Leader时,会调用其相应的函数

1
2
3
4
5
6
7
8
public interface LeaderContender {

    void grantLeadership(UUID leaderSessionID);

    void revokeLeadership();

    void handleError(Exception exception);
}
  • LeaderElectionService Leader选主成功时,会调用LeaderContender的grandLeadership函数

  • 当某组件不再是Leader时,会调用LeaderContender的revokeLeadership函数

2. LeaderRetrievalService

LeaderRetrievalService提供了监听某组件Leader地址变化的功能,其接口定义如下

1
2
3
4
5
public interface LeaderRetrievalService {
    void start(LeaderRetrievalListener listener) throws Exception;

    void stop() throws Exception;
}

在使用LeaderRetrievalService时,首先会调用LeaderRetrievalService.start(LeaderRetrievalListener)方法,LeaderRetrievalListener即为Leader地址发生变化之后的回调类。

1
2
3
4
5
6
public interface LeaderRetrievalListener {

    void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);

    void handleError(Exception exception);
}
  • 当需要监听的某组件Leader地址发生变化时,会调用LeaderRetrievalListener的notifyLeaderAddress函数

Zookeeper HA

这里以Zookeeper模式来分析Flink的HighAvailability逻辑。在flink中,比如ResourceManager、Dispatcher、JobManager等组件其实是运行在同一个进程模块中的,而这些组件本身都会使用到HA功能,比如Leader选举等。

如果要实现HA能力,可以为每一个组件创建HighAvailabilityService,底层去调用Zookeeper的LeaderLatch等选主逻辑;也可以为该进程中的所有组件创建同一个HighAvailabilityService,所有的组件共用底层同一套Zookeeper框架逻辑。

Flink中采用第二种方式,该方式是通过代码中的ZooKeeperMultipleComponentLeaderElectionHaServices类来实现的,其LeaderElectionServiceLeaderRetrievalService的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

 /* /flink
 *      +/cluster_id_1/leader/latch
 *      |            |       /resource_manager/connection_info
 *      |            |       /dispatcher/connection_info
 *      |            |       /rest_server/connection_info
 *      |            |       /job-id-1/connection_info
 *      |            |       /job-id-2/connection_info
 *      |            |
 *      |            |
 *      |            +jobgraphs/job-id-1
 *      |            |         /job-id-2
 *      |            +jobs/job-id-1/checkpoints/latest
 *      |                 |                    /latest-1
 *      |                 |                    /latest-2
 *      |                 |        /checkpoint_id_counter
 *
 */
 public class ZooKeeperMultipleComponentLeaderElectionHaServices extends AbstractZooKeeperHaServices {
    @Override
    protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) {
        return ZooKeeperUtils.createLeaderRetrievalService(
                leaderNamespacedCuratorFramework, leaderPath, configuration);
    }
    
    @Override
    protected LeaderElectionService createLeaderElectionService(String leaderName) {
        return new DefaultLeaderElectionService(
                getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName));
    }
}

1. LeaderElectionService

在为某个组件(leaderName)创建LeaderElectionService时,ZooKeeperMultipleComponentLeaderElectionHaServices内部新建了一个所有组件共享的DefaultMultipleComponentLeaderElectionService作为中间层proxy,该proxy内部维护了每一个组件及其对应的LeaderElectionEventHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private MultipleComponentLeaderElectionService getOrInitializeSingleLeaderElectionService() {
    synchronized (lock) {
        if (multipleComponentLeaderElectionService == null) {
            try {
                multipleComponentLeaderElectionService =
                        new DefaultMultipleComponentLeaderElectionService(
                                fatalErrorHandler,
                                new ZooKeeperMultipleComponentLeaderElectionDriverFactory(
                                        leaderNamespacedCuratorFramework));
            } catch (Exception e) {
                throw new FlinkRuntimeException(
                        String.format(
                                "Could not initialize the %s",
                                DefaultMultipleComponentLeaderElectionService.class
                                        .getSimpleName()),
                        e);
            }
        }

        return multipleComponentLeaderElectionService;
    }
}

(1). start(LeaderContender)

上面说到,在使用LeaderElectionService时,会首先调用LeaderElectionService.start(LeaderContender)方法,而使用这种多组件共享的场景时,其start方法底层则是简单的进行组件及其LeaderElectionEventHandler的注册,其调用链如下

  • DefaultLeaderElectionService.start(LeaderContender)

  • leaderElectionDriverFactory.createLeaderElectionDriver新建MultipleComponentLeaderElectionDriverAdapter

MultipleComponentLeaderElectionDriverAdapter的构造函数中,可以看到其注册过程

1
2
3
4
5
6
7
8
9
MultipleComponentLeaderElectionDriverAdapter(
        String componentId,
        MultipleComponentLeaderElectionService multipleComponentLeaderElectionService,
        LeaderElectionEventHandler leaderElectionEventHandler) {
    
    // 注册component和LeaderElectionEventHandler(即DefaultLeaderElectionService) 的映射关系
    multipleComponentLeaderElectionService.registerLeaderElectionEventHandler(
            this.componentId, leaderElectionEventHandler);
}

上面的LeaderElectionEventHandler其实就是DefaultLeaderElectionService,其实现了LeaderElectionEventHandler接口。

(2). Zookeeper交互

DefaultMultipleComponentLeaderElectionService内部会对应一个ZooKeeperMultipleComponentLeaderElectionDriver类来进行与Zookeeper的交互,主要做了两方面事情

  • LeaderLatch: Leader选举,当Leader选举成功后,会进行DefaultMultipleComponentLeaderElectionService.isLeader回调

  • TreeCache:节点变化监听,当节点变化时,会进行DefaultMultipleComponentLeaderElectionService.notifyLeaderInformationChange回调

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public ZooKeeperMultipleComponentLeaderElectionDriver(
        CuratorFramework curatorFramework,
        MultipleComponentLeaderElectionDriver.Listener leaderElectionListener)
        throws Exception {
    this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
    this.treeCache =
            TreeCache.newBuilder(curatorFramework, "/")
                    .setCacheData(true)
                    .setCreateParentNodes(false)
                    .setSelector(
                            new ZooKeeperMultipleComponentLeaderElectionDriver
                                    .ConnectionInfoNodeSelector())
                    .setExecutor(Executors.newDirectExecutorService())
                    .build();
    treeCache
            .getListenable()
            .addListener(
                    (client, event) -> {
                        switch (event.getType()) {
                            case NODE_ADDED:
                            case NODE_UPDATED:
                                Preconditions.checkNotNull(
                                        event.getData(),
                                        "The ZooKeeper event data must not be null.");
                                handleChangedLeaderInformation(event.getData());
                                break;
                            case NODE_REMOVED:
                                Preconditions.checkNotNull(
                                        event.getData(),
                                        "The ZooKeeper event data must not be null.");
                                handleRemovedLeaderInformation(event.getData().getPath());
                                break;
                        }
                    });

    leaderLatch.addListener(this);
    curatorFramework.getConnectionStateListenable().addListener(listener);
    leaderLatch.start();
    treeCache.start();
}

(3). Zookeeper回调

可以看下DefaultMultipleComponentLeaderElectionService的类实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class DefaultMultipleComponentLeaderElectionService
        implements MultipleComponentLeaderElectionService, MultipleComponentLeaderElectionDriver.Listener {
    private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver;
    
    // LeaderElectionEventHandler这里其实就是createLeaderElectionService所返回的DefaultLeaderElectionService,
    // 它实现了`LeaderElectionEventHandler`的接口。
    private final Map<String, LeaderElectionEventHandler> leaderElectionEventHandlers;

    DefaultMultipleComponentLeaderElectionService(
            FatalErrorHandler fatalErrorHandler,
            MultipleComponentLeaderElectionDriverFactory multipleComponentLeaderElectionDriverFactory,
            ExecutorService leadershipOperationExecutor)
            throws Exception {
        // 创建ZooKeeperMultipleComponentLeaderElectionDriver用于和Zookeeper交互
        // 1. LeaderLatch: Leader选主
        // 2. TreeCache: 节点变化监控
        multipleComponentLeaderElectionDriver =
                multipleComponentLeaderElectionDriverFactory.create(this);
    }
    
    public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
        // 将/component_info回写到Zookeeper,用于LeaderRetrievalService做Leader Address监控
        multipleComponentLeaderElectionDriver.publishLeaderInformation(
                componentId, leaderInformation);
        //...
    }

    public void registerLeaderElectionEventHandler(
            String componentId, LeaderElectionEventHandler leaderElectionEventHandler) {

        // 组件及其回调映射
        leaderElectionEventHandlers.put(componentId, leaderElectionEventHandler);
        // ...
    }
    
    // 1. 选主成功回调
    public void isLeader() {
        // 由于多组件同进程,如果选主成功,则表示所有组件均选主成功
        final UUID newLeaderSessionId = UUID.randomUUID();
        synchronized (lock) {
            currentLeaderSessionId = UUID.randomUUID();

            forEachLeaderElectionEventHandler(
                    leaderElectionEventHandler ->
                            leaderElectionEventHandler.onGrantLeadership(newLeaderSessionId));
        }
    }

    // 2. 组件节点变化回调
    public void notifyLeaderInformationChange(
            String componentId, LeaderInformation leaderInformation) {
        synchronized (lock) {
            final LeaderElectionEventHandler leaderElectionEventHandler =
                    leaderElectionEventHandlers.get(componentId);

            if (leaderElectionEventHandler != null) {
                sendLeaderInformationChange(leaderElectionEventHandler, leaderInformation);
            }
        }
    }

    private void sendLeaderInformationChange(
            LeaderElectionEventHandler leaderElectionEventHandler,
            LeaderInformation leaderInformation) {
        leadershipOperationExecutor.execute(
                () -> leaderElectionEventHandler.onLeaderInformationChange(leaderInformation));
    }
}

2. LeaderRetrievalService

相对来说,LeaderRetrievalService的ZK实现就相对简单很多,整体逻辑和LeaderElectionService的ZK实现类似,但没有DefaultMultipleComponentLeaderElectionService中间层,直接就是创建了ZooKeeperLeaderRetrievalDriver和ZK交互,并进行Event回调。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public void start(LeaderRetrievalListener listener) throws Exception {

    synchronized (lock) {
        leaderListener = listener;
        leaderRetrievalDriver =
                leaderRetrievalDriverFactory.createLeaderRetrievalDriver(
                        this, new LeaderRetrievalFatalErrorHandler());
        running = true;
    }
}

通过ZooKeeperLeaderRetrievalDriver和ZK进行交互,主要是做地址变化监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public ZooKeeperLeaderRetrievalDriver(
        CuratorFramework client,  String path,
        LeaderRetrievalEventHandler leaderRetrievalEventHandler,
        LeaderInformationClearancePolicy leaderInformationClearancePolicy,
        FatalErrorHandler fatalErrorHandler) throws Exception {
    // 针对/connection_info地址变化监听
    this.cache =
            ZooKeeperUtils.createTreeCache(
                    client,
                    connectionInformationPath,
                    this::retrieveLeaderInformationFromZooKeeper);

    cache.start();

    running = true;
}

xxxx

3. 交互框图