Java Reinforcement 08 并发中的资源竞争

作者 柚爸

CSAPP里讲到并发的时候需要关注程序执行的逻辑流, 如果逻辑流有冲突, 就会造成不可预知的结果, 如果要操作资源, 要先定义好哪一片是临界区域, 在临界区域开头和结尾采取同步方式.

这次就要来看看Java里对于这个是如何使用的.

先记住Java 编程思想的 同步规则:

如果你正在写一个变量, 它可能接下来被另外一个线程读取, 或者正在读取一个上一次以及被另外一个线程写过的变量, 那么必须使用同步, 并且, 读写线程都必须用相同的监视器锁同步.

  1. 多线程竞争
  2. Lock对象
  3. 原子性 与 可视性
  4. 原子类
  5. 临界区
  6. 分隔存储

多线程竞争

Java编程思想这里给出了一个简单的测试条件, 即一个发生器, 不断的产生偶数(我们的意图), 然后用一个东西去检查这个发生器, 如果当前的数字是偶数, 就继续检查 .next()生成的数字.

按照我们的意图, 一个偶数的发生器肯定要一直产生偶数下去, 如果失败了, 就说明有问题. 用到的几个类如下. 这里为了解耦, 使用了一个抽象类:

//抽象类, 核心是next()方法, 以及一个标志该发生器对象是否出了问题的canceled布尔值
public abstract class IntGenerator {

    private volatile boolean canceled = false;

    public abstract int next();

    public void cancel() {
        canceled = true;
    }

    public boolean isCanceled() {
        return canceled;
    }

}
//偶数发生器
public class EvenGenerator extends IntGenerator {

    private int currentEvenValue = 0;

    //这个方法意图在于每次调用的时候, 产生0, 2, 4.....等偶数
    @Override
    public int next() {
        //自增1
        ++currentEvenValue;

        //自增1
        ++currentEvenValue;
        return currentEvenValue;
    }

}

最后是一个偶数检查器:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class EvenChecker implements Runnable {

    private IntGenerator intGenerator;
    private final int id;

    public EvenChecker(IntGenerator intGenerator, int ident) {
        this.intGenerator = intGenerator;
        id = ident;
    }

    @Override
    public void run() {
        while (!intGenerator.isCanceled()) {
            int val = intGenerator.next();
            if (val % 2 != 0) {
                System.out.println(val + " not even!");
                intGenerator.cancel();
            }
        }
    }

    public static void test(IntGenerator gp, int count) {
        System.out.println("Press Ctrl+C to exit.");
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < count; i++) {
            executorService.execute(new EvenChecker(gp, i));
        }
        executorService.shutdown();
    }

    public static void test(IntGenerator gp) {
        test(gp, 10);
    }
}

这个偶数检查器按照我们的意图, 如果全部都是偶数, 应该会一直运行下去, 不会退出. 注意红色的部分, 使用了线程池.

运行之后会发现, 可能一开始不会出错, 但最终必然会出错, 这是因为自增不是原子性操作, 是两条指令. 在两个自增之间如果加上Thread.yield(), 会更早的出错, 因为那时候强制线程挂起.

在CSAPP里已经知道有两种办法解决这个问题, 一是每个线程操作互相不冲突的数据区域, 二是可以使用互斥锁, 也就是一个特殊的整型变量来保护临界区, 也就是两次自增的代码, 以保证两次自增完成之后.

针对上述代码的修改, 我们可以看到是10个线程对一个对象同时进行操作, 要保护的临界区域实际上就是两次自增代码, 必须一次性自增2才可以, 中间不允许打断. 当然, 实际的共享数据区域并不是next()方法, 而是currentEvenValue变量所在的内存区域. 只不过这个区域只能通过.next()方法修改, 所以.next()方法就是临界区域.

Java 针对锁, 对每个对象都提供了单一的锁, 称为监视器. 在一个对象上调用 synchronized 方法, 在这个线程从方法返回之前, 其他要调用类中的方法的线程都必须阻塞.

所以可以在方法上声明 synchronized :

    @Override
    public synchronized int next() {
        ++currentEvenValue;

        //即使加上这个会导致早点出现失败的语句, 也不会再失败了.
        Thread.yield();

        ++currentEvenValue;
1
        return currentEvenValue;
    }

注意, 在使用并发的时候, 一定要将域设置为private 然后通过方法访问, 这样才能起到好的效果.

锁的个数还可以累计, 如果一个线程拿到了锁, 然后调用了另外一个需要锁的方法, 那么锁的数量会累加, 每退出一个 synchronized 锁的数量会减少1. 只有获得锁的对象才能继续获得锁.

注意, 锁是跟着监视器对象走的, 如果多个线程操作同一个对象, 那么监视器对象就要设置成这个对象. 如果是对一个类的静态变量进行操作, 监视器对象就要设置成这个类才行.

此外注意, 如果操作共享变量的方法多与一个, 那必须全部 synchronized 才可以, 否则还是会出现问题.

synchronized 是隐式的锁, 还有显式的锁, 使用起来更加灵活, 显式的锁其实有点像信号量啊感觉.

Lock对象

显式的Lock对象包含在 java.util.concurrent.locks 中. 必须被显式的创建, 获取和释放.

用显示锁来重写上边的例子:

public class MutexEvenGenerator extends IntGenerator {

    private int currentEvenValue = 0;

    //可重入锁对象
    private Lock lock = new ReentrantLock();

    @Override
    public synchronized int next() {
        //上锁
        lock.lock();
        //必须要使用try语句, 以保证临界区执行完毕之后会解锁
        try {

            ++currentEvenValue;
            ++currentEvenValue;

        } finally {
        //finally子句中取消锁, 保证即使出现异常也恢复到正常状态
            lock.unlock();
        }
        return currentEvenValue;
    }

    public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator());
    }

}

显式的锁的好处是更加灵活, 而且可以尝试获取锁失败, 这是sync语句做不到的. lock.trylock()返回一个布尔值, 表示是否成功获取锁. 如果返回true, 则暗含已经lock.lock(), 之后在finally子句中 lock.unlock()即可:

trylock方法还支持传入时间参加的重载方式, 可以指定时间:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AttempLocking {

    private ReentrantLock lock = new ReentrantLock();

    public void untimed() {
        //尝试获取锁
        boolean captured = lock.tryLock();

        try {
            System.out.println("tryLock():" + captured);

        }finally {
            if (captured) {
                lock.unlock();
            }
        }
    }

    public void timed() {
        boolean captured = false;

        try {
            captured = lock.tryLock(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        try{
            System.out.println("tryLock(2secons)");
        }
        finally {
            if (captured) {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        final AttempLocking al = new AttempLocking();
        //在main线程里上锁, 然后解锁, 正常
        al.untimed();
        al.timed();
        System.out.println("-------------");

        //匿名类直接启动一个新线程, 上锁
        new Thread(){
            {
                setDaemon(true);}

            public void run() {

                al.lock.lock();

                System.out.println("acquired");
            }
        }.start();
        //之后将控制权交给其他线程
        Thread.yield();

        //再启动一个线程调用timed()方法
        new Thread(){
            {
                setDaemon(true);}

            public void run() {

                al.timed();

            }
        }.start();
        Thread.yield();

        //再启动一个线程调用 untimed()方法
        new Thread(){
            {
                setDaemon(true);}

            public void run() {

                al.untimed();

            }
        }.start();
        Thread.yield();
    }
}

在一个线程里加上锁之后, 另外一个线程就拿不到锁了. 如果尝试超过时间, 就会返回结果.

原子性 与 可视性

知道了并发的基本概念和锁的基本概念, 就可以来了解原子性了, 如果一个操作是原子性的, 那么就没有必要对这个操作进行加锁.

但问题是如何控制原子性. JVM的原子性操作是:

读取和写入除了long和double之外的基本类型变量, 可以保证其当做不可分离的操作(底层指令就一条,执行了就改变了). long和double因为会被当成两个32位处理, 因此在指令中很可能不是一步.

使用volatile定义long和double变量, 也可以获得原子性.

可视性则是另外一个概念, 即一个线程的操作结果是否可以被其他线程看到. 在CSAPP中, 经常可以看到计算出的值被保存在寄存器中, 还没有写入内存中, 此时线程上下文被操作系统保存, 另外一个线程如果从内存中获取数据, 数据依然是没有改变过的数据. 如果线程需要的是改变之后的数据, 就会出错.

volatile 是确保可视性的声明, 也即读写内存的操作, 只要使用volatile标识的变量, 其值被写入之后, 一定会被下一个读取操作读出改变之后的值.

原子性和可视性是互相分离的. 一个方法中的临时变量可以是原子的, 但未必要对其他线程可视. 作为共享数据的变量如果有可视需求, 则必须声明为可视, 否则每个线程未必会及时更新该变量.

同步也会导致主存刷新, 如果一个临界区域由synchronized方法保护或者声明了锁, 则无需声明volatile.

所以使用volatile的唯一安全的情况, 就是类中只有一个可变的域, 同时被多个线程访问, 其中至少一个线程是写入的.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AtomicityTest implements Runnable {

    private int i = 0;

    public int getI() {
        return i;
    }

    private synchronized void evenIncrement() {
        i++;
        Thread.yield();
        i++;
    }

    @Override
    public void run() {
        while (true) {
            evenIncrement();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        AtomicityTest atomicityTest = new AtomicityTest();
        //启动无限自增2的任务
        executorService.execute(atomicityTest);

        //新启动一个线程不断去读i的值, 读取到奇数就打印并终止
        new Thread(() -> {
            while (true) {
                int val = atomicityTest.getI();
                if (val % 2 != 0) {
                    System.out.println(val);
                    System.exit(0);
                }
            }
        }).start();

    }
}

看上去private synchronized void evenIncrement()这里似乎是没有问题的, 则每次产生的i应该都是偶数. 但实际上会发生取出一个奇数的问题.

实际上把这个例子与之前的例子比较一下就知道, getI()方法中的 return i 虽然是原子的, 但没有被sync所保护, 因此读取的i未必是正好自增的i, 可能是处于自增过程中的奇数的i.

这个问题还有可视性的问题, 到底i是否被写入了内存, 在运行的过程中不得而知. 这个例子需要把getI()方法也修改成同步的, 当然把i声明为volatile更好, 但是两个都使用了sync保护, 就没有必要了.

原子类

原子类是在基本类型外边包裹了一层, 会被解释成机器级的原子性操作, 比如改写一下之前的自增2的类:

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest implements Runnable {

    private AtomicInteger i = new AtomicInteger(0);

    public int getValue() {
        return i.get();
    }

    private void evenIncrement() {
        i.addAndGet(2);
    }

    @Override
    public void run() {
        while (true) {
            evenIncrement();
        }
    }
}

这其中的 evenIncrement() 全部都是操作AtomicInteger类, 没有其他的操作, 则可以将 evenIncrement() 方法看成是原子的, 无需使用 synchronized 来修饰.

原子类还有AtomicLong, AtomicReference 等, 只要记住有这个类就可以了. 但是由于原子类的使用比较严格, 因此使用的还是比较少的.

临界区

学过CSAPP知道, 临界区才是通用情况,保护部分代码, Java能够给方法加上synchronized修饰是语法便利.

实际上可以在方法内部采用 synchronized(Object){} 来为一段代码块标识为临界区.

Object是对象监视器, 也就是带有锁的对象, 由于多线程竞争都是在读写同一个对象, 通常可以设置成this, 如果明确的知道某个对象, 可以设置成那个对象.

如果知道一个非线程安全的类, 但是想要改写成线程安全的类, 在CSAPP中实际上是套一个函数, 在使用该功能的外部加上锁. 在Java 中也一样, 可以让一个对象持有非线程安全的对象, 然后套壳加锁.

如果 synchronized(Object){} 中的Object 是不同的, 则这些线程不会互相影响.

public class SyncObject {
    public static void main(String[] args) {
        final DualSynch dualSynch = new DualSynch();

        //起一个新线程执行f()
        new Thread() {
            @Override
            public void run() {
                dualSynch.f();
            }
        }.start();

        //Main线程中执行g()
        dualSynch.g();

    }
}

class DualSynch {
    private Object syncObject = new Object();

    //默认的synchronized采用本身对象
    public synchronized void f() {
        for (int i = 0; i < 5; i++) {
            System.out.println("f()");
            Thread.yield();
        }
    }

    //这个使用了另外一个对象监视器
    public void g() {
        synchronized (syncObject) {
            for (int i = 0; i < 5; i++) {
                System.out.println("g()");
                Thread.yield();
            }
        }
    }

}

注意其中的红色部分, 由于对象监视器不同, 所以两个函数虽然在同一个对象里, 也都使用也 synchroized, 但是实际上没有影响.

如果将syncObject改成this, 输出就会有所变化, f() 和 g() 一个执行完才会执行另外一个.

分割存储

如果让不同的线程操作不同的内存空间, 那就不会有竞争问题了. 可以使用一个ThreadLocal对象, 使用get()和set()来获取值, 返回和设置的都是与当前线程相关的值, 而不是被所有线程共享的值.

这个实际上感觉就是一个封了一层的语法糖.

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Accessor implements Runnable {

    private final int id;

    public Accessor(int idn) {
        id = idn;
    }

    @Override
    public void run() {
        ThreadLocalVariableHolder.set(new Random().nextInt(100));
        while (!Thread.currentThread().isInterrupted()) {
            ThreadLocalVariableHolder.increment();
            System.out.println(this);
            Thread.yield();
        }
    }

    @Override
    public String toString() {
        return "#" + id + ": " + ThreadLocalVariableHolder.get();
    }
}

public class ThreadLocalVariableHolder {

    //一般ThreadLocal对象可以做成一个静态的,这里创建一个int类型的ThreadLocal对象
    private static ThreadLocal<Integer> value = new ThreadLocal<>();

    //自增1然后设置新的值, 这些值由于都互相独立, 所以根本不需要同步
    public static void increment() {
        value.set(value.get() + 1);
    }

    public static void set(int i) {
        value.set(i);
    }


    //获取值
    public static int get() {
        return value.get();
    }

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        //启动10个线程, 自己读写自己的变量,互相不会冲突
        for (int i = 0; i < 3; i++) {
            executorService.submit(new Accessor(i));
        }
        //main线程3秒之后退出, 可以看到各个线程自增的数字互相不影响
        TimeUnit.MILLISECONDS.sleep(300);
        executorService.shutdown();
        System.exit(0);
    }
}

这段程序实际上是验证一个ThreadLocal对象可以同时为多个线程提供存取服务, 如果每个线程创建一个新对象, 那就不是竞争资源了.