# CompletableFuture
- 1. ์๋ฐ Concurrent ํ๋ก๊ทธ๋๋ฐ ์๊ฐ
- 2. Executors
- 3. Callable๊ณผ Future
- 4. CompletableFuture (1)
- 5. CompletableFuture (2)
# 1. ์๋ฐ Concurrent ํ๋ก๊ทธ๋๋ฐ ์๊ฐ
# 1-1. Concurrent ์ํํธ์จ์ด
๐ก ๋์์ ์ฌ๋ฌ ์์ ์ ํ ์ ์๋ ์ํํธ์จ์ด
# ๋์์ฑ๊ณผ ๋ณ๋ ฌ์ฑ์ ์ฐจ์ด
- ๋์์ฑ
- ํ๋์ ์ฐ๋ ๋๋ ํ๋ก์ธ์์ ์ฌ๋ฌ Task๋ฅผ ๊ด๋ฆฌํจ์ผ๋ก์จ, ๋ง์น ๋์์ ์์ ์ด ์ํ๋๊ณ ์๋ ๊ฒ์ฒ๋ผ ๋ณด์ด๋ ๊ฒ
- ๋ณ๋ ฌ์ฑ
- Task์ ์คํ์ ํ๋์จ์ด ์์ค์ผ๋ก ์คํ๋๋ฉฐ, ๊ฐ๊ฐ์ ์์ ๋ค์ด ๋ ๋ฆฝ์ ์
# 1-1-1. ์๋ฐ์์ ์ง์ํ๋ Concurrent ํ๋ก๊ทธ๋๋ฐ
- ๋ฉํฐ์ฐ๋ ๋
- ๋ฉํฐํ๋ก์ธ์ฑ(ProcessBuilder)
๋ฒ์ | ์ฌ์ฉ ๋ฐฉ๋ฒ |
---|---|
Java 5 ์ด์ | Runnable๊ณผ Thread๋ฅผ ์ด์ฉํ์ฌ ๊ตฌํ |
Java 5 | ExecutorService, Callable, Future |
Java 7 | Fork/Join ๊ทธ๋ฆฌ๊ณ RecursiveTask |
Java 8 | Stream, CompletableFuture |
Java 9 | ๋ถ์ฐ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ๋ช ์์ ์ผ๋ก ์ง์ (๋ฐํ ๊ตฌ๋ ํ๋กํ ์ฝ ์ง์ Flow API) |
# 1-1-2. ์๋ฐ ๋ฉํฐ์ฐ๋ ๋ ํ๋ก๊ทธ๋๋ฐ
Thread/Runnable
public class App { public static void main(String[] args) { // ์ฒซ๋ฒ์งธ ๋ฐฉ๋ฒ MyThread myThread = new MyThread(); myThread.start(); // Anonymous class Thread runnable = new Thread(new Runnable() { @Override public void run() { System.out.println("Runnable Thread: " + Thread.currentThread().getName()); } }); // Lambda Expression Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName())); System.out.println("Hello: "+Thread.currentThread().getName()); } // Thread ์์ static class MyThread extends Thread { @Override public void run() { System.out.println("Thread: "+Thread.currentThread().getName()); } } } /* ์คํ๊ฒฐ๊ณผ Hello: main Thread: Thread-0 */
- ์ฝ๋์ ์คํ ์์๋ง ๋ด์๋ Thread๊ฐ ๋จผ์ ์ถ๋ ฅ๋์ผ ํ ๊ฒ ๊ฐ์ง๋ง, ์ค์ ๋ก ์คํํด๋ณด๋ฉด ๋ค๋ฅด๊ฒ ์ถ๋ ฅ๋ ๋๋ ์๋ค.
- ์ด๋ฅผ ํตํด Thread๋ ์์๋ฅผ ๋ณด์ฅํ์ง ์๋๋ค๋ ๊ฒ์ ์ ์ ์๋ค.
์ฌ๊ธฐ์ ๋ก์ปฌํด๋์ค๋ฅผ ์ด์ฉํ์ง๋ง, ์ต๋ช ํด๋์ค์ ๋๋คํํ์์ ์ด์ฉํด์๋ ์ ์ฉํ ์ ์๋ค.
# 1-1-3. ์ฃผ์๊ธฐ๋ฅ(method)
sleep(mills)
- ํ์ฌ ์ฐ๋ ๋ ์ฌ์ฐ๊ธฐ(๋ฉ์ถฐ๋๊ธฐ)
- ์ค๋ ๋๋ฅผ ๋๊ธฐ์ํ๋ก ๋ฉ์ถฐ์ ๋ค๋ฅธ ์ค๋ ๋๊ฐ ์ฒ๋ฆฌํ ์ ์๋๋ก ํจ.
- ํ์ง๋ง ๋ฝ์ ๋์ฃผ์ง ์๊ธฐ์ ์๋ชปํ๋ฉด ๋ฐ๋๋ฝ ์ํ์ ๊ฑธ๋ฆด ์ ์๋ค.
public static void main(String[] args) throws InterruptedException { // Lambda Expression Thread lambdaThread = new Thread(() -> { try { Thread.sleep(1000L); } catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); System.out.println("Hello: "+Thread.currentThread().getName()); }
Thread.sleep(1000L)
- Thread๋ฅผ startํ๋ฉด 1์ด(1000L)๋์ ๋ฉ์ถฐ์๊ณ ๊ทธ ๋์ ๋ค๋ฅธ ์ฐ๋ ๋๋ฅผ ์ํํ๊ธฐ ๋๋ฌธ์ Hello๊ฐ ํญ์ ์ฐ์ ์ถ๋ ฅ๋๋ค.
- ํ์ฌ ์ฐ๋ ๋ ์ฌ์ฐ๊ธฐ(๋ฉ์ถฐ๋๊ธฐ)
interrupt()
- ๋ค๋ฅธ ์ฐ๋ ๋๋ฅผ ๊นจ์ฐ๊ธฐ
InterruptException
์ ๋ฐ์- ์ด ์๋ฌ์ ๋ํ ํธ๋ค๋ง์ ๊ตฌํ ๊ฐ๋ฅ
public static void main(String[] args) throws InterruptedException { // Lambda Expression Thread lambdaThread = new Thread(() -> { try { Thread.sleep(3000L); } catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); lambdaThread.interrupt(); System.out.println("Hello: "+Thread.currentThread().getName()); }
lambdaThread.interrupt();
- lambdaThread์ interrupt()๋ฉ์๋๋ฅผ ํธ์ถํด lambdaThread๋ด์ InterruptedException ์ ๋ฐ์์ํจ๋ค.
- ๋ค๋ฅธ ์ฐ๋ ๋๋ฅผ ๊นจ์ฐ๊ธฐ
join()
- ๋ค๋ฅธ ์ฐ๋ ๋๊ฐ ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
public static void main(String[] args) throws InterruptedException { //Lambda Expression Thread lambdaThread = new Thread(() -> { try { Thread.sleep(3000L); } catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); lambdaThread.join(); System.out.println("Hello: "+Thread.currentThread().getName()); }
lambdaThread.join();
- lambdaThread์ join()๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ lambdaThread๊ฐ ์ข ๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
# 2. Executors
Runnable์ด๋ Thread์ ๊ฐ์ Low-level์ด ์๋ ๊ณ ์์ค(High-Level) Concurrency ํ๋ก๊ทธ๋๋ฐ
์ฐ๋ฆฌ๊ฐ Runnable๋ง ์ ์ํด์ ์ ๊ณตํด์ฃผ๋ฉด ์ค๋ ๋๋ฅผ ๋ง๋ค๊ณ , ๋ถํ์ํด์ง๋ฉด ์ข ๋ฃํ๊ณ ๊ด๋ฆฌํด์ฃผ๋ ์์ ๋ค์ ๋์ ํด์ฃผ๋ ํด๋์ค
# 2-1. Executors๊ฐ ํ๋ ์ผ
- ์ฐ๋ ๋ ๋ง๋ค๊ธฐ: ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ฌ์ฉํ ์ฐ๋ ๋ ํ์ ๋ง๋ค์ด๊ด๋ฆฌํ๋ค.
- ์ฐ๋ ๋ ๊ด๋ฆฌ: ์ฐ๋ ๋ ์๋ช ์ฃผ๊ธฐ๋ฅผ ๊ด๋ฆฌํ๋ค.
- ์์ ์ฒ๋ฆฌ ๋ฐ ์คํ: ์ฐ๋ ๋๋ก ์คํํ ์์ ์ ์ ๊ณตํ ์ ์๋ API๋ฅผ ์ ๊ณตํ๋ค.
# 2-2. ์ฃผ์ ์ธํฐํ์ด์ค
Executor
: execute(Runnable)ExecutorService
: Executor๋ฅผ ์์ ๋ฐ์ ์ธํฐํ์ด์ค๋ก, Callable๋ ์คํ ๊ฐ๋ฅํ๋ฉฐ Executor๋ฅผ ์ข ๋ฃ ์ํค๊ฑฐ๋ ์ฌ๋ฌ Callable์ ๋์์ ์คํํ๋ ๋ฑ์ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.ScheduledExecutorService
: ExecutorService๋ฅผ ์์ ๋ฐ์ ์ธํฐํ์ด์ค๋ก ํน์ ์๊ฐ ์ดํ์ ๋๋ ์ฃผ๊ธฐ์ ์ผ๋ก ์์ ์คํํ ์ ์๋ค.
# 2-3. ์์
# 2-3-1. ๊ธฐ๋ณธ ์ฌ์ฉ ์์
public static void main(String[] args) throws InterruptedException {
// ExecutorService ์์ฑ
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Legacy case
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("Thread: "+Thread.currentThread().getName());
}
});
// Lambda Expression
executorService.submit(()->{
System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
});
executorService.shutdown(); // graceful shutdown
// executorService.shutdownNow(); //์ฆ์ ์ข
๋ฃ
}
# 2-3-2. 2๊ฐ์ Thread๋ฅผ ์ด์ฉํ์ฌ ์คํ
public class App {
private static Runnable getRunnable(String message) {
return () -> System.out.println(message + Thread.currentThread().getName());
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(getRunnable("Hello"));
executorService.submit(getRunnable("World"));
executorService.submit(getRunnable("The"));
executorService.submit(getRunnable("Java"));
executorService.submit(getRunnable("Lecture"));
executorService.submit(getRunnable("Concurrent"));
executorService.submit(getRunnable("Part"));
executorService.shutdown(); //graceful shutdown
}
}
/* ์คํ๊ฒฐ๊ณผ
Hellopool-1-thread-1
Worldpool-1-thread-2
Javapool-1-thread-2
Thepool-1-thread-1
Lecturepool-1-thread-2
Concurrentpool-1-thread-1
Partpool-1-thread-2
*/
Executors.newFixedThreadPool(2)
- ํด๋น ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด ํด๋น ์์ญ์๋ ์ธ์๊ฐ์ผ๋ก ๋๊ฒจ์ค ์ซ์๋งํผ Thread๋ฅผ ๊ด๋ฆฌํ๋ค.
- ์ ์ฝ๋์์๋ 2๋ฅผ ์ธ์๊ฐ์ผ๋ก ๋๊ฒจ์คฌ๊ธฐ ๋๋ฌธ์ 2๊ฐ์ 2๊ฐ์ ์ฐ๋ ๋๋ฅผ ๊ด๋ฆฌํ๋ Thread Pool์ด ๋๋ ๋์ Blocking Queue์ ๋ฑ๋ก๋ ์์ ๋ค์ด ์ฐจ๋ก๋๋ก ๋์ํ๋ค.
ExecutorService.newSingleThreadScheduledExcutor();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
- scheduleAtFixedRate(์คํ Runnable, ์์ ์ง์ฐ ์๊ฐ, ๋๋ ์ด, ํ๋ผ๋ฏธํฐ ์๊ฐ ๋จ์)
- ์ ์ฝ๋๋ Runnableํ์ ์ ๋ฐํํ๋ getRunnable() ๋ฉ์๋๋ฅผ ํ๋ก๊ทธ๋จ์ด ์์ ํ 3์ด ๋ค๋ถํฐ 1์ด๋ง๋ค ์ํํ๋ ์ฝ๋
# 2-4. Executor ์ ๋ฆฌ
supplyAsync
๋ฑ์ ๋ฉ์๋ ํธ์ถ์ ์ฐ๋ ๋ ํ์ ๋ช ์ํ์ง ์์ผ๋ฉด Java ForkJoinPool (opens new window) ์commonPool()
์ด ์ฌ์ฉ๋๋ค.- ๊ฐ๋ฐ์๊ฐ ์ฐ๋ ๋ ํ์ ์ ์ดํ ์ ์๋ค๋ ๊ฒ์ ๋์ค์ ๋ฌธ์ ๊ฐ ๋ ์ ์๋ค.
- ๋ฐ๋ผ์, ํญ์ Java ExecutorService (opens new window) ๋ฅผ ๋ช ์์ ์ผ๋ก ์ฌ์ฉํ์ฌ ์ฐ๋ ๋ ํ์ ์ง์ ํ๋๋ก ํ๋ค.
# 3. Callable๊ณผ Future
# 3-1. Callable
- Runnable๊ณผ ๊ฑฐ์ ์ ์ฌํ์ง๋ง ๋ฐํ ๊ฐ์ ๊ฐ์ง ์ ์๋ค.
# 3-2. Future
- ๋น๋๊ธฐ์ ์ธ ์์ ์ ํ์ฌ ์ํ๋ฅผ ์กฐํํ๊ฑฐ๋ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ค.
# 3-3. ๋ฉ์๋ ์ดํด๋ณด๊ธฐ
# 3-3-1. ๊ฒฐ๊ณผ ๋ฐํ : get()
- ํด๋น ๋ฉ์๋๋
๋ธ๋กํน ์ฝ
์ด๊ธฐ์ ๋ฉ์๋ ํธ์ถ์์ ๋ถํฐ ์ฝ๋์คํ ์๋ฃ๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค. - ํ์์์์ ์ค์ ํ ์ ์๋ค.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");
submit.get();// blocking
System.out.println("End");
executorService.shutdown();
/*
Started!
(2์ด ๋ค)
End
*/
# 3-3-2. ์์ ์ํ ํ์ธ - isDone()
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");
helloFuture.get(); // blocking
System.out.println(helloFuture.isDone());
System.out.println("End");
executorService.shutdown();
/* ์คํ ๊ฒฐ๊ณผ
false
Started!
true
End
*/
# 3-3-3. ์์ ์ทจ์ : cancel()
- ์ธ์ ๊ฐ์ผ๋ก ํ์ฌ ์งํ์ค์ธ ์ฐ๋ ๋ interrupt ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ๋ค.
- true ์ด๋ฉด ํ์ฌ ์งํ์ค์ธ ์ฐ๋ ๋๋ฅผ interruptํ๊ณ ๊ทธ๋ ์ง ์์ผ๋ฉด ํ์ฌ ์งํ์ค์ธ ์์ ์ด ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");
helloFuture.cancel(false);
System.out.println(helloFuture.isDone());
System.out.println("End");
helloFuture.get();
executorService.shutdown();
/* ์คํ ๊ฒฐ๊ณผ
false
Started!
true
End
Exception in thread "main" java.util.concurrent.CancellationException...
*/
helloFuture.cancel(false)
- ํ์ฌ ์งํ์ค์ธ ์์ ์ ๊ธฐ๋ค๋ฆฐ ๋ค ์์ ์ ์ทจ์ํ๋ค.
- ์์
์ด ์ทจ์๋์ด ์ข
๋ฃ๋์๊ธฐ ๋๋ฌธ์ ์๋์
helloFuture.isDone()
์ true๊ฐ ๋ฐํ๋๋ฉฐ, ์ด๋ฏธ ์ทจ์ํ ์์ ์ get() ํธ์ถํ๋ ์์ ์๋CancellationException
์์ธ๊ฐ ๋ฐ์
# 3-3-4. ์ฌ๋ฌ ์์
๋์ ์คํ : invokeAll()
ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java = () ->{
Thread.sleep(3000L);
return "java";
};
Callable<String> youngjun = () ->{
Thread.sleep(1000L);
return "youngjun";
};
List<Future<String>> futures =
executorService.invokeAll(Arrays.asList(hello, java, youngjun));
for (Future<String> future : futures) {
System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());
/*
Hello
java
youngjun
6
*/
invokeAll()
๋ฉ์๋๋ ํ์คํฌ๊ฐ ๋ชจ๋ ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ ธ๋ค๊ฐ ๊ฐ๋ค์ ๋ฐํ- ์ฑ๊ธ ์ฐ๋ ๋์ด๊ธฐ ๋๋ฌธ์ 6์ด๊ฐ ์์๋๋ค.
- future list๋ฅผ ๋ฐํํ๊ณ ์ ๋ถ ๋๋ ๋๊น์ง holding๋๋ค.
- ๋๊ฒจ์ค Callable list๊ฐ ์ ์ ์ฒ๋ฆฌ๋๋ exception์ด ๋ฐ์ํ๋ ์๋ฃ๋ ๊ฒ์ผ๋ก ๋ณธ๋ค.
- ๋์์ค์ ์ ๋ฌ๋ฐ์ list๊ฐ ๋ณ๊ฒฝ๋๋ฉด ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฅํ์ง ์๋๋ค
- ๋๊ฒจ์ค list ์์๋๋ก ๊ฒฐ๊ณผ future๋ฅผ ๋ด์์ ๋๊ฒจ์ค๋ค.
๋ฐ์ํ ์ ์๋ Exception
- InterruptedException : ๋์์ด ์ข ๋ฃ๋์ง ์์(๋์์ค์ธ) task๊ฐ ์ทจ์ ๋์์ ๋
- NullPointerException : ํจ์์ param์ค null์ด ์์ ๋
- RejectedExecutionException : task๋ฅผ pool์ ๋ฃ์์ ์์ ๋.
๋ฐ๋ผ์ ๊ฐ์ฅ ๋จผ์ ์๋ฃ๋ ์์ ๋ง ๋ฐํํด๋ ๊ด์ฐฎ๋ค๋ฉด invokeAll์ ์ฐ๊ธฐ์๋ ์ฑ๋ฅ์ด ๋จ์ด์ง๋ค.
- ๊ทธ๋ด ๋ ์ฌ์ฉ ํ ์ ์๋ ๋ฉ์๋๊ฐ
invokeAny
์ด๋ค.
- ๊ทธ๋ด ๋ ์ฌ์ฉ ํ ์ ์๋ ๋ฉ์๋๊ฐ
# 3-3-5. ์ฌ๋ฌ ์์ ๋์ ์คํ : invokeAny()
- ๋์์ ์คํํ ์์ ์ค์ ์ ์ผ ์งง๊ฒ ๊ฑธ๋ฆฌ๋ ์์ ๋งํผ ์๊ฐ์ด ๊ฑธ๋ฆฐ๋ค.
- ๋ธ๋กํน ์ฝ์ด๋ค.
String result = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println("result = " + result);
/* ์คํ ๊ฒฐ๊ณผ
result = youngjun
*/
์ฃผ์ํ ์ ์, ์ฑ๊ธ ์ฐ๋ ๋๋ก ํ ๊ฒฝ์ฐ ๋จผ์ ๋ค์ด๊ฐ ์์๋๋ก ๋์ค๊ฒ ๋๋ค๋ ์ ์ด๋ค.
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(3000L);
return "java";
};
Callable<String> youngjun = () -> {
Thread.sleep(1000L);
return "youngjun";
};
String s = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println(s);
executorService.shutdown();
}
/* ์คํ ๊ฒฐ๊ณผ
Hello
*/
# 4. CompletableFuture (1)
# 4-1. ๊ฐ์
์๋ฐ์์ ๋น๋๊ธฐ(Asynchronous)ํ๋ก๊ทธ๋๋ฐ์ ๊ฐ๋ฅํ๊ฒํ๋ ์ธํฐํ์ด์ค.
Future์ ์ ์ฝ์ฌํญ๋ค์ ํด๊ฒฐํ๋ค.
# 4-1-1. Future ์ ์ฝ
- ์์ธ ์ฒ๋ฆฌ์ฉ API๋ฅผ ์ ๊ณตํ์ง ์๋๋ค.
- ์ฌ๋ฌ Future๋ฅผ ์กฐํฉํ ์ ์๋ค. (ex: Event์ ๋ณด๋ฅผ ๋ฐ์ ๋ค์ Event์ ์ฐธ์ํ ํ์๋ชฉ๋ก์กฐํ)
- Future๋ฅผ ์ธ๋ถ์์ ์๋ฃ์ํฌ ์ ์๋ค. ์ทจ์ํ๊ฑฐ๋, get()์ ํ์์์์ ์ค์ ํ ์๋ ์๋ค.
- get()์ ํธ์ถํ๊ธฐ ์ ๊น์ง๋ future๋ฅผ ๋ค๋ฃฐ ์ ์๋ค.
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
...//TODO
future.get();
...//TODO
- ์ฌ๊ธฐ์ future๋
blocking call
future๋ฅผ get()์ผ๋ก ๊ฐ์ ธ์ค๋ ๋์์๋ ๋ค๋ฅธ ์์ ๋ค์ ์ํ์ด ์๋๋ค๋ ์๋ฏธ์ด๊ณ ๊ทธ ๊ธฐ๊ฐ์ด ๊ธธ์ด์ง์๋ก ์ฑ๋ฅ์ ๋จ์ด์ง์ ๋ฐ์ ์๋ค.
# 4-2. CompletableFuture
Future์ CompletionStage๋ฅผ ๊ตฌํํ๋ ๊ตฌํ์ฒด
- ์์ธ์ฒ๋ฆฌ๋ฅผ ์ง์ํ๋ ๋ฉ์๋
- ์์์ ์์กด๊ด๊ณ๋ฅผ ๋ง๋ ์ค๋ ๋ ํ๋ก๊ทธ๋๋ฐ
- ์ฝ๋ฐฑ์ ์ง์ํ๊ธฐ๋ ํ๋ฉฐ ์ฌ๋ฌ ์ค๋ ๋๋ฅผ ํ๋๋ก ๋ฌถ์ด ์ฒ๋ฆฌํ๊ธฐ์๋ ์ฉ์ด
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("youngjun");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("youngjun");
System.out.println("future = " + future.get());
# 4-2-1. ๋น๋๊ธฐ๋ก ์์ ์คํํ๊ธฐ
๋ฆฌํด๊ฐ์ด ์๋ ๊ฒฝ์ฐ: runAsync()
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); }); future.get();
๋ฆฌํด๊ฐ์ด ์๋ ๊ฒฝ์ฐ: supplyAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }); future.get();
์ํ๋ Executor(์ฐ๋ ๋ํ)๋ฅผ ์ฌ์ฉํด์ ์คํํ ์๋ ์๋ค. (๊ธฐ๋ณธ์ ForkJoinPool.commonPool())
# 4-2-2. ์ฝ๋ฐฑ ์ ๊ณตํ๊ธฐ
thenApply(Function)
๋ฆฌํด๊ฐ์ ๋ฐ์์ ๋ค๋ฅธ ๊ฐ์ผ๋ก ๋ฐ๊พธ๋ ์ฝ๋ฐฑ
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }).thenApply((s)->{ System.out.println("content: "+s); System.out.println(Thread.currentThread().getName()); return "HelloAsync"; }); System.out.println(future.get());
- Javascript์ Promise ์ ์ ์ฌํ ํํ
supplyAsync
์ ๋๋คํํ์์์ ๋ฐํ๋ Hello๋ผ๋ ๊ฐ์ ์ฒด์ด๋๋ ๋ฉ์๋thenApply
์ ์ธ์๊ฐ์ผ๋ก ๋ค์ด๊ฐ๊ณ ์ฌ์ฉ ๊ฐ๋ฅ- ๊ทธ๋ฆฌ๊ณ ๋ ์ด์ ์ฒด์ด๋๋ ๋ฉ์๋๊ฐ ์๊ธฐ ๋๋ฌธ์ return๊ฐ์ธ HelloAsync๋ ๋ฐํ๋์ด future๋ก ๋ค์ด๊ฐ๊ณ get()์ํตํด ๋ฐ์ ์ ์๋ค.
thenAccept(Consumer)
๋ฆฌํด๊ฐ์ ๋ฐ์ ๋ ๋ค๋ฅธ ์์ ์ ์ํํ๋๋ฐ ๋ฐํ๊ฐ์ ์๋ ์ฝ๋ฐฑ (๋ฆฌํด x)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }).thenAccept((s)->{ System.out.println("content: "+s); System.out.println(Thread.currentThread().getName()); // return ์๋ค. }); future.get();
thenRun(Runnable)
๋ฆฌํด๊ฐ์ ๋ฐ์ง ์๊ณ ๋ค๋ฅธ ์์ ์ ์ํํ๋ ์ฝ๋ฐฑ
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "hello"; }).thenRun(()->{ System.out.println(Thread.currentThread().getName()); }); future.get();
# ParallelStream vs. CompletableFuture
- from Java 8 in Action (opens new window)
- ParallelStream : I/O๊ฐ ํฌํจ๋์ง ์์ ๊ณ์ฐ ์ค์ฌ์ ๋์์ ์คํํ ๋๋ ์คํธ๋ฆผ ์ธํฐํ์ด์ค๊ฐ ๊ฐ์ฅ ๊ตฌํํ๊ธฐ ๊ฐ๋คํ๋ฉฐ ํจ์จ์ ์ผ ์ ์๋ค(๋ชจ๋ ์ค๋ ๋๊ฐ ๊ณ์ฐ ์์ ์ ์ํํ๋ ์ํฉ์์๋ ํ๋ก์ธ์ ์ฝ์ด ์ ์ด์์ ์ฐ๋ ๋๋ฅผ ๊ฐ์ง ํ์๊ฐ ์๋ค).
- CompletableFuture : ๋ฐ๋ฉด ์์
์ด I/O๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์์
์ ๋ณ๋ ฌ๋ก ์คํํ ๋๋
CompletableFuture
๊ฐ ๋ ๋ง์ ์ ์ฐ์ฑ์ ์ ๊ณตํ๋ฉฐ ๋๊ธฐ/๊ณ์ฐ(W/C)์ ๋น์จ์ ์ ํฉํ ์ค๋ ๋ ์๋ฅผ ์ค์ ํ ์ ์๋ค. ํนํ ์คํธ๋ฆผ์ ๊ฒ์ผ๋ฅธ ํน์ฑ ๋๋ฌธ์ ์คํธ๋ฆผ์์ I/O๋ฅผ ์ค์ ๋ก ์ธ์ ์ฒ๋ฆฌํ ์ง ์์ธกํ๊ธฐ ์ด๋ ค์ด ๋ฌธ์ ๋ ์๋ค. - fahd.blog: Java 8: CompletableFuture vs Parallel Stream (opens new window)
# 5. CompletableFuture (2)
# 5-1. CompletableFuture ์กฐํฉ ๋ฉ์๋
# 5-1-1. thenCompose()
- ๋ ์์ ์ด ์๋ก ์ด์ด์ ์คํํ๋๋ก ์กฐํฉํ๋ฉฐ ์ฐ๊ด๋ future๊ฐ์ ๋ง์ด ์ฌ์ฉ
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorldFuture(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + " World";
});
}
}
/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/
# 5-1-2. thenCombine()
- ๋ ์์ ์ ๋ ๋ฆฝ์ ์ผ๋ก ์คํํ๊ณ ๋ ๋ค ์ข ๋ฃ ํ์๋ ์ฝ๋ฐฑ ์คํ.
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<String> resultFuture = msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
System.out.println(resultFuture.get());
}
# 5-1-3. allOf()
- ์ฌ๋ฌ ์์ ์ ๋ชจ๋ ์คํํ๊ณ ๋ชจ๋ ์์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);
CompletableFuture<List<String>> results =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
results.get().forEach(System.out::println);
}
# 5-1-4. anyOf()
- ์ฌ๋ฌ ์์ ์ค ๊ฐ์ฅ ๋๋ ํ๋์ ๊ฒฐ๊ณผ๋ฅผ ์ฝ๋ฐฑ์ ๋๊ฒจ ์คํ
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();
# 5-2. ์์ธ ์ฒ๋ฆฌ ๋ฉ์๋
# 5-2-1. exeptionally(Function)
boolean throwError = true;
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
}).exceptionally(ex->{
System.out.println(ex);
return "Error";
});
System.out.println(msFuture.get());
# 5-2-2. handle(BiFunction)
boolean throwError = false;
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
}).handle((result, ex)->{
if (Objects.nonNull(ex)) {
System.out.println(ex);
return "ERROR";
}
return result;
});
System.out.println(msFuture.get());
# 5-3. CompletableFuture ๋ฅผ ์ธ์ ์ธ ์ ์์๊น
- ์๋๋ microservices.io ์ฌ์ดํธ์์ ๊ฐ์ ธ์จ ์์ ์ด๋ค.
- CompletableFuture ๋ฅผ ์ฌ์ฉํ ๋งํ ๋ถ๋ถ์ API GATEWAY, Storefront WebApp ๋ผ๊ณ ํ ์ ์๋ค. ์ด์ ๋ Account, Inventory, Shipping Service ๋ DATA BASE ์ ์ฐ๋ํ๊ณ ์๊ธฐ ๋๋ฌธ์ด๋ค. java ํ๊ฒฝ์์ RDBMS ์ฐ๋์์ ์์ง Async ํ ์ธํฐํ์ด์ค ๋ฐฉ์์ ์ ๊ณตํ๊ณ ์์ง ์๊ธฐ ๋๋ฌธ์ Async ๊ฐ๋ฐ์ ํด์ ํฐ ํจ๊ณผ๋ฅผ ์ป๊ธฐ ๋ถ์กฑํ๋ค.
- ํ์ง๋ง GATEWAY ๋ฑ์ REST Service ์ ์ฐ๋ํ๊ธฐ ๋๋ฌธ์ Async ๋ฅผ ํตํ ์ฑ๋ฅ ํจ๊ณผ๋ฅผ ๋ณผ ์ ์๋ค. (Blocking Thread ๋ฌธ์ ํด๊ฒฐ ๋ฑ)
Reference
- https://www.hungrydiver.co.kr/bbs/detail/develop?id=2 (opens new window)
- https://kwonnam.pe.kr/wiki/java/8/completable_future (opens new window)
- https://huisam.tistory.com/entry/completableFuture (opens new window)
- https://tourspace.tistory.com/137 (opens new window)
- https://bbubbush.tistory.com/23 (opens new window)