코틀린 코루틴 한판 정리

코틀린의 등장 배경

프로세스는 별도 설정없이는 단일 스레드로 실행됩니다 (Single Thread Process), 실행 중에 File System IO작업, Network작업과 같이 Blocking 작업이 실행되면 대기하게 됩니다.
이를 해결하기 위해 멀티 스레드 프로그래밍이 등장했습니다, 작업간 독립성이 있는 경우 Parallel Processing 이 가능하기 떄문에 단일 스레드보다 더 높은 작업량을 처리할 수 있습니다.

멀티 스레드 프로그래밍도 만능은 아니고, 여러 단점이 존재합니다. Thread 생성 비용과 Thread간 context switching 비용이 존재합니다,
JVM에선 1개의 Thread 당 약 1MB의 Stack 메모리 영역을 차지합니다.
또한 I/O 작업과 같이 thread가 blocking 되는 일도 발생합니다.

thread blocking : thread가 아무것도 하지 않고, 대기하는 상태 (Mutex, Semaphore 로 인해 공유되는 자원에 접근할 수 있는 스레드가 제한되는 경우에도 발생함)]
예를 들어 아래의 상황에서도 blocking 될 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
fun main(args: Array<String>) {
val executorService = Executors.newFixedThreadPool(2)
val future = executorService.submit<String>{
Thread.sleep(1000L)
return@submit "작업완료!";
}

val result = future.get(); // thread blocking 됨!!
println(result)
executorService.shutdown()
}

thread blocking문제를 코루틴이라는 경량 쓰레드(lightweight thread)로 해결할 수 있는데요,
코루틴은 작업이 일시 중단되면 더 이상 쓰레드 사용이 필요없으므로 쓰레드를 다른 코루틴에게 양보합니다.
즉 마치 코루틴을 쓰레드에 땟다가 붙이면서 쓰레드가 쉬는 시간을 갖지 않도록 열심히 채찍질하는거죠..

그래서 코루틴의 정의란 ?

일시 중단이 가능한 ‘작업’입니다. thread에 땟다가 붙일 수가 있기 때문에 경량 쓰레드라고도 불리고 있습니다.

CouroutineDispatcher

CouroutineDispatcher는 코루틴을 thread로 보내어 실행시키는 역할을 합니다. 즉 코루틴의 실행을 관리하는 주체입니다.

  • CouroutineDispatcher에서 애플리케이션이 가지고 있는 Thread Pool 관리
  1. 1번 Thread에서 1번 코루틴 실행중
  2. 2번 코루틴이 들어오면, CouroutineDispatcher가 2번 코루틴을 2번 Thread에게 할당
  3. 1,2번 Thread 모두 점유중인 상태에서 3번 코루틴이 들어오면 작업 대기열에 대기
  4. 1번 코루틴이 작업이 완료되면 1번 Thread에 작업 대기열에서 대기중인 3번 코루틴이 실행됨.

CoroutineDispatcher 의 종류

  1. Confined Dispatcher (제한된 디스패처) : 사용할 수 있는 스레드나 스레드풀이 제한된 디스패처
  2. UnConfined Dispatcher (무제한 디스패처) : 사용할 수 있는 스레드나 스레드풀이 제한되지 않은 디스패처

CoroutineDispatcher는 코루틴 라이브러리에서 미리 정의해둔 목록이 존재합니다, 만약 직접생성하게되는 경우, 메모리를 낭비하게 될 수도 있습니다,,

  1. Dispatchers.IO : IO bound 작업
  2. Dispatchers.Default : CPU bound 작업

또한 코루틴 라이브러리는 스레드의 생성과 관리를 효율적으로 할 수 있도록 application 레벨의 공유 스레드풀을 제공합니다.

정리하면, CoroutineDispatcher 객체는 코루틴을 Thread로 보내 실행하는 객체이다. 코루틴을 작업 대기열에 적재한 후 사용이 가능한 스레드로 보내 실행한다.

코루틴 빌더함수

  • 코루틴을 생성하는데 사용하는 함수로 (runBlocking 함수, launch함수 모두 코루틴 빌더 함수입니다.) 모든 코루틴 빌더함수는 코루틴을 만들고, 코루틴을 추상화한 Job 객체를 생성합니다.
    Job 객체를 통해 코루틴의 상태를 추적하고 제어합니다.

Job 객체의 join 함수를 호출하면 join의 대상이 된 코루틴의 작업이 완료될떄까지 join을 호출한 코루틴이 일시 중단됩니다.

join, joinAll함수를 활용하면 복수의 코루틴에 대한 순차처리도 가능합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
runBlocking {
val convertImageJob1 :Job = launch(Dispatchers.Default) {
Thread.sleep(1000L)
println("[${Thread.currentThread().name}] 이미지1 변환 완료")
}
val convertImageJob2 :Job = launch(Dispatchers.Default) {
Thread.sleep(1000L)
println("[${Thread.currentThread().name}] 이미지2 변환 완료")
}

joinAll(convertImageJob1, convertImageJob2) // 두 코루틴이 종료될때까지 runBlocking 코루틴 중단

val uploadImageJob :Job = launch(Dispatchers.IO) {
println("[${Thread.currentThread().name}] 이미지1,2 업로드")
}
}

코루틴을 생성만 하고, 시작은 하지 않고 싶다면 start 파라미터에 지연 설정도 줄 수가 있습니다.

1
2
3
4
5
6
7
8
runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val lazyJob :Job = launch(start = CoroutineStart.LAZY){
println("[${getElapsedTime(startTime)}] 지연 실행") // 지연 코루틴은 명시적으로 실행하지 않으면 대기
}
delay(1000L)
lazyJob.start()
}

취소 또한 가능합니다, 단 cancel 함수나 cancelAndJoin 함수를 사용했다고 해서 코루틴이 즉시 취소되는 것은 아닙니다,
이들은 Job 객체 내부에 있는 취소확인용 플래그를 바꾸기만 하며,
코루틴이 이 플래그를 확인하는 시점에 비로소 취소됩니다.
코루틴이 취소를 확인하는 시점은 일반적으로 일시 중단 지점이나 코루틴이 실행을 대기하는 시점이며, 이 시점들이 없다면 코루틴이 취소되지 않습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val longJob :Job = launch(Dispatchers.Default){
repeat(10) {
repeatTime ->
delay(1000L)
println("[${getElapsedTime(startTime)}] 반복횟수 ${repeatTime}")
}
}
delay(3500L)
longJob.cancel()// 즉시 취소되는건 아니라 Job 객체 내부의 취소 확인용 플래그를 취소 요청됨으로 변경함으로써 코루틴이 취소되야 한다는것을 알린다.
}


만약 아래와 같이 코루틴이 취소flag값을 확인할 틈을 안주면 당연히 코루틴은 취소되지 않습니다.

1
2
3
4
5
6
7
8
9

runBlocking<Unit> {
val infiniteJob :Job = launch(Dispatchers.Default){
while (true){
println("작업중")
}
}
infiniteJob.cancelAndJoin()
}

이를 해결하기 위해서는 코루틴이 취소 플래그값을 체크할 시간을 주면 됩니다, 아래의 함수를 사용하거나 플래그값으로 분기처리할 수 있습니다.

  1. delay : 일시중단 함수(suspend fun)를 통해 코루틴을 일시 중단함, 해당 스레드가 blocking 되는건아니고 non-blocking으로 코루틴만 일시중단되고, thread는 다른 코루틴에 할당될 수 있음
  2. yield : 코루틴이 자신이 사용하는 스레드를 양보할 수 있다.
  3. CoroutineScope.isActive : 코루틴 활성화 여부 반환

코틀린의 상태

  1. 생성(New): 코루틴 빌더를 통해 코루틴을 생성하면 기본적으로 생성상태가 되며, 지연 코루틴이 아니라면 자동중으로 실행중 상태로 넘어간다.
  2. 실행중(Active): 지연 코루틴이 아닌 코루틴을 만들면 자동으로 실행 중 상태로 바뀐다.
  3. 실행 완료 중 : 부모 코루틴의 모든 코드가 실행되었지만, 자식 코루틴이 실행중인 경우일때, 부모 코루틴이 갖는 상태입니다.
  4. 실행 완료(Completed)
  5. 취소 중(Cancelling) : Job.cancel() 등을 통해 코루틴에 취소가 요청됐을 경우 취소 중 상태로 넘어간다. 아직 취소된 상태는 아니라서 코루틴은 계속해서 실행된다.
  6. 취소 완료 (Cancelled)

Async 와 Deferred

launch 코루틴 빌더를 통해 생성되는 코루틴은 기본적으로 작업 실행 후 결과를 반환하지 않습니다.

  • launch 코루틴 빌더 함수를 사용하면 결괏값이 없는 코루틴 객체인 Job이 반환

  • async 코루틴 빌더 함수를 사용하면 결괏값이 있는 코루틴 객체인 Deferred 반환

    • Deferred 객체는 미래의 어느 시점에 결과값이 반환될 수 있음을 표현하는 코루틴 객체입니다.
    • Deferred 객체는 결과값 수신 대기를 위해 await 함수를 제공합니다.
    • await 함수는 await 의 대상이 된 Deferred 코루틴이 실행 완료될떄까지 await 함수를 호출한 코루틴을 일시 중단합니다. (마치 Job 객체의 join과 매우 유사)
1
2
3
4
val networkDeferred :Deferred<String> = async(Dispatchers.IO) {
delay(1000L)
return@async "Dummy Response"
}
  • Deferred는 Job 인터페이스의 서브타입으로 즉, Job 객체의 일종입니다.
1
public interface Deferred<out T> : Job 

CoroutineContext

  • 코루틴을 실행하는 실행 환경을 설정하고, 관리하는 인터페이스입니다

주요 구성 요소

  1. CoroutineName : 코루틴의 이름 설정할 수 있습니다.
  2. CoroutineDispatcher : 코루틴을 쓰레드에 할당해서 실행합니다.
  3. Job: 코루틴의 추상체로 코루틴을 조작하는데 사용한다.
  4. CoroutineExceptionDispatcher : 코루틴에서 발생한 예외를 처리한다.

각 구성 요소는 K-V꼴로 관리된다, 따라서 Key에 따라 중복된 값은 허용되지 않습니다.
CoroutineContext는 다음과 같이 객체 간에 더하기 연산자(+)를 사용해 CoroutineContext 객체를 구성할 수 있습니다.

1
2
val coroutineContext = newSingleThreadContext("MyThread-Chansoo") + CoroutineName("MyCoroutine")
val newCoroutineContext = coroutineContext + newSingleThreadContext("MyThread-Overrided") // 중복된 키값을 가진 경우 덮어씌여집니다.

특정 key만 제거할 수도 있습니다.

1
2
val coroutineContext = CoroutineName("MyCoroutine1") + newSingleThreadContext("MyThread1")
val deletedCoroutineNameContext = coroutineContext.minusKey(coroutineName.key)

구조화된 동시성 (Structured Concurrency)

  • 부모 코루틴의 실행환경은 기본적으로 자식 코루틴에게 상속됩니다.

자식이 별도로 CoroutineContext를 설정하다면, 설정한 구성요소는 부모 CoroutineContext를 Overriding 하게 됩니다.
단, Job 객체는 항상 새롭게 생성합니다. 코루틴 제어에 Job객체가 필요한데, Job 객체를 부모 코루틴으로부터 상속받게 되면, 개별 코루틴의 제어가 어려워지기 때문이다.

1
2
3
4
5
6
7
8
val coroutineContext = newSingleThreadContext("MyThread") + CoroutineName("CoroutineA")
launch(coroutineContext) {
// 부모 코루틴
println("[${Thread.currentThread().name}] 부모 코루틴 실행")
launch {
println("[${Thread.currentThread().name}] 자식 코루틴 실행")
}
}
  • 부모 코루틴이 취소되면 자식 코루틴도 취소된다.
  • 부모 코루틴은 자식 코루틴이 완료될때까지 대기한다.
  • CoroutineScope를 사용해 코루틴이 실행되는 범위를 제한할 수 있다.

Comments