Java Reinforcement 07 并发基础知识

作者 柚爸

并发的基础理论在看过CSAPP之后都不是事了, Java 使用的是在一个进程中的多线程技术, 而不是多任务操作系统中还可以使用的多进程系统. 这是为了让程序可以一次编写多次运行.

当然, 有着这样的限制, 也就不能像系统编程一样fork一个新进程来执行程序了.

  1. 任务
  2. 启动多线程任务
  3. 使用Executor
  4. 从任务中返回值
  5. 后台线程(守护线程)
  6. join

任务

一个线程就对应一个任务, 线程也是系统调用, 所以必须抽象成一个任务.

任务在Java 中必须实现Runnable 接口, 然后编写run()方法, run()方法内部就是想让这个线程去做的事情. 注意这里说的是类, 每一个实例化出来的对象, 都是一个独立的任务.

package thinkinginjava.learn.chapter21;

public class LiftOff implements Runnable {

    protected int countDown = 10;
    private static int taskCount = 0;
    private final int id = taskCount++;

    public LiftOff() {

    }

    public LiftOff(int countDown) {
        this.countDown = countDown;
    }

    public String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff!") + ").";
    }

    @Override
    public void run() {
        while (countDown-- > 0) {
            System.out.println(status());
            Thread.yield();
        }
    }
}

这里有几个常用技术: 用private final int id = taskCount++;来标识不同的对象.

通常run()会写成循环, 或者从run中返回, 由于run不能附带参数, 所以可以让其使用类中的内容.

Thread.yield()指的是对线程调度器的建议, 建议调度器让自己休眠, 让其他线程执行.

如果直接实例化一个对象然后调用run()方法, 这和调用一个普通方法一样, 不会有多线程效果:

public static void main(String[] args) {
    LiftOff liftOff = new LiftOff();

    liftOff.run();
}

这只是一个普通的顺序方法调用.

启动多线程任务

将Runnable 对象提交给一个Thread类, 才是启动了多线程任务, 方法如下:

public class BasicThreads {

    public static void main(String[] args) {
        Thread t = new Thread(new LiftOff());
        t.start();
        System.out.println("Waiting for LiftOff");

    }
}

Thread类的构造器需要一个Runnable对象, 然后会调用其中的run()方法执行任务, 此时就是在一个新的线程中执行任务了. 从上边这个输出可以看到, Waiting for LiftOff输出在任务过程结束之前.

这说明t.start()不会阻塞, 后边的打印语句立刻就跟着执行了.

使用Executor

java.util.concurrent 并发包中有一个Executor,可以用来方便的管理任务, 而不像上上边这样需要自己创建线程对象. 语义也更加直接:

public class CachedThreadPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i < 100; i++) {
            executorService.execute(new LiftOff());
        }

        //阻止新任务提交
        executorService.shutdown();

    }
}

还可以指定线程池的数量:

ExecutorService executorService = Executors.newFixedThreadPool(10);

当然也有固定的一个线程的任务, 叫做Executors.newSingleThreadExecutor(), 如果用这个来执行上边的程序, 实际就是顺序执行所有程序,不再有并发效果, 这个固定一个线程的任务经常用来执行一些后台的工作, 而且很显然, 这个线程也无需同步锁.

从任务中返回值

run方法无参数而且必定没有返回值. 如果需要返回值, 就不需要实现Runnable, 而是实现Callable接口:

public class CallableDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        ArrayList<Future<String>> arrayList = new ArrayList<>();

        for (int i = 0; i < 1000; i++) {
            arrayList.add(executorService.submit(new TaskWithResult(i)));
        }

        for (Future<String> fs : arrayList) {
            try{
                System.out.println(fs.get());

            } catch (InterruptedException e) {
                e.printStackTrace();
                return;
            } catch (ExecutionException e) {
                e.printStackTrace();
            } finally {
                executorService.shutdown();
            }
        }
    }

}


class TaskWithResult implements Callable<String> {

    private int id;

    @Override
    public String call() throws Exception {
        return "result of TaskWithResult: " + System.currentTimeMillis() % 59;
    }

    public TaskWithResult(int i) {
        this.id = i;
    }

}

这个是套路用法, 外边要套一个参数化的Future对象, 然后使用get()从其中来获取结果, 如果获取不到, 会阻塞.

线程休眠可以使用Thread.sleep(), 异常是不能跨线程传播的.

后台线程(守护线程)

后台线程在所有前台线程执行完毕的时候, 就会被操作系统给砍了(整个进程也砍了). 要使用后台线程, 在 调用.start()之前, 将那个线程设置成后台:

Thread t = new Thread(new Task());
t.setDaemon(true);
t.start();

join

类似于CSAPP里底层的join , 当前线程会等待执行了join的线程结束再返回. 如果带上一个时间参数, 也可以在到时间之后返回, 如果要打断某个线程的sleep, 可以调用 interrupt() 方法, 这个时候需要try-catch子句.

public class Joining {
    public static void main(String[] args) {
        Sleeper sleepy = new Sleeper("Sleepy", 1500);
        Sleeper grumpy = new Sleeper("Grumpy", 1500);

        Joiner
//                dopey去等待sleepy, doc去等待Grumpy
                dopey = new Joiner("Dopey", sleepy),
                doc = new Joiner("Doc", grumpy);
        //打断睡眠中的Grumpy, 由于睡眠之后没有语句, 打断结束线程会结束, 等待该线程的线程也会结束
        grumpy.interrupt();

    }

}


class Sleeper extends Thread {
    private int duration;

    public Sleeper(String name, int sleepTime) {
        super(name);
        duration = sleepTime;
        start();
    }

    public void run() {
        try {
            sleep(duration);

        } catch (InterruptedException e) {
            System.out.println(getName() + " was interrupted." + "isInterrupted(): " + isInterrupted());
            return;
        }
        System.out.println(getName() + "has awakened");
    }
}

class Joiner extends Thread {
    private Sleeper sleeper;

    public Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        //开始运行此线程, 虚拟机会调用run()方法
        start();
    }

    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            System.out.println("Interrupted");

        }
        System.out.println(getName() + " join completed");
    }
}