ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Semaphore使用及源码分析

2022-05-16 12:01:04  阅读:221  来源: 互联网

标签:分析 return Thread int current 源码 Semaphore new final


  • 简介
控制并发数量
使用场景:接口限流
  • 案例1
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {

    public static void main(String[] args) {
        // 指定个数
        Semaphore semaphore = new Semaphore(8);

        // 创建10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();            // 获得信号后才能执行
                    System.out.println(Thread.currentThread().getName() + "开始执行");
                    Thread.sleep(5000L);        // 模拟该线程要执行复杂的操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();            // 释放信号
                }
            }).start();
        }
    }
    
}

# 控制台打印结果:最后线程8和线程9要过一会儿才打印,这是因为这2个线程被限制了,要等其他线程释放信号,才开始执行
Thread-0开始执行
Thread-6开始执行
Thread-2开始执行
Thread-7开始执行
Thread-5开始执行
Thread-4开始执行
Thread-1开始执行
Thread-3开始执行
Thread-9开始执行
Thread-8开始执行

Process finished with exit code 0
  • 源码分析
# 查看Semaphore类
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);      // 当我们执行信号量个数时,其实是new 的非公平实现
    }

# 查看NonfairSync方法
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);        // 调用super方法
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

# 查看super方法
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);      // 设置状态,上面传入的参数是8,则这里的状态是8
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

# 查看acquire方法
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

# 查看acquireSharedInterruptibly方法
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())          // 线程中断时,抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)        // 执行tryAcquireShared方法
            doAcquireSharedInterruptibly(arg);
    }

# 查看tryAcquireShared方法
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

# 查看实现类

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {        // 执行死循环    
                int available = getState();      // 获取状态,即是个数
                int remaining = available - acquires;  // 总个数 - 1,acquires表示每次传入的1
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))      // 判断是否小于0,返回remaining
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

# 查看release方法
    public void release() {
        sync.releaseShared(1);
    }

# 查看releaseShared
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

# 查看tryReleaseShared方法
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

# 查看实现 

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();      // 获取状态,即是总个数
                int next = current + releases;    // 当前总个数 + 每次releases的值,每次releases的值为1;相当于他已经释放了,其他线程可以去获取
                if (next < current) // overflow      // 如果结果数小于之前总个数,抛出异常
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

标签:分析,return,Thread,int,current,源码,Semaphore,new,final
来源: https://www.cnblogs.com/chniny/p/16276451.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有