일시 중단 함수는 일반적인 함수를 더 일반화해 함수 본문의 원하는 지점에서 함수에 필요한 모든 런타임 문맥을 저장하고 함수 실행을 중단한 다음, 나중에 필요할 때 다시 실행을 계속 진행할 수 있게 한 것이다.
코틀린에서는 이런 함수에 suspend 라는 변경자를 붙인다.
suspendfunfoo() {println("${System.currentTimeMillis()} : Task started (${Thread.currentThread()})")delay(1000)println("${System.currentTimeMillis()} : Task finished (${Thread.currentThread()})")}suspendfunmain() {foo()println("${System.currentTimeMillis()} : Main (${Thread.currentThread()})")}
1720589906842 : Task started (Thread[main,5,main])
1720589907854 : Task finished (Thread[kotlinx.coroutines.DefaultExecutor,5,main])
1720589907855 : Main (Thread[kotlinx.coroutines.DefaultExecutor,5,main])
delay() 함수는 Thread.sleep() 과 비슷한 일을 한다. 하지만 현재 스레드를 블럭시키지 않고 자신을 호출한 함수를 일시 중단시키며 스레드를 다른 작업을 수행할 수 있게 풀어준다.
동시성 함수는 주로 공통적인 생명 주기와 문맥이 정해진 몇몇 작업이 정의된 구체적인 영역 안에서만 호출한다.
코루틴을 실행할 때 사용하는 여러 가지 함수를 코루틴 빌더(coroutine builder) 라고 부른다.
코루틴 빌더는 CoroutineScope 인스턴스의 확장 함수로 쓰인다. CoroutineScope에 대한 구현 중 가장 기본적인 것으로 GlobalScope 객체가 있다. GlobalScope 객체를 사용하면 독립적인 코루틴을 만들 수 있고, 이 코루틴은 자신만의 작업을 내포할 수 있다.
자주 사용하는 launch() , async() , runBlocking() 이라는 코루틴 빌더를 살펴보자.
1.2. 코루틴 빌더
launch() 함수는 코루틴을 시작하고, 코루틴을 실행 중인 작업의 상태를 추적하고 변경할 수 있는 Job 객체를 돌려준다. 이 함수는 CoroutineScope.() -> Unit 타입의 일시 중단 람다를 받는다.
import java.lang.System.currentTimeMillisimport kotlinx.coroutines.GlobalScopeimport kotlinx.coroutines.delayimport kotlinx.coroutines.launchfunmain() {val time =currentTimeMillis() GlobalScope.launch {delay(100)println("Task 1 finished in ${currentTimeMillis() - time} ms (${Thread.currentThread()})") } GlobalScope.launch {delay(100)println("Task 2 finished in ${currentTimeMillis() - time} ms (${Thread.currentThread()})") } GlobalScope.launch {delay(100)println("Task 3 finished in ${currentTimeMillis() - time} ms (${Thread.currentThread()})") } GlobalScope.launch {delay(100)println("Task 4 finished in ${currentTimeMillis() - time} ms (${Thread.currentThread()})") } GlobalScope.launch {delay(100)println("Task 5 finished in ${currentTimeMillis() - time} ms (${Thread.currentThread()})") } GlobalScope.launch {delay(100)println("Task 6 finished in ${currentTimeMillis() - time} ms (${Thread.currentThread()})") } Thread.sleep(200)}
Task 5 finished in 131 ms (Thread[DefaultDispatcher-worker-3,5,main])
Task 6 finished in 131 ms (Thread[DefaultDispatcher-worker-1,5,main])
Task 3 finished in 131 ms (Thread[DefaultDispatcher-worker-6,5,main])
Task 4 finished in 131 ms (Thread[DefaultDispatcher-worker-5,5,main])
Task 1 finished in 131 ms (Thread[DefaultDispatcher-worker-4,5,main])
Task 2 finished in 131 ms (Thread[DefaultDispatcher-worker-2,5,main])
DefaultDispatcher-worker-n : 스레드의 이름
DefaultDispatcher : 코루틴 디스패처 중 하나인 DefaultDispatcher에 의해 생성된 스레드임을 알 수 있음. 해당 디스패처는 주로 코어 수에 비례한 풀 크기를 가지며, CPU 집중적인 작업에 사용된다.
worker-n : DefaultDispatcher에서 생성된 스레드 중 하나이며, worker-n은 그 중 n 번째 스레드 임을 의미한다.
n : 스레드 우선순위
스레드 우선순위는 1(가장 낮음) 부터 10(가장 높음)까지 설정할 수 있으며, 5는 기본 우선순위 이다.
main : 스레드 그룹
이 스레드가 속한 스레그 그룹의 이름을 의미한다.
모든 작업이 동시에 끝났다는 점에서 모든 작업이 실제로 병렬적으로 실행됐다는 것을 알 수 있다. 다만 실행 순서가 보장되지 않으므로 다른 작업이 먼저 표시될 수 있다. 코루틴 라이브러리는 필요할 때 실행 순서를 강제할 수 있는 도구도 제공한다.
코루틴을 처리하는 스레드는 데몬 모드로 실행되기 때문에 main 스레드가 코루틴보다 빨리 끝나버리면 자동으로 실행이 종료된다.
코루틴은 스레드보다 훨씬 가볍다. 특히 코루틴은 유지해야 하는 상태가 더 간단하며 일시 중단되고 재개될 때 완전한 문맥 전환을 사용하지 않아도 되므로 엄청난 수의 코루틴을 충분히 동시에 실행할 수 있다.
launch() 빌더는 동시성 작업이 결과를 만들어내지 않는 경우 적합하다. 그러므로 결과가 필요한 경우에는 async() 라는 다른 빌더 함수를 사용해야 한다. 이 함수는 Deferred 의 인스턴스를 돌려주고, 이 인스턴스는 Job의 하위 타입으로 await() 메서드를 통해 계산 결과에 접근할 수 있게 해준다. await()는 계산이 완료되거나 계산 작업이 취소될 때까지 현재 코루틴을 일시 중단시킨다.
runBlocking() 내부의 코루틴은 메인 스레드에서 실행된 반면, launch()로 시작한 코루틴은 공유 풀에서 백그라운드 스레드를 할당받았음을 알 수 있다.
이런 블러킹 동작 때문에 runBlocking()을 다른 코루틴 안에서 사용하면 안된다. runBlocking()은 블러킹 호출과 넌블러킹 호출 사이의 다리 역할을 하기 위해 고안된 코루틴 빌더이므로, 테스트나 메인 함수에서 최상위 빌더로 사용하는 등의 경우에만 써야 한다.
1.3. 코루틴 영역과 구조적 동시성
지금까지 살펴본 예제 코루틴은 전역 영역(global scope)에서 실행됐다. 전역 영역이란 코루틴의 생명 주기가 전체 애플리케이션의 생명 주기에 의해서만 제약되는 영역이다.
동시성 작업 사이의 부모 자식 관계를 통해 실행 시간 제한이 가능하다. 어떤 코루틴을 다른 코루틴의 문맥에서 실행하면 후자가 전자의 부모가 된다. 이 경우 자식의 실행이 모두 끝나야 부모가 끝날 수 있도록 부모와 자식의 생명 주기가 연관된다.
이런 기능을 구조적 동시성(structured concurrency) 이라고 부르며, 지역 변수 영역 안에서 블럭이나 서브 루틴을 사용하는 경우와 구조적 동시성을 비교할 수 있다. 예제를 살펴보자.
funmain() {runBlocking {println("Parent task started : ${Thread.currentThread()}")launch {println("Task A started : ${Thread.currentThread()}")delay(200)println("Task A finished : ${Thread.currentThread()}") }launch {println("Task B started : ${Thread.currentThread()}")delay(200)println("Task B finished : ${Thread.currentThread()}") }delay(100)println("Parent task finished : ${Thread.currentThread()}") }println("Shutting down... : ${Thread.currentThread()}")}
Parent task started : Thread[main,5,main]
Task A started : Thread[main,5,main]
Task B started : Thread[main,5,main]
Parent task finished : Thread[main,5,main]
Task A finished : Thread[main,5,main]
Task B finished : Thread[main,5,main]
Shutting down... : Thread[main,5,main]
이 코드는 최상위 코루틴을 시작하고 현재 CoroutineScope 인스턴스 안에서 launch를 호출해 두 가지 자식 코루틴을 시작한다.
coroutineScope() 호출로 코드 블럭을 감싸면 커스텀 영역을 도입할 수도 있다. runBlocking()과 마찬가지로 coroutineScope() 호출은 람다의 결과를 반환하고, 자식들이 완료되기 전까지 실행이 완료되지 않는다.
funmain() {runBlocking {println("Parent task started : ${Thread.currentThread()}")coroutineScope {launch {println("Task A started : ${Thread.currentThread()}")delay(200)println("Task A finished : ${Thread.currentThread()}") }launch {println("Task B started : ${Thread.currentThread()}")delay(200)println("Task B finished : ${Thread.currentThread()}") } }println("Parent task finished : ${Thread.currentThread()}") }println("Shutting down... : ${Thread.currentThread()}")}
Parent task started : Thread[main,5,main]
Task A started : Thread[main,5,main]
Task B started : Thread[main,5,main]
Task A finished : Thread[main,5,main]
Task B finished : Thread[main,5,main]
Parent task finished : Thread[main,5,main]
Shutting down... : Thread[main,5,main]
coroutineScope()은 일시 중단 함수라 현재 스레드를 블럭시키지 않는다.
두 자식 코루틴 실행이 끝날 때까지 앞의 coroutineScope() 호출이 일시 중단되므로 Parent task finished 메시지가 마지막에 표시된다.
1.4. 코루틴 문맥
코루틴마다 CoroutineContext 인터페이스로 표현되는 문맥이 연관돼 있으며, 코루틴을 감싸는 변수 영역의 coroutineContext 프로퍼티를 통해 이 문맥에 접근할 수 있다. 문맥은 키-값 쌍으로 이뤄진 불변 컬렉션이며, 코루틴에서 사용할 수 있는 여러 가지 데이터가 들어 있다.
코루틴이 실행 중인 취소 가능한 작업을 표현하는 잡(job)
코루틴과 스레드의 연관을 제어하는 디스패처(dispatcher)
일반적으로 문맥은 CoroutineContext.Element 를 구현하는 아무 데이터나 저장할 수 있다. 특정 원소에 접근하려면 get() 메서드나 인덱스 연산자에 키를 넘겨야 한다.
funmain() {runBlocking {launch {println("Job.Key : ${Job.Key}")println("coroutineContext : $coroutineContext")println("Task is active : ${coroutineContext[Job.Key]!!.isActive} (${Thread.currentThread()})") } }}
Job.Key : kotlinx.coroutines.Job$Key@6325a3ee
coroutineContext : [StandaloneCoroutine{Active}@7b49cea0, BlockingEventLoop@887af79]
Task is active : true (Thread[main,5,main])
기본적으로 launch(), async() 등의 표준 코루틴 빌더에 의해 만들어지는 코루틴은 현재 문맥을 이어받는다.
필요하면 빌더 함수에 context 파라미터를 지정해서 새 문맥을 넘길 수도 있다. 새 문맥을 만들려면 두 문맥의 데이터를 합쳐주는 plus() 함수나 + 연산자를 사용하거나, 주어진 키에 해당하는 원소를 문맥에서 제거해주는 minusKey() 함수를 사용하면 된다.
Dispatchers.Default : 공유 스레드 풀로, 풀 크기는 디폴트로 사용 가능한 CPU 코어 수이거나 2다(둘 중 큰 값). 이 구현은 일반적으로 작업 성능이 주로 CPU 속도에 의해 결정되는 CPU 위주의 작업에 적합하다.
Dispatchers.IO : 스레드 풀 기반이며 디폴트 구현과 비슷하지만, 파일을 읽고 쓰는 것처럼 잠재적으로 블러킹될 수 있는 I/O를 많이 사용하는 작업에 최적화돼 있다. 이 디스패처는 스레드 풀을 디폴트 구현과 함께 공유하지만, 필요에 따라 스레드를 추가하거나 종료시켜준다.
Dispatchers.Main : UI 스레드에서만 배타적으로 작동하는 디스패처
디스패처를 명시적으로 지정하지 않으면 코루틴을 시작한 영역으로부터 디스패처가 자동으로 상속된다.
부모 코루틴이 자식과 똑같은 오류로 취소된다. 이로 인해 부모의 나머지 자식도 모두 취소된다.
자식들이 모두 취소되고 나면 부모는 예외를 코루틴 트리의 윗부분으로 전달한다.
전역 영역에 있는 코루틴에 도달할 때까지 이 과정이 반복된다.
그 후 예외가 CoroutineExceptionHanlder.Consider에 의해 처리된다.
funmain() {runBlocking {println("Root Start")launch {throwException("Error in task A")println("Task A completed") }launch {delay(1000)println("Task B completed") }delay(1000)println("Root Completed") }}
Root Start
Exception in thread "main" java.lang.Exception: Error in task A
at Test20Kt$main$1$1.invokeSuspend(Test20.kt:12)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at Test20Kt.main(Test20.kt:8)
at Test20Kt.main(Test20.kt)
첫 번째 코루틴이 예외를 던져서 최상위 작업이 취소되고, 최상위의 자식인 두 작업도 취소된다.
커스텀 핸들러가 지정되지 않았기 때문에 Thread.uncaughtExceptionHandler에 등록된 디폴트 동작을 실행한다.
CoroutineExceptionHandler 는 현재 코루틴 문맥(CoroutineContext)과 던져진 예외를 인자로 전달받는다.
val handler =CoroutineExceptionHandler { _, exception ->println("Caught $exception") }
핸들러가 예외를 처리하도록 지정하려면 코루틴 문맥에 인스턴스를 넣어야 한다.
suspendfunmain() {val handler =CoroutineExceptionHandler { _, exception ->println("Caught $exception") } GlobalScope.launch(handler) {println("Root start")launch {throwException("Error in task A")println("Task A completed") }launch {delay(1000)println("Task B completed") }delay(1000)println("Root completed") }.join()}
Root start
Caught java.lang.Exception: Error in task A
CoroutineExceptionHandler는 전역 영역에서 실행된 코루틴에 대해서만 정의할 수 있고, CoroutineExceptionHandler가 정의된 코루틴의 자식에 대해서만 적용된다. 그러므로 runBlocking()을 그대로 사용하면 예외 핸들러가 동작하지 않는다.
예외를 처리하는 다른 방법은 던져진 예외를 저장했다가 예외가 발생한 계산에 대한 await() 호출을 받았을 때 다시 던지는 것이다.
suspendfunmain() {runBlocking {val deferredA =async {throwException("Error in task A")println("Task A completed") }val deferredB =async {println("Task B completed") } deferredA.await() deferredB.await()println("Root completed") }}
Exception in thread "main" java.lang.Exception: Error in task A
at Test23Kt$main$2$deferredA$1.invokeSuspend(Test23.kt:7)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at Test23Kt.main(Test23.kt:5)
at Test23Kt$main$3.invoke(Test23.kt)
at Test23Kt$main$3.invoke(Test23.kt)
at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$1.invokeSuspend(IntrinsicsJvm.kt:270)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlin.coroutines.ContinuationKt.startCoroutine(Continuation.kt:115)
at kotlin.coroutines.jvm.internal.RunSuspendKt.runSuspend(RunSuspend.kt:19)
at Test23Kt.main(Test23.kt)
try-catch 블록으로 예외를 처리하려고 시도할 경우 어떤 일이 벌어지는지 확인해보자.
funmain() {runBlocking {val deferredA =async {throwException("Error in task A")println("Task A completed") }val deferredB =async {println("Task B completed") }try { deferredA.await() deferredB.await() } catch (e: Exception) {println("Caught $e") } }println("completed")}
Caught java.lang.Exception: Error in task A
Exception in thread "main" java.lang.Exception: Error in task A
at Test24Kt$main$1$deferredA$1.invokeSuspend(Test24.kt:7)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at Test24Kt.main(Test24.kt:5)
at Test24Kt.main(Test24.kt)
catch가 동작하긴 하지만 프로그램은 여전히 예외와 함께 중단된다.
자식이 실패한 경우에는 부모를 취소시키기 위해 자동으로 예외를 다시 던지기 때문이다.
위와 같은 동작을 변경하려면 슈퍼바이저(supervisor) 잡을 사용해야 한다.
부모 코루틴을 슈퍼바이저로 변환하려면 coroutineScope() 대신 supervisorScope() 함수를 사용해 새로운 영역을 정의해야 한다.
가변 상태를 스레드 안전하게 공유하는 액터(actor)가 있다. 액터는 내부 상태와 다른 액터에게 메시지를 보내서 동시성 통신을 진행할 수 있는 수단을 제공하는 객체다. 액터는 자신에게 들어오는 메시지를 listen하고, 자신의 상태를 바꾸면서 메시지에 응답할 수 있으며, 다른 메시지를 보낼 수 있고, 새로운 액터를 시작할 수 있다. 액터 모델은 락 기반의 동기화와 관련한 여러 가지 문제로부터 자유로울 수 있다.
actor() 함수를 통해 액터를 만들 수 있다. 액터는 ActorScope를 만들며, 이 영역은 기본 코루틴 영역에 자신에게 들어오는 메시지에 접근할 수 있는 수신자 채널이 추가된 것이다.
다음 예제를 통해 은행 계좌 잔고를 유지하고 어떤 금액을 저축하거나 인출할 수 있는 액터를 살펴보자.