# CompletableFuture

# 1. ์ž๋ฐ” Concurrent ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์†Œ๊ฐœ

# 1-1. Concurrent ์†Œํ”„ํŠธ์›จ์–ด

๐Ÿ’ก ๋™์‹œ์— ์—ฌ๋Ÿฌ ์ž‘์—…์„ ํ•  ์ˆ˜ ์žˆ๋Š” ์†Œํ”„ํŠธ์›จ์–ด

# ๋™์‹œ์„ฑ๊ณผ ๋ณ‘๋ ฌ์„ฑ์˜ ์ฐจ์ด

  • ๋™์‹œ์„ฑ
    • ํ•˜๋‚˜์˜ ์“ฐ๋ ˆ๋“œ๋‚˜ ํ”„๋กœ์„ธ์„œ์˜ ์—ฌ๋Ÿฌ Task๋ฅผ ๊ด€๋ฆฌํ•จ์œผ๋กœ์จ, ๋งˆ์น˜ ๋™์‹œ์— ์ž‘์—…์ด ์ˆ˜ํ–‰๋˜๊ณ  ์žˆ๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ๋ณด์ด๋Š” ๊ฒƒ
  • ๋ณ‘๋ ฌ์„ฑ
    • Task์˜ ์‹คํ–‰์„ ํ•˜๋“œ์›จ์–ด ์ˆ˜์ค€์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ, ๊ฐ๊ฐ์˜ ์ž‘์—…๋“ค์ด ๋…๋ฆฝ์ ์ž„

# 1-1-1. ์ž๋ฐ”์—์„œ ์ง€์›ํ•˜๋Š” Concurrent ํ”„๋กœ๊ทธ๋ž˜๋ฐ

  • ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ
  • ๋ฉ€ํ‹ฐํ”„๋กœ์„ธ์‹ฑ(ProcessBuilder)
๋ฒ„์ „ ์‚ฌ์šฉ ๋ฐฉ๋ฒ•
Java 5 ์ด์ „ Runnable๊ณผ Thread๋ฅผ ์ด์šฉํ•˜์—ฌ ๊ตฌํ˜„
Java 5 ExecutorService, Callable, Future
Java 7 Fork/Join ๊ทธ๋ฆฌ๊ณ  RecursiveTask
Java 8 Stream, CompletableFuture
Java 9 ๋ถ„์‚ฐ ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์€ ๋ช…์‹œ์ ์œผ๋กœ ์ง€์› (๋ฐœํ–‰ ๊ตฌ๋… ํ”„๋กœํ† ์ฝœ ์ง€์› Flow API)

# 1-1-2. ์ž๋ฐ” ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

  • Thread/Runnable

    public class App {
        public static void main(String[] args) {
    				// ์ฒซ๋ฒˆ์งธ ๋ฐฉ๋ฒ•
            MyThread myThread = new MyThread();
            myThread.start();
    
            // Anonymous class
            Thread runnable = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Runnable Thread: " + Thread.currentThread().getName());
                }
            });
    
            // Lambda Expression
            Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName()));
    
            System.out.println("Hello: "+Thread.currentThread().getName());
        }
    
        // Thread ์ƒ์†
        static class MyThread extends Thread {
            @Override
            public void run() {
                System.out.println("Thread: "+Thread.currentThread().getName());
            }
        }
    }
    
    /* ์‹คํ–‰๊ฒฐ๊ณผ
    Hello: main
    Thread: Thread-0
    */
    
    • ์ฝ”๋“œ์˜ ์‹คํ–‰ ์ˆœ์„œ๋งŒ ๋ด์„œ๋Š” Thread๊ฐ€ ๋จผ์ € ์ถœ๋ ฅ๋˜์•ผ ํ•  ๊ฒƒ ๊ฐ™์ง€๋งŒ, ์‹ค์ œ๋กœ ์‹คํ–‰ํ•ด๋ณด๋ฉด ๋‹ค๋ฅด๊ฒŒ ์ถœ๋ ฅ๋  ๋•Œ๋„ ์žˆ๋‹ค.
    • ์ด๋ฅผ ํ†ตํ•ด Thread๋Š” ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

์—ฌ๊ธฐ์„  ๋กœ์ปฌํด๋ž˜์Šค๋ฅผ ์ด์šฉํ–ˆ์ง€๋งŒ, ์ต๋ช…ํด๋ž˜์Šค์™€ ๋žŒ๋‹คํ‘œํ˜„์‹์„ ์ด์šฉํ•ด์„œ๋„ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

# 1-1-3. ์ฃผ์š”๊ธฐ๋Šฅ(method)

  1. sleep(mills)

    • ํ˜„์žฌ ์“ฐ๋ ˆ๋“œ ์žฌ์šฐ๊ธฐ(๋ฉˆ์ถฐ๋‘๊ธฐ)
      • ์Šค๋ ˆ๋“œ๋ฅผ ๋Œ€๊ธฐ์ƒํƒœ๋กœ ๋ฉˆ์ถฐ์„œ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๊ฐ€ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•จ.
      • ํ•˜์ง€๋งŒ ๋ฝ์„ ๋†”์ฃผ์ง„ ์•Š๊ธฐ์— ์ž˜๋ชปํ•˜๋ฉด ๋ฐ๋“œ๋ฝ ์ƒํƒœ์— ๊ฑธ๋ฆด ์ˆ˜ ์žˆ๋‹ค.
    public static void main(String[] args) throws InterruptedException {
        // Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        System.out.println("Hello: "+Thread.currentThread().getName());
    }
    

    Thread.sleep(1000L)

    • Thread๋ฅผ startํ•˜๋ฉด 1์ดˆ(1000L)๋™์•ˆ ๋ฉˆ์ถฐ์žˆ๊ณ  ๊ทธ ๋™์•ˆ ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— Hello๊ฐ€ ํ•ญ์ƒ ์šฐ์„  ์ถœ๋ ฅ๋œ๋‹ค.
  2. interrupt()

    • ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ๋ฅผ ๊นจ์šฐ๊ธฐ
      • InterruptException ์„ ๋ฐœ์ƒ
      • ์ด ์—๋Ÿฌ์— ๋Œ€ํ•œ ํ•ธ๋“ค๋ง์€ ๊ตฌํ˜„ ๊ฐ€๋Šฅ
    public static void main(String[] args) throws InterruptedException {
        // Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        lambdaThread.interrupt();
        System.out.println("Hello: "+Thread.currentThread().getName());
    }
    

    lambdaThread.interrupt();

    • lambdaThread์— interrupt()๋ฉ”์†Œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด lambdaThread๋‚ด์— InterruptedException ์„ ๋ฐœ์ƒ์‹œํ‚จ๋‹ค.
  3. join()

    • ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ๊ฐ€ ๋๋‚  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.
    public static void main(String[] args) throws InterruptedException {
        //Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        lambdaThread.join();
        System.out.println("Hello: "+Thread.currentThread().getName());
    }
    

    lambdaThread.join();

    • lambdaThread์— join()๋ฉ”์†Œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ lambdaThread๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.

# 2. Executors

Runnable์ด๋‚˜ Thread์™€ ๊ฐ™์€ Low-level์ด ์•„๋‹Œ ๊ณ  ์ˆ˜์ค€(High-Level) Concurrency ํ”„๋กœ๊ทธ๋ž˜๋ฐ

์šฐ๋ฆฌ๊ฐ€ Runnable๋งŒ ์ •์˜ํ•ด์„œ ์ œ๊ณตํ•ด์ฃผ๋ฉด ์Šค๋ ˆ๋“œ๋ฅผ ๋งŒ๋“ค๊ณ , ๋ถˆํ•„์š”ํ•ด์ง€๋ฉด ์ข…๋ฃŒํ•˜๊ณ  ๊ด€๋ฆฌํ•ด์ฃผ๋Š” ์ž‘์—…๋“ค์„ ๋Œ€์‹  ํ•ด์ฃผ๋Š” ํด๋ž˜์Šค

# 2-1. Executors๊ฐ€ ํ•˜๋Š” ์ผ

  • ์“ฐ๋ ˆ๋“œ ๋งŒ๋“ค๊ธฐ: ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์‚ฌ์šฉํ•  ์“ฐ๋ ˆ๋“œ ํ’€์„ ๋งŒ๋“ค์–ด๊ด€๋ฆฌํ•œ๋‹ค.
  • ์“ฐ๋ ˆ๋“œ ๊ด€๋ฆฌ: ์“ฐ๋ ˆ๋“œ ์ƒ๋ช… ์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค.
  • ์ž‘์—… ์ฒ˜๋ฆฌ ๋ฐ ์‹คํ–‰: ์“ฐ๋ ˆ๋“œ๋กœ ์‹คํ–‰ํ•  ์ž‘์—…์„ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋Š” API๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

# 2-2. ์ฃผ์š” ์ธํ„ฐํŽ˜์ด์Šค

  • Executor: execute(Runnable)
  • ExecutorService: Executor๋ฅผ ์ƒ์† ๋ฐ›์€ ์ธํ„ฐํŽ˜์ด์Šค๋กœ, Callable๋„ ์‹คํ–‰ ๊ฐ€๋Šฅํ•˜๋ฉฐ Executor๋ฅผ ์ข…๋ฃŒ ์‹œํ‚ค๊ฑฐ๋‚˜ ์—ฌ๋Ÿฌ Callable์„ ๋™์‹œ์— ์‹คํ–‰ํ•˜๋Š” ๋“ฑ์˜ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.
  • ScheduledExecutorService: ExecutorService๋ฅผ ์ƒ์† ๋ฐ›์€ ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํŠน์ • ์‹œ๊ฐ„ ์ดํ›„์— ๋˜๋Š” ์ฃผ๊ธฐ์ ์œผ๋กœ ์ž‘์—… ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

# 2-3. ์˜ˆ์ œ

# 2-3-1. ๊ธฐ๋ณธ ์‚ฌ์šฉ ์˜ˆ์ œ

public static void main(String[] args) throws InterruptedException {
		// ExecutorService ์ƒ์„ฑ
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    // Legacy case
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Thread: "+Thread.currentThread().getName());
        }
    });
    // Lambda Expression
    executorService.submit(()->{
        System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
    });

    executorService.shutdown(); // graceful shutdown
    // executorService.shutdownNow(); //์ฆ‰์‹œ ์ข…๋ฃŒ
}

# 2-3-2. 2๊ฐœ์˜ Thread๋ฅผ ์ด์šฉํ•˜์—ฌ ์‹คํ–‰

public class App {
    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message + Thread.currentThread().getName());
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("World"));
        executorService.submit(getRunnable("The"));
        executorService.submit(getRunnable("Java"));
        executorService.submit(getRunnable("Lecture"));
        executorService.submit(getRunnable("Concurrent"));
        executorService.submit(getRunnable("Part"));

        executorService.shutdown(); //graceful shutdown
    }
}
/* ์‹คํ–‰๊ฒฐ๊ณผ
Hellopool-1-thread-1
Worldpool-1-thread-2
Javapool-1-thread-2
Thepool-1-thread-1
Lecturepool-1-thread-2
Concurrentpool-1-thread-1
Partpool-1-thread-2
*/

Executors.newFixedThreadPool(2)

  • ํ•ด๋‹น ๋ฉ”์†Œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ํ•ด๋‹น ์˜์—ญ์—๋Š” ์ธ์ž๊ฐ’์œผ๋กœ ๋„˜๊ฒจ์ค€ ์ˆซ์ž๋งŒํผ Thread๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค.
  • ์œ„ ์ฝ”๋“œ์—์„œ๋Š” 2๋ฅผ ์ธ์ž๊ฐ’์œผ๋กœ ๋„˜๊ฒจ์คฌ๊ธฐ ๋•Œ๋ฌธ์— 2๊ฐœ์˜ 2๊ฐœ์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” Thread Pool์ด ๋„๋Š” ๋™์•ˆ Blocking Queue์— ๋“ฑ๋ก๋œ ์ž‘์—…๋“ค์ด ์ฐจ๋ก€๋Œ€๋กœ ๋™์ž‘ํ•œ๋‹ค.

์ถœ์ฒ˜ : https://catsbi.oopy.io/50d5af24-a724-40a1-981b-a9f00ff521ad#638f51da-ae38-4574-baf5-d4f331ee6fbe (opens new window)

ExecutorService.newSingleThreadScheduledExcutor();

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
  • scheduleAtFixedRate(์‹คํ–‰ Runnable, ์‹œ์ž‘ ์ง€์—ฐ ์‹œ๊ฐ„, ๋”œ๋ ˆ์ด, ํŒŒ๋ผ๋ฏธํ„ฐ ์‹œ๊ฐ„ ๋‹จ์œ„)
  • ์œ„ ์ฝ”๋“œ๋Š” Runnableํƒ€์ž…์„ ๋ฐ˜ํ™˜ํ•˜๋Š” getRunnable() ๋ฉ”์†Œ๋“œ๋ฅผ ํ”„๋กœ๊ทธ๋žจ์ด ์‹œ์ž‘ ํ›„ 3์ดˆ ๋’ค๋ถ€ํ„ฐ 1์ดˆ๋งˆ๋‹ค ์ˆ˜ํ–‰ํ•˜๋Š” ์ฝ”๋“œ

# 2-4. Executor ์ •๋ฆฌ

  • supplyAsync ๋“ฑ์˜ ๋ฉ”์†Œ๋“œ ํ˜ธ์ถœ์‹œ ์“ฐ๋ ˆ๋“œ ํ’€์„ ๋ช…์‹œํ•˜์ง€ ์•Š์œผ๋ฉด Java ForkJoinPool (opens new window) ์˜ commonPool() ์ด ์‚ฌ์šฉ๋œ๋‹ค.
  • ๊ฐœ๋ฐœ์ž๊ฐ€ ์“ฐ๋ ˆ๋“œ ํ’€์„ ์ œ์–ดํ•  ์ˆ˜ ์—†๋‹ค๋Š” ๊ฒƒ์€ ๋‚˜์ค‘์— ๋ฌธ์ œ๊ฐ€ ๋  ์ˆ˜ ์žˆ๋‹ค.
  • ๋”ฐ๋ผ์„œ, ํ•ญ์ƒ Java ExecutorService (opens new window) ๋ฅผ ๋ช…์‹œ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜์—ฌ ์“ฐ๋ ˆ๋“œ ํ’€์„ ์ง€์ •ํ•˜๋„๋ก ํ•œ๋‹ค.

# 3. Callable๊ณผ Future

# 3-1. Callable

  • Runnable๊ณผ ๊ฑฐ์˜ ์œ ์‚ฌํ•˜์ง€๋งŒ ๋ฐ˜ํ™˜ ๊ฐ’์„ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋‹ค.

# 3-2. Future

  • ๋น„๋™๊ธฐ์ ์ธ ์ž‘์—…์˜ ํ˜„์žฌ ์ƒํƒœ๋ฅผ ์กฐํšŒํ•˜๊ฑฐ๋‚˜ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

# 3-3. ๋ฉ”์†Œ๋“œ ์‚ดํŽด๋ณด๊ธฐ

# 3-3-1. ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜ : get()

  • ํ•ด๋‹น ๋ฉ”์†Œ๋“œ๋Š” ๋ธ”๋กํ‚น ์ฝœ์ด๊ธฐ์— ๋ฉ”์†Œ๋“œ ํ˜ธ์ถœ์‹œ์ ๋ถ€ํ„ฐ ์ฝ”๋“œ์‹คํ–‰ ์™„๋ฃŒ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.
  • ํƒ€์ž„์•„์›ƒ์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");

submit.get();// blocking

System.out.println("End");
executorService.shutdown();

/*
Started!
(2์ดˆ ๋’ค)
End
*/

# 3-3-2. ์ž‘์—… ์ƒํƒœ ํ™•์ธ - isDone()

ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.get(); // blocking

System.out.println(helloFuture.isDone());
System.out.println("End");
executorService.shutdown();

/* ์‹คํ–‰ ๊ฒฐ๊ณผ
false
Started!
true
End
*/

# 3-3-3. ์ž‘์—… ์ทจ์†Œ : cancel()

  • ์ธ์ž ๊ฐ’์œผ๋กœ ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ ์“ฐ๋ ˆ๋“œ interrupt ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค.
  • true ์ด๋ฉด ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ ์“ฐ๋ ˆ๋“œ๋ฅผ interruptํ•˜๊ณ  ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ ์ž‘์—…์ด ๋๋‚ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.cancel(false);

System.out.println(helloFuture.isDone());
System.out.println("End");

helloFuture.get();
executorService.shutdown();

/* ์‹คํ–‰ ๊ฒฐ๊ณผ
false
Started!
true
End
Exception in thread "main" java.util.concurrent.CancellationException...
*/

helloFuture.cancel(false)

  • ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ ์ž‘์—…์„ ๊ธฐ๋‹ค๋ฆฐ ๋’ค ์ž‘์—…์„ ์ทจ์†Œํ•œ๋‹ค.
  • ์ž‘์—…์ด ์ทจ์†Œ๋˜์–ด ์ข…๋ฃŒ๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์•„๋ž˜์— helloFuture.isDone()์€ true๊ฐ€ ๋ฐ˜ํ™˜๋˜๋ฉฐ, ์ด๋ฏธ ์ทจ์†Œํ•œ ์ž‘์—…์„ get() ํ˜ธ์ถœํ•˜๋Š” ์‹œ์ ์—๋Š” CancellationException ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒ

# 3-3-4. ์—ฌ๋Ÿฌ ์ž‘์—… ๋™์‹œ ์‹คํ–‰ : invokeAll()

ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Callable<String> java = () ->{
    Thread.sleep(3000L);
    return "java";
};
Callable<String> youngjun = () ->{
    Thread.sleep(1000L);
    return "youngjun";
};

List<Future<String>> futures = 
        executorService.invokeAll(Arrays.asList(hello, java, youngjun));
for (Future<String> future : futures) {
    System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());

/*
Hello
java
youngjun
6
*/
  • invokeAll() ๋ฉ”์†Œ๋“œ๋Š” ํƒœ์Šคํฌ๊ฐ€ ๋ชจ๋‘ ๋๋‚ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ๊ฐ’๋“ค์„ ๋ฐ˜ํ™˜

    • ์‹ฑ๊ธ€ ์“ฐ๋ ˆ๋“œ์ด๊ธฐ ๋•Œ๋ฌธ์— 6์ดˆ๊ฐ€ ์†Œ์š”๋œ๋‹ค.
    • future list๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์ „๋ถ€ ๋๋‚ ๋•Œ๊นŒ์ง€ holding๋œ๋‹ค.
    • ๋„˜๊ฒจ์ค€ Callable list๊ฐ€ ์ •์ƒ ์ฒ˜๋ฆฌ๋˜๋“  exception์ด ๋ฐœ์ƒํ•˜๋“  ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ๋ณธ๋‹ค.
    • ๋™์ž‘์ค‘์— ์ „๋‹ฌ๋ฐ›์€ list๊ฐ€ ๋ณ€๊ฒฝ๋˜๋ฉด ๊ฒฐ๊ณผ๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค
    • ๋„˜๊ฒจ์ค€ list ์ˆœ์„œ๋Œ€๋กœ ๊ฒฐ๊ณผ future๋ฅผ ๋‹ด์•„์„œ ๋„˜๊ฒจ์ค€๋‹ค.

    ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋Š” Exception

    • InterruptedException : ๋™์ž‘์ด ์ข…๋ฃŒ๋˜์ง€ ์•Š์€(๋™์ž‘์ค‘์ธ) task๊ฐ€ ์ทจ์†Œ ๋˜์—ˆ์„ ๋•Œ
    • NullPointerException : ํ•จ์ˆ˜์˜ param์ค‘ null์ด ์žˆ์„ ๋•Œ
    • RejectedExecutionException : task๋ฅผ pool์— ๋„ฃ์„์ˆ˜ ์—†์„ ๋•Œ.
  • ๋”ฐ๋ผ์„œ ๊ฐ€์žฅ ๋จผ์ € ์™„๋ฃŒ๋œ ์ž‘์—…๋งŒ ๋ฐ˜ํ™˜ํ•ด๋„ ๊ดœ์ฐฎ๋‹ค๋ฉด invokeAll์„ ์“ฐ๊ธฐ์—๋Š” ์„ฑ๋Šฅ์ด ๋–จ์–ด์ง„๋‹ค.

    • ๊ทธ๋Ÿด ๋•Œ ์‚ฌ์šฉ ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์†Œ๋“œ๊ฐ€ invokeAny ์ด๋‹ค.

# 3-3-5. ์—ฌ๋Ÿฌ ์ž‘์—… ๋™์‹œ ์‹คํ–‰ : invokeAny()

  • ๋™์‹œ์— ์‹คํ–‰ํ•œ ์ž‘์—… ์ค‘์— ์ œ์ผ ์งง๊ฒŒ ๊ฑธ๋ฆฌ๋Š” ์ž‘์—… ๋งŒํผ ์‹œ๊ฐ„์ด ๊ฑธ๋ฆฐ๋‹ค.
  • ๋ธ”๋กํ‚น ์ฝœ์ด๋‹ค.
String result = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println("result = " + result);

/* ์‹คํ–‰ ๊ฒฐ๊ณผ
result = youngjun
*/

์ฃผ์˜ํ•  ์ ์€, ์‹ฑ๊ธ€ ์“ฐ๋ ˆ๋“œ๋กœ ํ•  ๊ฒฝ์šฐ ๋จผ์ € ๋“ค์–ด๊ฐ„ ์ˆœ์„œ๋Œ€๋กœ ๋‚˜์˜ค๊ฒŒ ๋œ๋‹ค๋Š” ์ ์ด๋‹ค.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Callable<String> java = () -> {
        Thread.sleep(3000L);
        return "java";
    };

    Callable<String> youngjun = () -> {
        Thread.sleep(1000L);
        return "youngjun";
    };

    String s = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
    System.out.println(s);

    executorService.shutdown();
}

/* ์‹คํ–‰ ๊ฒฐ๊ณผ
Hello
*/

# 4. CompletableFuture (1)

# 4-1. ๊ฐœ์š”

์ž๋ฐ”์—์„œ ๋น„๋™๊ธฐ(Asynchronous)ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค.
Future์˜ ์ œ์•ฝ์‚ฌํ•ญ๋“ค์„ ํ•ด๊ฒฐํ•œ๋‹ค.

# 4-1-1. Future ์ œ์•ฝ

  • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ์šฉ API๋ฅผ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š”๋‹ค.
  • ์—ฌ๋Ÿฌ Future๋ฅผ ์กฐํ•ฉํ•  ์ˆ˜ ์—†๋‹ค. (ex: Event์ •๋ณด๋ฅผ ๋ฐ›์•„ ๋‹ค์Œ Event์— ์ฐธ์„ํ•  ํšŒ์›๋ชฉ๋ก์กฐํšŒ)
  • Future๋ฅผ ์™ธ๋ถ€์—์„œ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์—†๋‹ค. ์ทจ์†Œํ•˜๊ฑฐ๋‚˜, get()์— ํƒ€์ž„์•„์›ƒ์„ ์„ค์ •ํ•  ์ˆ˜๋Š” ์žˆ๋‹ค.
  • get()์„ ํ˜ธ์ถœํ•˜๊ธฐ ์ „๊นŒ์ง€๋Š” future๋ฅผ ๋‹ค๋ฃฐ ์ˆ˜ ์—†๋‹ค.
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
...//TODO
future.get();
...//TODO
  • ์—ฌ๊ธฐ์„œ future๋Š” blocking call

future๋ฅผ get()์œผ๋กœ ๊ฐ€์ ธ์˜ค๋Š” ๋™์•ˆ์—๋Š” ๋‹ค๋ฅธ ์ž‘์—…๋“ค์˜ ์ˆ˜ํ–‰์ด ์•ˆ๋œ๋‹ค๋Š” ์˜๋ฏธ์ด๊ณ  ๊ทธ ๊ธฐ๊ฐ„์ด ๊ธธ์–ด์งˆ์ˆ˜๋ก ์„ฑ๋Šฅ์€ ๋–จ์–ด์งˆ์ˆ˜ ๋ฐ–์— ์—†๋‹ค.

# 4-2. CompletableFuture

Future์™€ CompletionStage๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๊ตฌํ˜„์ฒด

  • ์˜ˆ์™ธ์ฒ˜๋ฆฌ๋ฅผ ์ง€์›ํ•˜๋Š” ๋ฉ”์„œ๋“œ
  • ์ˆœ์„œ์˜ ์˜์กด๊ด€๊ณ„๋ฅผ ๋งž๋Š” ์Šค๋ ˆ๋“œ ํ”„๋กœ๊ทธ๋ž˜๋ฐ
  • ์ฝœ๋ฐฑ์„ ์ง€์›ํ•˜๊ธฐ๋„ ํ•˜๋ฉฐ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๋ฅผ ํ•˜๋‚˜๋กœ ๋ฌถ์–ด ์ฒ˜๋ฆฌํ•˜๊ธฐ์—๋„ ์šฉ์ด

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}

CompletableFuture<String> future = new CompletableFuture<>();
future.complete("youngjun");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("youngjun");
System.out.println("future = " + future.get());

# 4-2-1. ๋น„๋™๊ธฐ๋กœ ์ž‘์—… ์‹คํ–‰ํ•˜๊ธฐ

  • ๋ฆฌํ„ด๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ: runAsync()

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
    });
    future.get();
    
  • ๋ฆฌํ„ด๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ: supplyAsync()

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    });
    future.get();
    
  • ์›ํ•˜๋Š” Executor(์“ฐ๋ ˆ๋“œํ’€)๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์‹คํ–‰ํ•  ์ˆ˜๋„ ์žˆ๋‹ค. (๊ธฐ๋ณธ์€ ForkJoinPool.commonPool())

# 4-2-2. ์ฝœ๋ฐฑ ์ œ๊ณตํ•˜๊ธฐ

  • thenApply(Function)

    ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์•„์„œ ๋‹ค๋ฅธ ๊ฐ’์œผ๋กœ ๋ฐ”๊พธ๋Š” ์ฝœ๋ฐฑ

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    }).thenApply((s)->{
        System.out.println("content: "+s);
        System.out.println(Thread.currentThread().getName());
        return "HelloAsync";
    });
    System.out.println(future.get());
    
    • Javascript์˜ Promise ์™€ ์œ ์‚ฌํ•œ ํ˜•ํƒœ
    • supplyAsync์˜ ๋žŒ๋‹คํ‘œํ˜„์‹์—์„œ ๋ฐ˜ํ™˜๋œ Hello๋ผ๋Š” ๊ฐ’์€ ์ฒด์ด๋‹๋œ ๋ฉ”์†Œ๋“œ thenApply์˜ ์ธ์ž๊ฐ’์œผ๋กœ ๋“ค์–ด๊ฐ€๊ณ  ์‚ฌ์šฉ ๊ฐ€๋Šฅ
    • ๊ทธ๋ฆฌ๊ณ  ๋” ์ด์ƒ ์ฒด์ด๋‹๋œ ๋ฉ”์†Œ๋“œ๊ฐ€ ์—†๊ธฐ ๋•Œ๋ฌธ์— return๊ฐ’์ธ HelloAsync๋Š” ๋ฐ˜ํ™˜๋˜์–ด future๋กœ ๋“ค์–ด๊ฐ€๊ณ  get()์„ํ†ตํ•ด ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.
  • thenAccept(Consumer)

    ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์•„ ๋˜ ๋‹ค๋ฅธ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š”๋ฐ ๋ฐ˜ํ™˜๊ฐ’์€ ์—†๋Š” ์ฝœ๋ฐฑ (๋ฆฌํ„ด x)

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    }).thenAccept((s)->{
        System.out.println("content: "+s);
        System.out.println(Thread.currentThread().getName());
            // return ์—†๋‹ค.
    });
    future.get();
    
  • thenRun(Runnable)

    ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์ง€ ์•Š๊ณ  ๋‹ค๋ฅธ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์ฝœ๋ฐฑ

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "hello";
    }).thenRun(()->{
        System.out.println(Thread.currentThread().getName());
    });
    future.get();
    

# ParallelStream vs. CompletableFuture

  • from Java 8 in Action (opens new window)
  • ParallelStream : I/O๊ฐ€ ํฌํ•จ๋˜์ง€ ์•Š์€ ๊ณ„์‚ฐ ์ค‘์‹ฌ์˜ ๋™์ž‘์„ ์‹คํ–‰ํ•  ๋•Œ๋Š” ์ŠคํŠธ๋ฆผ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ๊ฐ€์žฅ ๊ตฌํ˜„ํ•˜๊ธฐ ๊ฐ„๋‹คํ•˜๋ฉฐ ํšจ์œจ์ ์ผ ์ˆ˜ ์žˆ๋‹ค(๋ชจ๋“  ์Šค๋ ˆ๋“œ๊ฐ€ ๊ณ„์‚ฐ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์ƒํ™ฉ์—์„œ๋Š” ํ”„๋กœ์„ธ์„œ ์ฝ”์–ด ์ˆ˜ ์ด์ƒ์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ๊ฐ€์งˆ ํ•„์š”๊ฐ€ ์—†๋‹ค).
  • CompletableFuture : ๋ฐ˜๋ฉด ์ž‘์—…์ด I/O๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ž‘์—…์„ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ๋•Œ๋Š” CompletableFuture๊ฐ€ ๋” ๋งŽ์€ ์œ ์—ฐ์„ฑ์„ ์ œ๊ณตํ•˜๋ฉฐ ๋Œ€๊ธฐ/๊ณ„์‚ฐ(W/C)์˜ ๋น„์œจ์— ์ ํ•ฉํ•œ ์Šค๋ ˆ๋“œ ์ˆ˜๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. ํŠนํžˆ ์ŠคํŠธ๋ฆผ์˜ ๊ฒŒ์œผ๋ฅธ ํŠน์„ฑ ๋•Œ๋ฌธ์— ์ŠคํŠธ๋ฆผ์—์„œ I/O๋ฅผ ์‹ค์ œ๋กœ ์–ธ์ œ ์ฒ˜๋ฆฌํ•  ์ง€ ์˜ˆ์ธกํ•˜๊ธฐ ์–ด๋ ค์šด ๋ฌธ์ œ๋„ ์žˆ๋‹ค.
  • fahd.blog: Java 8: CompletableFuture vs Parallel Stream (opens new window)

# 5. CompletableFuture (2)

# 5-1. CompletableFuture ์กฐํ•ฉ ๋ฉ”์†Œ๋“œ

# 5-1-1. thenCompose()

  • ๋‘ ์ž‘์—…์ด ์„œ๋กœ ์ด์–ด์„œ ์‹คํ–‰ํ•˜๋„๋ก ์กฐํ•ฉํ•˜๋ฉฐ ์—ฐ๊ด€๋œ future๊ฐ„์— ๋งŽ์ด ์‚ฌ์šฉ
public class App {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
        System.out.println(future.get());

    }
    private static CompletableFuture<String> getWorldFuture(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/

# 5-1-2. thenCombine()

  • ๋‘ ์ž‘์—…์„ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๊ณ  ๋‘˜ ๋‹ค ์ข…๋ฃŒ ํ–ˆ์„๋•Œ ์ฝœ๋ฐฑ ์‹คํ–‰.
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });

    CompletableFuture<String> resultFuture = msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
    
    System.out.println(resultFuture.get());
}

# 5-1-3. allOf()

  • ์—ฌ๋Ÿฌ ์ž‘์—…์„ ๋ชจ๋‘ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ์ž‘์—… ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });
    List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);

    CompletableFuture<List<String>> results =
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                    .thenApply(v -> futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));
    results.get().forEach(System.out::println);
}

# 5-1-4. anyOf()

  • ์—ฌ๋Ÿฌ ์ž‘์—… ์ค‘ ๊ฐ€์žฅ ๋๋‚œ ํ•˜๋‚˜์˜ ๊ฒฐ๊ณผ๋ฅผ ์ฝœ๋ฐฑ์— ๋„˜๊ฒจ ์‹คํ–‰
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
});

CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Apple " + Thread.currentThread().getName());
    return "Apple";
});

CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();

# 5-2. ์˜ˆ์™ธ ์ฒ˜๋ฆฌ ๋ฉ”์†Œ๋“œ

# 5-2-1. exeptionally(Function)

boolean throwError = true;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).exceptionally(ex->{
    System.out.println(ex);
    return "Error";
});

System.out.println(msFuture.get());

# 5-2-2. handle(BiFunction)

boolean throwError = false;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).handle((result, ex)->{
    if (Objects.nonNull(ex)) {
        System.out.println(ex);
        return "ERROR";
    }
    return result;
});

System.out.println(msFuture.get());

# 5-3. CompletableFuture ๋ฅผ ์–ธ์ œ ์“ธ ์ˆ˜ ์žˆ์„๊นŒ

  • ์•„๋ž˜๋Š” microservices.io ์‚ฌ์ดํŠธ์—์„œ ๊ฐ€์ ธ์˜จ ์˜ˆ์ œ์ด๋‹ค.

image

  • CompletableFuture ๋ฅผ ์‚ฌ์šฉํ•  ๋งŒํ•œ ๋ถ€๋ถ„์€ API GATEWAY, Storefront WebApp ๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด์œ ๋Š” Account, Inventory, Shipping Service ๋Š” DATA BASE ์™€ ์—ฐ๋™ํ•˜๊ณ  ์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. java ํ™˜๊ฒฝ์—์„œ RDBMS ์—ฐ๋™์‹œ์— ์•„์ง Async ํ•œ ์ธํ„ฐํŽ˜์ด์Šค ๋ฐฉ์‹์„ ์ œ๊ณตํ•˜๊ณ  ์žˆ์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— Async ๊ฐœ๋ฐœ์„ ํ•ด์„œ ํฐ ํšจ๊ณผ๋ฅผ ์–ป๊ธฐ ๋ถ€์กฑํ•˜๋‹ค.
  • ํ•˜์ง€๋งŒ GATEWAY ๋“ฑ์€ REST Service ์™€ ์—ฐ๋™ํ•˜๊ธฐ ๋•Œ๋ฌธ์— Async ๋ฅผ ํ†ตํ•œ ์„ฑ๋Šฅ ํšจ๊ณผ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. (Blocking Thread ๋ฌธ์ œ ํ•ด๊ฒฐ ๋“ฑ)

Reference

Last Updated: 6/18/2023, 2:13:15 PM