기본 콘텐츠로 건너뛰기

[코드로 보는 카프카] Producer: BufferPool

 카프카 프로듀서는 메시지를 전송할 때 ByteBuffer를 사용한다. ByteBuffer는 생성하는 쓰레드에서 큰 메모리 단위를 생성하거나 여러 버퍼에 할당된 메모리를 해제할 때 쓰레드는 기아가 되거나 데드락이 될 수 있다. 이런 문제 때문에 프로듀서는 BufferPool을 사용한다.  이 BufferPool은 충분한 메모리가 확보 될 때 까지 쓰레드를 기다리게 할 수 있고, 이미 생성된 ByteBuffer를 재사용 할 수 있으며, 제한된 메모리로 동작할 수 있게 한다.

실제 BufferPool.java는 메트릭 관련 코드도 있고 다른 변수들이 있어 예시보다는 쬐~금 복잡하지만 allocate와 deallocate 부분만 간단히 구현해보고 메모리를 제한하는 방법과 쓰레드를 처리하는 동작 방식에 대해 알아 두기로 한다.

코드를 보기 앞서 ByteBuffer.allocate와 ReentrantLock에 대해 먼저 알아보자.

ByteBuffer.allocate  vs ByteBuffer.allocateDirect


BufferPool은 allocate를 사용한다. 왜 allocateDirect를 사용하지 않는지는 여기에 설명 되어 있는데, 간단히 요약하면 아래 정도의 내용이 된다.
생명주기가 짧거나 자주 사용되지 않는 객체에는 다이렉트 버퍼를 사용하지 않아야 한다. 왜냐하면, 다이렉트 버퍼는 OS 종속적인 네이티브 코드를 사용하기 때문에 힙기반 버퍼보다 생성과 메모리 반환 비용이 높고 가비지 컬렉터의 영역 밖이라 메모리 누수가 있을 수 있다. 용량이 큰 다이렉트 버퍼를 빈번하게 할당하면 OutofMemorryError가 생길 수 있다.

그리고, FileChannel 에서 non-direct Buffer와 direct Buffer 속도비교 (FileChannel and non-direct buffer vs. FileChannel and direct buffer, 중국어! 코드와 그림만 보자)
버퍼가 256KB보다 작을땐 non-direct Buffer가 훨씬 빠르고, 256KB보다 클땐 direct Buffer가 약간 빠르다


ReentrantLockCondition


Condition에는 await 메소드가 있다. 조건이 만족스럽지 못할 때 현재 쓰레드를 멈추고 기다릴 때 사용한다. 지정된 시간 후 스스로 깨어날 수는 있지만 다른 쓰레드에서 signal이라는 메소드를 호출해 주어도 await 이후 로직을 수행 할 수 있다. 자바 Condition 문서에도 예제가 포함되어있지만,, 다른 예제를 만들어 보았다.  두개의 쓰레드가 아래(↓) 위(↑)  화살표쌍을 출력하는 코드다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class ShareBasic {

    int iterCount;
    Lock lock = new ReentrantLock();
    Condition condition;
    int interCount;

    // volatile: https://twitter.com/_freestrings/status/688963217784082432
    volatile boolean isUp;

    public ShareBasic(int iterCount) {
        this.iterCount = iterCount;
        condition = lock.newCondition();
    }

    void doUp() {
        lock.lock();
        try {
            while (!isUp) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            isUp = false;
            System.out.println("↑");
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    void doDown() throws InterruptedException {
        if (interCount++ == iterCount) {
            throw new InterruptedException();
        }

        lock.lock();
        try {
            while (isUp) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            isUp = true;
            System.out.print("↓");
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

}

동작방식은 아래와 같다.
  1. 9L: isUp은 기본 false
  2. 19L: 방향이 up이 아닐동안 condition.await 한다. 즉, 현재 쓰레드를 블럭 시킨다.
  3. 48L: 방향을 up으로 바꾸고 '↓'를 출력한뒤, condition.signal()을 호출해 현재 조건(condition) 이 만족 할 때까지 await중인 다른 쓰레드를 깨운다.
  4. 26L: await가 풀리고, 방향을 바꾼뒤 '↑'을 출력한다.
  5. 36L: 반복하다가 지정된 카운트에 도달하면 예외를 던진다


실행을 하려면 두개의 쓰레드가 각각 doUp()과 doDown()을 호출 하도록 만들어 주면 되는데,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
int iterCount = 20;
ShareBasic share = new ShareBasic(iterCount);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit((Runnable) () ->
        IntStream.range(0, iterCount).forEach(i -> share.doUp())
);
executorService.submit((Runnable) () -> {
    try {
        while (true) {
            share.doDown();
        }
    } catch (InterruptedException e) {
        executorService.shutdown();
    }
});

iterCount에 따라 다르겠지만 위아래 화살표 쌍이 한라인씩 보이게 된다.



아래는 카프카 프로듀서에서 사용하는 BufferPool이 ReentrantLock과 Condition을 어떻게 사용하고 있는지 그리고 여러개의 요청 쓰레드간 상태 흐름이 어떻게 되는지 살펴 보는 코드이다. 위 ShareBasic 예제와 비슷하지만 쓰레드별로 Condition을 보관하는 데크(Deque)가 추가 되어있다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Share {
    ReentrantLock lock = new ReentrantLock();
    Deque<Condition> waiters = new ArrayDeque<>();
    AtomicInteger counter = new AtomicInteger(0);

    public void awaitTest(String name, String prefix) {
        lock.lock();
        try {
            int i = counter.incrementAndGet();
            Condition condition = lock.newCondition();
            waiters.addLast(condition);

            System.out.println(prefix + name + i);

            try {
                condition.await(1000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                System.out.println("###" + e.getMessage());
            }

            System.out.println(prefix + "\t" + name + i);

            Condition condition1 = waiters.removeFirst();
            if (condition != condition1) {
                throw new IllegalStateException(Thread.currentThread().getName() + ": Wooops");
            }

        } finally {
            lock.unlock();
        }
    }

    public void signalTest() {
        lock.lock();
        try {
            Condition condition = waiters.peekFirst();
            if (condition != null) {
                condition.signal();
            }
        } finally {
            lock.unlock();
        }

    }
}

코드 내용을 살펴 보면,
  1. 9L: awaitTest가 호출 될때마다 새로운 Condition을 생성하고 큐에 넣는다.
  2. 16L: 호출한 쓰레드가 기다릴 시간을 1초로 저정한다. (1초가 지나면 자동으로 블럭된 쓰레드가 블럭이 풀리면서 이후 로직을 수행하게 된다. 실제 BufferPool에선 자동으로 블럭이 풀리면 타임아웃 에러를 발생시키기 때문에 해당 쓰레드는 지정된 시간동안 ByteBuffer를 할당받지 못했다는 의미가 된다)
  3. 23L: 큐헤더에 있는 Condition을 하나 꺼낸다. 그리고 9L에서 생성된 Condition인지 비교하고 다르면 예외를 던진다. 블럭된 시간동안(await 동안) 다른 쓰레드에서 큐헤더의 Condition을 풀어주지 않으면 해당 조건이 성립된다.
  4. 36L: 큐헤더에 있는 Condition을 꺼내지는 않고 대기중인 쓰레드가 깨어날 수 있게 호출만 해준다.

위 코드는 여러 개의 요청 쓰레드가 블럭된 후 어떻게 깨어나며 쓰레드별로 타임아웃을 어떻게 만드는지가 포인트다. 그래서 두 개의 쓰레드로 awaitTest()를 호출하고 또 하나의 쓰레드는 siginalTest()를 호출해 블럭된 쓰레드를 깨워주는 방식으로 테스트할 수 있다. ShareBasic에서 처럼 쓰레드별 Condition으로 전달한 이름이 연속 출력되면 의도했던 결과가 된다.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Share share = new Share();
new AwaitTestThread("A", share, "").start();
new AwaitTestThread("B", share, "\t\t\t\t").start();

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        share.signalTest();
    }
}, 100);

결과


SimpleBufferPool


자, 이제 SimpleBufferPool 을 살펴보자.


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class BufferPool {

    // 풀링된 메모리를 제외하고 사용할 수 있는 메모리
    long availableMemory;

    // 기본 Buffer 크기
    int poolableSize;

    ReentrantLock lock = new ReentrantLock();

    // deallocate 될 때 ByteBuffer를 쌓아둔다.
    Deque<ByteBuffer> free = new ArrayDeque<>();

    // 메모리를 할당받으려고 대기중인 쓰레드를 깨우기 위한 Condition을 모아둔다.
    Deque<Condition> waiters = new ArrayDeque<>();

    BufferPool(long availableMemory, int poolableSize) {
        this.availableMemory = availableMemory;
        this.poolableSize = poolableSize;
    }

    ByteBuffer allocate(long maxTimeToBlock) throws
            TimeoutException,
            InterruptedException {

        this.lock.lock();

        try {
            // 1. 풀링된 버퍼가 있으면 꺼내 쓴다
            if (!this.free.isEmpty()) {
                return this.free.pollFirst();
            }

            // 2. 메모리를 할당 할 수 있으면
            // 풀링된 버퍼는 꺼내썼기 때문에 새로 ByteBuffer를 생성한다.
            if (canBeNewlyAllocated()) {
                freeUp(this.poolableSize);
                this.availableMemory -= poolableSize;

                // !
                lock.unlock();
                return ByteBuffer.allocate(poolableSize);
            }

            Condition moreMemory = this.lock.newCondition();
            waiters.addLast(moreMemory);

            // 3. 가용한 메모리가 없기 때문에 풀에 버퍼가 반납되거나
            // 이전 쓰레드에서 메모리를 확보해 주기를 기다린다.
            ByteBuffer buffer = blockAwaitUntilAvailable(
                                  moreMemory, 
                                  maxTimeToBlock
                                );

            Condition removed = this.waiters.removeFirst();
            if (moreMemory != removed) {
                throw new IllegalStateException(
                  "블럭시간 - moreMemory.awaite(..) 동안 deallocate 되지 않은 경우"
                );
            }

            // deallocate 시점이 아니더라도 가용메모리가 확보 되었거나
            // 그 사이 풀에 반납된 버퍼가 있을 수 있으니 바로 다음 쓰레드를 깨워준다.
            if (this.availableMemory > 0 || !this.free.isEmpty()) {
                if (!this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            }


            lock.unlock();

            if (buffer == null)
                return ByteBuffer.allocate(poolableSize);
            else
                return buffer;

        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }

    }

    void deallocate(ByteBuffer buffer) {
        lock.lock();
        try {
            if (this.poolableSize == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                this.availableMemory += this.poolableSize;
            }
            System.out.println("Waiters: " + this.waiters.size());
            Condition moreMemory = this.waiters.peekFirst();
            if (moreMemory != null)
                moreMemory.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 가용한 메모리와 풀링된 버퍼를 모두 합해도 poolableSize 만큼 되지 않는 경우.
     * <p>
     * 현재 쓰레드를 블럭한뒤 deallocate 로 풀에 버퍼가 반납 되는 시점이나,
     * 이전 쓰레드가 버퍼 할당을 마치는 시점에 poolableSize가 확보 되었는지 체크한다.
     */
    private ByteBuffer blockAwaitUntilAvailable(
        Condition moreMemory, 
        long maxTimeToBlock
    ) throws InterruptedException, TimeoutException {

        int accumulated = 0;
        ByteBuffer buffer = null;

        while (accumulated < this.poolableSize) {

            System.out.println("Start block..");
            if (!moreMemory.await(
                    maxTimeToBlock, 
                    TimeUnit.MILLISECONDS)
            ) {
                throw new TimeoutException(
                  "지정된 시간동안 메모리를 할당하지 못함"
                );
            }

            if (accumulated == 0 && !this.free.isEmpty()) {
                // 그 사이 deallocate 되면서 버퍼가 풀에 반납되었을 수 있다.

                buffer = this.free.pollFirst();
                accumulated = poolableSize;
            } else {
                // 가용한 메모리 확보

                freeUp(poolableSize - accumulated);
                int got = (int) Math.min(
                    poolableSize - accumulated, 
                    this.availableMemory
                );

                this.availableMemory -= got;
                accumulated += got;
            }
        }

        return buffer;
    }

    /**
     * 사용하고 있는 메모리와 풀에 있는 버퍼크기의 총합이 기본 버퍼크기보다 크다면 
     * 새로 버퍼를 생성 할 수 있다.
     */
    private boolean canBeNewlyAllocated() {
        return this.availableMemory + this.free.size() * this.poolableSize >= this.poolableSize;
    }

    /**
     * 할당하려는 크기보다 작으면 pooling된 버퍼를 해제해서 가용한 메모리를 확보한다.
     */
    void freeUp(int size) {
        while (!this.free.isEmpty() && this.availableMemory < size) {
            this.availableMemory += this.free.pollLast().capacity();
        }
    }

}

짧게 만든다고 만들었는데,, 원래 코드와 차이도 적고 길다. 그래서 자세한? 주석을 넣었다. -:)

Condition.signal()을 호출하는 부분과 Condition.await(..) 부분을 중심으로 코드를 살펴보면 쓰레드 블러킹은 위에서 보여준 예제 흐름과 동일하다. 메모리 관리부분을 중심으로 살펴 보자.
  1. 156L: Deque<ByteBuffer> free.size() * poolableSize + availableMemory가 사용중인 메모리가 된다
  2. 162L: 할당해야 하는 즉, 할당하려는 메모리가 부족하면 풀링된(deallocate면 풀에 반납됨) ByteBuffer를 제거해 availableMemory를 재계산하는 로직이다. 이 freeUp 호출시점은 allocate 중에, 풀에 반납된 버퍼가 없거나, 더이상 할당 받을 메모리가 없어서 쓰레드가 가용한 메모리가 생길 때 까지 기다린 후 다시 availableMomory 재계산때가 호출시점이 된다.
  3. 4L: 사용할 수 있는 메모리
  4. 5L: 기본 할당 크기로 실제는 프로듀서의 batch.size 설정이다
  5. 56L: 쓰레드가 블럭된 동안 다른 쓰레드에서 블럭을 풀어주는게 아니라 타임아웃으로 블럭이 풀리면 해당 조건이 만족되는데, 타임아웃으로 블럭이 풀리면 예외가 발생하기 때문에 실제로는 발생하지 않아야 정상이다.

끝으로 BufferPool을 사용하는 코드를 살펴보자.
실제 BufferPool은 메시지 전송을 요청하는 쓰레드에서 BufferPool.allocate()하고 응답을 처리하는 쓰레드에서 BufferPool.deallocate() 한다.
1. 가용메모리가 10MB이고 10개 쓰레드가 1MB씩 할당 했으므로 정상.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
long availableMemory = 1000 * 1000 * 10;
int poolableSize = 1000 * 1000;
BufferPool bufferPool = new BufferPool(availableMemory, poolableSize);
IntStream.range(0, 10)
        .parallel()
        .forEach(i -> {
            try {
                bufferPool.allocate(1000);
            } catch (TimeoutException | InterruptedException e) {
                e.printStackTrace();
            }
        });

2. 가용메모리가 10MB이고 11개 쓰레드가 1MB씩 할당 했으므로 에러발생.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
long availableMemory = 1000 * 1000 * 10;
int poolableSize = 1000 * 1000;
BufferPool bufferPool = new BufferPool(availableMemory, poolableSize);
IntStream.range(0, 11)
        .parallel()
        .forEach(i -> {
            try {
                bufferPool.allocate(1000);
            } catch (TimeoutException | InterruptedException e) {
                e.printStackTrace();
            }
        });

3. 가용메모리가 10MB이고 11개 쓰레드가 1MB씩 할당 했지만, 타임아웃 전에 ByteBuffer를 하나 반납했기 때문에 정상.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
long availableMemory = 1000 * 1000 * 10;
int poolableSize = 1000 * 1000;
BufferPool bufferPool = new BufferPool(availableMemory, poolableSize);
List<ByteBuffer> buffers = new ArrayList<>();

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        System.out.println("Start deallocate");
        bufferPool.deallocate(buffers.get(0));
        timer.cancel();
    }
}, 2990);

IntStream.range(0, 11)
        .parallel()
        .forEach(i -> {
            try {
                buffers.add(bufferPool.allocate(3000));
            } catch (TimeoutException | InterruptedException e) {
                e.printStackTrace();
            }
        });

실제 BufferPoolTest.java 코드와는 SimpleBufferPool과 차이가 있지만 살펴 보고자 했던 부분이 쓰레드 처리와 메모리 제한에 관한 로직이라 테스트 코드로 적합하다고 봄..

결론


 카프카는 설정이 많아 설정이 실제 어떻게 적용되는지 궁금했었다.  쓰레드 동기화 처리가 일반적인 Object Pool과 어떻게 다른지, 메모리 사용량을 제한 한다는 것이 어떤 특별함이 있는지 궁금했었는데, 마침 관련 코드가 간단해서 쓰레드가 메모리를 할당을 기다릴 수 있고, 제한된 용량 만큼 사용할 수 있는지 파악하는 데 많은 도움이 되었다.

부가적으로 Locking과 ByteBuffer.allocate 관련 코드에 이해되지 않는 부분이 있어 테스트한 코드가 있다.  ByteBuffer.allocate() 전에 lock을 푸는 것과 이후에 푸는 것에 대한 차이점에 관한 것이다. #아몰랑~


전체 코드는 아래





댓글