2013년 5월 2일 목요일

redis 를 hibernate 2nd cache 로 사용하기 - part 2


hibernate-redis 제작 - part 1
hibernate-redis 제작 - part 2


redis 가 캐시 시스템으로 사용하기 좋다는 것은 여러 사례에서 알 수 있고, SNS 같은 경우에는 Timeline 정보를 캐싱하는데 자주 사용하지요.

이런 redis 를 hibernate 의 2nd cache  저장소로 사용하면 좋겠다고 생각해서, hibernate-redis 를 제작했습니다.

첫번째 버전은 제가 Redis에 대해 잘 몰라, 그냥 Spring Data Redis 를 사용하여, 구현했습니다.
개발하다보니 Jedis 만을 사용하여 개발하는 것이 더 가볍게 개발 할 수 있을 것 같아, Spring Data Redis 를 걷어내고, Jedis 만을 사용하여 개발했습니다.

hibernate-redis 는 현재 0.9.0 이고, 단위 테스트를 통과한 상태입니다.
아직 실전 테스트가 남아 있습니다^^

이번 글에서는 일반적인 캐시로서 Redis를 이용할 때, 직접 jedis 를 사용할 수도 있지만,  좀 더 쉽게 만든 JedisClient 에 대해 소개하기로 하겠습니다.

JedisClient.java

/*
* Copyright 2011-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hibernate.cache.redis.jedis;
import lombok.Getter;
import lombok.Setter;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.redis.serializer.BinaryRedisSerializer;
import org.hibernate.cache.redis.serializer.RedisSerializer;
import org.hibernate.cache.redis.serializer.SerializationTool;
import org.hibernate.cache.redis.util.CollectionUtil;
import org.hibernate.cache.spi.CacheKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* RedisClient implements using Jedis library
* <p/>
* 참고 : https://github.com/xetorthio/jedis/wiki/AdvancedUsage
*
* @author 배성혁 ( sunghyouk.bae@gmail.com )
* @since 13. 4. 9 오후 10:20
*/
public class JedisClient {
private static final Logger log = LoggerFactory.getLogger(JedisClient.class);
private static final boolean isTraceEnabled = log.isTraceEnabled();
private static final boolean isDebugEnabled = log.isDebugEnabled();
public static final int DEFAULT_EXPIRY_IN_SECONDS = 120;
public static final String DEFAULT_REGION_NAME = "hibernate";
@Getter
private final String regionName;
@Getter
private final JedisPool jedisPool;
@Getter
@Setter
private int database;
@Getter
@Setter
private int expiryInSeconds;
@Getter
@Setter
private RedisSerializer keySerializer = new BinaryRedisSerializer<Object>();
@Getter
@Setter
private RedisSerializer valueSerializer = new BinaryRedisSerializer<Object>();
public JedisClient() {
this(DEFAULT_REGION_NAME, new JedisPool("localhost"));
}
public JedisClient(JedisPool jedisPool) {
this(DEFAULT_REGION_NAME, jedisPool);
}
public JedisClient(String regionName, JedisPool jedisPool) {
this(regionName, jedisPool, DEFAULT_EXPIRY_IN_SECONDS);
}
public JedisClient(String regionName, JedisPool jedisPool, int expiryInSeconds) {
log.debug("JedisClient created. regionName=[{}], jedisPool=[{}], expiryInSeconds=[{}]",
regionName, jedisPool, expiryInSeconds);
this.regionName = regionName;
this.jedisPool = jedisPool;
this.expiryInSeconds = expiryInSeconds;
}
/** 서버와의 통신 테스트, "PONG" 을 반환한다 */
public String ping() {
return run(new JedisCallback<String>() {
@Override
public String execute(Jedis jedis) {
return jedis.ping();
}
});
}
/** db size를 구합니다. */
public Long dbSize() {
return run(new JedisCallback<Long>() {
@Override
public Long execute(Jedis jedis) {
return jedis.dbSize();
}
});
}
/** 키에 해당하는 캐시 값이 존재하는지 확인합니다. */
public boolean exists(Object key) {
final byte[] rawRegion = rawRegion(key);
final byte[] rawKey = rawValue(key);
Long rank = run(new JedisCallback<Long>() {
@Override
public Long execute(Jedis jedis) {
return jedis.zrank(rawRegion, rawKey);
}
});
if (isTraceEnabled) log.trace("캐시 값이 존재하는지 확인합니다. key=[{}], exists=[{}]", key, (rank != null));
return rank != null;
}
/**
* 키에 해당하는 캐시 값을 구합니다.
*
* @param key 캐시 키
* @return 저장된 캐시 값, 없으면 null을 반환한다.
*/
public Object get(Object key) {
if (isTraceEnabled) log.trace("캐시 값을 조회합니다... key=[{}]", key);
final byte[] rawKey = rawKey(key);
byte[] rawValue = run(new JedisCallback<byte[]>() {
@Override
public byte[] execute(Jedis jedis) {
return jedis.get(rawKey);
}
});
return deserializeValue(rawValue);
}
/**
* 지정한 캐시 영역에 저장된 캐시의 키 집합을 반환합니다.
*
* @param regionName 캐시 영역명
* @return 캐시 영역에 저장된 모든 키 정보
*/
public Set<Object> keysInRegion(String regionName) {
if (isTraceEnabled) log.trace("영역에 해당하는 모든 키 값을 가져옵니다. regionName=[{}]", regionName);
final byte[] rawRegion = rawKey(regionName);
Set<byte[]> rawKeys = run(new JedisCallback<Set<byte[]>>() {
@Override
public Set<byte[]> execute(Jedis jedis) {
return jedis.zrange(rawRegion, 0, -1);
}
});
return deserializeKeys(rawKeys);
}
/**
* 지정한 키들의 값들을 한꺼번에 가져옵니다.
*
* @param keys 캐시 키 컬렉션
* @return 캐시 값의 컬렉션
*/
public List<Object> mget(Collection<? extends Object> keys) {
if (isTraceEnabled) log.trace("multi get... keys=[{}]", CollectionUtil.toString(keys));
if (keys == null || keys.size() == 0)
return new ArrayList<Object>();
final byte[][] rawKeys = rawKeys(keys);
List<byte[]> rawValues = run(new JedisCallback<List<byte[]>>() {
@Override
public List<byte[]> execute(Jedis jedis) {
return jedis.mget(rawKeys);
}
});
return deserializeValues(rawValues);
}
/**
* 캐시를 저장합니다.
*
* @param key 캐시 키
* @param value 캐시 값
*/
public final void set(Object key, Object value) {
set(key, value, -1);
}
/**
* 캐시를 저장합니다.
*
* @param key 캐시 키
* @param value 캐시 값
* @param timeoutInSeconds 유효 기간 (Seconds 단위)
*/
public final void set(Object key, Object value, long timeoutInSeconds) {
set(key, value, timeoutInSeconds, TimeUnit.SECONDS);
}
/**
* 캐시를 저장합니다.
*
* @param key 캐시 키
* @param value 캐시 값
* @param timeout 캐시 유효 시간
* @param unit 시간 단위 (기본은 seconds)
*/
protected void set(Object key, Object value, long timeout, TimeUnit unit) {
if (isTraceEnabled)
log.trace("캐시를 저장합니다... key=[{}], value=[{}]", key, value);
final byte[] rawKey = rawKey(key);
final byte[] rawValue = rawValue(value);
final byte[] rawRegion = rawRegion(key);
final int seconds = (int) unit.toSeconds(timeout);
runWithTx(new JedisTransactionalCallback() {
@Override
public void execute(Transaction tx) {
tx.set(rawKey, rawValue);
tx.zadd(rawRegion, 0, rawKey);
if (seconds > 0) {
tx.expire(rawKey, seconds);
tx.expire(rawRegion, seconds);
}
}
});
}
public void delete(Object key) {
del(key);
}
/** 지정된 키의 항목으로 삭제합니다. */
public void del(Object key) {
if (isTraceEnabled) log.trace("캐시를 삭제합니다. key=[{}]", key);
final byte[] rawKey = rawKey(key);
final byte[] rawRegion = rawRegion(key);
runWithTx(new JedisTransactionalCallback() {
@Override
public void execute(Transaction tx) {
tx.del(rawKey);
tx.zrem(rawRegion, rawKey);
}
});
}
/** 지정된 키의 항목으로 삭제합니다. */
public void mdel(Collection<? extends Object> keys) {
if (isTraceEnabled) log.trace("캐시를 삭제합니다. keys=[{}]", CollectionUtil.toString(keys));
if (keys == null || keys.size() == 0) return;
final byte[][] rawKeys = rawKeys(keys);
final byte[][] rawRegions = rawRegions(keys);
runWithTx(new JedisTransactionalCallback() {
@Override
public void execute(Transaction tx) {
tx.del(rawKeys);
for (int i = 0; i < rawKeys.length; i++)
tx.zrem(rawRegions[i], rawKeys[i]);
}
});
}
public void deleteRegion(final String regionName) throws CacheException {
log.info("Region 전체를 삭제합니다... regionName=[{}]", regionName);
// Redis에서 한 Transaction 하에서 get / set 을 동시에 실행 할 수 없습니다. (복합 함수인 setnx 같은 것을 제외하고)
//
try {
final byte[] rawRegion = rawRegion(regionName);
Set<byte[]> keySet = run(new JedisCallback<Set<byte[]>>() {
@Override
public Set<byte[]> execute(Jedis jedis) {
return jedis.zrange(rawRegion, 0, -1);
}
});
if (keySet.size() > 0) {
final byte[][] rawKeys = keySet.toArray(new byte[keySet.size()][]);
runWithTx(new JedisTransactionalCallback() {
@Override
public void execute(Transaction tx) {
tx.del(rawKeys);
tx.zremrangeByRank(rawRegion, 0, -1);
}
});
}
} catch (Throwable t) {
log.error("Region을 삭제하는데 실패했습니다.", t);
throw new CacheException(t);
}
}
public String flushDb() {
log.info("Redis DB 전체를 flush 합니다...");
return run(new JedisCallback<String>() {
@Override
public String execute(Jedis jedis) {
return jedis.flushDB();
}
});
}
/** 키를 byte[] 로 직렬화합니다 * */
@SuppressWarnings( "unchecked" )
private byte[] rawKey(Object key) {
return getKeySerializer().serialize(key);
}
@SuppressWarnings( "unchecked" )
private byte[][] rawKeys(Collection<? extends Object> keys) {
byte[][] rawKeys = new byte[keys.size()][];
int i = 0;
for (Object key : keys) {
rawKeys[i++] = getKeySerializer().serialize(key);
}
return rawKeys;
}
/** 키를 이용해 region 값을 직렬화합니다. */
@SuppressWarnings( "unchecked" )
private byte[] rawRegion(Object key) {
return getKeySerializer().serialize(getEntityName(key));
}
@SuppressWarnings( "unchecked" )
private byte[][] rawRegions(Collection<? extends Object> keys) {
byte[][] rawRegions = new byte[keys.size()][];
int i = 0;
for (Object key : keys) {
rawRegions[i++] = getKeySerializer().serialize(getEntityName(key));
}
return rawRegions;
}
private String getEntityName(Object key) {
if (key instanceof CacheKey)
return ((CacheKey) key).getEntityOrRoleName();
return regionName;
}
/** byte[] 를 key 값으로 역직렬화 합니다 */
private Object deserializeKey(byte[] rawKey) {
return getKeySerializer().deserialize(rawKey);
}
/** 캐시 값을 byte[]로 직렬화를 수행합니다. */
@SuppressWarnings( "unchecked" )
private byte[] rawValue(Object value) {
return getValueSerializer().serialize(value);
}
/** byte[] 를 역직렬화하여 원 객체로 변환합니다. */
private Object deserializeValue(byte[] rawValue) {
return getValueSerializer().deserialize(rawValue);
}
/**
* Redis 작업을 수행합니다.<br/>
* {@link redis.clients.jedis.JedisPool} 을 이용하여, {@link redis.clients.jedis.Jedis}를 풀링하여 사용하도록 합니다.
*/
private <T> T run(final JedisCallback<T> callback) {
final Jedis jedis = jedisPool.getResource();
try {
if (database != 0) jedis.select(database);
return callback.execute(jedis);
} catch (Throwable t) {
log.error("Redis 작업 중 예외가 발생했습니다.", t);
throw new RuntimeException(t);
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* 복수의 작업을 하나의 Transaction 하에서 수행하도록 합니다.<br />
* {@link redis.clients.jedis.JedisPool} 을 이용하여, {@link redis.clients.jedis.Jedis}를 풀링하여 사용하도록 합니다.
*/
private List<Object> runWithTx(final JedisTransactionalCallback callback) {
final Jedis jedis = jedisPool.getResource();
try {
if (database != 0) jedis.select(database);
Transaction tx = jedis.multi();
callback.execute(tx);
return tx.exec();
} catch (Throwable t) {
log.error("Redis 작업 중 예외가 발생했습니다.", t);
throw new RuntimeException(t);
} finally {
jedisPool.returnResource(jedis);
}
}
/** Raw Key 값들을 역직렬화하여 Key Set을 반환합니다. */
@SuppressWarnings( "unchecked" )
private Set<Object> deserializeKeys(Set<byte[]> rawKeys) {
return SerializationTool.deserialize(rawKeys, getKeySerializer());
}
/** Raw Value 값들을 역직렬화하여 Value List를 반환합니다. */
@SuppressWarnings( "unchecked" )
private List<Object> deserializeValues(List<byte[]> rawValues) {
return SerializationTool.deserialize(rawValues, getValueSerializer());
}
}

JedisClient 를 굳이 따로 만든 이유는 JedisPool 사용과 Transactional 작업이 반복적인 코드라 Jedis 만 사용하게되면 코드가 많아질까봐 Spring Data Redis 와 유사하게 만들었습니다.

물론 Jedis 의 모든 메소드를 지원하지는 않습니다. 캐시로 사용할 때 필요한 기능만을 구현했습니다.

이를 바탕으로 일반 저장소로 사용 가능하도록 확장할 수 있을겁니다.
특히 ShardedJedis 를 사용하려면, 다른 방법을 사용해야 합니다. 이 방식은 대용량 캐시나 저장소로 사용할 때 고려해 봐야 겠습니다.

참고로 JedisPool 설정 정보도 도움이 될겁니다.


댓글 2개:

백일몽 :

이 프로젝트를 fork 해서 하이버네이트3 용으로 만들려고 하는데 단위테스트쪽이 4와 많이 달라서 좀 헤매고 있습니다.

Unknown :

hibernate 3 용이라면, hibernate-orm 3 에서 ehcache 용 코드 및 단위테스트를 참과하세요.
JedisClient 는 재사용 가능하고, 나머지는 hibernate 3용으로 porting 이 가능합니다.

hibernate 3 이하는 CacheProvider 가 단순해서 구현하기 쉬울 겁니다.