public class SumArray { private static class SumTask extends RecursiveTask<Integer>{
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src; //表示我们要实际统计的数组 private int fromIndex;//开始统计的下标 private int toIndex;//统计到哪里结束的下标
public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; }
@Override protected Integer compute() { if(toIndex-fromIndex < THRESHOLD) { int count = 0; for(int i=fromIndex;i<=toIndex;i++) { //SleepTools.ms(1); count = count + src[i]; } return count; }else { //fromIndex....mid....toIndex //1...................70....100 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,toIndex); invokeAll(left,right); // left.fork(); // right.fork(); return left.join()+right.join(); } } }
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(); int[] src = MakeArray.makeArray();
SumTask innerFind = new SumTask(src,0,src.length-1);
long start = System.currentTimeMillis();
pool.invoke(innerFind);//同步调用 System.out.println("Task is Running.....");
System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms");
/** * Performs the given task, returning its result upon completion. * If the computation encounters an unchecked Exception or Error, * it is rethrown as the outcome of this invocation. Rethrown * exceptions behave in the same way as regular exceptions, but, * when possible, contain stack traces (as displayed for example * using {@code ex.printStackTrace()}) of both the current thread * as well as the thread actually encountering the exception; * minimally only the latter. * * @param task the task * @param <T> the type of the task's result * @return the task's result * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalSubmit(task); return task.join(); }
/** * Submits a ForkJoinTask for execution. * * @param task the task to submit * @param <T> the type of the task's result * @return the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { return externalSubmit(task); }
下面是execute的实现:
1 2 3 4 5 6 7 8 9 10 11
/** * Arranges for (asynchronous) execution of the given task. * * @param task the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public void execute(ForkJoinTask<?> task) { externalSubmit(task); }
public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6);
// 初始化线程(只有一步,有4个) private static class InitThread implements Runnable{
@Override public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" finish init work......"); latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次; // We can add some tasks after the countDown is invoked for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } // 业务线程 private static class BusinessThread implements Runnable{ @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusinessThread_"+Thread.currentThread().getId() +" start to do business-----"); } } }
public static void main(String[] args) throws InterruptedException { // 单独的初始化线程,初始化分为2步,需要扣减两次 new Thread(new Runnable() { @Override public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" finish init work step 1st......"); latch.countDown();//每完成一步初始化工作,扣减一次 System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" finish init work step 2nd......"); latch.countDown();//每完成一步初始化工作,扣减一次 } }).start(); new Thread(new BusinessThread()).start(); // Start 3 new init thread for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); }
latch.await(); System.out.println("Main do ites work........"); } }
public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5,new TaskAfterBarrierIsOpenThread()); private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();//存放子线程工作结果的容器
public static void main(String[] args) { for(int i=0;i< 5;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } }
//负责屏障开放以后的工作 private static class TaskAfterBarrierIsOpenThread implements Runnable{
@Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } }
//工作线程 private static class SubThread implements Runnable{
@Override public void run() { long id = Thread.currentThread().getId();//线程本身的处理结果 resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random();//随机决定工作线程的是否睡眠 try { if(r.nextBoolean()) { Thread.sleep(2000+id); System.out.println("Thread_"+id+" ....do something "); } System.out.println(id+"....is await"); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) { e.printStackTrace(); }
public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();
public static void main(String[] args) {
//第一个线程 new Thread(new Runnable() { @Override public void run() { Set<String> setA = new HashSet<String>();//存放数据的容器 try { setA.add("1"); System.out.println(Thread.currentThread().getName() + " Add 1 to the set"); setA.add("2"); System.out.println(Thread.currentThread().getName() + " Add 2 to the set"); setA.add("3"); System.out.println(Thread.currentThread().getName() + " Add 3 to the set"); setA = exchange.exchange(setA);//交换setA出去,返回交换来的数据setB /*处理交换后的数据*/ System.out.println(Thread.currentThread().getName() + " print the data after exchange "); setA.forEach(string -> System.out.println(Thread.currentThread().getName() + " print" + string)); } catch (InterruptedException e) { } } }).start();
//第二个线程 new Thread(new Runnable() { @Override public void run() { Set<String> setB = new HashSet<String>();//存放数据的容器 try { setB.add("A"); System.out.println(Thread.currentThread().getName() + " Add A to the set"); setB.add("B"); System.out.println(Thread.currentThread().getName() + " Add B to the set"); setB.add("C"); System.out.println(Thread.currentThread().getName() + " Add C to the set"); setB = exchange.exchange(setB);//交换setB出去,返回交换来的数据setA /*处理交换后的数据*/ System.out.println(Thread.currentThread().getName() + " print the data after exchange "); setB.forEach(string -> System.out.println(Thread.currentThread().getName() + " print" + string)); } catch (InterruptedException e) { } } }).start(); } }