原创

JUC的学习记录


JUC并发学习

JUC (java.util.concurrent) 普通线程代码:Thread Runnable:没有返回值、效率比Callable相对较低 java默认有 2 个线程 main、GC

什么是并发

​ 多线程操作同一个资源 ​ 单核CPU 快速切换线程,模拟出来多线程并发

什么是并行

​ 多个线程同时执行 ​ 多核CPU,多个线程可以同时执行 ​ 查看CPU多少核

	public static void main(String[] args) {
        //获取CPU核数
        System.out.println(Runtime.getRuntime().availableProcessors());
    }

线程的状态

新生:NEW
运行:RUNNABLE
阻塞:BLOCKED
等待:WAITING
超时等待:TIMED_WAITING
终止:TERMINATED

wait/sleep区别

​ 1、来自不同的类 ​ wait => Object ​ sleep => Thread ​ 2、关于锁的释放 ​ wait:会释放锁 ​ sleep:不会释放锁 ​ 3、使用的范围不同 ​ wait:必须在同步代码块中 ​ sleep:可以在任何地方使用 ​ 4、是否需要捕获异常 ​ wait:不需要捕获异常 ​ sleep:需要捕获异常(可能超时等待) ​

实现使用锁的方式

​ 线程是一个单独的资源来,没有任何的附属操作

Synchronized 传统的方式

例子:

public class Test01 {

    public static void main(String args[]) {
        //并发,操作同一个资源类,把资源类丢入线程
        Ticket01 ticket01 = new Ticket01();
        //FunctionalInterface 是函数式接口   lambda表达式 (方法参数)->{代码内容}
        new Thread(() -> {
            for (int i=0;i<100;i++){
                ticket01.sale();
            }
        }, "A").start();
    
        new Thread(() -> {
            for (int i=0;i<100;i++){
                ticket01.sale();
            }
        }, "B").start();
    
        new Thread(() -> {
            for (int i=0;i<100;i++){
                ticket01.sale();
            }
        }, "C").start();
    
    }

}
//资源类 oop
class Ticket01 {
    private int number = 100;
    //synchronized 本质就是队列、锁
    public synchronized void sale() {
        if (number > 0) {
            System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张--还剩下" + number);
        }
    }
}

Lock锁方式

ReentrantLock 可重用锁(常用的) 就是普通的锁 内有公平锁(FairSync)和非公平锁(NonfairSync) FairSync: 十分公平,先来后到 NonfairSync:十分不公平,可以插队(默认) ReentrantReadWriteLock.ReadLock 读锁 ReentrantReadWriteLock.WriteLock 写锁

lock的使用

​ 1、new ReentrantLock() ​ 2、lock.lock(); 加锁 ​ 3、try{业务代码} 执行业务代码 ​ 4、finally{lock.unlock();} 解锁

例子:

public class Test02 {

    public static void main(String args[]) {
        //并发,操作同一个资源类,把资源类丢入线程
        Ticket02 ticket02 = new Ticket02();
        //FunctionalInterface 是函数式接口   lambda表达式 (方法参数)->{代码内容}
        new Thread(() -> {
            for (int i=0;i<35;i++){
                ticket02.sale();
            }
        }, "A").start();
    
        new Thread(() -> {
            for (int i=0;i<35;i++){
                ticket02.sale();
            }
        }, "B").start();
    
        new Thread(() -> {
            for (int i=0;i<40;i++){
                ticket02.sale();
            }
        }, "C").start();
    
    }

}
//资源类 oop
class Ticket02 {
    private int number = 100;
    //常用的普通的锁
    Lock lock = new ReentrantLock();

    //synchronized 本质就是队列、锁
    public  void sale() {
        //用之前枷锁 lock.lock()
        lock.lock();
        try {
            //业务代码
            if (number > 0) {
                System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张--还剩下" + number);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //用完之后解锁  lock.unlock()
            lock.unlock();
        }
    }

}

Synchronized 和 Lock的区别

1.Synchronized是关键字
  Lock 是一个java类
2.Synchronized 是无法判断获取锁的状态
  Lock 可以判断是否获取到了锁
3.Synchronized会自动释放锁
  Lock 需要手动释放锁  如果不释放锁就会死锁
4.Synchronized 线程1获得锁,线程2就会等待,如果线程1阻塞,线程2还会一直等待
  Lock 就不一定会等待下去  执行lock.tryLock()  就会尝试获取锁
5.Synchronized 是可重入锁,使用不可以中断的非公平锁
  Lock 也是可重入锁,但是可以判断锁,可以自己设置是否使用公平锁
6.Synchronized 适合锁少量的代码同步问题
  Lock 适合锁大量的代码同步

生产者和消费者问题

Synchronized的生产者消费者问题

Synchronized:wait(等待)、notify(通知)

public class Test01 {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
    }
}
//资源类
//生产者消费者   判断等待、业务、通知
class Data {
    private int num = 0;

    //+1
    public synchronized void increment() throws InterruptedException {
        if (num != 0) {
            //等待
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName() + "执行完毕--" + num);
        //通知其他线程
        this.notifyAll();
    }
    
    //-1
    public synchronized void decrement() throws InterruptedException {
        if (num == 0) {
            //等待
            this.wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName() + "执行完毕--" + num);
        //通知其他线程
        this.notifyAll();
    }

}

如果是多个线程 就会出现虚假唤醒问题,防止虚假唤醒应该使用while判断,而不是if

JUC版的生产者消费者的问题

Lock:
	Lock lock = new ReentrantLock();
	Condition notFull  = lock.newCondition(); 
	Condition notEmpty = lock.newCondition(); 
	notFull.await();   	->	Synchronized是wait
	notEmpty.signal();	->	Synchronized是notify
public class Test02 {
    public static void main(String[] args) {

        Data02 data02 = new Data02();
    
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data02.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data02.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data02.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
        new Thread(()->{
            for (int i=0;i<10;i++){
                try {
                    data02.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }

}
class Data02 {
    private int num = 0;
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //+1
    public void increment() throws InterruptedException {
        lock.lock();
        try {
            //业务代码
            while (num != 0) {
                //等待
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName() + "执行完毕--" + num);
            //通知其他线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    
    //-1
    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            //业务代码
            while (num == 0) {
                //等待
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName() + "执行完毕--" + num);
            //通知其他线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

}	

以上不能准确的依次执行,以下代码能依次的执行 A->B->C->A

public class Test3 {
    public static void main(String[] args) {
        Date03 date03 = new Date03();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                date03.A();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                date03.B();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                date03.C();
            }
        }, "C").start();
    }
}
class Date03 {
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private int num = 1;

    public void A() {
        lock.lock();
        try {
            //业务代码  判断->执行->通知
            while (num != 1) {
                //等待
                condition1.await();
            }
            num = 2;
            System.out.println(Thread.currentThread().getName() + "--执行了");
            //唤醒指定的
            condition2.signal();    //唤醒condition2
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public void B() {
        lock.lock();
        try {
            //业务代码
            while (num != 2) {
                condition2.await();
            }
            num = 3;
            System.out.println(Thread.currentThread().getName() + "--执行了");
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public void C() {
        lock.lock();
        try {
            //业务代码
            while (num != 3) {
                condition3.await();
            }
            num = 1;
            System.out.println(Thread.currentThread().getName() + "--执行了");
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

小结

两种区别

​ new: 锁的 this 具体的一个

​ static:所得 Class 唯一的一个模板

集合类不安全

​ List:并发下 list 是不安全的

​ 解决方案

        1.List<String> list = new Victor<>();
        2.List<String> list = Collections.synchronizedList(new ArrayList<>());
        3.List<String> list = new CopyOnWriteArrayList<>();

​ Set:和list一样 不安全

​ 解决方案

        1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
        2.Set<String> set = new CopyOnWriteArraySet<>();

​ HashSet的底层:

			public HashSet() {
				map = new HashMap<>();
			}

​ add: set的本质就是map的key,因为map的key是无法重复的,所以set就是无序的

			public boolean add(E e) {
				return map.put(e, PRESENT)==null;
			}

​ Map: 不安全

​ 多并发下map不安全解决方案

        1.Map<String, Object> map = Collections.synchronizedMap(new HashMap<>());
        2.Map<String, Object> map = new ConcurrentHashMap<>();

Callbale

  1. 多线程的第三种创建方式
  2. 可以有返回值
  3. 可以抛出异常
  4. 方法不同

有泛型,传入的泛型的参数是方法的返回值

例子:

	class MyThread02 implements Callable<String> {
		@Override
		public String call() throws Exception {
			return null;
		}
	}
	new Thread() 不能直接传入Callable  需要适配器中转:
	MyThread02 myThread02 = new MyThread02();
	//适配类
	FutureTask futureTask = new FutureTask(myThread02);
    //将 futureTask 放入Thread
    new Thread(futureTask,"A").start();
    //获取 Callable 的返回值
	String str = (String) futureTask.get();

细节:

  1. 有缓存
  2. 结果可能需要等待,会阻塞

常用辅助类

1、CountDownLatch 减法计数器

​ 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助

​ 例子:

		public static void main(String[] args) throws InterruptedException {
			//总数是 6  有必须要执行任务的时候再使用
			CountDownLatch countDownLatch = new CountDownLatch(6);
			//每次有线程调用 countDown() 数量减1
			for (int i = 1; i <=6 ; i++) {
				new Thread(()->{
					System.out.println(Thread.currentThread().getName()+"--走了");
					countDownLatch.countDown(); // countDownLatch的数量减1
				},String.valueOf(i)).start();
			}
			//假设计数器变为0,countDownLatch.await() 就会被唤醒,然后继续执行
			countDownLatch.await();//等待计数器归零,然后再向下执行
			System.out.println("关门");
		}
		countDownLatch.countDown();  数量减1
		countDownLatch.await(); 等待计数器归零,然后再向下执行

2、CyclicBarrier 加法计数器

public static void main(String[] args) {
			CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
				System.out.println("收集了7颗龙珠,召唤神龙");
			});
			for (int i = 1; i <=7 ; i++) {
				final int temp = i;
				new Thread(()->{
					System.out.println(Thread.currentThread().getName()+"---已经收集了"+temp+"颗龙珠");
					try {
						cyclicBarrier.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				},String.valueOf(i)).start();
			}
		}

3、Semaphore 操作信号量

public static void main(String[] args) {
  // 默认线程数量  例子 停车位  限流
  Semaphore semaphore = new Semaphore(3);
  for (int i = 1; i <=6 ; i++) {
        new Thread(()->{
            //acquire() 得到
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"---抢到了停车位");
                Thread.sleep(3);
                System.out.println(Thread.currentThread().getName()+"---离开了车位");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //释放的操作得放到finally内
                //release() 释放
                semaphore.release();
            }
        },String.valueOf(i)).start();
    }

}

原理:acquire() 获取 假设如果已经满了,就会等待,等待被释放为止

​ release() 释放 将当前的信号量释放 +1 然后唤醒等待的线程

​ 多个共享资源互斥的使用。并发限流,控制最大的线程数

读写锁 ReadWriteLock

​ 读可以被多线程同时读

​ 写的时候只能有一个线程去写

​ 独占锁(写锁) 一次只能被一个线程占用

​ 共享锁(读锁) 多个线程可以同时占用

​ 读 - 读 可以共存

​ 读 - 写 不能共存

​ 写 - 写 不能共存

	ReadWriteLock.writelock().lock()  加写入锁
	ReadWriteLock.readlock().lock()  加读取锁
	ReadWriteLock.writelock().unlock()  释放写入锁
	ReadWriteLock.readlock().unlock()  释放读取锁
public class ReadWriteLockTest2 {
    public static void main(String[] args) {
        MyCache2 myCache2 = new MyCache2();
        //执行写入
        for (int i = 1; i <=5 ; i++) {
            final int temp = i;
            new Thread(()->{
                myCache2.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //执行读取
        for (int i = 1; i <=5 ; i++) {
            final int temp = i;
            new Thread(()->{
                myCache2.get(temp+"");
            },String.valueOf(i)).start();
        }
    }

}
/**

 * 自定义缓存  有锁
   */
   class MyCache2 {
   private volatile Map<String, Object> map = new HashMap<>();
   //读写锁  更加细粒度的控制
   private ReadWriteLock readWriteLock =  new ReentrantReadWriteLock();

   //存 写的过程  写入的时候 只希望同时只有一个线程写
   public void put(String key,Object value) {
       //加写入锁
       readWriteLock.writeLock().lock();
       try {
           //业务代码
           System.out.println(Thread.currentThread().getName()+"---执行的写操作---key"+key);
           map.put(key,value);
           System.out.println(Thread.currentThread().getName()+"---写入完毕");
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           //释放写入锁
           readWriteLock.writeLock().unlock();
       }
   }

   //取 读的过程  所有的线程都可以同时进行读取
   public void get(String key) {
       //加锁
       readWriteLock.readLock().lock();
       try {
           //业务代码
           System.out.println(Thread.currentThread().getName()+"---执行的读操作---key"+key);
           Object value = map.get(key);
           System.out.println(Thread.currentThread().getName()+"---读取完毕---value"+value);
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           //释放锁
           readWriteLock.readLock().unlock();
       }

   }

}	

阻塞队列

FIFO 先进先出	

不得不阻塞:

​	写入阻塞情况:

​		队列满了,就必须阻塞等待其他数据被取走

​	取出阻塞情况:

​		如果队列是空的,就必须阻塞等待生产

 BlockingQueue<E>

​	父类:Collection <E>, 

​	重要实现类:ArrayBlockingQueue 、LinkedBlockingQueue 、 SynchronousQueue

​	Collection

​		List  Set  					Queue

​					Deque  	BlockingQueue	AbstractQueue

​					双端队列	阻塞队列		非阻塞队列

​	使用队列:

​		添加、移除

四组API

1.抛出异常

​ 添加 .add() 移除 .remove() 返回队首 .element()

public static void test1() {
    // ()写队列的大小
    ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(blockingQueue.add("a"));
    System.out.println(blockingQueue.add("b"));
    System.out.println(blockingQueue.add("c"));
    // 添加的数据 超过队列大小  就会抛出异常
    //System.out.println(blockingQueue.add("d"));

    System.out.println(blockingQueue.remove());
    System.out.println(blockingQueue.remove());
    System.out.println(blockingQueue.remove());
    //此时队列里已经没有元素了  就会抛出异常
    //System.out.println(blockingQueue.remove());
}

2.不会抛出异常 有返回值

​ 添加 .offer() 移除 .poll() 返回队首 .peek()

public static void test2() {
    ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);

    System.out.println(blockingQueue.offer("a"));
    System.out.println(blockingQueue.offer("b"));
    System.out.println(blockingQueue.offer("c"));
    //不会抛出异常  返回false
    System.out.println(blockingQueue.offer("d"));

    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    //不会抛出异常  取不到值,返回null
    System.out.println(blockingQueue.poll());
}

3.阻塞等待

​ 添加 .put() 移除 .take()

public static void test3() throws InterruptedException {
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        //没有返回值
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        //队列没有位置了  会一直等待  一直阻塞
        //blockingQueue.put("d");
        System.out.println(blockingQueue.take());
   	 	System.out.println(blockingQueue.take());
    	System.out.println(blockingQueue.take());
    	//当没有第四个元素  就会一直等待  一直阻塞
    	//System.out.println(blockingQueue.take());
}

4.超时等待

​ 添加:offer("",时间,秒还是...) 移除:poll(时间,秒还是...)

public static void test4() throws InterruptedException {
    ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
	blockingQueue.offer("a");
    blockingQueue.offer("b");
    blockingQueue.offer("c");
    //如果队列满了,等待两秒,如果超时了,就退出
    blockingQueue.offer("d", 2, TimeUnit.SECONDS);

    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    //取值,等待两秒如果取不到就退出
    System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}

同步队列 SynchronousQueue

​ 没有容量 ​ 添加进一个元素,必须得等取出来之后才能再往里面放一个元素 ​ 添加 .put() 移除 .take()

线程池

重点

好处:

  1. 降低资源的消耗
  2. 提高响应的速度
  3. 方便管理 线程复用、可以控制最大并发数、管理线程

三大方法

单个线程

ExecutorService executorService = Executors.newSingleThreadExecutor();
			此时使用线程就用 executorService.execute()
			executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+"-----");
            });
			}	

​ 关闭线程池 executorService。shutdown();

​ 此时最高有一个线程在执行 创建固定的线程池的大小

	ExecutorService executorService = Executors.newFixedThreadPool(5);
			executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+"-----");
            });
			}	

​ 此时并发线程执行数量是自己初始化创建的个数,例如此时是5个 可伸缩的

	ExecutorService executorService = Executors.newCachedThreadPool(); 
			for (int i = 1; i <=10 ; i++) {
            //使用线程池创建线程
            executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+"-----");
            });
			}	

​ 此时有10个线程同时执行

七大参数

根据这三大方法分析,调用线程执行的都是 ThreadPoolExecutor()

public ThreadPoolExecutor(int corePoolSize,		核心线程池大小
                              int maximumPoolSize,		最大核心线程池大小
                              long keepAliveTime,		超时了没有被调用就会释放
                              TimeUnit unit,			超时单位
                              BlockingQueue<Runnable> workQueue,	阻塞队列
                              ThreadFactory threadFactory,			线程工厂:创建线程的,一般不用动
                              RejectedExecutionHandler handler 拒绝策略 )  {
			if (corePoolSize < 0 ||
				maximumPoolSize <= 0 ||
				maximumPoolSize < corePoolSize ||
				keepAliveTime < 0)
				throw new IllegalArgumentException();
			if (workQueue == null || threadFactory == null || handler == null)
				throw new NullPointerException();
			this.corePoolSize = corePoolSize;
			this.maximumPoolSize = maximumPoolSize;
			this.workQueue = workQueue;
			this.keepAliveTime = unit.toNanos(keepAliveTime);
			this.threadFactory = threadFactory;
			this.handler = handler;
		}

四种拒绝策略

最大承载 workQueue(队列)+ maximumPoolSize(最大线程池)

  • ​ new ThreadPoolExecutor.AbortPolicy()

​ 默认的 超过最大承载就会抛出异常

  • ​ new ThreadPoolExecutor.CallerRunsPolicy()

    ​ 到达最大承载,返回任务,不会抛出异常

  • ​ new ThreadPoolExecutor.DiscardOldestPolicy()

    ​ 到达最大承载,丢掉超出的任务,不会抛出异常

  • ​ new ThreadPoolExecutor.DiscardPolicy()

    ​ 到达最大承载,尝试和最开始执行的竞争,也不会抛出异常

线程池小结

最大线程该如何定义:

  1. CPU密集型

    ​ CPU的核数 可以让多条线程同时执行,并行效率最高

    ​ Runtime.getRuntime().availableProcessors() 获取CPU核数

  2. IO密集型

    ​ 判断程序中十分耗IO的线程,只要大于这个数就好

四大函数式接口

函数式接口:只有一个方法接口

​ 简化编程模型,在新版本的框架底层大量应用

​ 例如: foreach(消费者类的函数式接口)

​ 有一个输入参数,有一个输出

​ 只要是函数式接口,就能用lambda 表达式简化

Function:
		函数式接口:
			public class Test01 {
				public static void main(String[] args) {
			//        Function function= new Function<String,String>() {
			//            @Override           //传入参数String  返回类型 String
			//            public String apply(String str) {
			//                return str;
			//            }
			//        };

					Function function = (str)->{
						return str;
					};
				}
			}
Predicate:
		断定型接口: 只有一个参数,返回boolean
		public class Test02 {
			public static void main(String[] args) {
				//Predicate<T>  只有一个输入参数  boolean test(T t);  返回值是固定的 boolean
				//断定型接口,有一个参数,返回值是boolean
				// 判断字符串是否为空
		//        new Predicate<String>() {
		//            @Override
		//            public boolean test(String str) {
		//                if(str.isEmpty()){
		//                    return true;
		//                }
		//                return false;
		//            }
		//        };
				Predicate<String> predicate= (str)->{
					return str.isEmpty();
				};
			}
		}
Consumer:消费型接口
		public class Test03 {
			public static void main(String[] args) {
				//Consumer<T> 消费型接口 只有输入参数  void accept(T t);  没有返回值
		//        Consumer<String> consumer = new Consumer<String>() {
		//            @Override
		//            public void accept(String str) {
		//                System.out.println(str);
		//            }
		//        };
				Consumer<String> consumer = (str)->{
					System.out.println(str);
				};
			}
		}
Supplier:供给型接口
		public class Test04 {
			public static void main(String[] args) {
				// 供给型接口 Supplier<T>   T get();  没有参数  只有返回值
		//        Supplier<String> supplier = new Supplier<String>() {
		//            @Override
		//            public String get() {
		//                return null;
		//            }
		//        };
				Supplier<String> supplier = ()->{
					return null;
				};
			}
		}

Stream流式计算

集合、MySQL本质就是存储东西的

计算都应该交给流来操作

​ java.util.stream

​ AutoCloseable , BaseStream <T, Stream >

public class Test {
		public static void main(String[] args) {
			User user1 = new User(1, "a", 21);
			User user2 = new User(2, "b", 22);
			User user3 = new User(3, "c", 23);
			User user4 = new User(4, "d", 24);
			User user5 = new User(6, "e", 25);

			List<User> list = Arrays.asList(user1, user2, user3, user4, user5);

			list.stream().filter(user -> {
				//所有id是偶数的
				return user.getId() % 2 == 0;
			}).filter(user -> {
				//所有年龄大于23的
				return user.getAge() > 23;
			}).map(user -> {
				//所有用户的名字转成大写
				return user.getName().toUpperCase();
			}).sorted((u1, u2) -> {
				//按照倒序排列
				return u2.compareTo(u1);
				//只输出一个用户名
			}).limit(1).forEach(System.out::println);
		}
	}

ForkJoin:

​ 并行执行任务,提高效率,大数据量

​ 特点:工作窃取

/**
 * @Classname Test
 * @Description TODO
 * @Date 2020/8/29 14:11
 * fork join 使用
 * 1.fork join通过forkjoinpool来执行
 * 2.新建计算任务 forkjoinpool.execute(ForkJoinTask task)
 * 3. 计算类继承 ForkJoinTask
 */
public class ForkJoinTest extends RecursiveTask<Long> {

    private Long start;
    private Long end;

    //临界值
    private Long temp = 10000L;

    public ForkJoinTest(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if ((end - start) < temp) {
            Long sum = 0L;
            for (Long i = start; i < end; i++) {
                sum += i;
            }
            System.out.println(sum);
            return sum;
        } else {
            //分支合并计算 fork join
            long middle = (start + end) / 2; //中间值
            ForkJoinTest task1 = new ForkJoinTest(start, middle);
            task1.fork();//拆分任务,把任务压入线程队列
            ForkJoinTest task2 = new ForkJoinTest(middle + 1, end);
            task2.fork();
            long l = task1.join()+task2.join();
            return l;
        }
    }
}	
public class Test {

    public static void main(String[] args) {
        test3();
    }
	//普通方法
    public static void test1() {
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (Long i = 0L; i <10_0000_0000 ; i++) {
            sum+=i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum--"+sum+"  时间"+(end - start));
    }
    //fork join
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTest forkJoinTest = new ForkJoinTest(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTest);
        Long sum = submit.get();

        long end = System.currentTimeMillis();
        System.out.println("sum--"+sum+"时间"+(end - start));
    }
    //Stream 并行流
    public static void test3() {
        long start = System.currentTimeMillis();

        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);

        long end = System.currentTimeMillis();
        System.out.println("sum--"+sum+"时间"+(end - start));
    }
}

异步回调

Future设计初衷:对将来的某个时间结果进行建模

​ java.util.concurrent.CompletableFuture

异步执行
	CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
            return "1024";
        });
	completableFuture.whenComplete((t,u)->{
	成功回调
            System.out.println(t+"---"+u);    //t 是正常的返回结果 u 是错误的信息
        }).exceptionally((e)->{
	失败回调
            System.out.println(e.getMessage());
            return "失败了";
        });	

JMM

​ java内存模型,是不存在的东西,一种约定,一个概念

  1. 线程解锁前 必须把共享变量 立刻 刷回主存
  2. 线程加锁前 必须读取主存中的最新值到工作内存中
  3. 加锁和解锁是同一把锁

​ 线程:工作内存,主内存

​ 8种操作

	lock 锁定
	unlock 解锁
	read  读取
	load  载入
	use   使用
	assign 赋值
	store  存储
	write  写入

Volatile

是java虚拟机提供的 轻量级的同步机制

​ 1.保证可见性

	//不加 volatile 程序就会陷入死循环
	// 加了volatile 就保证了可见性
	private volatile static int num = 0;
	public static void main(String[] args) {

		new Thread(()->{
			while (num==0){

			}
		}).start();

		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		num = 1;
		System.out.println(num);
	}

2.不保证原子性

​ 原子性:不可分割

​ 线程A在执行任务的时候,是不能被打扰的,也不能被分割

public class Test2 {
			// 加了 volatile 也没用,volatile不保证原子性
			//private static int num = 0;
			//可以把int改成原子性的int->AtomicInteger  这样就保证了原子性
			private volatile static AtomicInteger num = new AtomicInteger();
			//如果加了 synchronized 可保证原子性  结果会正常
			public static void add() {
				//num++;
				//原子性的int 自增 1 方法
				num.getAndIncrement();
			}

			public static void main(String[] args) {

				for (int i = 1; i <= 5; i++) {
					new Thread(() -> {
						for (int j = 1; j <= 500; j++) {
							add();
						}
					}).start();
				}

				while (Thread.activeCount() > 2) {
					Thread.yield(); //礼让一下
				}

				System.out.println(Thread.currentThread().getName() + "----" + num);
			}
		}
	原子性的类是Unsafe类

3.禁止指令重排

​ 指令重排:自己写的程序,计算机并不是按照自己写的那样去执行的

​ 源代码-->编译器优化的重排-->指令并行也可能会重排-->内存系统也会重排-->执行

volatile:

  1. 保证特定的操作的执行顺序
  2. 可以保证某些变量的内存可见性

Volatile是可以保证可见性,不能保证原子性,由于内存屏障,可以保证能够避免指令重排的现象产生

后端
Java

评论