Distributed System - MIT 6.824 Raft Lab 4A

Lab 4A build up a Shard Master System based on Raft.

Lab 4A

Shard storage system

A sharded storage system stores data a subset of data in different groups for performance. Each of the groups is only responsible a subset of data, and thus total system throughput increases in proportion to the number of groups. Groups is a set of computers that replicate each other. Each of the groups is managed by raft or other concensus algorithm.

A new group could be added to the shard storage system if the load is two heavy and a existing group could be deleted from the system if it is down. This is called configuration change, i.e., at each configuration, there are a set of groups. Thus, at each configuration, shards are allocated to those groups. Whenever configuration changes, shards need to be reallocated to new groups. This is managed by a seperate server called shard master. Considering reliability and availiablity, shard master is not only one server, but a set of servers that is based on Raft. Lab 4A is to build a raft based shard master.

Shard and Raft

Function of shard master

Shard master manages configuration change, each of the shard acquires shard master for configuration change and shard reallocation. Therefore, shard master should support Joint new groups, group leaving, query of configuration change and move shard between groups.

The implementation of those function is similar to Key/Value server in Lab 3. The master received RPC call, and check duplication, and then send to Raft for agreement. If it is commited, raft send the operation to server again, and the server replys to the client.

The difference is shard master doesn’t need to support snapshot.

Rebalance of shards

This is the only challenge in this lab. When a new group is joined or a group leaves, how should the shard be reallocated to each group.

The simplest way is to reallocate each of the shard to the groups by simply taking mod of the shard number to the total number of groups. But this will result in rearrangment of every shard, which waster time to moving shards.

Consistent Hashing. I implemented this approach and it is not able to give me balanced allocation. I think this is because the number of shards is relatively small ? I will check it back later.

The other brutal way is: When a new group joining, calculate the average number of shards that each group should have. Loop through all groups, and move the shard from the group with maximum number of shard to the new group, until the new group get average number of shards.

When a group leaving, Loop through all groups, and move a shard from leaving group to the group with minimum number of shards, until the leaving group is cleared.

Code below:

 // brutal force rebalance shards
    func (sm *ShardMaster) rebalanceShards(nextCfg *Config, opType string, gid int) {

        if len(nextCfg.Groups) < 1 {
            return
        }

        // extract shards for each gid in the new config
        gidShardsMap := make(map[int][]int)

        for xgid := range nextCfg.Groups {
            gidShardsMap[xgid] = []int{}
        }
        for i := 0; i < len(nextCfg.Shards); i++ {
            val, _ := gidShardsMap[nextCfg.Shards[i]]
            val = append(val, i)
            gidShardsMap[nextCfg.Shards[i]] = val
        }

        // average shards per gid
        avg := len(nextCfg.Shards) / len(nextCfg.Groups)

        switch opType {
        case JOIN:
            jShards := []int{}
            // new gid has at least average number of shards
            for i := 0; i < avg; i++ {

                maxShard := 0
                maxGid := 0
                // find the gid with max number of shards
                for xgid, shards := range gidShardsMap {
                    if len(shards) > maxShard {
                        maxShard = len(shards)
                        maxGid = xgid
                    }
                }
                // remove one shard from that Gid, and put it to the new Gid
                mShards := gidShardsMap[maxGid]
                jShards = append(jShards, mShards[len(mShards)-1])
                mShards = mShards[0 : len(mShards)-1]
                gidShardsMap[maxGid] = mShards
                // continue until the new Gid has at least average number of shards
            }

            gidShardsMap[gid] = jShards

        case LEAVE:
            leaveShards := gidShardsMap[gid]
            delete(gidShardsMap, gid)
            for _, shard := range leaveShards {
                min := len(nextCfg.Shards) + 1
                minGid := 0
                // find the gid with min number of shards
                for xgid, xshards := range gidShardsMap {
                    if len(xshards) < min {
                        min = len(xshards)
                        minGid = xgid
                    }
                }
                // move one shard from leaving Shards to the Gid with min number of shards
                mShards := gidShardsMap[minGid]
                mShards = append(mShards, shard)
                gidShardsMap[minGid] = mShards
                // continue until all the shards in leaving group are assigned
            }
        }

        // assign gid to each shard
        for gid, shardList := range gidShardsMap {
            for _, shard := range shardList {
                nextCfg.Shards[shard] = gid
            }
        }

    }

Service update

// update Configuration based Operation from Raft based on the OpType
func (sm *ShardMaster) updateSeviceState(op Op) {

	sm.requestRecord[op.ClientId] = op.OpId
	if op.OpType == QUERY {
		return
	}
	currentConfig := sm.configs[len(sm.configs)-1]
	nextConfig := sm.createNextConfig(currentConfig)

	switch op.OpType {

	case JOIN:

		for gid, servers := range op.JoinServers {

			newServers := make([]string, len(servers))
			copy(newServers, servers)
			nextConfig.Groups[gid] = newServers
			sm.rebalanceShards(&nextConfig, op.OpType, gid)
		}

	case LEAVE:

		for _, gid := range op.LeaveGIDs {
			delete(nextConfig.Groups, gid)
			sm.rebalanceShards(&nextConfig, op.OpType, gid)
		}

	case MOVE:

		for i := 0; i < NShards; i++ {
			nextConfig.Shards[i] = currentConfig.Shards[i]
		}
		nextConfig.Shards[op.MoveShard] = op.MoveGID
	}

	sm.configs = append(sm.configs, nextConfig)
}

Conclusion

  1. This is relatively simple if you finished Lab 3.
  2. Reblance shard has to be as average as possible, otherwise, it will not pass.
Written on April 25, 2021