Annotation use-site targets

사용 지점 대상 Annotation

  • 코틀린에서는 소스코드에서 한 선언을 컴파일한 결과가 여러 자바 선언과 대응되는 경우가 많다. 따라서 여러 자바 선언에서 각각 Annotation을 붙여야 하는 경우가 존재한다.

예를 들면 코틀린의 property는 기본적으로 Java의 property + Getter method 선언과 대응된다.
만약 var 타입이라면 Setter method선언까지 대응되는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Kotlin Code
class Test(var value:String?)

// Java Code
final class Test {

String value;

public String getValue(){
return this.value;
}

public void setValue(String value){
this.value = value;
}
}

따라서 코틀린에서 특정 프로그래밍 요소에 Annotation을 붙일떄는 사용 지점 대상 선언으로 어떤 프로그래밍 요소에 Annotation을 붙일 것인지 명시할 수 있다.

사용 지점 대상은 @적용대상:Annotation명으로 아래와 같이 사용한다.

1
2
// 사용 지점 대상  : 적용 Annotation 명 
@get:Rule

위 코드의 뜻은 @Rule Annotation을 Getter 에 적용하라는 뜻이다.

Read more

코루틴(Coroutines)

코루틴이란?

  • 비선점형 (협력형) 멀티 태스킹 (non-preemptive multitasking)으로 실행을 일시 중단(suspend) 하고 재개(resume) 할 수 있는 여러 진입 지점을 허용한다.

  • 서로 협력해서 실행을 주고받으면서 작동하는 여러 서브루틴을 말한다.

  • 일반적인 서브루틴은 오직 한 가지 진입 지점만을 가진다. 함수를 호출하는 부분이며, 그때마다 활성 레코드 (activation record)가 스택에 할당되면서 서브루틴 내부의 로컬 변수등이 초기화 된다. 또한 서브루틴에서 반환되고 나면 활성 레코드가 스택에서 사라지기 떄문에 모든 상태를 잃어버린다.

1
2
3
4
5
6
7
8
9
fun main() {
//main routine
val extension = getFileExtension("cs.txt")
}

fun getFileExtension(fileName:String) :String{
// subroutine
return fileName.substringAfter(".")
}
  • 반면 코루틴은 실행을 일시 중단하고 진입 지점을 허용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fun log(msg:String , self : Any?) = println("Current Thread : ${Thread.currentThread().name} / this : $self :$msg")


fun main() {
log("main routine started",null)
yieldExample()
log("main routine ended",null)
}


fun yieldExample(){
runBlocking{ //내부 코루틴이 모두 끝난뒤 반환
launch {
log("1",this)
yield() //
log("3",this)
yield()
log("5",this)
}
log("after first launch",this)
launch {
log("2",this)
yield()
log("4",this)
yield()
log("6",this)
}
log("after second launch",this)
}

}

실행로그

1
2
3
4
5
6
7
8
9
10
Current Thread : main / this : null :main routine started
Current Thread : main / this : BlockingCoroutine{Active}@33e5ccce :after first launch
Current Thread : main / this : BlockingCoroutine{Active}@33e5ccce :after second launch
Current Thread : main / this : StandaloneCoroutine{Active}@2ac1fdc4 :1
Current Thread : main / this : StandaloneCoroutine{Active}@3ecf72fd :2
Current Thread : main / this : StandaloneCoroutine{Active}@2ac1fdc4 :3
Current Thread : main / this : StandaloneCoroutine{Active}@3ecf72fd :4
Current Thread : main / this : StandaloneCoroutine{Active}@2ac1fdc4 :5
Current Thread : main / this : StandaloneCoroutine{Active}@3ecf72fd :6
Current Thread : main / this : null :main routine ended

위 코드를 분석하기 전에 각각 함수가 하는 역할을 정리하면 다음과 같다.

  • runBlocking : coroutine builder 로서 내부 코르틴이 모두 끝난 다음에 반환된다.
  • launch : coroutine builder로서 , 넘겨받은 코드 블록으로 새로운 코르틴을 생성하고 실행시켜준다.
  • yield : 해당 코르틴이 실행권을 양보하고, 실행 위치를 기억하고, 다음 호출때는 해당 위치부터 다시 실행한다.

위 코르틴에서 1,3,5를 출력하는 코르틴과 2,4,6를 출력하는 코르틴이 서로 실행권을 양보해가면서 실행된다. 한가지 유의할점은 마치 병렬적으로 실행되는 것처럼 보이지만 다른 쓰레드가 아니라 하나의 쓰레드에서 수행된다는 점이다. 따라서 Context Switching 도 발생하지 않는다.

  • Launch coroutine Builder는 Job 객체를 반환한다. Job은 N개 이상의 coroutines의 동작을 제어할 수도 있으며, 하나의 coroutines 동작을 제어할수도 있다.
1
2
3
4
5
6
7
8
9
10
suspend fun main()  = coroutineScope {
// Job 객체는 하나이상의 Coroutine 의 동작을 제어할 수 있다.
val job : Job = launch {
delay(1000L)
println("World!")
}
println("Hello,")
job.join()
println("Done.")
}

Job

코루틴의 Job 객체는 코루틴의 상태를 가지고 있다.

1
2
3
4
5
6
7
8
9
10
11
                                      wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
  • start : job을 실행하고, 호출시 코루틴이 동작중이면 true, 준비 및 완료 상태면 false를 반환된다.
  • join : job을 실행하고, Job의 동작이 완료될떄까지 job을 호출한 코루틴을 일시중단한다.
  • cancel : 현재 코루틴에 종료를 유도하고, 대기하지 않는다.
  • cancelAndJoin : 현재 코루틴을 즉시 종료하라는 신호를 보낸후 대기한다.
  • cancelChildren : 하위 자식 코루틴을 종료시킨다.

Async

  • Async는 사실상 Launch와 같은일을 수행하는데, 차이점은 Launch는 Job객체를 반환하는 반면 , Async는 Deffered를 반환한다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public interface Deferred<out T> : Job {

    public suspend fun await(): T

    public val onAwait: SelectClause1<T>

    @ExperimentalCoroutinesApi
    public fun getCompleted(): T

    @ExperimentalCoroutinesApi
    public fun getCompletionExceptionOrNull(): Throwable?
    }

  • Deffered는 Job을 상속한 클래스로서, 타입 파라미터가 있는 제너릭 타입이며, Job과 다르게 await 함수가 정의되어 있다.

  • Deffered의 타입 파라미터는 Deffered 코루틴이 계산 후 돌려주는 값의 타입이다. 즉 Job은 Deffered<Unit>라고 생각할수도 있다.

정리하면 async는 코드 블록을 비동기로 실행 할 수 있고, async가 반환하는 Deffered의 await를 사용해서 코루틴이 결과 값을 내놓을떄까지 기다렸다가 결과값을 받아올 수 있다.

이떄 비동기로 실행할떄 제공되는 코루틴 컨텍스트에 따라 하나의 Thread안에서 제어만 왔다 갔다 할수도 있고, 여러 Thread를 사용할 수도 있다.

1
2
3
4
5
6
7
8
9
10
11
fun sumAll(){
runBlocking {
val d1 = async { delay(1000L); 1 }
println("after d1")
val d2 = async { delay(2000L); 2 }
println("after d2")
val d3 = async { delay(3000L); 3 }
println("after d3")
println("1+2+3 = ${d1.await()+d2.await()+d3.await()}")
}
}

실행로그를 보면 다음과 같다.

1
2
3
4
after d1
after d2
after d3
1+2+3 = 6 // 코루틴이 결과값을 내놓을떄까지 기다렸다가 결과값을 받아온다.

만약 위 코드를 직렬화해서 실행하면 최소 6초의 시간이 걸리겠지만, async로 비동기적으로 실행하면 3초가량이 걸리며 더군다나 위 코드는 별개의 thread가 아니라 main thread 단일 thread로 실행되어 이와 같은 성능상 이점을 얻을수 있다.

특히 이와 같은 상황에서 코루틴이 장점을 가지는 부분은 I/O로 인한 장시간 대기 , CPU 코어수가 작아 동시에 병렬적으로 실행 가능한 쓰레드 개수 한정된 상황 이다.

코루틴 컨텍스트

  • Launch , Async 등은 모두 CoroutineScope의 확장함수로 실제로 CoroutineScope는 CoroutineContext 필드를 이런 확장함수 내부에서 사용하기 위한 매개체 역할을 수행한다. 원한다면 launch,aync 확장함수에 CoroutineContext를 넘길수도 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

// --------------- launch ---------------

/**
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
* The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with a corresponding [context] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
**/
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//...
}

// --------------- async ---------------

/**
* Creates a coroutine and returns its future result as an implementation of [Deferred].
* The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
* The resulting coroutine has a key difference compared with similar primitives in other languages
* and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm.
* To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used.
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [context] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
*

*/
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
//...
}

그렇다면 CoroutineContext가 하는 역할은 무엇일까?

  • 코루틴이 실행중인 여러 작업과 디스패처를 저장하는 일종의 맵으로 이 CoroutineContext를 사용해 다음에 실행할 작업을 선정하고, 어떻게 Thread에 배정할지에 대한 방법을 결정한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun context(){
runBlocking {
launch {
// 부모 컨텍스트를 사용
println("use parent context :${getThreadName()}")
}
launch(Dispatchers.Unconfined) {
// 특정 Thread에 종속되지 않고, Main Thread 를 사용
println("use main thread :${getThreadName()}")
}
launch(Dispatchers.Default){
println("use default dispatcher :${getThreadName()}")
}
launch(newSingleThreadContext("MyOwnThread")){
// 직접 만든 새로운 Thread 사용
println("use thread that i created : ${getThreadName()}")
}
}
}

실행로그를 보면 같은 launch 확장함수를 사용한다고 하더라도 실행되는 CoroutineContext에 따라 다른 Thread상에서 코루틴이 실행됨을 확인할 수 있다.

1
2
3
4
use main thread :main
use default dispatcher :DefaultDispatcher-worker-1
use parent context :main
use thread that i created : MyOwnThread

Coroutine Builder 와 Suspending Function

  • 앞선 Launch , Async , runBlocking , CoroutineScope모두 코루틴 빌더라고 , 새로운 코루틴을 만들어주는 함수이다.

  • delay , yield 함수는 일시중단 함수로 이외에도 다른 일시중단 함수들이 존재한다.

    • withContext: 다른 컨텍스트로 코루틴 전환
    • withTimeOut : 일정 시간내 코루틴이 실행되지 않으면 예외 발생
    • withTimeOutOrNull : 일정 시간내 코루틴이 실행되지 않으면 null 반환
    • awaitAll : 모든 작업의 성공을 대기 , 만약 하나라도 예외 발생시 실패처리
    • joinAll : 모든 작업이 종료될떄까지 현재 작업 대기
Read more

Kotlin 고차 함수안에서 흐름 제어

람다 안에서의 return 문

  • Java와 다르게 Kotlin에서는 람다안에서 return 을 사용하면 람다로부터만 반환되는게 아니라, 그 람다를 호출하는 함수가 실행을 끝내고 반환된다.

  • 이를 Non-Local Return 이라고 부른다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// java
List.of("a","b","c").forEach((item)-> {
if (item.equals("a")){
return;
}
// b,c에 대해서도 실행됨.
});
// kotlin
listOf("a","b","c").forEach {
if (it.equals("a")){
return;
}
println(it)
// a에서 종료됨
}

Non-Local Return 이 적용되는 상황

  • 람다 안의 return 문이 바깥쪽 블록의 함수를 반환시킬 수 있는 상황은 람다를 인자로 받는 함수가 인라인 함수인 경우에만 가능하다. 즉 위의 forEach 함수는 인라인이기에 Non-local return 이 가능한 것이다.

Label을 사용한 Local return

  • 람다식안에서 람다의 실행을 끝내고 람다를 호출했던 코드의 실행을 이어서 실행하기 위해서는 Local Return을 사용하면 된다.

  • Non-Local Return 과 구분하기 위해서 Local Return에는 레이블을 추가해야 한다.

1
2
3
4
5
6
7
8
9
10
fun lookForBob(people : List<Person>){
people.forEach label@{
if (it.name == "Bob"){
println("found Bob!")
return@label
}
}
// Local Return을 사용하면 람다가 종료되고 람다 아래의 코드가 실행된다.
println("end of function")
}
  • 또는 인라인 함수의 이름을 label로 사용하여도 위의 코드와 동일하다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    fun lookForBob(people : List<Person>){
    people.forEach {
    if (it.name == "Bob"){
    println("found Bob!")
    return@forEach
    }
    }
    println("end of function")
    }

Anonymous Function 을 사용한 깔끔한 Local Return

  • 앞선 Local Return 방식은 레이블을 통해 구현하여, 조건 분기에 따라 여러번 Return 문을 기입해야 할떄는 반환문이 장황해 질 수 있다.

  • Anonymous Function은 (익명,무명함수) 코드 블록을 함수에 넘길때 사용할 수 있는 방법 중에 하나로 일반 함수와 차이점은 함수 이름과 파라미터 타입을 생략 가능하다는 점이다.

1
2
3
4
5
6
7
8
9
fun lookForBob(people : List<Person>){
people.forEach (fun(person){
if (person.name == "Bob"){
println("found Bob!")
return
}
println("end of anonymous function")
})

  • 기본적으로 익명함수도 반환타입을 기입해줘야 하지만, 함수 표현식 (expression body)를 바로 쓰는 경우에는 반환타입 생략이 가능하다.

정리

  • Inline 함수의 경우 람다안의 return문이 바깥쪽 block의 함수를 반환시키는 Non-Local Return을 사용할 수 있다.

  • Anonymous Function을 활용하면 람다를 대체해서 Local Return을 깔끔하게 Label 사용없이 작성 가능하다.

Read more

Execution Plan 이란?

Execution Plan

  • 사용자가 실행한 sql을 어떤 방식으로 처리할 것인지에 대한 실행 계획으로 Optimizer에 의해 만들어짐.

  1. SQL 파싱과정을 통해 SQL에 문법적인 오류 없는지 검사
  2. 해당 SQL을 이전에 실행한 적이 있는지 SHARED POOL 메모리 검사. 만약 실행 기록이 있다면 그대로 실행 (SOFT PARSING)
  3. 만약 실행기록이 없다면 새로 실행 계획을 세운다. (HARD PARSING)

접근 경로 (Access Path)

(참조 : https://docs.oracle.com/database/121/TGSQL/tgsql_optop.htm#TGSQL228)

  • 실행 계획에서 Optimizer게 데이터를 읽을때 데이터를 접근하는 방식
  1. FULL TABLE SCAN : 테이블 전체 데이터를 읽어 조건에 맞는 데이터 추출
  2. ROWID SCAN : ROWID를 기준으로 데이터를 추출
  3. INDEX SCAN: 인덱스 사용하여 추출

무조건 TABLE SCAN 보다 INDEX SCAN이 우월한가?

  • 사용용도에 따라 다름. 배치 처리를 위해 전체 데이터를 읽어들여 통계자료 생성시에는 TABLE SCAN이 유리

INDEX SCAN의 종류

  1. INDEX UNIQUE SCAN : 한건이하의 ROWID를 반환하는 방식
  2. INDEX RANGE SCAN : 한건이상의 필요한 데이터가 포함된 일정범위의 인덱스 블록을 오름차순으로 접근
  3. INDEX SKIP SCAN
Read more

Apache Kafka Consumer 동작 방식

Consumer

  • producer가 전송한 데이터는 kafka broker에 적재되며, consumer가 하는 역할은 broker로부터 data를 가져와 필요한 처리를 수행한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SimpleConsumer {

private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);

private final static String TOPIC_NAME = "test";
// Consumer Group
private final static String GROUP_ID = "test-group";

public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ,KafkaConst.SERVER_ADDR);
configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
logger.info("record :{}" , records);
for (ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
}
}
}
}
  • Consumer Group 을 통해 Consumer의 목적을 구분한다. 동일 기능을 수행하는 app인 경우 하나의 Group으로 묶어서 관리하며, Consumer Group을 기준으로 Consumer Offset을 관리한다. 따라서 subscribe method를 사용해 특정 토픽을 구독하는 경우에 Consumer Group을 선언해야 한다.
  • Consumer Group을 선언하지 않으면 어떤 그룹에도 속하지 않는 Consumer로 동작하게 된다.
  • Consumer는 poll method를 호출하여 데이터를 가져와 처리한다. 이떄 Duration 타입의 인자값을 받는데, 이 값은 Consumer 버퍼에 데이터를 가져오는 타임아웃 간격을 뜻한다.

파티션 할당 Consumer

  • Consumer 운영시 위와 같이 subscribe 형태로 특정 토픽을 구독하는 형태로 사용하는것 외에도 파티션까지 명시적으로 선언할 수 있다.
    1
    consumer.assign(TOPIC_NAME,PARITITON_NUM);
    이때는 Consumer가 특정 Topic의 특정 파티션에 직접적으로 할당됨으로 rebalancing 과정이 안일어나는데 rebalancing에 대한 설명은 아래에 정리하였다.

Consumer 운영 방식

  1. 1개 이상의 Consumer로 이루어진 Consumer Group을 운영한다. 이떄 Consumer들은 Topic의 1개 이상의 파티션들에 할당되어 데이터를 가져갈 수 있다.

이떄 1개의 파티션은 최대 1개의 Consumer에 할당가능한 반면 1개의 Consumer는 여러 개의 Partition에 할당될 수 있다. 따라서 Consumer Group의 Consumer개수는 가져가고자 하는 Topic의 파티션 개수보다 같거나 작아야 한다.

물론 Consumer개수가 파티션의 개수보다 많게 설정할 수는 있겠지만, 파티션은 1개의 Consumer까지만 가질 수 있기 때문에 놀고 있는 Consumer는 Thread만 차지하고 아무 데이터도 처리하지 않음으로 불필요하다.

Consumer Group은 다른 Consumer Group과 격리되는 특징을 가지고 있다. 따라서 Kafka Producer가 보낸 데이터를 각기 다른 역할을 하는 Consumer Group끼리 영향을 받지 않게 처리할 수 있다는 장점을 가진다.

Rebalancing

  • Rebalancing : Consumer Group으로 이루어진 Consumer 들 중 일부 Consumer에 장애가 발생하면 , 장애가 발생한 Consumer에 할당된 partition은 장애가 발생하지 않은 Consumer에 소유권이 넘어가는 것

  • Consumer가 추가되거나 장애발생으로 제외되는 상황에서 발생한다.

  • Rebalancing 발생 시 Consumer들이 Topic의 데이터를 읽을 수 없기 떄문에 빈번히 Rebalancing이 일어나는 상황은 피해야 한다.

  • Broker중 한 대가 Group Coordinator(그룹 조정자)로서, Rebalancing을 발동시키는 역할을 한다.

  • Rebalancing 직전에 데이터를 커밋하지 않아서, consumer가 처리했던 데이터의 오프셋이 기록되지 않고, 또 다시 데이터를 중복처리하는 경우가 생길수도 있다. org.apache.kafka.clients.consumer.ConsumerRebalanceListener 는 rebalancing 직후 , 직전에 호출되는 method를 가지고 있다.

1
2
3
4
5
6
7
8
9
10
public class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// partition 할당 완료시 호출되는 메소드
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// partition 할당이전에 호출되는 메소드로 offset commit 하는 로직이 주로 들어간다.
}
}

Commit

  • Consumer는 Broker로부터 Data를 어디까지 가져갔는지 Commit을 통해 기록한다.

  • 특정 Topic의 Partition을 어떤 Consumer Group 이 몇 번쨰까지 가져갔는지 Broker 내부에서 사용되는 내부 토픽에 기록된다.

  • offset commit은 Consumer Application에서 명시적/비명시적으로 수행가능한데, default는 poll method가 수행될때 일정간격마다 offset을 commit하도록 설정되어 있다. 이를 비명시 오프셋 커밋이라고 부른다.
1
2
enable.auto.commit=true
auto.commit.interval.ms=설정된시간값 // 설정시간값이후에 그 시점까지 읽은 레코드의 오프셋을 커밋한다.
  • 비명시 오프셋 커밋의 장점은 poll method 실행시 auto.commit.interval.ms 설정값 이후면 오프셋을 자동으로 커밋해주어, 코드상 수정이 필요없지만 반대로 단점은 poll method 실행 이후에 rebalancing이나 consumer 장애발생시에 오프셋이 커밋되지 않아, 데이터 중복이나 유실이 일어날 수 있는 가능성이 있다.

  • 명시적으로 오프셋 커밋을 수행하려면 commitSync method를 통해 poll method를 통해 반환된 record의 가장 마지막 오프셋을 기준으로 커밋을 수행하면 된다. commitSync method는 동기적으로 broker에게 응답을 기다리지만 이를 비동기적으로 수행하고 싶다면 commitAsync method를 실행하면 된다.

동기 오프셋 커밋

1
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  • Broker로 Commit을 요청한 이후에 Commit이 완료될떄까지 기다린다. 따라서 비동기 오프셋 커밋보다 데이터 처리량은 떨어진다.

가장 최근에 받아온 레코드의 오프셋을 커밋하는 경우

1
2
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
consumer.commitSync(); // 동기 오프셋커밋 (가장 마지막 레코드의 오프셋을 커밋함)

개별 레코드 단위로 커밋하는 경우로, topic,partition,offset등의 정보를 담은 map을 파라미터로 넘겨준다. 이때 offset은 넘겨준값부터 레코드를 넘겨주기 떄문에 + 1 한 값을 넣는다.

1
2
3
4
5
6
7
8
9
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

for (ConsumerRecord<String, String> record : records) {
currentOffset.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1,null));
consumer.commitSync(currentOffset);
}

비동기 오프셋 커밋

  • Broker로 Commit을 요청한 이후에 응답을 기다리지 않고, 데이터를 처리한다.
1
consumer.commitAsync();
  • consumer.commitAsync method는 콜백 인터페이스를 제공하며 , 비동기요청이 완료되었을때 수행할 행동을 지정할 수 있다.
1
2
3
4
5
6
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
// commitAsync 응답을 받아서 처리하는 callback method
}
});

Fetcher instance

1
consumer.poll(Durtaion.ofSeconds(1));

consumer는 poll method를 통해 record를 반환받지만, 실제로 이 method가 실행될떄 Broker cluser로부터 데이터를 가져오는 것이 아니라, Consumer Application 실행시점에 내부에서 미리 Fetcher Instance가 생성되어 poll method 호출전에 record를 미리 내부 Queue로 가져온다.

따라서 poll method는 Fetcher instace의 queue에 있는 record를 반환받는 것이다.

Read more

Apache Kafka Producer 동작 방식

Kafka Producer

  • Producer Application은 Kafka Broker에 전송할 데이터를 선언하고, Broker의 Topic내 특정 Partition에 Data를 전송한다.

  • Producer는 Leader Partition을 가지는 Broker와 직접 통신하며, Follower Partition으로 데이터 복제는 이 Leader Partition으로부터 데이터가 복제된다.

  • Kafka Producer 객체가 send() method를 호출하면 , Partitioner가 Topic의 어떤 Partition으로 데이터를 전송할지 결정한다. Partitioner를 별도로 설정하지 않으면 DefaultPartitioner가 동작한다.

  • Partitioner에 의해 Topic의 어떤 Partition으로 보낼지가 결정되면 Accumulator에 데이터를 Buffer에 쌓아놓고 배치 전송한다.

Partitioner 종류

UniformStickyPartitioner , RoundRobinPartitioner가 있는데 둘 다 KEY가 존재하는 경우에는 KEY의 해시값을 통해 partition을 매칭시킨다. KEY가 존재하지 않을떄 동작이 다르다.

  • UniformStickyPartitioner : Accumulator에서 데이터가 배치로 모두 묶일떄까지 대기하였다가, 모두 동일한 파티션에 전송
  • RoundRobinPartitioner : 데이터가 들어오는 대로 Partition을 Round-Robin 방식으로 순회하며 전송한다.

성능적인 측면에서는 UniformStickyPartitioner가 낮은 리소스 사용률,높은 처리량을 가진다.

Kafka Producer Application 예시

먼저 kafka-client dependency를 추가한다. 여기 예제에서는 빌드 및 의존성 관리 도구로서 Gradle을 사용하였다.

1
2
3
4
5
// build.gradle 
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30' // for logging
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleProducerApp {

private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);

// Broker 의 토픽이름
private final static String TOPIC_NAME = "test";

// Broker Ip 주소와 Port 번호
private final static String SERVER = "BROKER_PUBLIC_IP_ADDR:9092";


public static void main(String[] args) {

// Kafka Producer 객체를 생성하기 위한 필수 옵션
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,SERVER);
// Message 키와 값을 직렬화하기 위한 직렬화 클래스
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Producer 객체 생성
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(config);

String testMessage = "testMessage";
// Broker로 보낼 데이터 생성 . 키를 별도로 지정하지 않으면 null이 들어가며, Key와 Value는 당연히 직렬화 클래스와 타입이 동일해야 한다.
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testMessage);
// 배치 전송 (즉각적인 전송이 아니라, 배치단위로 묶어서 전송한다. )
producer.send(record);
logger.info("record : {}",record);
// producer 내부 버퍼의 레코드 배치를 Broker로 전송한다.
producer.flush();
producer.close();
}
}

실제로 로그에 찍힌 내용을 보면 정상적으로 Broker에 Record가 송신된것을 확인할 수 있다.

추가로 send method의 return 값은 Future 객체로 원한다면 동기적으로 record를 보낸 데이터를 가져올 수 있다.

1
2
3
4
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testKey,testMessage);
Future<RecordMetadata> result = producer.send(record);
RecordMetadata recordMetadata = result.get();
logger.info("recordMetadata : {}",recordMetadata); // recordMetadata : test-0@4 0번 파티션의 4번 오프셋에 저장됨

동기로 데이터를 확인할 경우, Producer 서버는 데이터 응답 전까지 대기하는 단점이 있다. 이를 커버하기 위해서 CallBack 인터페이스도 제공된다.

1
2
3
4
5
6
7
8
9
10
11
12
public class ProducerCallBack implements Callback {

private static final Logger logger = LoggerFactory.getLogger(ProducerCallBack.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
logger.error(exception.getMessage());
return;
}
logger.info(metadata.toString());
}
}

CallBack 인터페이스 구현체를 하기와 같이 등록해서 비동기로 결과를 받아 처리할 수 있다. 다만 비동기로 받아올 경우에는 당연히 데이터 순서는 보장되지 않는다, 따라서 데이터 순서가 중요한 경우에는 동기로 처리해야 한다.

1
producer.send(record,new ProducerCallBack()); // INFO ProducerCallBack - test-0@5
Read more

Apache Kafka

Kafka 의 역할

  • 각각의 application끼리 연결하여 데이터 처리하는 방식이 아니라 중앙에서 처리할 수 있도록 데이터 스트림을 관리해주는 프레임워크

  • application끼리 직접적으로 데이터를 처리하는 방식이 아니라, 카프카가 중앙에 배치됨으로써, 소스 application과 타켓 application 간의 의존도를 약화시킨다. 어떤 타켓 application으로 데이터를 보내든 kafka를 통해 송신한다.

  • 데이터를 보내는 application을 Producer , 데이터를 받는 application을 Consumer 라고 한다.

  • Kafka는 대개 3대 이상의 서버 (Kafka가 운영중인 서버를 broker라고도 부른다.)에서 분산 운영해서 데이터를 중복 저장함으로 고가용성을 제공한다.

Kafka의 장점

  • Producer가 Broker로 데이터를 송신할때, Consumer가 Broker로부터 데이터를 수신할때, 묶어서 전송한다. 즉 네트워크 통신 횟수를 최소화 한다.

  • 동일 데이터를 Kafka Broker에 여러개를 분산 저장하고, 병렬 처리도 가능하다.

  • 들어오는 데이터에 따라 Scale-In 또는 Scale-Out이 가능하다.

  • 영속성 : 데이터를 메모리에 저장하지 않고, Broker내 File System에 저장한다. 따라서 FailOver되더라도 데이터가 소실되지 않는다.

  • 고가용성 : 3개 이상의 서버로 운영되는 Kafka cluster를 통해서 일부 서버에 장얘가 생기더라도 무중단 서비스 제공이 가능하다.

Read more

PL/SQL - 동적 SQL

동적 SQL이란?

  • 실행 시점 , 즉 런타임에 SQL문장이 달라지는 경우로 대표적으로 검색시, WHERE 절이 동적으로 추가되는 예시가 있다.

  • 그 외에도 PL/SQL 블록 내에서 DDL 문을 실행하는 경우, 또는 PL/SQL 블록 내에서 ALTER SYSTEM/SESSION 명령어를 실행하여 세션별 파라미터 (ex) NLS_LANG : 언어 정보) 를 설정하는 경우에도 동적 SQL을 사용할 수 있다.

  • PL/SQL에서 동적 SQL을 처리하는 방법은 2가지가 있다. 두 방법 모두 SQL문장 자체를 문자열 형태로 조합하여 실행한다.

  1. Native Dynamic SQL (원시동적 SQL , NDS)
  2. DBMS_SQL 시스템 패키지

NDS

  • EXECUTE IMMEDIATE 문 : 가장 기본적인 동적 SQL 실행 형태이다.

  • 구문은 다음과 같이 사용한다. INTO 다음에 결과값을 매핑할 OUT변수 , 동적으로 Binding 될 매개변수를 USING 절에 명시한다.

1
2
3
EXECUTE IMMEDIATE 'SQL문자열'
INTO OUT변수 ...
USING 매개변수 ...

EXECUTE IMMEDIATE 실행 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
DECLARE 
/*출력값을 매핑할 변수*/
vn_emp_id employees.employee_id%TYPE;
vs_emp_name employees.emp_name%TYPE;
vs_job_id employees.job_id%TYPE;

vs_sql VARCHAR2(1000);

/*Bind 변수 설정*/
vs_job employees.job_id%TYPE :='SA_REP';
vn_sal employees.salary%TYPE := 7000;
vn_manager employees.manager_id%TYPE := 148;
BEGIN
vs_sql := '
SELECT employee_id , emp_name , job_id
FROM employees
WHERE job_id = :a
AND salary < :b
AND manager_id = :c
';

EXECUTE IMMEDIATE vs_sql
/*출력값을 매핑할 매개변수*/
INTO vn_emp_id , vs_emp_name , vs_job_id
/*조건전에 Binding 될 Bind 변수*/
USING vs_job , vn_sal , vn_manager;

DBMS_OUTPUT.PUT_LINE('emp_id : ' || vn_emp_id);
DBMS_OUTPUT.PUT_LINE('emp_name: ' || vs_emp_name);
DBMS_OUTPUT.PUT_LINE('job_id : ' || vs_job_id);
END;

Bind 변수

  • Bind 변수를 사용하여 조건문을 런타임에 만들 수 있다.

  • 상수를 사용하는 경우에는 오라클이 매번 실행 계획을 세운다. 만약 Bind 변수를 사용하는 경우에는 sql 구문이 변경되지 않으므로, 이전에 세웠던 실행 계획을 활용해 처리한다. 즉 성능적으로 더 빨라진다. 따라서 항상 Bind 변수를 사용하는게 좋다.

  • 일반 SQL문에서는 순서와 타입으로만 매핑된다. 즉 이름은 가독성에만 영향을 끼치고 실제 실행은 USING절에 오는 순서와, 타입 기반으로 실행된다. ( 프로시저로 매핑할떄는 변수 이름까지 맞춰주어야 한다. )

1
2
3
4
WHERE test_condition1 = :바인드변수명1 
AND test_condition2 = :바인드변수명2
..
USING 바인드변수1, 바인드변수2;
  • 프로시저안에서 DDL문 실행은 불가능하지만, 동적 SQL을 사용하면 실행이 가능하다. (ALTER SESSION 명령문과 같이 세션 파라미터를 변경하고자 할때도 마찬가지이다. )
Read more

Transaction 과 격리 수준

Transaction 격리성

  • Transaction은 ACID 라 하는 원자성 (Atomicity) , 일관성 (Consistency) , 격리성 (Isolation) , 지속성 (Durability) 을 보장해야 한다.
  • 이중에 Transaction 격리 수준에 관련된 ACID 특성인 격리성만 간략하게 정리하면 , 동시에 실행되는 트랜잭션이 서로에게 영향을 미치도록 격리한다는 뜻이다.

왜 격리 수준을 나누어서 관리하는가?

  • 격리성이란 트랜잭션이 서로에게 영향을 미치지 않도록 해야 한다는 성질인데, 이를 완벽하게 100% 보장하려면 동시성과 관련된 성능 저하가 야기된다. 예를 들면 모든 트랜잭션이 순차적으로 실행되고 끝나야만 이를 보장할 수 있다.

ANSI 표준 Transaction 격리 수준

  • ANSI 표준에서는 트랜잭션 격리 수준을 4단계로 나누어서 정의하고 있다.
  1. READ UNCOMMITED (커밋되지 않은 읽기)
  2. READ COMMITTED (커밋된 읽기)
  3. REPEATABLE READ (반복 가능한 읽기)
  4. SERIALIZABLE (직렬화 가능)

(1->4으로 갈수록 격리 수준이 높아지고, 동시성은 떨어진다. )

READ UNCOMMITED

  • 커밋하지 않은 데이터를 읽을 수 있다.
  • Transaction A가 Transaction B 가 커밋하기 전에 수정한 데이터를 조회할 수 있다.
  • 만약 Transaction B가 데이터를 수정하고 롤백을 해버렸는데, Transaction A는 이 ROLLBACK한 데이터를 참조하고 있는 경우 데이터 정합성 문제가 발생할 수 있다. (Dirty Read)

READ COMMITED

  • 커밋한 데이터만 읽을 수 있다.
  • Dirty Read가 발생하지 않는다.
  • 만약 Transaction A 가 데이터를 수정하고 Commit하면 , Transaction B가 Transaction A가 변경하기 전의 데이터를 읽고, Transaction A가 변경하고 나서 커밋한 뒤에 데이터를 다시 읽었을때, 값이 다르다. 즉 Tranascation B가 실행되는 도중에 Transaction A 가 값을 변경하고 커밋해버리면 다시 읽었을떄 값이 달라지는 문제이다. (NON-REPEATABLE READ)
  • 보통 데이터베이스는 READ COMMITED 격리 수준을 기본으로 제공한다.

REPEATABLE READ

  • 동일 Trnasaction 내에서 한번 조회한 데이터를 다시 조회할때도 동일한 값이 조회되는 격리 수준이다. 즉 NON-REPEATABLE READ가 발생하지 않는다.
  • 만약 Transaction B가 특정 결과집합을 조회하고 나서, Transaction A가 데이터를 insert 하고 커밋하면 , Transaction B가 결과집합을 다시 조회하였을때, 데이터 값이 변경되진 않지만 추가된 상태를 읽는다. (PHANTOM READ)

SERIALIZABLE

  • 가장 엄격한 Transaction 격리 수준으로 Dirty Read , NON-REPEATABLE READ , PHANTOM READ 어떠한 문제도 발생하지 않는다.

Oracle Transaction Isolation Level

  • read - only mode의 경우에는 serializable isolation level과 유사하나 , SYS 유저가 아닌 경우에는 데이터 변경을 허용하지 않는다. 즉 일반 유저는 serializable isolation level 에서 데이터 변경까지 허용이 불가능한 격리 수준이 가장 높은 모드이다.
Read more

PL/SQL - Package

패키지

  • 논리적 연관성이 있는 PL/SQL 타입 , 변수 , 상수 , 서브 프로그램 , 커서 , 예외 등의 항목을 묶어놓은 객체
  • 컴파일 과정을 거쳐 DB 에 저장된다.
  • 다른 프로그램에서 참조 가능하다. (WAR , JAR -> DB 패키지 호출 , DB 패키지내에서 다른 DB 패키지의 프로시저 호출 )
  • 패키지의 하위 서브 프로그램 (Ex) 프로시저 ) 를 호출하면 해당 패키지 전체가 메모리에 올라간다.

패키지 구조

  • 패키지는 선언부 (스펙) 과 본문 (바디) 두 부분으로 구성된다. 패키지는 선언부만 작성하고 컴파일하여 DB에 저장할 수 있다. 즉 본문은 나중에 작성하여도 된다.

패키지 선언부 구문

1
2
3
4
5
6
7
8
9
10
11
12
CREATE OR REPLACE PACKAGE 패키지명 IS 
TYPE_ 구문;
상수명 CONSTANT 상수_타입;
예외명 EXCEPTION
변수명 변수_타입;
커서 구문;

FUNCTION 함수명(매개변수 IN 매개변수_타입 )
RETURN 반환타입;

FUNCTION 프로시저명(매개변수 [IN,OUT,INOUT] 매개변수_타입);
END 패키지명;
  • 패키지 선언부는 데이터와 서브 프로그램 (프로시저,함수) 영역으로 나눌 수 있다.

패키지 본문 구문

  • 패키지 바디는 선언부에 정의한 서브 프로그램 명세에 대한 구현체이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE OR REPLACE PACKAGE BODY 패키지명 IS 
TYPE_ 구문;
상수명 CONSTANT 상수_타입;
커서 구문;

FUNCTION 함수명 (매개변수 IN 매개변수_타입)
RETURN 반환타입 IS 반환타입명
BEGIN
/* 함수 로직 */
END 함수명;

PROCEDURE 프로시저명 (매개변수 [IN,OUT,INOUT])
IS
BEGIN
/* 프로시저 로직 */
END 프로시저명;
END 패키지명;
Read more