Coroutine에서의 Dynamic Proxy

일시 중지 함수에 동적 프록시를 적용하는 방법

2024-08-20


이번 글은 코루틴(Coroutine) 환경에서의 동적 프록시(Dynamic Proxy)에 대한 글입니다.
이번에 Spring WebFlux 환경에서 Redis를 사용하다가 불편함을 겪었는데요.
Spring Data에서는 리액티브 환경에서의 RedisRepository를 지원하지 않아 엔티티마다 반복적인 코드를 작성해야 했습니다.
RefreshTokenRepository.kt
@Repository
class RefreshTokenRepository(
    private val redisTemplate: ReactiveRedisTemplate<String, String>
) {
    suspend fun findByKey(key: String): RefreshToken? =
        redisTemplate.opsForValue()
            .get(key)
            .awaitSingleOrNull()
            .let {
                RefreshToken(
                    id = key,
                    content = it
                )
            }
 
    suspend fun save(refreshToken: RefreshToken): RefreshToken =
        refreshToken.apply {
            redisTemplate.opsForValue()
                .set(key, content)
                .awaitSingle()
        }
 
    suspend fun save(refreshToken: RefreshToken, expire: Duration): RefreshToken =
        refreshToken.apply {
            redisTemplate.opsForValue()
                .set(key, content, expire)
                .awaitSingle()
        }
 
    suspend fun deleteByKey(key: String): Boolean =
        redisTemplate.opsForValue()
            .delete(key)
            .awaitSingle()
}
ReactiveRedisTemplate은 엔티티마다 위와 같은 코드를 계속해서 작성해야 하는데요.
특히 코루틴을 사용하는 경우에는 awaitSingle() 등의 확장 함수를 계속 호출해야 하는 불편함도 있습니다.

그래서 저는 기존의 Spring Data의 RedisRepository처럼 조회, 저장, 삭제 작업에 대해서 자동으로 구현을 해주는 기능을 구현하고 싶었습니다.

CoroutineRedisRepository

CoroutineRedisRepository.kt
@NoRepositoryBean
interface CoroutineRedisRepository<V, K> {
    suspend fun findByKey(key: K): V?
 
    suspend fun save(value: V): V
 
    suspend fun save(value: V, ttl: Duration): V
 
    suspend fun deleteByKey(key: String): Boolean
}
우선 조회, 저장, 삭제 작업에 대한 명세를 코루틴 기반으로 제공하는 인터페이스를 만들었는데요.
이제 RedisRepository처럼 해당 인터페이스를 상속받는 모든 인터페이스들을 자동으로 구현하도록 하면 됩니다.

ClassPathScanningCandidateComponentProvider

ReactiveRedisRepository를 상속받는 인터페이스들을 모두 찾아야 하는데요.
이렇게 특정 인터페이스나 클래스를 찾는데 사용하는 클래스로 ClassPathScanningCandidateComponentProvider가 있습니다.

ClassPathScanningCandidateComponentProvider는 특정 패키지 하위로부터 지정된 TypeFilter의 조건에 부합하는 클래스들을 조회합니다.
TypeFilter의 구현체 중 AssignableTypeFilter를 통해 특정 인터페이스를 상속받은 클래스를 찾을 수 있습니다.
CoroutineRedisRepositoryScanner.kt
class CoroutineRedisRepositoryScanner(
    private val registry: BeanDefinitionRegistry
) : ClassPathScanningCandidateComponentProvider(false) {
    init {
        addIncludeFilter(AssignableTypeFilter(CoroutineRedisRepository::class.java))
    }
 
    override fun isCandidateComponent(beanDefinition: AnnotatedBeanDefinition): Boolean =
        super.isCandidateComponent(beanDefinition) || beanDefinition.metadata.isInterface
 
    override fun findCandidateComponents(basePackage: String): MutableSet<BeanDefinition> =
        super.findCandidateComponents(basePackage)
            .onEach {
                val beanDefinition = it as GenericBeanDefinition
                val beanClassName = beanDefinition.beanClassName
                val factoryBeanDefinition = GenericBeanDefinition()
                    .apply {
                        setBeanClass(CoroutineRedisRepositoryFactoryBean::class.java)
                        constructorArgumentValues.addGenericArgumentValue(Class.forName(beanClassName))
                    }
 
                registry.registerBeanDefinition(beanClassName!!, factoryBeanDefinition)
            }
}
CoroutineRedisRepository를 상속받은 인터페이스들을 가져와 BeanDefinitionRegistry에 각 인터페이스의 구현체를 Bean으로 등록하는 CoroutineRedisRepositoryScanner를 구현했는데요.
원래 ClassPathScanningCandidateComponentProvider는 클래스만 찾을 수 있도록 설계되었기 때문에 isCandidateComponent()를 오버라이딩해 인터페이스도 찾을 수 있도록 구현했습니다.
실제 스캔을 수행하는 findCandidateComponents()에서는 BeanDefinition을 조작해 구현체를 FactoryBean을 통해 Bean으로 등록하는 작업을 수행합니다.
FactoryBean.java
public interface FactoryBean<T> {
    @Nullable
    T getObject() throws Exception;
 
    @Nullable
    Class<?> getObjectType();
 
    default boolean isSingleton() {
        return true;
    }
}
여기서 FactoryBeanprivate 생성자나 동적 프록시 등을 이유로 인스턴스화가 어려운 클래스를 Bean으로 등록할 수 있도록 해주는 클래스입니다.
실제로 Bean으로 등록되는 인스턴스는 getObject()가 반환하는 인스턴스입니다.

프록시 구현

제가 앞서 CoroutineRedisRepositoryScanner에서 CoroutineRedisRepositoryFactoryBean을 통해 구현체를 Bean으로 등록했는데요.
굳이 FactoryBean을 사용한 이유는 해당 Bean을 동적 프록시를 통해 구현했기 때문입니다.
클래스 정보를 미리 알아낼 수 없어 인스턴스화가 힘든 프록시는 FactoryBean을 통해 Bean으로 등록해야 합니다.
CoroutineRedisRepositoryFactoryBean.kt
class CoroutineRedisRepositoryFactoryBean<T : CoroutineRedisRepository<*, *>>(
    private val repositoryType: Class<T>
) : FactoryBean<T> {
    @Autowired
    private lateinit var redisTemplate: ReactiveRedisTemplate<String, String>
 
    @Autowired
    private lateinit var objectMapper: ObjectMapper
 
    private val entityType by lazy {
        (repositoryType.genericInterfaces[0] as ParameterizedType)
            .actualTypeArguments
            .first() as Class<*>
    }
 
    override fun getObject(): T =
        ProxyFactory.getProxy(
            repositoryType,
            CoroutineRedisRepositoryInterceptor(
                entityType = entityType,
                redisTemplate = redisTemplate,
                objectMapper = objectMapper
            )
        )
 
    override fun getObjectType(): Class<T> = repositoryType
}
CoroutineRedisRepositoryFactoryBean에서는 CoroutineRedisRepositoryScanner에서 addGenericArgumentValue()를 통해 전달 받은 인터페이스와 함께 ProxyFactory로 프록시를 구현하게 됩니다.
일반적으로 Spring에서는 인터페이스 상속 여부에 따라 JDK(Java Development Kit) 또는 CGLib(Code Generator Library)를 기반으로 프록시를 구현하게 되는데요.
이 과정을 추상화해 제공하는 클래스가 ProxyFactory입니다.

CoroutineRedisRepositoryInterceptor

ProxyFactory를 통해 구현한 프록시에 대한 메서드 호출은 MethodInterceptor에 위임됩니다.
MethodInterceptor.java
@FunctionalInterface
public interface MethodInterceptor extends Interceptor {
    @Nullable
    Object invoke(@Nonnull MethodInvocation invocation) throws Throwable;
}
문제는 MethodInterceptorinvoke()는 일시 중지 함수가 아니기에 코루틴을 사용할 수 없다는 점입니다.
외부에서 호출할 CoroutineRedisRepository의 메서드들은 모두 일시 중지 함수로 정의되어 있고 실제로 메서드 내에서 awaitSingle() 등의 일시 중지 함수를 호출해야 하는데요.
그렇다고 runBlocking()이나 GlobalScope를 사용하는 것은 논블로킹이나 컨텍스트 관리 관점에서 적절하지 않은 방법이라고 생각했습니다.
그래서 suspend 키워드 없이 코루틴을 사용할 수 있는 방법을 찾기로 했습니다.

Continuation

사실 코루틴의 작동 원리를 생각하면 어려운 문제는 아니었습니다.
CoroutineTest.kt
class CoroutineTest : AnnotationSpec() {
    @Test
    suspend fun test() {
        delay(1000)
        delay(1000)
    }
}
CoroutineTest.class
public final class CoroutineTest extends AnnotationSpec {
   @Test
   @Nullable
   public final Object test(@NotNull Continuation $completion) {
      Object $continuation;
      label27: {
         if ($completion instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)$completion;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label27;
            }
         }
 
         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            int label;
 
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineTest.this.test((Continuation)this);
            }
         };
      }
 
      Object $result = ((<undefinedtype>)$continuation).result;
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (((<undefinedtype>)$continuation).label) {
         case 0:
            ResultKt.throwOnFailure($result);
            ((<undefinedtype>)$continuation).label = 1;
            if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
               return var4;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            break;
         case 2:
            ResultKt.throwOnFailure($result);
            return Unit.INSTANCE;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
 
      ((<undefinedtype>)$continuation).label = 2;
      if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
         return var4;
      } else {
         return Unit.INSTANCE;
      }
   }
}
코루틴은 CPS(Continuation Passing Style)로 구현되어 있습니다.
이때, 코루틴은 label 등의 일시 중지 지점까지의 상태를 저장하는 Continuation을 State Machine으로 사용합니다.
실제로 바이트 코드를 보면 함수의 인자로 Continuation이 추가된 것을 확인할 수 있는데요.
COROUTINE_SUSPENDED이 반환되면 현재 루틴이 일시 중지되며, 이후 ContinuationresumeWith()가 호출되면 Continuation에 저장된 상태를 바탕으로 루틴이 재개됩니다.

이제 이러한 작동 방식을 suspend 키워드 없이 직접 구현하면 됩니다.
CoroutineRedisRepositoryProxy.kt
@NoRepositoryBean
class CoroutineRedisRepositoryProxy(
    private val entityType: Class<*>,
    private val redisTemplate: ReactiveRedisTemplate<String, String>,
    private val objectMapper: ObjectMapper
) : CoroutineRedisRepository<Any, String> {
    override suspend fun findByKey(key: String): Any? =
        redisTemplate.opsForValue()
            .get(key)
            .map { objectMapper.readValue(it, entityType) }
            .awaitSingleOrNull()
 
    override suspend fun save(value: Any): Any =
        redisTemplate.opsForValue()
            .set(value.getKey(), objectMapper.writeValueAsString(value))
            .thenReturn(value)
            .awaitSingle()
 
    override suspend fun save(value: Any, ttl: Duration): Any =
        redisTemplate.opsForValue()
            .set(value.getKey(), objectMapper.writeValueAsString(value), ttl.toJavaDuration())
            .thenReturn(value)
            .awaitSingle()
 
    override suspend fun deleteByKey(key: String): Boolean =
        redisTemplate.opsForValue()
            .delete(key)
            .awaitSingle()
 
    private fun Any.getKey() =
        entityType.declaredFields
            .first { it.isAnnotationPresent(Key::class.java) }
            .apply { setAccessible(true) }
            .get(this)
            .toString()
}
우선 CoroutineRedisRepository의 모든 메서드를 구현한 CoroutineRedisRepositoryProxy를 구현했습니다.
CoroutineUtil.kt
internal fun <T> Continuation<T>.coroutineScope(block: suspend () -> T): Any =
    with(CoroutineScope(context)) {
        launch {
            runCatching { block() }
                .run(::resumeWith)
        }
 
        COROUTINE_SUSPENDED
    }
그 다음, 일반 함수에서도 Continuation을 통해 코루틴을 사용할 수 있도록 하는 coroutineScope()를 구현했습니다.
ContinuationCoroutineContext를 통해 새로운 CoroutineScope를 만들고, 내부에서 코루틴을 수행하는 동시에 COROUTINE_SUSPENDED를 반환합니다.
코루틴이 끝나면 resumeWith()를 호출해 반환 값을 전달함과 동시에 함수를 재개시키도록 했습니다.
CoroutineRedisRepositoryInvocationHandler.kt
class CoroutineRedisRepositoryInterceptor(
    private val entityType: Class<*>,
    private val redisTemplate: ReactiveRedisTemplate<String, String>,
    private val objectMapper: ObjectMapper
) : MethodInterceptor {
    private val repositoryProxy = CoroutineRedisRepositoryProxy(
        entityType = entityType,
        redisTemplate = redisTemplate,
        objectMapper = objectMapper
    )
 
    override fun invoke(invocation: MethodInvocation): Any =
        with(invocation) {
            arguments.lastOrNull { it is Continuation<*> }
                ?.let {
                    (it as Continuation<Any?>)
                        .coroutineScope { coInvoke(method.kotlinFunction!!, (arguments - it).toList()) }
                }
                ?: invoke(method.kotlinFunction!!, arguments.toList())
        }
 
    private suspend fun coInvoke(function: KFunction<*>, parameters: List<*>): Any? =
        when (function.name) {
            "findByKey" -> repositoryProxy.findByKey(parameters[0].toString())
            "save" -> {
                val value = parameters[0] as Any
 
                when (parameters.size) {
                    1 -> repositoryProxy.save(value)
                    2 -> repositoryProxy.save(value, (parameters[1] as Long).milliseconds)
                    else -> throw UnsupportedOperationException()
                }
            }
            "deleteByKey" -> repositoryProxy.deleteByKey(parameters[0].toString())
            else -> throw UnsupportedOperationException()
        }
 
    private fun invoke(function: KFunction<*>, parameters: List<*>): Flow<*> =
        when (function.name) {
            else -> throw UnsupportedOperationException()
        }
}
 
이제 CoroutineRedisRepositoryInterceptor에서는 일시 중지 함수의 마지막 인자로 주어지는 Continuation으로 코루틴 스코프를 만들고, 해당 스코프 내에서 CoroutineRedisRepositoryProxy의 메서드를 호출하면 됩니다.

전처리

이제 지금까지 구현한 CoroutineRedisRepositoryFactoryBean을 Bean으로 등록하는 CoroutineRedisRepositoryScanner가 Bean의 생성이 시작되기 전에 수행되도록 해야 합니다.
이런 경우에는 Bean의 메타 정보를 읽은 직후에 호출되는 BeanFactoryPostProcessor를 활용하면 됩니다.
BeanFactoryPostProcessor.java
@FunctionalInterface
public interface BeanFactoryPostProcessor {
    void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
}
BeanFactoryPostProcessor에서는 ConfigurableListableBeanFactory를 통해 Bean에 대한 메타 정보인 BeanDefinition을 조회할 수 있는데요.
저는 새로운 BeanDefinition을 등록하는 것이기 때문에 BeanDefinition을 조회 뿐만 아니라 등록하고 삭제할 수 있는 BeanDefinitionRegistry을 사용할 것입니다.
DefaultListableBeanFactory.java
public class DefaultListableBeanFactory extends AbstractAutowireCapableBeanFactory implements ConfigurableListableBeanFactory, BeanDefinitionRegistry, Serializable {
    ...
}
ConfigurableListableBeanFactory의 구현체인 DefaultListableBeanFactoryBeanDefinitionRegistry 또한 상속받고 있기 때문에 형변환이 가능합니다.
CoroutineRedisRepositoryPostProcessor.kt
@Component
class CoroutineRedisRepositoryPostProcessor : BeanFactoryPostProcessor {
    override fun postProcessBeanFactory(beanFactory: ConfigurableListableBeanFactory) {
        CoroutineRedisRepositoryScanner(beanFactory as BeanDefinitionRegistry)
            .findCandidateComponents("com.familidge.infrastructure")
    }
}
앞서 CoroutineRedisRepositoryScannerfindCandidateComponents()에서 CoroutineRedisRepository의 구현체에 대한 BeanDefinitionBeanDefinitionRegistry에 등록하도록 구현했기 때문에
CoroutineRedisRepositoryPostProcessor는 단순히 CoroutineRedisRepositoryScanner에게 BeanDefinitionRegistry를 전달하고 findCandidateComponents()를 호출하기만 하면 됩니다.

테스트

RefreshTokenRepository.kt
@Repository
interface RefreshTokenRepository : CoroutineRedisRepository<RefreshToken, String>
2024-08-21 00:57:49.180 DEBUG [main] o.s.b.f.s.DefaultListableBeanFactory: Creating shared instance of singleton bean 'com.familidge.infrastructure.domain.user.repository.RefreshTokenRepository'
CoroutineRedisRepository를 상속받은 인터페이스가 Bean으로 등록되는 것을 확인할 수 있습니다.
RefreshTokenRepositoryTest.kt
@SpringBootTest
class RefreshTokenRepositoryTest : AnnotationSpec() {
    @Autowired
    private lateinit var refreshTokenRepository: RefreshTokenRepository
 
    @Test
    suspend fun test() {
        val token = RefreshToken(
            id = "test",
            content = "test"
        )
 
        refreshTokenRepository.save(token)
        refreshTokenRepository.findByKey("test").shouldNotBeNull() shouldBeEqual token
    }
}
실제로 메서드도 프록시에 의해 정상적으로 작동하는 것도 확인할 수 있습니다.