ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Lab8 Locks

2022-07-14 22:31:36  阅读:162  来源: 互联网

标签:hash struct lock Locks bcache Lab8 buf addr


Lab8 Locks

这个实验还是蛮难的...

1. Memory allocator(moderate)

1.1 题目

众所周知,在xv6中,我们申请的内存是在结构体struct kmem中申请的,struct kmem中的链表struct run *freelist存放了所有的空闲内存(以page为单位),在申请和释放内存的时候,可能会存在race condition(其详细解释在课程“lock”一节),为了避免race condition,需要一把锁来保护不变量,即kem中的struct spinlock lock,每当要添加元素到freelist或是从freelist删除元素时,都应该acquire这把锁。

但是这又出现问题了,由于频繁地申请和释放空间,所有相关的进程都在争用同一把锁,这降低了程序的效率。我们要做的就是解决这个问题,这也是这个lab的主题。

实验给了user/kalloctest.c,这个程序大概做的工作是频繁地申请kalloc和释放kfree内存。最后输出每个锁的争用次数以及总和。

1.2 分析

内存资源(freelist)是全局的,它是对所有CPU都是可见的,为了保护内存资源这个不变量,一个很简单的方法就是这些资源共用一把锁,哪个CPU要修改(申请或释放空间),它就要先获得这把锁。这也就导致了大量的锁争用,一个解决方法是分而治之,既然申请和释放内存是每个CPU的行为,我们可以把内存资源分散到每个CPU上去,即将内存资源分散为b1, b2, b3, ...bn等几块,每一块内存资源只能由对应的CPU访问。

这样看来,我们似乎连锁都不需要了,因为资源和CPU已经一一对应了,而每个CPU在任何一个时间都只能做一件事,它要么申请内存,要么释放内存,不可能既申请又释放,这样就不存在race condition,也就不需要锁了。

然而有两个因素(我想到是两个,不一定都对,且可能还有更多)来否定上述想法:

  1. 有两种形式concurrency,一种是CPU和CPU之间的并发,另一种是CPU和device之间的并发(详见课程“interrupt”一节),通过分而治之的方法可以排除CPU和CPU之间对同一内存资源的并发访问,但是依然存在后一种并发。虽然我并不知道driven中是否会访问内存,但是毕竟这是一个潜在的隐患。

  2. 可能存在这样一种情形:CPU0频繁的申请内存,而其他CPU基本不怎么访问内存,这就导致分配给CPU0的空间被耗尽而分配给其他CPU的空间还有很多剩余的情况,此时若CPU0再申请空间,仅仅报一个panic("out of memory")是不可取的,因为明明还存在空闲的内存,只不过在其他CPU的“管辖范围内”。对于这种情况,CPU0需要从其他CPU中偷取空闲内存,这也就导致了CPU对同一资源的并发访问。

综上,由于存在并发,所以每块内存资源都应该有一把锁。但这已远远减少了锁的争用。

1.3 实现

首先,舍弃原来的kmem,为每个CPU划定自己管辖的资源:

struct {
  struct spinlock lock;
  struct run *freelist;
 } kmem[NCPU];

修改kinit用以初始化

void
kinit()
{
  for (int i = 0; i < NCPU; i ++) {
    initlock(&kmem[i].lock, "kmem");
    // freerange(end, (void *)PHYSTOP); // freerange一遍就好了...
  }
  freerange(end, (void *)PHYSTOP);
}

注意本人被kinit坑害过,原因是freerange一次就行了,也就是说先将所有的内存都放到一个CPU上(不用担心这一点,因为根据上面的分析,资源是可以动态调整的)。

修改kfree,将要释放的内存放到当前的CPU上,采用头插法

void
kfree(void *pa)
{
  struct run *r;

  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");

  // Fill with junk to catch dangling refs.
  memset(pa, 1, PGSIZE);

  r = (struct run*)pa;

  push_off();
  int id = cpuid();
  acquire(&kmem[id].lock);
  r->next = kmem[id].freelist;
  kmem[id].freelist = r;
  release(&kmem[id].lock);
  pop_off();
}

调用cpuid必须保证关中断,并且在整个使用周期都要保持中断关闭(具体我也不知道为什么,但似乎在xv6book的7.4节有讲)。这里说句题外话,从函数开/关中断的函数名看来,中断似乎是以栈的形式存在的,可能是因为存在acquire一把锁但没有release之前,又acquire另一半锁的情况...并且在sched函数中可以看到对nodeoff进行了检查,nodeoff表示depth of push_off nesting,即中断关闭的深度?扯远了...

接下来是kalloc函数,

void *
kalloc(void)
{
  struct run *r;

  push_off();
  int id = cpuid();

  acquire(&kmem[id].lock);
  r = kmem[id].freelist;
  if(r)
    kmem[id].freelist = r->next;
  else {
    int find = 0;
    for (int i = 0; i < NCPU && find == 0; i ++) {
      if (i != id) {
        acquire(&kmem[i].lock); // 获取另一个cpu的锁
        r = kmem[i].freelist;
        if (r) {
          kmem[i].freelist = r->next; // 从另一个cpu的list上取下来
          r->next = kmem[id].freelist; // 放到这个cpu的list的头部
          find = 1;
        }
        release(&kmem[i].lock);
      }
    }
  }
  release(&kmem[id].lock);
  pop_off();

  if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk
  return (void*)r;
}

2. Buffer cache( hard )

2.1 题目

这题还是太难了...

这道题和前一题的主题是相同的,也是减少锁争用,但是这里的战线转向了buffer cache难度立竿见影。

首先简单地介绍一下背景:disk上是以block(有时候也称sector)为单位来保存数据的,我们知道对disk读写是很慢的,所以如果要对磁盘上的数据进行读写,一般是先将block放到buffer cache中,再进行操作。这一点在计算机中多有应用,比如cache和memory,mmap,还有我们经常用但可能不熟悉的I/O缓冲区

struct {
  struct spinlock lock;
  struct buf buf[NBUF];
  // Linked list of all buffers, through prev/next.
  // Sorted by how recently the buffer was used.
  // head.next is most recent, head.prev is least.
  struct buf head;
} bcache;

xv6中用bcache来表示buffer cache,如上所示。通过双向循环链表来组织所有的buf。源代码里有一个很天才的设计,即LRU算法的实现,但这不是我们讨论的重点bget函数寻找一个可行的buf,将一个block放到该buf中。由于上述行为可能是并发的,所以需要一把锁来保护不变量,即bcache.lock。这就导致了锁的争用。我们要做的就是减少锁的争用

2.2 分析

我们不能像上面那道题一样,通过CPU来划分资源,因为bcache缓冲区真正的在进程(以及CPU)之间共享(对于内存的分配,当CPU申请了某页内存后,在释放之前,该页内存就为某个进程所私有,一般来说其他进程不能访问该内存;然而buffer cache不同,某个block被放进了buf后,所有进程都可以访问它)。

好在有hint,它告诉了我们怎么做——使用hash表。我想这也是一种分而治之的思想。在原来的实现中,可以发现一个问题:为了找到一个buf,必须遍历整条链表,而为了遍历整条链表,必须获取bcache.lock,这是一把很大的锁,而锁越大,串行性越高。hash表将指定编号blockno(关键码key)的block直接映射到某一个hash bucket(桶),这就避免了获取大锁在遍历链表寻找的过程。每个hash bucket都有很多buf供映射进来的block使用,将block放到buf中,此时,我们只需要获取这个bucket的锁即可。

同上一题一样,可能会出现桶中buf不够的情形,这时候要从bucket中偷取一个buf,当然偷取的原则是LRU算法。此时需要获取被偷的bucket的锁以保护不变量。

2.3 实现

桶的数据结构:桶内通过链表来组织,需要一个头结点head;同时每个桶须有一把锁

#define NBUCKET 13

struct bucket {
  struct spinlock lock;
  struct buf head;
};

struct {
  struct spinlock lock;
  struct buf buf[NBUF];
  struct bucket hash_table[NBUCKET];
  // Linked list of all buffers, through prev/next.
  // Sorted by how recently the buffer was used.
  // head.next is most recent, head.prev is least.
  // struct buf head;
} bcache;

同时在buf中加入成员ticks,用来表示最后被访问的时间,可以以此实现LRU算法

struct buf {
  // 原有的成员
  uint ticks; // 时间戳ticks
};

将block放到buf的过程:

  1. 给定blockno,通过hash函数(除留余数法)来获取桶号addr

  2. 获取这个桶的锁

  3. 在桶内寻找,查看是否存在buf b满足b->dev == dev && b->blockno == blockno,若存在,则这个block已经被映射过了,此时增加其引用次数即可b->refcnt++,去第6步。若不存在,去第4步。

  4. 仍在这个桶内寻找,查看是否存在空闲的buf b,即b->refcnt == 0。这时需要通过LRU算法来寻找,通过时间戳ticks,找到时间戳最小的那个,也就是最早被访问的,若存在,映射此block,去第6步,否则去第5步

  5. 遍历其他所有的桶,对于每个桶,都要获取这个桶的锁,然后执行第4步,释放锁。如果能找到满足条件的buf,将这个结点从原来的桶移到addr号桶,这里主要涉及双向循环链表的插入和删除操作,然后映射该桶,去第6步;如果找不到这样的buf,panic。注意,偷取buf的时候,需要获取被偷取的桶的锁,而下面的代码是直接获取了大锁(bcache.lock),因为那时候我对这个lab还不是很熟悉,属于云里雾里的状态。理论上这两种方法都可以,因为都保护了不变量。但是小锁应该更好。

  6. 释放这个桶的锁,然后获取这个buf的睡眠锁acquiresleep

依据此逻辑写出的bget函数如下:

static struct buf*
bget(uint dev, uint blockno)
{
  struct buf *b;
  int addr = blockno % NBUCKET;

  acquire(&bcache.hash_table[addr].lock); // 对当前bucket上锁
  b = bcache.hash_table[addr].head.next;
  while (b != &bcache.hash_table[addr].head) {
    if (b->dev == dev && b->blockno == blockno) {
      b->refcnt ++;
      release(&bcache.hash_table[addr].lock);
      acquiresleep(&b->lock);
      return b;
    }
    b = b->next;
  }
  // printf("%d\n", 1);
  // 整条链都没有找到像样的,因此找这条链中找refcnt==0的结点
  // 原则:LRU(先不考虑将ticks最小的结点放在头部)
  int LRU = 65536;
  int found = 0;
  struct buf *tmp = bcache.hash_table[addr].head.next;
  while (tmp != &bcache.hash_table[addr].head) {
    if (tmp->refcnt == 0 && tmp->ticks < LRU) {
      LRU = tmp->ticks;
      found = 1;
      b = tmp;
    }
    tmp = tmp->next;
  }
  if (found) {
    b->dev = dev;
    b->blockno = blockno;
    b->valid = 0;
    b->refcnt = 1;
    release(&bcache.hash_table[addr].lock);
    acquiresleep(&b->lock);
    return b;
  }
  // 这条链中不存在,要在其他链中偷一个
  // 其他链中不可能存在dev和blockno都相等的结点
  // 所以只需要找refcnt==0,且满足LRU
  // 由于此时要对所有桶遍历,因此需要acquire(&bcache.lock)
  // 这种情况似乎可以和上面的合并
  int new_addr = addr;
  LRU = 65536;
  found = 0;
  acquire(&bcache.lock);
  for (int i = 0; i < NBUCKET; i ++) {
    if (i == addr)
      continue;
    acquire(&bcache.hash_table[i].lock);
    struct buf *tmp = bcache.hash_table[i].head.next;
    while (tmp != &bcache.hash_table[i].head) {
      if (tmp->refcnt == 0 && tmp->ticks < LRU) {
        LRU = tmp->ticks;
        found = 1;
        b = tmp;
        new_addr = i;
      }
      tmp = tmp->next;
    }
    release(&bcache.hash_table[i].lock);
  }
  release(&bcache.lock);

  if (found) {
    // 取下结点,并放到原来那条链上
    acquire(&bcache.hash_table[new_addr].lock);
    b->prev->next = b->next;
    b->next->prev = b->prev;
    release(&bcache.hash_table[new_addr].lock);

    b->next = bcache.hash_table[addr].head.next;
    b->prev = &bcache.hash_table[addr].head;
    bcache.hash_table[addr].head.next->prev = b;
    bcache.hash_table[addr].head.next = b;

    b->dev = dev;
    b->blockno = blockno;
    b->valid = 0;
    b->refcnt = 1;
    release(&bcache.hash_table[addr].lock);
    acquiresleep(&b->lock);
    return b;
  }
  panic("bget: no buffers");
}

当一个进程用完了buf以后,需要释放这个buf,这个过程比较简单。将buf的睡眠锁解除,然后减少对该buf的引用(refcnt)。注意要更新buf的时间戳。

brelse(struct buf *b)
{
  if(!holdingsleep(&b->lock))
    panic("brelse");

  releasesleep(&b->lock);

  int addr = b->blockno % NBUCKET;
  acquire(&bcache.hash_table[addr].lock);
  b->refcnt --;
  b->ticks = ticks;
  release(&bcache.hash_table[addr].lock);
}

最后修改一下bpinbunpin,虽然我不知道它俩是干嘛的,但是一看就知道需要修改,这一步别忘了。

void
bpin(struct buf *b) {
  int addr = b->blockno % NBUCKET;
  acquire(&bcache.hash_table[addr].lock);
  b->refcnt++;
  release(&bcache.hash_table[addr].lock);
}

void
bunpin(struct buf *b) {
  int addr = b->blockno % NBUCKET;
  acquire(&bcache.hash_table[addr].lock);
  b->refcnt--;
  release(&bcache.hash_table[addr].lock);
}

(通过这个实验我才发现自己数据结构的知识是如此薄弱...说实话,读了题目以后,这个hash表和bucket我很久都没弄懂...)

实验至此结束,按照如上代码可以通过所有测试,但是仍有几个问题:

  1. 对于桶数的选择,理论上来说,桶数的选择最好是不超过NBUF的最大素数,也就是29,但是用29会超时(这也是一个坑点,用了29死活过不了usertests,简直绝望,最后把29改成13就过了)。我想这可能是因为,选择不大于m的最大素数是为了尽可能减小冲突,然而这个问题里面不care冲突,恰恰相反,如果完美地避开了冲突,就说明每个桶只有1个或很少结点,可能造成频繁地从其他桶偷结点,从而拉低了效率。

  2. 对buf中时间戳的更新时机。是在bget的时候更新还是在brelse的时候更新?原来的实现是在brelse时实现LRU,但我发现改进后,bget和brelse都可以,似乎不影响。

  3. 可以有更好的解决方案,即:每个桶中都按buf的时间戳从小到大排列,这样便于LRU的实现,不过这么做似乎性能上并没有很大提升,并且实现起来有点繁琐。

番外

有关bio.c中的睡眠锁sleeplock。这个东西很牛*。

且看初始的bget函数和bread函数(即改进之前的版本):

static struct buf*
bget(uint dev, uint blockno)
{
  struct buf *b;

  acquire(&bcache.lock);

  // Is the block already cached?
  for(b = bcache.head.next; b != &bcache.head; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
      b->refcnt++;
      release(&bcache.lock);
      acquiresleep(&b->lock);
      return b;
    }
  }

  // Not cached.
  // Recycle the least recently used (LRU) unused buffer.
  for(b = bcache.head.prev; b != &bcache.head; b = b->prev){
    if(b->refcnt == 0) {
      b->dev = dev;
      b->blockno = blockno;
      b->valid = 0;
      b->refcnt = 1;
      release(&bcache.lock);
      acquiresleep(&b->lock);
      return b;                                                                                                                                     }
  }
  panic("bget: no buffers");
}
struct buf*
bread(uint dev, uint blockno)
{
  struct buf *b;

  b = bget(dev, blockno);
  if(!b->valid) {
    virtio_disk_rw(b, 0);
    b->valid = 1;
  }
  return b;
}

里面可以发现一个奇怪函数:acquiresleep。对如果是获取b->lock,我们是可以理解的,因为这个缓存将会被返回,做下一步操作,所以要获取b->lock,那为什么要以这种奇怪的形式获取呢?因为有两点可以说明我们不能简单得获取自旋锁:

  1. 如果两个进程争用一把锁,而磁盘操作可能很耗时,一个进程获取了锁,需要用大量的时间来做磁盘操作,这时另一个进程会一直在那spin,尝试获取锁,这造成了CPU资源的浪费。

  2. 由于获取自旋锁的时候会关闭中断,这意味着如果我们只获取自旋锁,我们将收不到任何磁盘产生的中断信息,(磁盘也是设备,它与UART相似,会产生中断,虽然我不知道具体工作原理是什么),所以中断驱动程序就无法运行。

方案:大锁套小锁

一个进程等待这样一个事件的发生:大锁没有被锁住。如果大锁被锁住了(即该block cache的访问权限已经被别的进程抢走了),就sleep,直到那个进程释放大锁并wakeup其他进程。如果大锁没有被锁,置该锁为1(锁住状态),这样就消除了其他进程对该大锁的访问。由于不同进程对大锁的锁住与非锁住状态有并发访问,所以需要一个小锁来保护这个不变量。这与我在“sleep&wakeup”中讲到的sleep和wakeup思想是一致的。还有很绝妙的地方是,这么做恰好使得做磁盘操作时,中断处于打开状态,因为acquiresleep返回之前释放了小锁,同时大锁处于锁住状态。

下面来看看这种大锁套小锁——睡眠锁sleeplock:

struct sleeplock {
  uint locked;       // Is the lock held?
  struct spinlock lk; // spinlock protecting this sleep lock

  // For debugging:
  char *name;        // Name of lock.
  int pid;           // Process holding lock
};

上面说的大锁就是外层的sleeplock,小锁就是内层的spinlock。每个block cache中都有一个sleeplock。一个有意思的地方是,sleeplock中的locked成员并不是通过acquire或是说test_and_set来设置的,它仅仅是一个condition,而这个condition保证了避免race condition。

接下来是acquiresleep和releasesleep:

void
acquiresleep(struct sleeplock *lk)
{
  acquire(&lk->lk); // 保护大锁的不变量
  while (lk->locked) {
    sleep(lk, &lk->lk);
  }
  lk->locked = 1;
  lk->pid = myproc()->pid;
  release(&lk->lk);
}

void
releasesleep(struct sleeplock *lk)
{
  acquire(&lk->lk);
  lk->locked = 0;
  lk->pid = 0;
  wakeup(lk);
  release(&lk->lk);
}

标签:hash,struct,lock,Locks,bcache,Lab8,buf,addr
来源: https://www.cnblogs.com/123chen-jiahui/p/16479590.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有