ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

TinyKV lab4完成总结

2022-03-19 15:37:26  阅读:142  来源: 互联网

标签:总结 txn return err nil kvrpcpb req TinyKV lab4


lab4是实现一个基于Percolator模型的分布式事务。

lab4a

lab4a是介绍三种Column Family的作用和协作,分别包括CfDefault, CfWrite, CfLock。
CfDefault是暂存实际数据的cf, CfWrite是写入时间的记录,CfLock是该数据的锁。数据版本(事务)由一个全局递增的ts表示,ts越大越新,数据在数据库中的排序是先按Key由小到大再按ts由大到小。
实现除GetValue的方法直接创建一个结构体写入txn.writes就行。
GetValue比较复杂,因为读事务只能看到它之前已经提交的事务。首先我们要先看有没有CfWrite的记录,如果有,我们还要看这个写入是Delete还是Put,如果是Put,那么说明有这个值,然后我们再对这个写入时间直接在CfDefault这个列族中找数据即可。

// GetValue finds the value for key, valid at the start timestamp of this transaction.
// I.e., the most recent value committed before the start of this transaction.
func (txn *MvccTxn) GetValue(key []byte) ([]byte, error) {
	// Your Code Here (4A).
	iter := txn.Reader.IterCF(engine_util.CfWrite)
	defer iter.Close()
	iter.Seek(EncodeKey(key, txn.StartTS))
	for ; iter.Valid(); iter.Next() {
		uKey := DecodeUserKey(iter.Item().Key())
		if bytes.Equal(key, uKey) {
			break
		}
	}
	if !iter.Valid() {
		return nil, nil
	}
	v, err := iter.Item().Value()
	if err != nil {
		return nil, err
	}
	w, err := ParseWrite(v)
	if err != nil {
		return nil, err
	}
	if w.Kind != WriteKindPut {
		return nil, nil
	}
	return txn.Reader.GetCF(engine_util.CfDefault, EncodeKey(key, w.StartTS))
}

MostRecentWrite和CurrentWrite就是找某个key的最新版本和当前事务是否写入,后者注意比较ts就好。

// CurrentWrite searches for a write with this transaction's start timestamp. It returns a Write from the DB and that
// write's commit timestamp, or an error.
func (txn *MvccTxn) CurrentWrite(key []byte) (*Write, uint64, error) {
	// Your Code Here (4A).
	iter := txn.Reader.IterCF(engine_util.CfWrite)
	defer iter.Close()
	for ; iter.Valid(); iter.Next() {
		rawKey := iter.Item().Key()
		uKey := DecodeUserKey(rawKey)
		if bytes.Equal(key, uKey) {
			v, err := iter.Item().Value()
			if err != nil {
				return nil, 0, err
			}
			w, err := ParseWrite(v)
			if err != nil {
				return nil, 0, err
			}
			if w.StartTS == txn.StartTS {
				return w, decodeTimestamp(rawKey), nil
			}
		}
	}
	return nil, 0, nil
}
// MostRecentWrite finds the most recent write with the given key. It returns a Write from the DB and that
// write's commit timestamp, or an error.
func (txn *MvccTxn) MostRecentWrite(key []byte) (*Write, uint64, error) {
	// Your Code Here (4A).
	iter := txn.Reader.IterCF(engine_util.CfWrite)
	defer iter.Close()
	for ; iter.Valid(); iter.Next() {
		rawKey := iter.Item().Key()
		uKey := DecodeUserKey(rawKey)
		if bytes.Equal(key, uKey) {
			v, err := iter.Item().Value()
			if err != nil {
				return nil, 0, err
			}
			w, err := ParseWrite(v)
			if err != nil {
				return nil, 0, err
			}
			return w, decodeTimestamp(rawKey), nil
		}
	}
	return nil, 0, nil
}

lab4b

lab4b是实现2PC模型。2PC模型分为两个阶段,一个是PreWrite,一个是Commit。
PreWrite的过程就是检查写冲突和检查锁冲突,如果都不冲突,再上锁。
检查写冲突的过程就是看CfWrite中有没有比这个事务更新的事务在写入这个key,如果有,显然不允许更旧的版本修改更新的版本,出现写冲突。检查锁冲突的过程就是看有没有版本更老的锁,如果有,则说明这个key被更早的事务锁住了,就不能修改别的锁占用的key。如果都没有,我们对这个key上锁。
Commit的过程就是写入数据库和解锁。Commit写入数据库和解锁必须是原子性的,我们需要用系统提供的锁保证这一点,首先检查所有的key均已经上该版本的锁,如果没有,我们不能提交没有上锁的key。然后记录写入时间,最后写入数据和解锁,再释放系统提供的原子性的锁。如果Commit出现失败,我们可以让client在未来重试成功(因为此时锁一定在更后的版本)。

func (server *Server) KvPrewrite(_ context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
	// Your Code Here (4B).
	reader, err := server.storage.Reader(req.Context)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	defer reader.Close()
	resp := &kvrpcpb.PrewriteResponse{}
	txn := mvcc.NewMvccTxn(reader, req.StartVersion)

	for _, mutation := range req.Mutations {
		w, ts, err := txn.MostRecentWrite(mutation.Key)
		if err != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		if w != nil && ts >= req.StartVersion {
			resp.Errors = append(resp.Errors, &kvrpcpb.KeyError{
				Conflict: &kvrpcpb.WriteConflict{
					StartTs:    req.StartVersion,
					ConflictTs: ts,
					Key:        mutation.Key,
					Primary:    req.PrimaryLock,
				},
			})
			continue
		}
		lock, err := txn.GetLock(mutation.Key)
		if err != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		if lock != nil && lock.Ts < req.StartVersion {
			resp.Errors = append(resp.Errors, &kvrpcpb.KeyError{
				Locked: &kvrpcpb.LockInfo{
					PrimaryLock: req.PrimaryLock,
					LockVersion: lock.Ts,
					Key:         mutation.Key,
					LockTtl:     lock.Ttl,
				},
			})
			continue
		}
		switch mutation.Op {
		case kvrpcpb.Op_Put:
			txn.PutValue(mutation.Key, mutation.Value)
			txn.PutLock(mutation.Key, &mvcc.Lock{
				Primary: req.PrimaryLock,
				Ts:      req.StartVersion,
				Ttl:     req.LockTtl,
				Kind:    mvcc.WriteKindPut,
			})
		case kvrpcpb.Op_Del:
			txn.DeleteValue(mutation.Key)
			txn.PutLock(mutation.Key, &mvcc.Lock{
				Primary: req.PrimaryLock,
				Ts:      req.StartVersion,
				Ttl:     req.LockTtl,
				Kind:    mvcc.WriteKindDelete,
			})
		}
	}
	if len(resp.Errors) != 0 {
		return resp, nil
	}
	err = server.storage.Write(req.Context, txn.Writes())
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	return resp, nil
}

func (server *Server) KvCommit(_ context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
	// Your Code Here (4B).
	reader, err := server.storage.Reader(req.Context)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CommitResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	defer reader.Close()
	txn := mvcc.NewMvccTxn(reader, req.StartVersion)
	server.Latches.WaitForLatches(req.Keys)
	defer server.Latches.ReleaseLatches(req.Keys)
	for _, key := range req.Keys {
		lock, err := txn.GetLock(key)
		if err != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.CommitResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		if lock == nil {
			return &kvrpcpb.CommitResponse{}, nil
		}
		if lock.Ts != req.StartVersion {
			return &kvrpcpb.CommitResponse{
				Error: &kvrpcpb.KeyError{
					Retryable: "true",
				},
			}, nil
		}
		txn.PutWrite(key, req.CommitVersion, &mvcc.Write{
			StartTS: req.StartVersion,
			Kind:    lock.Kind,
		})
		txn.DeleteLock(key)
	}
	err = server.storage.Write(req.Context, txn.Writes())
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CommitResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	return &kvrpcpb.CommitResponse{}, nil
}

lab4c

lab4c主要实现失败之后是回滚还是继续提交。
checkTxnStatus函数检查事务状态和回滚失效的锁,事务状态直接通过CurrentWrite检查,如果不存在,说明没有写入,事务已经回滚,我们就回收锁,锁有可能是已经被清除了,或者是过期了,我们要针对这两种情况分别处理并写入回滚时间标记,如果存在,说明已经提交,不需要处理,在返回中填上写入版本即可。

func (txn *MvccTxn) RollBack(key []byte, Locked bool) {
	txn.PutWrite(key, txn.StartTS, &Write{
		StartTS: txn.StartTS,
		Kind:    WriteKindRollback,
	})
	txn.DeleteValue(key)
	if Locked {
		txn.DeleteLock(key)
	}
}
func (server *Server) KvCheckTxnStatus(_ context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
	// Your Code Here (4C).
	reader, err := server.storage.Reader(req.Context)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	defer reader.Close()
	txn := mvcc.NewMvccTxn(reader, req.LockTs)
	w, ts, err := txn.CurrentWrite(req.PrimaryKey)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	if w != nil {
		resp := &kvrpcpb.CheckTxnStatusResponse{}
		if w.Kind != mvcc.WriteKindRollback {
			resp.CommitVersion = ts
		}
		return resp, nil
	}
	lock, err := txn.GetLock(req.PrimaryKey)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	resp := &kvrpcpb.CheckTxnStatusResponse{}
	if lock == nil {
		txn.RollBack(req.PrimaryKey, false)
		resp.LockTtl = 0
		resp.CommitVersion = 0
		resp.Action = kvrpcpb.Action_LockNotExistRollback
		if server.storage.Write(req.Context, txn.Writes()) != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		return resp, nil
	} else if mvcc.PhysicalTime(lock.Ts)+lock.Ttl < mvcc.PhysicalTime(req.CurrentTs) {
		txn.RollBack(req.PrimaryKey, true)
		resp.LockTtl = 0
		resp.CommitVersion = 0
		resp.Action = kvrpcpb.Action_TTLExpireRollback
		if server.storage.Write(req.Context, txn.Writes()) != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		return resp, nil
	}
	resp.LockTtl = lock.Ttl
	return resp, nil
}

batchRollback是回滚一批key的函数。如果发现某个key已经提交,则回滚失败,否则删除锁和kv。
resolveLock是根据CommitVersion对某个事务的key要么提交要么回滚。我们只需要检查锁的ts然后把满足条件的key放入一个数组然后判断CommitVersion即可。
一般来说,事务的发起者都是Client,多个Client有可能同时操作,这几个函数的关系是:一个Client进行PreWrite,这时因为有可能Client需要做其他操作或者已经宕机,或者有其他事务插队,所以执行CheckTxnStatus检查状态,然后得到CommitVersion,调用ResolveLock进行最后的提交或者回滚(作者自己的想法,欢迎指正)。

标签:总结,txn,return,err,nil,kvrpcpb,req,TinyKV,lab4
来源: https://www.cnblogs.com/mchxyz/p/16026636.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有