手记

Lettuce同步命令源码分析

  Lettuce同步源码分析

    在上一篇分享中分享了单机模式异步连接创建过程Lettuce创建连接过程源码分析; 在本次分享内容主要介绍同步命令的处理过程.

Lettuce是基于Netty的Redis高级客户端,对于异步命令来说是天然的,那么lettuce中是如何处理同步命令的呢?实际上同步连接还是对异步命令的一次封装;下面我们就通过源码进行分析看看Lettuce中的具体实现.

   通过上一篇文章中可以知道在StatefulRedisConnectionImpl中创建 异步模式,同步模式以及响应式模式命令处理模式,那么我们就从 该处看起

?

public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {     super(writer, timeout);     this.codec = codec;    //创建异步redis命令处理模式    this.async = newRedisAsyncCommandsImpl();    //创建redis命令同步处理模式    this.sync = newRedisSyncCommandsImpl();    //创建redis命令响应式处理模式    this.reactive = newRedisReactiveCommandsImpl();}

   通过这里似乎看不出同步处理模式同异步处理模式有什么关联,那么我们在深入进去看一下

?


protected RedisCommands<K, V> newRedisSyncCommandsImpl() {        return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);    }

  在这段代码中可以看到async(),这个就是redis命令异步处理模式,那么它是如何封装的呢?

?


protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {        //对异步API创建调用处理器        FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);        //创建动态代理        return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);    }

  通过上面对源码可以发现原来是对异步api创建了一个JDK动态代理;那么关键的逻辑还是在FutureSyncInvocationHandler中,对于动态代理的知识就不在展开了.

在invoke处理是在AbstractInvocationHandler中完成的,它将一些基本公用的抽象在了基类中,将特殊的实现延迟到子类中实现.

?


public final Object invoke(Object proxy, Method method, Object[] args) throws Throwable {       //如果参数为null则 将args设置为"{}"       if (args == null) {           args = NO_ARGS;       }       //如果参数长度为0同时方法名称为hashCode则直接返回hashCode       if (args.length == 0 && method.getName().equals("hashCode")) {           return hashCode();       }       //如果是equals       if (args.length == 1 && method.getName().equals("equals") && method.getParameterTypes()[0] == Object.class) {           Object arg = args[0];           if (arg == null) {               return false;           }           if (proxy == arg) {               return true;           }           return isProxyOfSameInterfaces(arg, proxy.getClass()) && equals(Proxy.getInvocationHandler(arg));       }       //如果是toString       if (args.length == 0 && method.getName().equals("toString")) {           return toString();       }       return handleInvocation(proxy, method, args);   }

  在FutureSyncInvocationHandler中实现了同步命令处理过程,其源码如下:

?


protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {        try {           //获取当前method在asyncApi 中对应的方法           Method targetMethod = this.translator.get(method);           //调用异步接口           Object result = targetMethod.invoke(asyncApi, args);           //如果返回结果是RedisFuture类型           if (result instanceof RedisFuture<?>) {              //类型强转               RedisFuture<?> command = (RedisFuture<?>) result;                 //如果不是事务控制方法 同时还在事务中则返回null               if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {                   return null;               }               //是事务控制方法,或不在事务中则进行如下处理               //等待超时或取消               LettuceFutures.awaitOrCancel(command, connection.getTimeout().toNanos(), TimeUnit.NANOSECONDS);              //返回结果,这里处理不是很好 上一步中就可以直接返回了               return command.get();           }           //如果不是RedisFuture类型则直接返回           return result;       } catch (InvocationTargetException e) {           throw e.getTargetException();       }   }

  在上文中有一段是获取获取指定方法在delegate中对应方法的处理,下面就看看这个处理是如何实现的

?


/**     * 方法翻译器     */    protected static class MethodTranslator {         private final static WeakHashMap<Class<?>, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32);                 //真实方法和代理类中方法映射表        private final Map<Method, Method> map;         private MethodTranslator(Class<?> delegate, Class<?>... methodSources) {             map = createMethodMap(delegate, methodSources);        }         /**         * 通过指定代理类,和目标类创建方法翻译器         */        public static MethodTranslator of(Class<?> delegate, Class<?>... methodSources) {            //同步代码块            synchronized (TRANSLATOR_MAP) {                //如果翻译器映射表中不存在delegate的翻译器则创建一个新的                return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources));            }        }         private Map<Method, Method> createMethodMap(Class<?> delegate, Class<?>[] methodSources) {             Map<Method, Method> map;            List<Method> methods = new ArrayList<>();            //遍历源类,找到所有public方法            for (Class<?> sourceClass : methodSources) {                methods.addAll(getMethods(sourceClass));            }             map = new HashMap<>(methods.size(), 1.0f);             //创建方法和代理类的方法的映射表            for (Method method : methods) {                 try {                    map.put(method, delegate.getMethod(method.getName(), method.getParameterTypes()));                } catch (NoSuchMethodException ignore) {                }            }            return map;        }       //获取目标方法中的所有方法        private Collection<? extends Method> getMethods(Class<?> sourceClass) {             //目标方法集合            Set<Method> result = new HashSet<>();             Class<?> searchType = sourceClass;            while (searchType != null && searchType != Object.class) {                 //将目标类中所有public方法添加到集合中                result.addAll(filterPublicMethods(Arrays.asList(sourceClass.getDeclaredMethods())));                //如果souceClass是接口类型                if (sourceClass.isInterface()) {                    //获取souceClass的所有接口                    Class<?>[] interfaces = sourceClass.getInterfaces();                    //遍历接口,将接口的public方法也添加到方法集合中                    for (Class<?> interfaceClass : interfaces) {                        result.addAll(getMethods(interfaceClass));                    }                     searchType = null;                } else {//如果不是接口则查找父类                     searchType = searchType.getSuperclass();                }            }             return result;        }         //获取给定方法集合中所有public方法        private Collection<? extends Method> filterPublicMethods(List<Method> methods) {            List<Method> result = new ArrayList<>(methods.size());             for (Method method : methods) {                if (Modifier.isPublic(method.getModifiers())) {                    result.add(method);                }            }             return result;        }         public Method get(Method key) {           //从方法映射表中获取目标方法            Method result = map.get(key);            //如果目标方法不为null则返回,否则抛出异常            if (result != null) {                return result;            }            throw new IllegalStateException("Cannot find source method " + key);        }    }}

  

原文出处

1人推荐
随时随地看视频
慕课网APP

热门评论


《深度剖析Lettuce源码与SpringBoot源码》,从5大方面来讲解:

1.redis的哈希槽原理分析

2.lettuce是如何基于《redis通信协议RESP》进行设计封装

3.lettuce如何读取 redis的拓扑结构图?

4.lettuce网络通信设计,底层netty连接redis

5.当网络异常时,lettuce如何采用netty解决?


具体见:

https://study.163.com/course/introduction/1210203966.htm?share=1&shareId=1016671292


查看全部评论