Feign是Netflix发布的一款轻量级Rest客户端框架,提供了一种声明式的服务调用组件,通过Feign只需声明一个接口并通过注解进行简单的配置即可实现对Http接口的绑定。然后就可以像调用本地方法一样来调用远程服务,而完全感觉不到是在进行远程调用。
Feign支持多种注解,比如Feign自带的注解以及JAX-RS注解,但遗憾的是Feign本身并不支持Spring mvc注解,这无疑会给广大Spring用户带来不便。于是Spring Cloud又对Feign进行了封装,使其方便集成到Spring项目中,并添加了对Spring mvc注解的支持,以及简化与负载均衡、服务注册发现、断路器等组件的集成,所以集成之后又叫Spring Cloud Feign
https://github.com/shanhm1991/spring-feign
Feign的历史
2013年6月,Netflix Feign发布了第一个版本:1.0.0
2015年3月,spring-cloud-starter-feign发布版本:1.0.0.RELEASE
2016年7月,Netflix Feign发布最后一个版本:8.18.0
2016年, Netflix将Feign捐献给社区,并改名为OpenFeign
2016年7月,OpenFeign发布首个版本:9.0.0,之后由社区维护一直持续发布
2016年9月,spring-cloud-starter-feign将依赖从Netflix Feign改为OpenFeign,并发布了1.2.0.RELEASE
2017年11月,Spring Cloud团队将两个”feign starter”进行合并,发布了spring-cloud-starter-openfeign的首个版本:1.4.0.RELEASE,以及spring-cloud-starter-feign的1.4.0.RELEASE版本
2019年5月,spring-cloud-starter-feign发布了最后一个换皮版本:1.4.7.RELEASE,至此,过渡期结束
Feign的实现 Feign的实现是典型的动态代理模式,关于动态代理可以参考之前的笔记:设计模式-代理模式
这里我们希望能按照自己的意愿来实现一个Feign进行远程服务调用,希望在一些独立的老项目场景中,也能方便使用,并可以自由修改。目标是能代替HttpClient调用就行,具体选择在Netflix Feign 8.18.0版本上进行改造,但是没有直接修改源码,而是将其作为依赖进行改造,修改了一些实现类,所以下面的类图中有些类名会不一样,不过结构基本是一致的。
代理调用 FeignImplement 从类图中也能看出,最终是由FeignImplement
创建的代理实例,而其他的组件类基本都是围绕这一目的而服务,做一些准备工作的
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/ FeignImplement.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class FeignImplement extends Feign { private final FeignParseHandlersByName targetToHandlersByName; private final InvocationHandlerFactory factory; @SuppressWarnings ("unchecked" ) @Override public <T> T newInstance (Target<T> target) { Map<String, InvocationHandlerFactory.MethodHandler> nameToHandler = targetToHandlersByName.apply(target); Map<Method, InvocationHandlerFactory.MethodHandler> methodToHandler = new LinkedHashMap<>(); List<FeignMethodHandler> defaultMethodHandlers = new LinkedList<>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class ) { continue ; } else if (Util.isDefault(method)) { FeignMethodHandler handler = new FeignMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } InvocationHandler handler = factory.create(target, methodToHandler); T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler); for (FeignMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; } }
FeignInvocationHandler FeignInvocationHandler
中定义了代理行为,然后将具体的操作又委托给了MethodHandler
,对于Object的几个方法则直接调用Target
,它持有被代理的类型信息,以及url地址等信息
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/ FeignInvocationHandler.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class FeignInvocationHandler implements InvocationHandler { private final Target target; private final Map<Method, InvocationHandlerFactory.MethodHandler> dispatch; @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if ("equals" .equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0 ] != null ? Proxy.getInvocationHandler(args[0 ]) : null ; return equals(otherHandler); } catch (IllegalArgumentException e) { return false ; } } else if ("hashCode" .equals(method.getName())) { return hashCode(); } else if ("toString" .equals(method.getName())) { return toString(); } return dispatch.get(method).invoke(args); } }
FeignSynchronousMethodHandler 到了MethodHandler
这里就是进行真正的Http调用了
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/ FeignSynchronousMethodHandler.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public class FeignSynchronousMethodHandler implements InvocationHandlerFactory .MethodHandler { private final List<RequestInterceptor> requestInterceptors; private final FeignTemplateFactory buildTemplateFromArgs; @Override public Object invoke (Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); org.springframework.feign.retryer.Retryer retryer = this .retryer.clone(); while (true ) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); } } } Object executeAndDecode (RequestTemplate template) throws Throwable { Request request = targetRequest(template); String url = request.url(); FeignTarget<?> feignTarget = (FeignTarget<?>)target; String name = feignTarget.name(); Response response; long start = System.nanoTime(); try { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); String threadName = Thread.currentThread().getName(); if (attributes == null && threadName != null && threadName.endsWith("-async" )){ threadName = threadName.substring(0 , threadName.length() - 6 ); ConcurrentMap<String, RemoteChain> chainMap = RemoteChain.ASYNC_CHAIN.computeIfAbsent(threadName, (key) -> new ConcurrentHashMap<>()); String realUrl = url; int index = realUrl.indexOf("?" ); if (index != -1 ){ realUrl = realUrl.substring(0 , index); } RemoteChain chain = RemoteChain.newChain(false , name, realUrl, 0 , 0 , "?" , null ); chain.setCount(new AtomicInteger(0 )); chain.setAsync(true ); RemoteChain effectChain = chainMap.computeIfAbsent(realUrl, (key) -> chain); effectChain.increaseCount(); } response = client.execute(request, options); } catch (IOException e) { long cost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); RemoteChain.appendChain(false , name, url, cost, -1 , "?" , null ); throw new RemoteException(format("remote[%s] failed %sms %s " , -1 , cost, url), e); } long cost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); int status = response.status(); boolean shouldClose = true ; try { if (Response.class == metadata.returnType()) { if (response.body() == null ) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false ; return response; } byte [] bodyData = Util.toByteArray(response.body().asInputStream()); return Response.create(status, response.reason(), response.headers(), bodyData); } if (status >= 200 && status < 300 ) { return decoder.decode(response, metadata.returnType(), name, url, cost, status, logger); } RemoteChain.appendChain(false , name, url, cost, status, "?" , null ); throw new RemoteException(format("remote[%s] %sms %s" , status, cost, url)); } catch (Exception e) { RemoteChain.appendChain(false , name, url, cost, -2 , "?" , null ); throw new RemoteException(format("remote[%s] failed %sms %s " , -2 , cost, url), e); } finally { if (shouldClose) { ensureClosed(response.body()); } } } Request targetRequest (RequestTemplate template) { for (RequestInterceptor interceptor : requestInterceptors) { interceptor.apply(template); } return target.apply(new RequestTemplate(template)); } }
首先根据MethodMetadata
创建RequestTemplate
,这里相当于做了一个翻译转换的工作,MethodMetadata
是用来描述一个方法的特征,比如参数和返回类型,以及注解信息,而RequestTemplate
用来描述一个Http请求,比如请求方法,请求url等
然后是应用RequestInterceptor
,也就是定义了一个接口给用户实现,这样在真正调用只有提供了一个修改RequestTemplate
的机会,比如你可以在Header中添加Token信息
接着就是创建请求Request
实例,并设置url,这个创建操作由FeignTarget
来完成。但是需要注意线程安全问题,因为代理实例可能是全局共享并且被多线程调用,所以每次创建传入的都是一个新的RequestTemplate
实例,并在构造过程中进行Copy,这样避免了线程安全问题,而且不用施加同步。
这里提供了一个接口NameServiceChooser
,以便在使用服务注册的场景中,可以根据service name进行Http调用,只要自己在实现中将service name转化成服务地址就行,另外对于url的设置也提供了Spring环境变量的支持。
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/ FeignTarget.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class FeignTarget <T > implements Target <T > { @Override public Request apply (RequestTemplate request) { String prasedName = "" ; if (StringUtils.hasText(name)){ NameServiceChooser serviceChooser = applicationContext.getBean(NameServiceChooser.class ) ; prasedName = serviceChooser.choose(name); } String prasedUrl = url; if (StringUtils.hasText(url) && url.contains("${" )){ prasedUrl = valueResolver.resolveStringValue(url); } String prased = prasedName + prasedUrl; if (request.url().indexOf("http" ) != 0 ) { request.insert(0 , prased); } return request.request(); } }
这里可以看一下Request的toString
方法,就知道它是一个标准的Http请求了
feign.Request 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public String toString () { StringBuilder builder = new StringBuilder(); builder.append(method).append(' ' ).append(url).append(" HTTP/1.1\n" ); for (String field : headers.keySet()) { for (String value : valuesOrEmpty(headers, field)) { builder.append(field).append(": " ).append(value).append('\n' ); } } if (body != null ) { builder.append('\n' ).append(charset != null ? new String(body, charset) : "Binary data" ); } return builder.toString(); }
类似的Response也是一个标准的Http响应
feign.Response 1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public String toString () { StringBuilder builder = new StringBuilder("HTTP/1.1 " ).append(status); if (reason != null ) builder.append(' ' ).append(reason); builder.append('\n' ); for (String field : headers.keySet()) { for (String value : valuesOrEmpty(headers, field)) { builder.append(field).append(": " ).append(value).append('\n' ); } } if (body != null ) builder.append('\n' ).append(body); return builder.toString(); }
Client 最终是将构造好的Request实例,交给Client进行Http调用,它会使用这个Request来构造一个真正的HttpURLConnection
进行连接,并发送和接收消息
feign.Client 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public static class Default implements Client { @Override public Response execute (Request request, Options options) throws IOException { HttpURLConnection connection = convertAndSend(request, options); return convertResponse(connection); } HttpURLConnection convertAndSend (Request request, Options options) throws IOException { final HttpURLConnection connection = (HttpURLConnection) new URL(request.url()).openConnection(); if (connection instanceof HttpsURLConnection) { HttpsURLConnection sslCon = (HttpsURLConnection) connection; if (sslContextFactory != null ) { sslCon.setSSLSocketFactory(sslContextFactory); } if (hostnameVerifier != null ) { sslCon.setHostnameVerifier(hostnameVerifier); } } connection.setConnectTimeout(options.connectTimeoutMillis()); connection.setReadTimeout(options.readTimeoutMillis()); connection.setAllowUserInteraction(false ); connection.setInstanceFollowRedirects(true ); connection.setRequestMethod(request.method()); Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING); boolean gzipEncodedRequest = contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP); boolean deflateEncodedRequest = contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE); boolean hasAcceptHeader = false ; Integer contentLength = null ; for (String field : request.headers().keySet()) { if (field.equalsIgnoreCase("Accept" )) { hasAcceptHeader = true ; } for (String value : request.headers().get(field)) { if (field.equals(CONTENT_LENGTH)) { if (!gzipEncodedRequest && !deflateEncodedRequest) { contentLength = Integer.valueOf(value); connection.addRequestProperty(field, value); } } else { connection.addRequestProperty(field, value); } } } if (!hasAcceptHeader) { connection.addRequestProperty("Accept" , "*/*" ); } if (request.body() != null ) { if (contentLength != null ) { connection.setFixedLengthStreamingMode(contentLength); } else { connection.setChunkedStreamingMode(8196 ); } connection.setDoOutput(true ); OutputStream out = connection.getOutputStream(); if (gzipEncodedRequest) { out = new GZIPOutputStream(out); } else if (deflateEncodedRequest) { out = new DeflaterOutputStream(out); } try { out.write(request.body()); } finally { try { out.close(); } catch (IOException suppressed) { } } } return connection; } }
HttpURLConnection 看到这里,我们可以再看一下Jdk的原生Api,是如何建立Http连接并进行通信的
1 2 3 final HttpURLConnection connection = (HttpURLConnection) new URL(request.url()).openConnection();OutputStream out = connection.getOutputStream();
首先在URL的构造器中,会解析出协议protocol,以及设置协议对应的URLStreamHandler
,它是各种协议连接的一个抽象工厂,关系如下
java.net.URL 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 public URL (URL context, String spec, URLStreamHandler handler) throws MalformedURLException { String original = spec; int i, limit, c; int start = 0 ; String newProtocol = null ; boolean aRef=false ; boolean isRelative = false ; if (handler != null ) { @SuppressWarnings ("removal" ) SecurityManager sm = System.getSecurityManager(); if (sm != null ) { checkSpecifyHandler(sm); } } try { limit = spec.length(); while ((limit > 0 ) && (spec.charAt(limit - 1 ) <= ' ' )) { limit--; } while ((start < limit) && (spec.charAt(start) <= ' ' )) { start++; } if (spec.regionMatches(true , start, "url:" , 0 , 4 )) { start += 4 ; } if (start < spec.length() && spec.charAt(start) == '#' ) { aRef=true ; } for (i = start ; !aRef && (i < limit) && ((c = spec.charAt(i)) != '/' ) ; i++) { if (c == ':' ) { String s = toLowerCase(spec.substring(start, i)); if (isValidProtocol(s)) { newProtocol = s; start = i + 1 ; } break ; } } protocol = newProtocol; if ((context != null ) && ((newProtocol == null ) || newProtocol.equalsIgnoreCase(context.protocol))) { if (handler == null ) { handler = context.handler; } if (context.path != null && context.path.startsWith("/" )) newProtocol = null ; if (newProtocol == null ) { protocol = context.protocol; authority = context.authority; userInfo = context.userInfo; host = context.host; port = context.port; file = context.file; path = context.path; isRelative = true ; } } if (protocol == null ) { throw new MalformedURLException("no protocol: " +original); } if (handler == null && (handler = getURLStreamHandler(protocol)) == null ) { throw new MalformedURLException("unknown protocol: " +protocol); } this .handler = handler; i = spec.indexOf('#' , start); if (i >= 0 ) { ref = spec.substring(i + 1 , limit); limit = i; } if (isRelative && start == limit) { query = context.query; if (ref == null ) { ref = context.ref; } } handler.parseURL(this , spec, start, limit); } catch (MalformedURLException e) { throw e; } catch (Exception e) { MalformedURLException exception = new MalformedURLException(e.getMessage()); exception.initCause(e); throw exception; } }
然后就是使用设置的URLStreamHandler
来创建对应的URLConnection
了,必须调用connect()
才会进行连接
java.net.URL 1 2 3 public URLConnection openConnection () throws java.io.IOException { return handler.openConnection(this ); }
不过就如注释所说,这里只是创建了一个连接实例,并没有真正进行连接,必须主动调用connect()
才会进行连接
Returns a URLConnection instance that represents a connection to the remote object referred to by the URL. A new instance of URLConnection is created every time when invoking the URLStreamHandler.openConnection(URL) method of the protocol handler for this URL. It should be noted that a URLConnection instance does not establish the actual network connection on creation. This will happen only when calling URLConnection.connect(). If for the URL’s protocol (such as HTTP or JAR), there exists a public, specialized URLConnection subclass belonging to one of the following packages or one of their subpackages: java.lang, java.io, java.util, java.net, the connection returned will be of that subclass. For example, for HTTP an HttpURLConnection will be returned, and for JAR a JarURLConnection will be returned.
下面看下具体的连接方法,这里它先将状态置为连接中connecting
sun.net.www.protocol.http.HttpURLConnection 1 2 3 4 5 6 7 8 9 public void connect () throws IOException { lock(); try { connecting = true ; } finally { unlock(); } plainConnect(); }
然后再进行连接,当然连接之前会检查下是否已经connected
sun.net.www.protocol.http.HttpURLConnection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 protected void plainConnect () throws IOException { lock(); try { if (connected) { return ; } } finally { unlock(); } SocketPermission p = URLtoSocketPermission(this .url); if (p != null ) { try { AccessController.doPrivilegedWithCombiner( new PrivilegedExceptionAction<>() { public Void run () throws IOException { plainConnect0(); return null ; } }, null , p ); } catch (PrivilegedActionException e) { throw (IOException) e.getException(); } } else { plainConnect0(); } }
plainConnect0
中会新建HttpClient实例,并将真正的建立连接工作委托给它,到这里基本就交给sun提供的网络工具包了
sun.net.www.http.HttpClient 1 2 3 4 5 6 7 8 9 10 11 12 13 protected HttpClient (URL url, Proxy p, int to) throws IOException { proxy = (p == null ) ? Proxy.NO_PROXY : p; this .host = url.getHost(); this .url = url; port = url.getPort(); if (port == -1 ) { port = getDefaultPort(); } setConnectTimeout(to); capture = HttpCapture.getCapture(url); openServer(); }
最终的网络连接肯定还是要创建客户端Socket实例,然后再进行连接,这里就可以看到connectTimeout
和readTimeout
的最终设置了
sun.net.NetworkClient 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 protected Socket serverSocket = null ;public void openServer (String server, int port) throws IOException, UnknownHostException { if (serverSocket != null ) closeServer(); serverSocket = doConnect (server, port); try { serverOutput = new PrintStream(new BufferedOutputStream(serverSocket.getOutputStream()), true , encoding); } catch (UnsupportedEncodingException e) { throw new InternalError(encoding +"encoding not found" , e); } serverInput = new BufferedInputStream(serverSocket.getInputStream()); } protected Socket doConnect (String server, int port) throws IOException, UnknownHostException { Socket s; if (proxy != null ) { if (proxy.type() == Proxy.Type.SOCKS) { s = AccessController.doPrivileged( new PrivilegedAction<>() { public Socket run () { return new Socket(proxy); } }); } else if (proxy.type() == Proxy.Type.DIRECT) { s = createSocket(); } else { s = new Socket(Proxy.NO_PROXY); } } else { s = createSocket(); } if (connectTimeout >= 0 ) { s.connect(new InetSocketAddress(server, port), connectTimeout); } else { if (defaultConnectTimeout > 0 ) { s.connect(new InetSocketAddress(server, port), defaultConnectTimeout); } else { s.connect(new InetSocketAddress(server, port)); } } if (readTimeout >= 0 ) s.setSoTimeout(readTimeout); else if (defaultSoTimeout > 0 ) { s.setSoTimeout(defaultSoTimeout); } return s; }
代理构建 FeignBuilder 如类名所表达的意思,FeignBuilder
负责Feign
实例的构建工作,可以将各种依赖的组件交给FeignBuilder
,然后由它来负责组装
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/ FeignBuilder 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <T> T target (Class<T> apiType, String url, String name, ApplicationContext applicationContext, StringValueResolver valueResolver, org.slf4j.Logger logger) { return target(new FeignTarget<>(apiType, name, url, applicationContext, valueResolver), logger); } public <T> T target (Target<T> target, org.slf4j.Logger logger) { return build(logger).newInstance(target); } public Feign build (org.slf4j.Logger logger) { FeignMethodHandlerFactory methodHandlerFactory = new FeignMethodHandlerFactory(client, retryer, requestInterceptors); FeignParseHandlersByName parseHandlersByName = new FeignParseHandlersByName(contract, options, encoder, decoder, methodHandlerFactory, logger); return new FeignImplement(parseHandlersByName, invocationHandlerFactory); }
FeignClient 在构建时会传入当前接口类型Class<T> apiType
,那么对于一些连接需要的配置参数我们就可以通过注解的形式来指定,比如地址url,超时等信息
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/annotation/ FeignClient.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Target ({ElementType.TYPE})@Retention (RetentionPolicy.RUNTIME)@Documented public @interface FeignClient { @AliasFor (attribute = "url" ) String value () default "" ; @AliasFor (attribute = "value" ) String url () default "" ; String name () default "" ; Class<?> encoder() default EJacksonEncoder.class ; Class<?> decoder() default EJacksonDecoder.class ; int connectTimeoutMillis () default 60000 ; int readTimeoutMillis () default 600000 ; }
然后只要在交给FeignBuilder
之前,将注解信息读取解析出来就行,具体的builder(feign)
过程就不展开了
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/ FeignManager.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static <T> T getCache (Class<T> clazz, String url) { FeignClient feign = AnnotationUtils.getAnnotation(clazz, FeignClient.class ) ; Assert.notNull(feign, clazz + " is not a FeignClient" ); String key = clazz.getName() + url; T exist = (T) localFeigns.get(key); if (exist != null ) { return exist; } Logger logger = LoggerFactory.getLogger(feign.logger()); T created = builder(feign).target(clazz, url, null , applicationContext, valueResolver, logger); T previous = (T) localFeigns.put(key, created); if (previous != null ) { return previous; } else { return created; } }
FeignScan 上面提供了一个静态工厂方法,但是很多时候我们都是基于Spring进行应用开发,所以希望能在容器初始化时进行加载
于是继承spring的注解@Import
,指定扫描的packages
,然后将其标记到@Configuration
的类型上
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/annotation/ FeignScan.java 1 2 3 4 5 6 7 8 @Target ({ElementType.TYPE})@Retention (RetentionPolicy.RUNTIME)@Documented @Import (FeignBeanDefinitionRegistrar.class ) public @interface FeignScan { String[] basePackages() default {}; }
并在ImportBeanDefinitionRegistrar
中扫描所有添加了@FeignClient
的类,并创建BeanDefinition
注册到Spring容器中
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/ FeignBeanDefinitionRegistrar.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class FeignBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions (AnnotationMetadata meta, @NonNull BeanDefinitionRegistry registry) { AnnotationAttributes attrs = AnnotationAttributes.fromMap(meta.getAnnotationAttributes(FeignScan.class .getName ())) ; if (attrs == null ){ return ; } String[] basePackages = (String[])attrs.get("basePackages" ); for (String pack : basePackages){ Reflections reflections = new Reflections(pack); for (Class<?> clazz : reflections.getTypesAnnotatedWith(FeignClient.class )) { FeignClient feign = AnnotationUtils.getAnnotation(clazz, FeignClient.class ) ; if (feign == null ){ continue ; } BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz); GenericBeanDefinition beanDefinition = (GenericBeanDefinition)builder.getBeanDefinition(); beanDefinition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); beanDefinition.getPropertyValues().add("feignClass" , clazz); beanDefinition.setBeanClass(FeignFactory.class ) ; registry.registerBeanDefinition(clazz.getSimpleName(), beanDefinition); } } } }
这里实际注册的是FactoryBean
,spring提供的一个Bean工厂接口,最终在这里进行代理实例的创建
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/ FeignFactory.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class FeignFactory <T > implements FactoryBean <T >, EmbeddedValueResolverAware , ApplicationContextAware { private Class<T> feignClass; @Override public T getObject () { FeignClient feign = AnnotationUtils.getAnnotation(feignClass, FeignClient.class ) ; assert feign != null ; org.slf4j.Logger logger = LoggerFactory.getLogger(feign.logger()); FeignBuilder builder = FeignManager.builder(feign); return builder.target(feignClass, feign.url(), feign.name(), applicationContext, valueResolver, logger); } }
调用链RemoteChain 当服务之间调来调去时,我们希望能跟踪调用的过程,并打印出来,期望达到下面这样的效果
1 2 3 4 5 6 7 8 格式:(* 表示是否异步调用)[成功次数/总次数 Http状态码|业务状态码 总记耗时 服务名称] 请求地址 ├─ [1/1 200|200 13ms serviceA] http://{ip}:{port}/{context-path}/api/v1/xxx │ └─ [1/1 200|200 8ms serviceB] http://{ip}:{port}/{context-path}/api/v1/xxx │ └─ [1/1 200|200 4ms serviceC] http://{ip}:{port}/{context-path}/api/v1/xxx ├─ [2/2 200|200 6ms serviceD] http://{ip}:{port}/{context-path}/api/v1/xxx └─ [1/1 200|200 22ms serviceE] http://{ip}:{port}/{context-path}/api/v1/xxx └─ *[2/2 200|? 18ms serviceF] http://{ip}:{port}/{context-path}/api/v1/xxx
于是定义RemoteChain
以及如下属性,为了让消息按照自己的意愿来进行Json序列化,可以先屏蔽掉了所有字段,然后再重写getDetail
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/ RemoteChain.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class RemoteChain { @JsonIgnore @JSONField (serialize = false ) private String name; @JsonIgnore @JSONField (serialize = false ) private String url; @JsonIgnore @JSONField (serialize = false ) private AtomicInteger succs = new AtomicInteger(0 ); @JsonIgnore @JSONField (serialize = false ) private AtomicInteger count = new AtomicInteger(1 ); @JsonIgnore @JSONField (serialize = false ) private AtomicLong cost = new AtomicLong(0 ); @JsonIgnore @JSONField (serialize = false ) private boolean async; @JsonIgnore @JSONField (serialize = false ) private int httpCode; @JsonIgnore @JSONField (serialize = false ) private String code; private String detail; private List<RemoteChain> children; public String getDetail () { if (detail != null ){ return detail; } StringBuilder builder = new StringBuilder(); if (async){ builder.append("*" ); } builder.append("[" ); builder.append(succs).append("/" ).append(count); builder.append(" " ).append(httpCode).append("|" ).append(code); builder.append(" " ).append(cost).append("ms" ); if (name != null ){ builder.append(" " ).append(name); } builder.append("]" ).append(" " ).append(url); return builder.toString(); } }
另外提供一个格式打印方法,类似于遍历文件目录的命令Tree
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/ RemoteChain.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void buildeTree (String prefix, List<RemoteChain> chains, StringBuilder builder) { if (chains != null ) { for (int i = 0 ; i < chains.size(); i++) { RemoteChain chain = chains.get(i); String newPrefix = prefix + (i == chains.size() - 1 ? "└─ " : "├─ " ); builder.append(newPrefix).append(chain.getDetail()); if (chain.getChildren() != null ) { builder.append("\n" ); buildeTree(prefix + (i == chains.size() - 1 ? " " : "│ " ), chain.getChildren(), builder); } if (i < chains.size() - 1 ){ builder.append("\n" ); } } } }
响应结构Response
思路:对于调用链中的每个服务,都将自己的远程调用记录下来,然后在响应中返回给调用方。调用者收到响应后,解析出返回的调用记录进行拼接。
所以要完整的打印出调用链,就需要遵循一些约定,比如响应结构中需要保护调用链数据,所以我们提供了如下响应结构定义,在Response
构造器中,获取当前保存的远程调用记录并组装到响应数据中。另外考虑到方便,提供了一些静态工厂,以及分页场景适配。
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/ Response.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 public class Response <T > { private String requestId; private String msg; private int code; private T data; private List<RemoteChain> chains; public Response () { } private Response (int code, T data, String msg) { this .code = code; this .data = data; this .msg = msg; RemoteChainHolder holder = RemoteChain.CHAIN.get(); RemoteChain.CHAIN.remove(); if (holder != null ){ this .chains = holder.getChains(); } String threadName = Thread.currentThread().getName(); Map<String, RemoteChain> chainMap = RemoteChain.ASYNC_CHAIN.remove(threadName); if (chainMap != null ){ if (this .chains == null ){ this .chains = chainMap.values().stream().toList(); }else { this .chains.addAll(chainMap.values()); } } } private Response (int code, T data) { this (code, data, null ); } public Response (ResponseCode responseCode) { this (responseCode.getCode(), null , responseCode.getDesc()); } public Response (ResponseCode responseCode, T data) { this (responseCode.getCode(), data, responseCode.getDesc()); } @Override public String toString () { return "{requestId=" + requestId + ", code=" + code + ", msg=" + msg + ", data=" + data + "}" ; } public static <V> Response<V> success () { return new Response<>(ResponseCode.OK); } public static <V> Response<V> success (V data) { return new Response<>(ResponseCode.OK, data); } public static <V> Response<V> success (String msg, V data) { Response<V> response = new Response<>(ResponseCode.OK, data); response.msg = msg; return response; } public static <V> Response<V> error () { return new Response<>(ResponseCode.INTERNAL_SERVER_ERROR); } public static <V> Response<V> error (ResponseCode responseCode) { return new Response<>(responseCode); } public static <V> Response<V> error (String msg) { Response<V> response = new Response<>(ResponseCode.INTERNAL_SERVER_ERROR); response.msg = msg; return response; } public static <V> Response<V> error (int code, String msg) { return new Response<>(code, null , msg); } public static <V> Response<V> error (ResponseCode responseCode, String msg) { Response<V> response = new Response<>(responseCode); response.msg = msg; return response; } public static <E> Response<Page<E>> page(List<E> list){ Response<Page<E>> response = new Response<>(ResponseCode.OK); if (list == null ){ list = new ArrayList<>(); } if (list instanceof com.github.pagehelper.Page<E> page){ response.setData(new Page<>(page, page.getTotal())); }else { response.setData(new Page<>(list, list.size())); } return response; } public static class Page <E > { private int total; private Collection<E> list; public Page () { } public Page (Collection<E> list, int total) { this .list = list; this .total = total; } public Page (Collection<E> list, long total) { this .list = list; this .total = (int )total; } public int getTotal () { return total; } public void setTotalRows (int total) { this .total = total; } public Collection<E> getList () { return list; } public void setList (Collection<E> list) { this .list = list; } @Override public String toString () { return "{total=" + total + ", list=" + list + "}" ; } } }
在收到Response的调用数据之后,就是进行拼接了,这里针对Response提供了一个ResponseDecoder
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/ Response.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public Object decode (Response response, Type type, String name, String url, long cost, int httpCode, org.slf4j.Logger logger) throws Exception { if (response.body() == null ) { return null ; } Reader reader = response.body().asReader(); if (!reader.markSupported()) { reader = new BufferedReader(reader, 1 ); } reader.mark(1 ); if (reader.read() == -1 ) { return null ; } reader.reset(); org.springframework.feign.codec.Response<?> resp = mapper.readValue(reader, org.springframework.feign.codec.Response.class ) ; if (ResponseCode.OK.getCode() != resp.getCode()){ RemoteChain.appendChain(false , name, url, cost, httpCode, String.valueOf(resp.getCode()), resp.getChains()); logger.info(">< remote {}|{} {}ms {}" , httpCode, resp.getCode(), cost, url); throw new RemoteException(resp.getCode() + ", " + resp.getMsg()); } if (void .class == type) { RemoteChain.appendChain(true , name, url, cost, httpCode, String.valueOf(resp.getCode()), resp.getChains()); logger.info(">< remote {}|{} {}ms {}" , httpCode, resp.getCode(), cost, url); return null ; } logger.info(">< remote {}|{} {}ms {}" , httpCode, resp.getCode(), cost, url); RemoteChain.appendChain(true , name, url, cost, httpCode, String.valueOf(resp.getCode()), resp.getChains()); String data = mapper.writeValueAsString(resp.getData()); return mapper.readValue(data, mapper.constructType(type)); }
具体的拼接需要考虑一些问题
首先只对servlet线程或者因其异步调用而产生的子线程(约定线程名以-async作为后缀),发起的远程调用进行记录;
记录需要根据远程url(去除参数)进行累计,对于同步调用则直接取最新的一次调用进行比较,对于异步则要根据线程名(去除后缀)和url进行比较;
要处理好记录的删除,具体有两个删除时机: 1.在拼接时,发现servlet线程名称变了(约定每个请求都有一个唯一的requestId,并且会将它记为servlet处理线程的名称),这里是由后一个请求处理时帮助前面的请求进行清除; 2.在Response的构造器中,这里假设了Response的构造是请求处理的最后一个操作,在此之后不应在有远程调用,这里是由当前请求处理时自己清除;
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/ RemoteChain.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public static final ThreadLocal<RemoteChainHolder> CHAIN = new ThreadLocal<>();public static final ConcurrentMap<String, ConcurrentMap<String, RemoteChain>> ASYNC_CHAIN = new ConcurrentHashMap<>();public static void appendChain (boolean success, String name, String url, long cost, int httpCode, String code, List<RemoteChain> next) { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); String threadName = Thread.currentThread().getName(); if (attributes == null && (threadName == null || !threadName.endsWith("-async" ))){ return ; } int index = url.indexOf("?" ); if (index != -1 ){ url = url.substring(0 , index); } if (attributes != null ){ RemoteChainHolder remoteChainHolder = CHAIN.get(); if (remoteChainHolder == null ){ remoteChainHolder = new RemoteChainHolder(threadName); CHAIN.set(remoteChainHolder); }else if (!remoteChainHolder.getHolderName().equals(threadName)){ remoteChainHolder = new RemoteChainHolder(threadName); CHAIN.set(remoteChainHolder); ASYNC_CHAIN.remove(threadName); } ArrayList<RemoteChain> chainList = remoteChainHolder.getChains(); if (chainList == null ){ chainList = new ArrayList<>(); remoteChainHolder.setChains(chainList); } if (!chainList.isEmpty()){ RemoteChain lastChain = chainList.get(chainList.size() - 1 ); if (url.equals(lastChain.getUrl())){ lastChain.increaseCount(); lastChain.sumCost(cost); if (success){ lastChain.increaseSuccs(); }else { lastChain.setHttpCode(httpCode); lastChain.setCode(code); } return ; } } chainList.add(newChain(success, name, url, cost, httpCode, code, next)); }else if (threadName.endsWith("-async" )){ threadName = threadName.substring(0 , threadName.length() - 6 ); Map<String, RemoteChain> chainMap = ASYNC_CHAIN.get(threadName); if (chainMap == null ){ return ; } RemoteChain chain = chainMap.get(url); if (chain == null ){ return ; } if (success){ chain.increaseSuccs(); } chain.sumCost(cost); chain.setHttpCode(httpCode); chain.setCode(code); chain.setChildren(next); } }
对于同步调用只要在响应时进行记录就行,而对于异步调用只能在发起时进行记录(有可能在收到异步调用的结果时,对应的请求处理已经结束并返回了),然后在收到异步响应时,检测下对应的同步请求处理是否还在,如果已经结束了,就直接丢弃。由于异步多线程可能存在多个调用并发的问题,所以这里也需要考虑下线程安全问题。
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/ FeignSynchronousMethodHandler.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); String threadName = Thread.currentThread().getName(); if (attributes == null && threadName != null && threadName.endsWith("-async" )){ threadName = threadName.substring(0 , threadName.length() - 6 ); ConcurrentMap<String, RemoteChain> chainMap = RemoteChain.ASYNC_CHAIN.computeIfAbsent(threadName, (key) -> new ConcurrentHashMap<>()); String realUrl = url; int index = realUrl.indexOf("?" ); if (index != -1 ){ realUrl = realUrl.substring(0 , index); } RemoteChain chain = RemoteChain.newChain(false , name, realUrl, 0 , 0 , "?" , null ); chain.setCount(new AtomicInteger(0 )); chain.setAsync(true ); RemoteChain effectChain = chainMap.computeIfAbsent(realUrl, (key) -> chain); effectChain.increaseCount(); }
参考: