java 多线程基础
基础知识
/**
* 实现多线程的时候:
* 1、需要继承Thread类
* 2、必须要重写run方法,指的是核心执行的逻辑
* 3、线程在启动的时候,不要直接调用run方法,而是要通过start()来进行调用
* 4、每次运行相同的代码,出来的结果可能不一样,原因在于多线程谁先抢占资源无法进行人为控制
* 第二种实现方式:使用了代理设计模式
* 1、实现Runnable接口
* 2、重写run方法
* 3、创建Thread对象,将刚刚创建好的runnable的子类实现作为thread的构造参数
* 4、通过thread.start()进行启动
* 两种实现方式哪种用的比较多
* 推荐使用第二种方式,
* 1、java是单继承,将继承关系留给最需要的类
* 2、使用runnable接口之后不需要给共享变量添加static关键字,每次创建一个对象,作为共享对象即可
* 线程的生命周期:
* 1、新生状态:
* 当创建好当前线程对象之后,没有启动之前(调用start方法之前)
* ThreadDemo thread = new ThreadDemo()
* RunnableDemo run = new RunnableDemo()
* 2、就绪状态:准备开始执行,并没有执行,表示调用start方法之后
* 当对应的线程创建完成,且调用start方法之后,所有的线程会添加到一个就绪队列中,所有的线程同时去抢占cpu的资源
* 3、运行状态:当当前进程获取到cpu资源之后,就绪队列中的所有线程会去抢占cpu的资源,谁先抢占到谁先执行,在执行的过程中就叫做运行状态
* 抢占到cpu资源,执行代码逻辑开始
* 4、死亡状态:当运行中的线程正常执行完所有的代码逻辑或者因为异常情况导致程序结束叫做死亡状态
* 进入的方式:
* 1、正常运行完成且结束
* 2、人为中断执行,比如使用stop方法
* 3、程序抛出未捕获的异常
* 5、阻塞状态:在程序运行过程中,发生某些异常情况,导致当前线程无法再顺利执行下去,此时会进入阻塞状态,进入阻塞状态的原因消除之后,
* 所有的阻塞队列会再次进入到就绪状态中,随机抢占cpu的资源,等待执行
* 进入的方式:
* sleep方法
* 等待io资源
* join方法(代码中执行的逻辑)
*
* 注意:
* 在多线程的时候,可以实现唤醒和等待的过程,但是唤醒和等待操作的对应不是thread类
* 而是我们设置的共享对象或者共享变量
* 多线程并发访问的时候回出现数据安全问题:
* 解决方式:
* 1、同步代码块
* synchronized(共享资源、共享对象,需要是object的子类){具体执行的代码块}
* 2、同步方法
* 将核心的代码逻辑定义成一个方法,使用synchronized关键字进行修饰,此时不需要指定共享对象
*
*/
//获取当前线程对象
Thread thread = Thread.currentThread();
//获取当前线程的名称
System.out.println(thread.getName());
//获取线程的id
System.out.println(thread.getId());
//获取线程的优先级,在一般系统中范围是0-10的值,如果没有经过设置的话,就是默认值5,有些系统是0-100
System.out.println(thread.getPriority());
//设置线程池的优先级
/*
* 优先级越高一定越先执行吗?
* 不一定,只是优先执行的概率比较大而已
* */
thread.setPriority(6);
System.out.println(thread.getPriority());
//判断线程是否在活动
System.out.println(thread.isAlive());
// Waits for this thread to die.该线程强制执行
thread.join();
//睡一秒
Thread.sleep(1000);
//暂停一次
Thread.yield();
JUC(TODO)
阻塞队列
BlockingQueue(阻塞队列)
阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;
BlockingQueue
//生产者
public class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private static int element = 0;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while (element < 20) {
System.out.println("将要放进去的元素是:" + element);
blockingQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候被打断了!");
e.printStackTrace();
}
System.out.println("生产者已经终止了生产过程!");
}
}
//消费者
public class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while (true) {
System.out.println("取出来的元素是:" + blockingQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候被打断了!");
e.printStackTrace();
}
}
}
//测试类
public class MainClass {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3, true);
Producer producerPut = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producerPut).start();
new Thread(consumer).start();
}
}
PriorityBlockingQueue
public class Task implements Comparable<Task> {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
public String toString() {
return this.id + "," + this.name;
}
public class UsePriorityBlockingQueue {
public static void main(String[] args) throws Exception {
PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
Task t1 = new Task();
t1.setId(3);
t1.setName("id为3");
Task t2 = new Task();
t2.setId(4);
t2.setName("id为4");
Task t3 = new Task();
t3.setId(1);
t3.setName("id为1");
q.add(t1); //3
q.add(t2); //4
q.add(t3); //1
System.out.println("容器:" + q);
System.out.println(q.take().getId());
System.out.println("容器:" + q);
}
}
Synchronousqueue
public class SynchronousQueueExample {
static class SynchronousQueueProducer implements Runnable {
protected BlockingQueue<String> blockingQueue;
final Random random = new Random();
public SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println("Put: " + data);
blockingQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class SynchronousQueueConsumer implements Runnable {
protected BlockingQueue<String> blockingQueue;
public SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = blockingQueue.take();
System.out.println(Thread.currentThread().getName()
+ " take(): " + data);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue);
new Thread(queueProducer).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer2).start();
}
}
DelayQueue
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<DelayTask> queue = new DelayQueue<>();
queue.add(new DelayTask("1", 1000L, TimeUnit.MILLISECONDS));
queue.add(new DelayTask("2", 2000L, TimeUnit.MILLISECONDS));
queue.add(new DelayTask("3", 3000L, TimeUnit.MILLISECONDS));
System.out.println("queue put done");
while(!queue.isEmpty()) {
try {
DelayTask task = queue.take();
System.out.println(task.name + ":" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class DelayTask implements Delayed {
public String name;
public Long delayTime;
public TimeUnit delayTimeUnit;
public Long executeTime;//ms
DelayTask(String name, long delayTime, TimeUnit delayTimeUnit) {
this.name = name;
this.delayTime = delayTime;
this.delayTimeUnit = delayTimeUnit;
this.executeTime = System.currentTimeMillis() + delayTimeUnit.toMillis(delayTime);
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
}else if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
线程池
生命周期
- RUNNING 能接受提交的任务,并且能处理阻塞队列中的任务
- SHUTDOWN 关闭状态,不再接受新的提交任务,但可以处理阻塞队列中以保存的任务
- STOP 不能接受新的任务,也不处理队列中的任务,会中断正在处理任务的线程
- TIDYING 如果所有的任务都终止了,wokerCount(有效线程数)为 0,线程池进入该状态后会调用 terminated()方法进入 TERMINATED 状态
- TERMINATED 在 terminated()方法执行完后进入该状态,默认 terminated()方法中什么也没有做
常用方法
//需要一个用来多线程执行的类
public class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" running");
}
}
//CacheThreadPoolDemo
public class CacheThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for(int i = 0;i<20;i++){
executorService.execute(new Task());
}
executorService.shutdown();
}
}
//FixedThreadPool
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0 ;i<20;i++){
executorService.execute(new Task());
}
executorService.shutdown();
}
}
//ScheduledThreadPool
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
System.out.println(System.currentTimeMillis());
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("延迟三秒执行");
System.out.println(System.currentTimeMillis());
}
},3, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
}
}
//ScheduledThreadPool 周期执行
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
System.out.println(System.currentTimeMillis());
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("1------延迟一秒执行,每三秒执行一次");
System.out.println(System.currentTimeMillis());
}
},1,3, TimeUnit.SECONDS);
}
}
//SingleThreadPool
public class SingleThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for(int i = 0;i<20;i++){
executorService.execute(new Task());
}
executorService.shutdown();
}
}
ForkJoinPool 利用的是分而治之这个思想
//ForkJoinPool 随机数相加
public class ForJoinPollTask {
public static void main(String[] args) throws Exception {
int[] arr = new int[100];
Random random = new Random();
int total =0;
//初始化100个数组元素
for(int i=0,len = arr.length;i<len;i++){
int temp = random.nextInt(20);
//对数组元素赋值,并将数组元素的值添加到sum总和中
total += (arr[i]=temp);
}
System.out.println("初始化数组总和:"+total);
SumTask task = new SumTask(arr, 0, arr.length);
// 创建一个通用池,这个是jdk1.8提供的功能
ForkJoinPool pool = ForkJoinPool.commonPool();
Future<Integer> future = pool.submit(task); //提交分解的SumTask 任务
System.out.println("多线程执行结果:"+future.get());
pool.shutdown(); //关闭线程池
}
}
class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20; //每个小任务 最多只累加20个数
private int arry[];
private int start;
private int end;
/**
* Creates a new instance of SumTask.
* 累加从start到end的arry数组
* @param arry
* @param start
* @param end
*/
public SumTask(int[] arry, int start, int end) {
super();
this.arry = arry;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum =0;
//当end与start之间的差小于threshold时,开始进行实际的累加
if(end - start <THRESHOLD){
for(int i= start;i<end;i++){
System.out.println(Thread.currentThread().getName()+"的i值:"+arry[i]);
sum += arry[i];
}
return sum;
}else {//当end与start之间的差大于threshold,即要累加的数超过20个时候,将大任务分解成小任务
int middle = (start+ end)/2;
SumTask left = new SumTask(arry, start, middle);
SumTask right = new SumTask(arry, middle, end);
//并行执行两个 小任务
left.fork();
right.fork();
//把两个小任务累加的结果合并起来
return left.join()+right.join();
}
}
}
构造方法的详细说明(TODO)
拒绝策略
ThreadPoolExecutor.AbortPolicy
丢弃任务并跑出 RejectedExecutionException 异常
ThreadPoolExecutor.DiscardPolicy
丢弃任务,但不跑出异常
ThreadPoolExecutor.DiscardOldestPolicy
丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy
由调用线程处理该任务