Imagine you are in a group project, the task is pretty complex :p and everyone is looking at each other, wondering who is going to take the lead? Does this situation sounds similar to you? I remember we used to play some sort of game like “Rock, Paper, Scissor” or “Eeny, meeny, miny moe” (I never did this but my sister). Similarly, distributed systems have their own ways of picking a leader, and it’s just as cool! The difference is instead of chanting rhymes or showing hand signs, they use something called a leader election algorithm.
Choosing a leader is important. In class we used to have class monitor (nobody listened to them but that’s a different discussion), in sports we have a team captain and in work place we have team leads. The role of a leader is not to order around or command people to do their task rather managing and orchestrating different workflows so that teams work smoothly. And this is where I would like to introduce our distributed friend ZooKeeper. This is a pretty reliable tool that will help us manage the distributed systems with ease. BTW, ZooKeeper is at the heart of Kafka as well.
Let’s dive straight to leader election algorithm and see what it does.
Table of Contents
How Leader Election Works?
In distributed system, multiple nodes (servers) work together to achieve a common goal. It sounds cool but its not very easy when it comes to distributed communication. To maintain the balance, and smooth communication between different nodes, one node is chosen as a leader. The leader makes critical decisions and coordinates tasks among the other nodes. The process of choosing a leader is known as Leader Election. It is important because it helps maintain order and reliability in the system.
Why do we need an algorithm to elect a leader? Can’t we choose one manually?
Imagine a situation where there are 5 nodes. Let’s call them Tom, Dick, Harry, John and Kamlesh. And you chose, let’s say, Tom as leader.
Everything will be good until the day where Tom is on a leave. This situation is similar to the one we call “Partition” in distributed system. What if the server name Tom crashed. That would disrupt the entire system.
In order to resolve the above situation leader election is done in an automated way.
But if you think closely, this is where our first problem originates. How would other servers come to know that Tom is not present today. Or Tom is unavailable. Let’s tackle this problem in the next section.
How will each node (server) know that the leader server has crashed?
The first step into organizing a leader election is to make sure that every other node is aware that leader has crashed and is unavailable. Take a pause and think for yourself, how would you let other nodes self identify that the leader is missing.
The concept of heartbeat comes to mind.
Heartbeat is another very important and widely used way to identify if some node is available or not. It can be used in various ways.
In this case, specifically in my implementation, I have used ZooKeeper for handling this task. ZooKeeper will inform which node has failed. This means every new node must register themselves to ZooKeeper first.
Heartbeat Process
The process is simple. ZooKeeper pings every node that is registered with it at “specific duration”. This duration is configurable depending on your setup. And if it doesn’t receive the response back, it triggers the NODE_DELETED
event to other nodes. After receiving the event, other nodes goes through a re-election and choses a new leader node. This way the service works reliably.
Now, you may ask “What if ZooKeeper fails?”, well then my service will definitely be offline. But just to be sure, ZooKeeper is not just a single server, it runs in a cluster of servers. It has already applied all the distributed fail safe mechanism to make sure it’s reliable. But hey, what if the tornado or some other calamity wiped out the entire cluster. Well then, that’s the end. We can’t fight the nature 😀
Now, I think we have come to a point where we know what ZooKeeper is. We might have to take a small detour to understand little bit about ZooKeeper – as to how do nodes register themselves with ZooKeeper. What goes internally.
ZooKeeper Design for Synchronization
ZooKeeper’s internal architecture is designed to handle coordination and synchronization efficiently. The main concept that you need to understand is ZooKeeper’s ZNodes.
There are mainly three types of ZNodes in ZooKeeper:
- Ephemeral Node
- Persistence Node
- Sequential Node
Going back to our example, let’s say, a new server is introduced in the cluster, the first thing that server is designed to do is to call the ZooKeeper and register itself. When we say register, we mean make an ephemeral node in the ZooKeeper.
What the heck is an Ephemeral Node?
Ephemeral Nodes are temporary nodes that are created in ZooKeeper when the server creates the session. The property of the Emphemeral node is that they last as long as the session is alive. In other words, as long as the server is alive, an ephemeral node is alive. If the session ends (server crashes), the ephemeral node is automatically deleted.
Sequential Node
Another thing that will come very handy are the Sequential Nodes.
Sequential Node as the name suggest adds a monotonically increasing counter to the node name to ensure that each node has a unique identifier. So imagine if 10 servers all called ZooKeeper at the same time to register themselves. Then ZooKeeper Sequential Node will assign a monotonically increasing sequence number to each node thus ensuring uniqueness. This is all taken care of by the ZooKeeper (including race conditions if any).
Watchers
This is the last concept that we need to know for now, I promise.
ZooKeeper allows nodes to set the watchers on various change events. If an event occurs (like a node deletion when server crashes), ZooKeeper notifies the interested nodes who are watching for that event.
I will give a quick sneak peak to the code so you know what I mean by watchers. Here, I’ve registered as a Watcher in ZooKeeper, so now my server will get all change events and I can choose to act on it. In this code, I perform a re-election when the node is deleted in order to choose a new leader (if failed server was a leader).
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None -> {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("Successfully Connected to zookeeper!");
} else {
synchronized (zooKeeper) {
System.out.println("Disconnected event from zookeeper received!");
zooKeeper.notifyAll();
}
}
}
case NodeDeleted -> {
try {
reelectLeader();
} catch (InterruptedException | KeeperException ignored) {
}
}
}
}
Whenever the server disconnects (session ends), the ZooKeeper notifies all the connected nodes of the NodeDeleted
event, so that each node can do a reelection or decide what needs to be done.
There are many more events but for Leader Election demo we will only use the NodeDeleted
event.
Great! thanks for staying with me so far. Now, we let’s look at the meaty part of Leader Election Algorithm.
Leader Election Algorithm
Let’s dive into leader election algorithm in distributed systems using ZooKeeper.
Connecting to ZooKeeper
First thing first, each node (or server) needs to connect to ZooKeeper. Think of ZooKeeper as the central hub where all the action happens. Once a node connects, it’s ready to participate in the leader election process.
Once again when I say connected, I mean an ephemeral node is created in the ZooKeeper for the current server session.
Volunteering for Leadership
Every node that wants to be considered for leadership (that’s every node in a cluster) creates a special kind of node in ZooKeeper called an ephemeral sequential node. This is like putting your name on a list with a unique number attached to it, which ZooKeeper handles for us. If you recall from the previous section, sequential node assigns monotonically increasing number to each node that request to join. In our case each of the node will be assigned a unique number.
Here’s how I’ll be doing it in the demo implementation. The thing to look for is: CreateMode.EPHEMERAL_SEQUENTIAL
.
public void volunteerForLeadership() throws InterruptedException, KeeperException {
String znodePreifx = ELECTION_NAMESPACE + "/c_";
String znodeFullPath = this.zooKeeper.create(znodePreifx, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("ZNode Name: " + znodeFullPath);
this.currentZnodeName = znodeFullPath.replace(ELECTION_NAMESPACE + "/", "");
}
Announcing the Leader
Since each node has been assigned a sequence number the simplest way to assign a leader is to the node with the smallest number. But the main challenge is how would each node know which server has the smallest number. Because at the end we want all nodes to know the leader.
The way I did it is simple –
the node with the smallest number checks if it’s the leader. If it is, it announces, “I’m the leader!” to the rest of the system. If a node is not the leader, it acknowledges who the leader is by recognizing the node with the smallest number.
There is the API in ZooKeeper that you can call to get all the children nodes under a given namespace. So, at the time of volunteering, each node creates an entry under a specific election namespace in ZooKeeper. Each node can ask for the list of children under that namespace. After getting the list of all nodes, we sort them by their unique sequence number then we know for sure the first one is the smallest.
Now we go over each node and check if this node is the smallest one, and if it is, it will announce itself as Leader or else Not a Leader!
Here’s the code for the same.
(Fun: There is a small race condition in the code that I fixed, can you identify which line would that be)
public void reelectLeader() throws InterruptedException, KeeperException {
List<String> children = this.zooKeeper.getChildren(ELECTION_NAMESPACE, this);
Collections.sort(children);
String smallestZnode = children.get(0);
Stat predecessorStat = null;
while (predecessorStat == null) {
if (smallestZnode.equals(this.currentZnodeName)) {
System.out.println("I'm the leader!");
return;
} else {
System.out.println("I'm not the leader!");
int predecessorIndex = Collections.binarySearch(children, this.currentZnodeName) - 1;
predecessorZnode = children.get(predecessorIndex);
predecessorStat = this.zooKeeper.exists(ELECTION_NAMESPACE + "/" + predecessorZnode, this);
}
}
System.out.println("Watching znode: " + predecessorZnode);
System.out.println();
}
In the real world application, you would want to perform some kind of logic when the leader is elected and not just announce like I’m doing here 😀
I guess that’s understood.
Watching for Changes
ZooKeeper watches over these nodes. If the leader node disconnects or fails (remember, it’s an ephemeral node), ZooKeeper will notice that the node is gone and notify the other nodes. This is like a watchdog ensuring that the leadership is always up to date.
If the current leader goes offline, the remaining nodes look at the sorted list again. The next smallest number in the list becomes the new leader. This way, there’s always a leader to keep things running smoothly.
You don’t believe me then watch this video:
I hope you enjoyed this article. I tried to simplify it as much as I can. Let me know your thoughts in the comments below.