Building a Consistent Hashing Ring¶
Authored by Greg Holt¶
This is a compilation of five posts I made earlier discussing how to build a consistent hashing ring. The posts seemed to be accessed quite frequently, so I’ve gathered them all here on one page for easier reading.
Part 1¶
“Consistent Hashing” is a term used to describe a process where data is distributed using a hashing algorithm to determine its location. Using only the hash of the id of the data you can determine exactly where that data should be. This mapping of hashes to locations is usually termed a “ring”.
Probably the simplest hash is just a modulus of the id. For instance, if all ids are numbers and you have two machines you wish to distribute data to, you could just put all odd numbered ids on one machine and even numbered ids on the other. Assuming you have a balanced number of odd and even numbered ids, and a balanced data size per id, your data would be balanced between the two machines.
Since data ids are often textual names and not numbers, like paths for files or URLs, it makes sense to use a “real” hashing algorithm to convert the names to numbers first. Using MD5 for instance, the hash of the name ‘mom.png’ is ‘4559a12e3e8da7c2186250c2f292e3af’ and the hash of ‘dad.png’ is ‘096edcc4107e9e18d6a03a43b3853bea’. Now, using the modulus, we can place ‘mom.jpg’ on the odd machine and ‘dad.png’ on the even one. Another benefit of using a hashing algorithm like MD5 is that the resulting hashes have a known even distribution, meaning your ids will be evenly distributed without worrying about keeping the id values themselves evenly distributed.
Here is a simple example of this in action:
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
DATA_ID_COUNT = 10000000
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
# This just pulls part of the hash out as an integer
hsh = unpack_from('>I', md5(data_id).digest())[0]
node_id = hsh % NODE_COUNT
node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
100000: Desired data ids per node
100695: Most data ids on one node, 0.69% over
99073: Least data ids on one node, 0.93% under
So that’s not bad at all; less than a percent over/under for distribution per node. In the next part of this series we’ll examine where modulus distribution causes problems and how to improve our ring to overcome them.
Part 2¶
In Part 1 of this series, we did a simple test of using the modulus of a hash to locate data. We saw very good distribution, but that’s only part of the story. Distributed systems not only need to distribute load, but they often also need to grow as more and more data is placed in it.
So let’s imagine we have a 100 node system up and running using our previous algorithm, but it’s starting to get full so we want to add another node. When we add that 101st node to our algorithm we notice that many ids now map to different nodes than they previously did. We’re going to have to shuffle a ton of data around our system to get it all into place again.
Let’s examine what’s happened on a much smaller scale: just 2 nodes again, node 0 gets even ids and node 1 gets odd ids. So data id 100 would map to node 0, data id 101 to node 1, data id 102 to node 0, etc. This is simply node = id % 2. Now we add a third node (node 2) for more space, so we want node = id % 3. So now data id 100 maps to node id 1, data id 101 to node 2, and data id 102 to node 0. So we have to move data for 2 of our 3 ids so they can be found again.
Let’s examine this at a larger scale:
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
node_id = hsh % NODE_COUNT
new_node_id = hsh % NEW_NODE_COUNT
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
9900989 ids moved, 99.01%
Wow, that’s severe. We’d have to shuffle around 99% of our data just to increase our capacity 1%! We need a new algorithm that combats this behavior.
This is where the “ring” really comes in. We can assign ranges of hashes directly to nodes and then use an algorithm that minimizes the changes to those ranges. Back to our small scale, let’s say our ids range from 0 to 999. We have two nodes and we’ll assign data ids 0–499 to node 0 and 500–999 to node 1. Later, when we add node 2, we can take half the data ids from node 0 and half from node 1, minimizing the amount of data that needs to move.
Let’s examine this at a larger scale:
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000
node_range_starts = []
for node_id in xrange(NODE_COUNT):
node_range_starts.append(DATA_ID_COUNT /
NODE_COUNT * node_id)
new_node_range_starts = []
for new_node_id in xrange(NEW_NODE_COUNT):
new_node_range_starts.append(DATA_ID_COUNT /
NEW_NODE_COUNT * new_node_id)
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
node_id = bisect_left(node_range_starts,
hsh % DATA_ID_COUNT) % NODE_COUNT
new_node_id = bisect_left(new_node_range_starts,
hsh % DATA_ID_COUNT) % NEW_NODE_COUNT
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
4901707 ids moved, 49.02%
Okay, that is better. But still, moving 50% of our data to add 1% capacity is not very good. If we examine what happened more closely we’ll see what is an “accordion effect”. We shrunk node 0’s range a bit to give to the new node, but that shifted all the other node’s ranges by the same amount.
We can minimize the change to a node’s assigned range by assigning several smaller ranges instead of the single broad range we were before. This can be done by creating “virtual nodes” for each node. So 100 nodes might have 1000 virtual nodes. Let’s examine how that might work.
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000
vnode_range_starts = []
vnode2node = []
for vnode_id in xrange(VNODE_COUNT):
vnode_range_starts.append(DATA_ID_COUNT /
VNODE_COUNT * vnode_id)
vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
NEW_NODE_COUNT = NODE_COUNT + 1
vnodes_to_reassign = VNODE_COUNT / NEW_NODE_COUNT
while vnodes_to_reassign > 0:
for node_to_take_from in xrange(NODE_COUNT):
for vnode_id, node_id in enumerate(new_vnode2node):
if node_id == node_to_take_from:
new_vnode2node[vnode_id] = new_node_id
vnodes_to_reassign -= 1
break
if vnodes_to_reassign <= 0:
break
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
vnode_id = bisect_left(vnode_range_starts,
hsh % DATA_ID_COUNT) % VNODE_COUNT
node_id = vnode2node[vnode_id]
new_node_id = new_vnode2node[vnode_id]
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
90423 ids moved, 0.90%
There we go, we added 1% capacity and only moved 0.9% of existing data. The vnode_range_starts list seems a bit out of place though. Its values are calculated and never change for the lifetime of the cluster, so let’s optimize that out.
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000
vnode2node = []
for vnode_id in xrange(VNODE_COUNT):
vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
vnodes_to_reassign = VNODE_COUNT / (NODE_COUNT + 1)
while vnodes_to_reassign > 0:
for node_to_take_from in xrange(NODE_COUNT):
for vnode_id, node_id in enumerate(vnode2node):
if node_id == node_to_take_from:
vnode2node[vnode_id] = new_node_id
vnodes_to_reassign -= 1
break
if vnodes_to_reassign <= 0:
break
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
vnode_id = hsh % VNODE_COUNT
node_id = vnode2node[vnode_id]
new_node_id = new_vnode2node[vnode_id]
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
89841 ids moved, 0.90%
There we go. In the next part of this series, will further examine the algorithm’s limitations and how to improve on it.
Part 3¶
In Part 2 of this series, we reached an algorithm that performed well even when adding new nodes to the cluster. We used 1000 virtual nodes that could be independently assigned to nodes, allowing us to minimize the amount of data moved when a node was added.
The number of virtual nodes puts a cap on how many real nodes you can have. For example, if you have 1000 virtual nodes and you try to add a 1001st real node, you can’t assign a virtual node to it without leaving another real node with no assignment, leaving you with just 1000 active real nodes still.
Unfortunately, the number of virtual nodes created at the beginning can never change for the life of the cluster without a lot of careful work. For example, you could double the virtual node count by splitting each existing virtual node in half and assigning both halves to the same real node. However, if the real node uses the virtual node’s id to optimally store the data (for example, all data might be stored in /[virtual node id]/[data id]) it would have to move data around locally to reflect the change. And it would have to resolve data using both the new and old locations while the moves were taking place, making atomic operations difficult or impossible.
Let’s continue with this assumption that changing the virtual node count is more work than it’s worth, but keep in mind that some applications might be fine with this.
The easiest way to deal with this limitation is to make the limit high enough that it won’t matter. For instance, if we decide our cluster will never exceed 60,000 real nodes, we can just make 60,000 virtual nodes.
Also, we should include in our calculations the relative size of our nodes. For instance, a year from now we might have real nodes that can handle twice the capacity of our current nodes. So we’d want to assign twice the virtual nodes to those future nodes, so maybe we should raise our virtual node estimate to 120,000.
A good rule to follow might be to calculate 100 virtual nodes to each real node at maximum capacity. This would allow you to alter the load on any given node by 1%, even at max capacity, which is pretty fine tuning. So now we’re at 6,000,000 virtual nodes for a max capacity cluster of 60,000 real nodes.
6 million virtual nodes seems like a lot, and it might seem like we’d use up way too much memory. But the only structure this affects is the virtual node to real node mapping. The base amount of memory required would be 6 million times 2 bytes (to store a real node id from 0 to 65,535). 12 megabytes of memory just isn’t that much to use these days.
Even with all the overhead of flexible data types, things aren’t that bad. I changed the code from the previous part in this series to have 60,000 real and 6,000,000 virtual nodes, changed the list to an array(‘H’), and python topped out at 27m of resident memory – and that includes two rings.
To change terminology a bit, we’re going to start calling these virtual nodes “partitions”. This will make it a bit easier to discern between the two types of nodes we’ve been talking about so far. Also, it makes sense to talk about partitions as they are really just unchanging sections of the hash space.
We’re also going to always keep the partition count a power of two. This makes it easy to just use bit manipulation on the hash to determine the partition rather than modulus. It isn’t much faster, but it is a little. So, here’s our updated ring code, using 8,388,608 (2 ** 23) partitions and 65,536 nodes. We’ve upped the sample data id set and checked the distribution to make sure we haven’t broken anything.
from array import array
from hashlib import md5
from struct import unpack_from
PARTITION_POWER = 23
PARTITION_SHIFT = 32 - PARTITION_POWER
NODE_COUNT = 65536
DATA_ID_COUNT = 100000000
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_id = part2node[part]
node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
1525: Desired data ids per node
1683: Most data ids on one node, 10.36% over
1360: Least data ids on one node, 10.82% under
Hmm. +–10% seems a bit high, but I reran with 65,536 partitions and 256 nodes and got +–0.4% so it’s just that our sample size (100m) is too small for our number of partitions (8m). It’ll take way too long to run experiments with an even larger sample size, so let’s reduce back down to these lesser numbers. (To be certain, I reran at the full version with a 10 billion data id sample set and got +–1%, but it took 6.5 hours to run.)
In the next part of this series, we’ll talk about how to increase the durability of our data in the cluster.
Part 4¶
In Part 3 of this series, we just further discussed partitions (virtual nodes) and cleaned up our code a bit based on that. Now, let’s talk about how to increase the durability and availability of our data in the cluster.
For many distributed data stores, durability is quite important. Either RAID arrays or individually distinct copies of data are required. While RAID will increase the durability, it does nothing to increase the availability – if the RAID machine crashes, the data may be safe but inaccessible until repairs are done. If we keep distinct copies of the data on different machines and a machine crashes, the other copies will still be available while we repair the broken machine.
An easy way to gain this multiple copy durability/availability is to just use multiple rings and groups of nodes. For instance, to achieve the industry standard of three copies, you’d split the nodes into three groups and each group would have its own ring and each would receive a copy of each data item. This can work well enough, but has the drawback that expanding capacity requires adding three nodes at a time and that losing one node essentially lowers capacity by three times that node’s capacity.
Instead, let’s use a different, but common, approach of meeting our requirements with a single ring. This can be done by walking the ring from the starting point and looking for additional distinct nodes. Here’s code that supports a variable number of replicas (set to 3 for testing):
from array import array
from hashlib import md5
from struct import unpack_from
REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
DATA_ID_COUNT = 10000000
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_ids = [part2node[part]]
node_counts[node_ids[0]] += 1
for replica in xrange(1, REPLICAS):
while part2node[part] in node_ids:
part += 1
if part > PARTITION_MAX:
part = 0
node_ids.append(part2node[part])
node_counts[node_ids[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
117186: Desired data ids per node
118133: Most data ids on one node, 0.81% over
116093: Least data ids on one node, 0.93% under
That’s pretty good; less than 1% over/under. While this works well, there are a couple of problems.
First, because of how we’ve initially assigned the partitions to nodes, all the partitions for a given node have their extra copies on the same other two nodes. The problem here is that when a machine fails, the load on these other nodes will jump by that amount. It’d be better if we initially shuffled the partition assignment to distribute the failover load better.
The other problem is a bit harder to explain, but deals with physical separation of machines. Imagine you can only put 16 machines in a rack in your datacenter. The 256 nodes we’ve been using would fill 16 racks. With our current code, if a rack goes out (power problem, network issue, etc.) there is a good chance some data will have all three copies in that rack, becoming inaccessible. We can fix this shortcoming by adding the concept of zones to our nodes, and then ensuring that replicas are stored in distinct zones.
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
node2zone = []
while len(node2zone) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
node2zone.append(zone)
zone += 1
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
shuffle(part2node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_ids = [part2node[part]]
zones = [node2zone[node_ids[0]]]
node_counts[node_ids[0]] += 1
zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
while part2node[part] in node_ids and \
node2zone[part2node[part]] in zones:
part += 1
if part > PARTITION_MAX:
part = 0
node_ids.append(part2node[part])
zones.append(node2zone[node_ids[-1]])
node_counts[node_ids[-1]] += 1
zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
(max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
(min_count, under)
117186: Desired data ids per node
118782: Most data ids on one node, 1.36% over
115632: Least data ids on one node, 1.33% under
1875000: Desired data ids per zone
1878533: Most data ids in one zone, 0.19% over
1869070: Least data ids in one zone, 0.32% under
So the shuffle and zone distinctions affected our distribution some, but still definitely good enough. This test took about 64 seconds to run on my machine.
There’s a completely alternate, and quite common, way of accomplishing these same requirements. This alternate method doesn’t use partitions at all, but instead just assigns anchors to the nodes within the hash space. Finding the first node for a given hash just involves walking this anchor ring for the next node, and finding additional nodes works similarly as before. To attain the equivalent of our virtual nodes, each real node is assigned multiple anchors.
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
VNODE_COUNT = 100
node2zone = []
while len(node2zone) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
node2zone.append(zone)
zone += 1
hash2index = []
index2node = []
for node in xrange(NODE_COUNT):
for vnode in xrange(VNODE_COUNT):
hsh = unpack_from('>I', md5(str(node)).digest())[0]
index = bisect_left(hash2index, hsh)
if index > len(hash2index):
index = 0
hash2index.insert(index, hsh)
index2node.insert(index, node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
index = bisect_left(hash2index, hsh)
if index >= len(hash2index):
index = 0
node_ids = [index2node[index]]
zones = [node2zone[node_ids[0]]]
node_counts[node_ids[0]] += 1
zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
while index2node[index] in node_ids and \
node2zone[index2node[index]] in zones:
index += 1
if index >= len(hash2index):
index = 0
node_ids.append(index2node[index])
zones.append(node2zone[node_ids[-1]])
node_counts[node_ids[-1]] += 1
zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
(max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
(min_count, under)
117186: Desired data ids per node
351282: Most data ids on one node, 199.76% over
15965: Least data ids on one node, 86.38% under
1875000: Desired data ids per zone
2248496: Most data ids in one zone, 19.92% over
1378013: Least data ids in one zone, 26.51% under
This test took over 15 minutes to run! Unfortunately, this method also gives much less control over the distribution. To get better distribution, you have to add more virtual nodes, which eats up more memory and takes even more time to build the ring and perform distinct node lookups. The most common operation, data id lookup, can be improved (by predetermining each virtual nodes’ failover nodes, for instance) but it starts off so far behind our first approach that we’ll just stick with that.
In the next part of this series, we’ll start to wrap all this up into a useful Python module.
Part 5¶
In Part 4 of this series, we ended up with a multiple copy, distinctly zoned ring. Or at least the start of it. In this final part we’ll package the code up into a useable Python module and then add one last feature. First, let’s separate the ring itself from the building of the data for the ring and its testing.
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time
class Ring(object):
def __init__(self, nodes, part2node, replicas):
self.nodes = nodes
self.part2node = part2node
self.replicas = replicas
partition_power = 1
while 2 ** partition_power < len(part2node):
partition_power += 1
if len(part2node) != 2 ** partition_power:
raise Exception("part2node's length is not an "
"exact power of 2")
self.partition_shift = 32 - partition_power
def get_nodes(self, data_id):
data_id = str(data_id)
part = unpack_from('>I',
md5(data_id).digest())[0] >> self.partition_shift
node_ids = [self.part2node[part]]
zones = [self.nodes[node_ids[0]]]
for replica in xrange(1, self.replicas):
while self.part2node[part] in node_ids and \
self.nodes[self.part2node[part]] in zones:
part += 1
if part >= len(self.part2node):
part = 0
node_ids.append(self.part2node[part])
zones.append(self.nodes[node_ids[-1]])
return [self.nodes[n] for n in node_ids]
def build_ring(nodes, partition_power, replicas):
begin = time()
part2node = array('H')
for part in xrange(2 ** partition_power):
part2node.append(part % len(nodes))
shuffle(part2node)
ring = Ring(nodes, part2node, replicas)
print '%.02fs to build ring' % (time() - begin)
return ring
def test_ring(ring):
begin = time()
DATA_ID_COUNT = 10000000
node_counts = {}
zone_counts = {}
for data_id in xrange(DATA_ID_COUNT):
for node in ring.get_nodes(data_id):
node_counts[node['id']] = \
node_counts.get(node['id'], 0) + 1
zone_counts[node['zone']] = \
zone_counts.get(node['zone'], 0) + 1
print '%ds to test ring' % (time() - begin)
desired_count = \
DATA_ID_COUNT / len(ring.nodes) * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts.itervalues())
over = \
100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts.itervalues())
under = \
100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
zone_count = \
len(set(n['zone'] for n in ring.nodes.itervalues()))
desired_count = \
DATA_ID_COUNT / zone_count * ring.replicas
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts.itervalues())
over = \
100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
(max_count, over)
min_count = min(zone_counts.itervalues())
under = \
100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
(min_count, under)
if __name__ == '__main__':
PARTITION_POWER = 16
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
nodes = {}
while len(nodes) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
node_id = len(nodes)
nodes[node_id] = {'id': node_id, 'zone': zone}
zone += 1
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
test_ring(ring)
0.06s to build ring
82s to test ring
117186: Desired data ids per node
118773: Most data ids on one node, 1.35% over
115801: Least data ids on one node, 1.18% under
1875000: Desired data ids per zone
1878339: Most data ids in one zone, 0.18% over
1869914: Least data ids in one zone, 0.27% under
It takes a bit longer to test our ring, but that’s mostly because of the switch to dictionaries from arrays for various items. Having node dictionaries is nice because you can attach any node information you want directly there (ip addresses, tcp ports, drive paths, etc.). But we’re still on track for further testing; our distribution is still good.
Now, let’s add our one last feature to our ring: the concept of weights. Weights are useful because the nodes you add later in a ring’s life are likely to have more capacity than those you have at the outset. For this test, we’ll make half our nodes have twice the weight. We’ll have to change build_ring to give more partitions to the nodes with more weight and we’ll change test_ring to take into account these weights. Since we’ve changed so much I’ll just post the entire module again:
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time
class Ring(object):
def __init__(self, nodes, part2node, replicas):
self.nodes = nodes
self.part2node = part2node
self.replicas = replicas
partition_power = 1
while 2 ** partition_power < len(part2node):
partition_power += 1
if len(part2node) != 2 ** partition_power:
raise Exception("part2node's length is not an "
"exact power of 2")
self.partition_shift = 32 - partition_power
def get_nodes(self, data_id):
data_id = str(data_id)
part = unpack_from('>I',
md5(data_id).digest())[0] >> self.partition_shift
node_ids = [self.part2node[part]]
zones = [self.nodes[node_ids[0]]]
for replica in xrange(1, self.replicas):
while self.part2node[part] in node_ids and \
self.nodes[self.part2node[part]] in zones:
part += 1
if part >= len(self.part2node):
part = 0
node_ids.append(self.part2node[part])
zones.append(self.nodes[node_ids[-1]])
return [self.nodes[n] for n in node_ids]
def build_ring(nodes, partition_power, replicas):
begin = time()
parts = 2 ** partition_power
total_weight = \
float(sum(n['weight'] for n in nodes.itervalues()))
for node in nodes.itervalues():
node['desired_parts'] = \
parts / total_weight * node['weight']
part2node = array('H')
for part in xrange(2 ** partition_power):
for node in nodes.itervalues():
if node['desired_parts'] >= 1:
node['desired_parts'] -= 1
part2node.append(node['id'])
break
else:
for node in nodes.itervalues():
if node['desired_parts'] >= 0:
node['desired_parts'] -= 1
part2node.append(node['id'])
break
shuffle(part2node)
ring = Ring(nodes, part2node, replicas)
print '%.02fs to build ring' % (time() - begin)
return ring
def test_ring(ring):
begin = time()
DATA_ID_COUNT = 10000000
node_counts = {}
zone_counts = {}
for data_id in xrange(DATA_ID_COUNT):
for node in ring.get_nodes(data_id):
node_counts[node['id']] = \
node_counts.get(node['id'], 0) + 1
zone_counts[node['zone']] = \
zone_counts.get(node['zone'], 0) + 1
print '%ds to test ring' % (time() - begin)
total_weight = float(sum(n['weight'] for n in
ring.nodes.itervalues()))
max_over = 0
max_under = 0
for node in ring.nodes.itervalues():
desired = DATA_ID_COUNT * REPLICAS * \
node['weight'] / total_weight
diff = node_counts[node['id']] - desired
if diff > 0:
over = 100.0 * diff / desired
if over > max_over:
max_over = over
else:
under = 100.0 * (-diff) / desired
if under > max_under:
max_under = under
print '%.02f%% max node over' % max_over
print '%.02f%% max node under' % max_under
max_over = 0
max_under = 0
for zone in set(n['zone'] for n in
ring.nodes.itervalues()):
zone_weight = sum(n['weight'] for n in
ring.nodes.itervalues() if n['zone'] == zone)
desired = DATA_ID_COUNT * REPLICAS * \
zone_weight / total_weight
diff = zone_counts[zone] - desired
if diff > 0:
over = 100.0 * diff / desired
if over > max_over:
max_over = over
else:
under = 100.0 * (-diff) / desired
if under > max_under:
max_under = under
print '%.02f%% max zone over' % max_over
print '%.02f%% max zone under' % max_under
if __name__ == '__main__':
PARTITION_POWER = 16
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
nodes = {}
while len(nodes) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
node_id = len(nodes)
nodes[node_id] = {'id': node_id, 'zone': zone,
'weight': 1.0 + (node_id % 2)}
zone += 1
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
test_ring(ring)
0.88s to build ring
86s to test ring
1.66% max over
1.46% max under
0.28% max zone over
0.23% max zone under
So things are still good, even though we have differently weighted nodes. I ran another test with this code using random weights from 1 to 100 and got over/under values for nodes of 7.35%/18.12% and zones of 0.24%/0.22%, still pretty good considering the crazy weight ranges.
Summary¶
Hopefully this series has been a good introduction to building a ring. This code is essentially how the OpenStack Swift ring works, except that Swift’s ring has lots of additional optimizations, such as storing each replica assignment separately, and lots of extra features for building, validating, and otherwise working with rings.