2013년 1월 10일 목요일

Scala 병렬 프로그래밍

Scala 가 기본적으로 병렬처리를 지원해서 만들어봤습니다.

scala.collection.parallel._ 에 보시면 여러가지 병렬 프로그래밍을 지원하는 자료구조들이 있습니다. 
물론 java 의 ExecutorService를 이용하여 처리할 수 있습니다만, 우아하게 코딩량을 줄여서 처리할 수 있어서 좋습니다.

먼저, void 형 메소드인 ( => Unit) 형태의 메소드 블럭을 주어진 횟수만큼 병렬로 호출하는 코드를  보면, 아주 간단히 ParArray 를 생성하고, map 메소드를 호출하여 지정된 메소드를 수행하도록 합니다.

def runUnitAsParallel(count: Int)(block: => Unit) {
log.debug("멀티스레드 환경에서 메소드들을 [{}] 번 실행합니다.", count)
try {
val latch = new CountDownLatch(count)
val pc = collection.parallel.mutable.ParArray.iterate(0, count)(x => x)
pc.tasksupport = new ThreadPoolTaskSupport()
pc map {
_ => {
block
latch.countDown()
}
}
latch.await()
} catch {
case e: InterruptedException => log.warn("작업 중 interrupted 되었습니다.")
case e: Exception => log.error("작업 중에 예외가 발생했습니다.", e)
}
log.debug("멀티스레드 환경에서 메소드들을 [{}] 번 실행했습니다.", count)
}

핵심 코드는 상당히 간단하죠?
그럼 Java 로 비슷한 기능을 만든다면?

private static ExecutorService newExecutorService() {
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
public static void runTasks(int count, final Runnable runnable) {
ExecutorService executor = newExecutorService();
try {
final CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
runnable.run();
latch.countDown();
}
});
}
latch.await();
} catch (InterruptedException e) {
if (log.isErrorEnabled())
log.error("작업 수행 중 예외가 발생했습니다.", e);
throw new RuntimeException(e);
} finally {
executor.shutdown();
}
}
view raw TestTool.java hosted with ❤ by GitHub

차이점이라면 ExecutorService 를 직접 사용하느냐, 내부코드에서 해주느냐의 차이입니다만, scala 의 경우 collection에 filtering, streaming 등 상당히 많은 반복 작업들을 이미 구현해 놓아서, 쓰기도 쉽고, 코드도 간단명료해지네요^^

scala 로 만든 병렬 실행 테스트 코드를 보면,


private final val LowerBound: Int = 0
private final val UpperBound: Int = 100000
@Test
def runUnitAsParallelTest {
def runnable = {
(LowerBound to UpperBound).foreach {
i => Hero.findRoot(i)
}
log.debug("Unit: FindRoot({}) returns [{}]", UpperBound, Hero.findRoot(UpperBound))
Hero.findRoot(UpperBound)
}
val stopwatch: AutoStopwatch = new AutoStopwatch()
ParallelTestTool.runUnitAsParallel(100) { runnable; runnable }
stopwatch.close()
}
함수 호출이 아니라 코드 블럭을 정의한 것 같지요?
함수형 언어의 currying 을 이용하여, code block 을 아예 넘길 수 있는 것도 아주 좋은 장점이 되겠습니다.

요즘 Scala의 매력에 푹 빠져있는데, 시간이 나면 Scala 로 Hibernate 와 연계한 base class library 를 만들어야 겠습니다. (case class 를 보면 얼마나 생산성이 좋은지 알 수 있습니다)


댓글 없음: