Http调用工具spring-feign 设计文档

words: 7.1k    views:    time: 36min

Feign是Netflix发布的一款轻量级Rest客户端框架,提供了一种声明式的服务调用组件,通过Feign只需声明一个接口并通过注解进行简单的配置即可实现对Http接口的绑定。然后就可以像调用本地方法一样来调用远程服务,而完全感觉不到是在进行远程调用。

Feign支持多种注解,比如Feign自带的注解以及JAX-RS注解,但遗憾的是Feign本身并不支持Spring mvc注解,这无疑会给广大Spring用户带来不便。于是Spring Cloud又对Feign进行了封装,使其方便集成到Spring项目中,并添加了对Spring mvc注解的支持,以及简化与负载均衡、服务注册发现、断路器等组件的集成,所以集成之后又叫Spring Cloud Feign

https://github.com/shanhm1991/spring-feign

Feign的历史

  • 2013年6月,Netflix Feign发布了第一个版本:1.0.0
  • 2015年3月,spring-cloud-starter-feign发布版本:1.0.0.RELEASE
  • 2016年7月,Netflix Feign发布最后一个版本:8.18.0
  • 2016年,     Netflix将Feign捐献给社区,并改名为OpenFeign
  • 2016年7月,OpenFeign发布首个版本:9.0.0,之后由社区维护一直持续发布
  • 2016年9月,spring-cloud-starter-feign将依赖从Netflix Feign改为OpenFeign,并发布了1.2.0.RELEASE
  • 2017年11月,Spring Cloud团队将两个”feign starter”进行合并,发布了spring-cloud-starter-openfeign的首个版本:1.4.0.RELEASE,以及spring-cloud-starter-feign的1.4.0.RELEASE版本
  • 2019年5月,spring-cloud-starter-feign发布了最后一个换皮版本:1.4.7.RELEASE,至此,过渡期结束

Feign的实现

Feign的实现是典型的动态代理模式,关于动态代理可以参考之前的笔记:设计模式-代理模式

这里我们希望能按照自己的意愿来实现一个Feign进行远程服务调用,希望在一些独立的老项目场景中,也能方便使用,并可以自由修改。目标是能代替HttpClient调用就行,具体选择在Netflix Feign 8.18.0版本上进行改造,但是没有直接修改源码,而是将其作为依赖进行改造,修改了一些实现类,所以下面的类图中有些类名会不一样,不过结构基本是一致的。

代理调用

FeignImplement

从类图中也能看出,最终是由FeignImplement创建的代理实例,而其他的组件类基本都是围绕这一目的而服务,做一些准备工作的

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/FeignImplement.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FeignImplement extends Feign {
private final FeignParseHandlersByName targetToHandlersByName;
private final InvocationHandlerFactory factory;

// ...

@SuppressWarnings("unchecked")
@Override
public <T> T newInstance(Target<T> target) {
Map<String, InvocationHandlerFactory.MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, InvocationHandlerFactory.MethodHandler> methodToHandler = new LinkedHashMap<>();
List<FeignMethodHandler> defaultMethodHandlers = new LinkedList<>();

for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) { // 接口默认方式的代理
FeignMethodHandler handler = new FeignMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}

InvocationHandler handler = factory.create(target, methodToHandler); // 创建FeignInvocationHandler
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);

for(FeignMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
}
FeignInvocationHandler

FeignInvocationHandler中定义了代理行为,然后将具体的操作又委托给了MethodHandler,对于Object的几个方法则直接调用Target,它持有被代理的类型信息,以及url地址等信息

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/FeignInvocationHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class FeignInvocationHandler implements InvocationHandler {

private final Target target;

private final Map<Method, InvocationHandlerFactory.MethodHandler> dispatch;

// ...

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
return dispatch.get(method).invoke(args);
}

// ...
}
FeignSynchronousMethodHandler

到了MethodHandler这里就是进行真正的Http调用了

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/FeignSynchronousMethodHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class FeignSynchronousMethodHandler implements InvocationHandlerFactory.MethodHandler {

private final List<RequestInterceptor> requestInterceptors;

private final FeignTemplateFactory buildTemplateFromArgs;

@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv); // 根据MethodMetadata创建RequestTemplate实例
org.springframework.feign.retryer.Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
}
}
}

Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
String url = request.url();

FeignTarget<?> feignTarget = (FeignTarget<?>)target;
String name = feignTarget.name();

Response response;
long start = System.nanoTime();
try {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
String threadName = Thread.currentThread().getName();
if(attributes == null && threadName != null && threadName.endsWith("-async")){
threadName = threadName.substring(0, threadName.length() - 6);
ConcurrentMap<String, RemoteChain> chainMap =
RemoteChain.ASYNC_CHAIN.computeIfAbsent(threadName, (key) -> new ConcurrentHashMap<>());

String realUrl = url;
int index = realUrl.indexOf("?");
if(index != -1){
realUrl = realUrl.substring(0, index);
}

RemoteChain chain = RemoteChain.newChain(false, name, realUrl, 0, 0, "?", null);
chain.setCount(new AtomicInteger(0)); // 先减1尝试放进去,然后再统一加1
chain.setAsync(true);
RemoteChain effectChain = chainMap.computeIfAbsent(realUrl, (key) -> chain);
effectChain.increaseCount();
}
response = client.execute(request, options);
} catch (IOException e) {
long cost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
RemoteChain.appendChain(false, name, url, cost, -1, "?", null);
throw new RemoteException(format("remote[%s] failed %sms %s ", -1, cost, url), e);
}

long cost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
int status = response.status();
boolean shouldClose = true;
try {
// Feign对于Http响应的描述类,也可以直接定义为接口方法的返回类型
if (Response.class == metadata.returnType()) {
if (response.body() == null) {
return response;
}

if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
return response;
}

// Ensure the response body is disconnected
byte[] bodyData = Util.toByteArray(response.body().asInputStream());
return Response.create(status, response.reason(), response.headers(), bodyData);
}

if (status >= 200 && status < 300) {
return decoder.decode(response, metadata.returnType(), name, url, cost, status, logger);
}

RemoteChain.appendChain(false, name, url, cost, status, "?", null);
throw new RemoteException(format("remote[%s] %sms %s", status, cost, url));
} catch (Exception e) {
RemoteChain.appendChain(false, name, url, cost, -2, "?", null);
throw new RemoteException(format("remote[%s] failed %sms %s ", -2, cost, url), e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}

Request targetRequest(RequestTemplate template) {
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template); // 请求拦截器
}
return target.apply(new RequestTemplate(template)); // 创建Request实例,并设置url
}
}
  • 首先根据MethodMetadata创建RequestTemplate,这里相当于做了一个翻译转换的工作,MethodMetadata是用来描述一个方法的特征,比如参数和返回类型,以及注解信息,而RequestTemplate用来描述一个Http请求,比如请求方法,请求url等

  • 然后是应用RequestInterceptor,也就是定义了一个接口给用户实现,这样在真正调用只有提供了一个修改RequestTemplate的机会,比如你可以在Header中添加Token信息

  • 接着就是创建请求Request实例,并设置url,这个创建操作由FeignTarget来完成。但是需要注意线程安全问题,因为代理实例可能是全局共享并且被多线程调用,所以每次创建传入的都是一个新的RequestTemplate实例,并在构造过程中进行Copy,这样避免了线程安全问题,而且不用施加同步。

这里提供了一个接口NameServiceChooser,以便在使用服务注册的场景中,可以根据service name进行Http调用,只要自己在实现中将service name转化成服务地址就行,另外对于url的设置也提供了Spring环境变量的支持。

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/FeignTarget.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FeignTarget<T> implements Target<T> {

@Override
public Request apply(RequestTemplate request) {
String prasedName = "";
if(StringUtils.hasText(name)){
NameServiceChooser serviceChooser = applicationContext.getBean(NameServiceChooser.class);
prasedName = serviceChooser.choose(name);
}

String prasedUrl = url;
if(StringUtils.hasText(url) && url.contains("${")){
prasedUrl = valueResolver.resolveStringValue(url);
}

String prased = prasedName + prasedUrl;
if (request.url().indexOf("http") != 0) {
request.insert(0, prased);
}
return request.request();
}
}

这里可以看一下Request的toString方法,就知道它是一个标准的Http请求了

feign.Request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(method).append(' ').append(url).append(" HTTP/1.1\n");
for (String field : headers.keySet()) {
for (String value : valuesOrEmpty(headers, field)) {
builder.append(field).append(": ").append(value).append('\n');
}
}
if (body != null) {
builder.append('\n').append(charset != null ? new String(body, charset) : "Binary data");
}
return builder.toString();
}

类似的Response也是一个标准的Http响应

feign.Response
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public String toString() {
StringBuilder builder = new StringBuilder("HTTP/1.1 ").append(status);
if (reason != null) builder.append(' ').append(reason);
builder.append('\n');
for (String field : headers.keySet()) {
for (String value : valuesOrEmpty(headers, field)) {
builder.append(field).append(": ").append(value).append('\n');
}
}
if (body != null) builder.append('\n').append(body);
return builder.toString();
}
Client

最终是将构造好的Request实例,交给Client进行Http调用,它会使用这个Request来构造一个真正的HttpURLConnection进行连接,并发送和接收消息

feign.Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public static class Default implements Client {

// ...

@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection);
}

HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
final HttpURLConnection connection = (HttpURLConnection) new URL(request.url()).openConnection(); // 创建连接实例
if (connection instanceof HttpsURLConnection) {
HttpsURLConnection sslCon = (HttpsURLConnection) connection;
if (sslContextFactory != null) {
sslCon.setSSLSocketFactory(sslContextFactory);
}
if (hostnameVerifier != null) {
sslCon.setHostnameVerifier(hostnameVerifier);
}
}
connection.setConnectTimeout(options.connectTimeoutMillis());
connection.setReadTimeout(options.readTimeoutMillis());
connection.setAllowUserInteraction(false);
connection.setInstanceFollowRedirects(true);
connection.setRequestMethod(request.method());

Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
boolean gzipEncodedRequest = contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
boolean deflateEncodedRequest = contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);
boolean hasAcceptHeader = false;
Integer contentLength = null;
for (String field : request.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : request.headers().get(field)) {
if (field.equals(CONTENT_LENGTH)) {
if (!gzipEncodedRequest && !deflateEncodedRequest) {
contentLength = Integer.valueOf(value);
connection.addRequestProperty(field, value);
}
} else {
connection.addRequestProperty(field, value);
}
}
}
// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
connection.addRequestProperty("Accept", "*/*");
}

if (request.body() != null) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream(); // 进行连接,并读取响应
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
out.write(request.body());
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
}
}
}
return connection;
}
}
HttpURLConnection

看到这里,我们可以再看一下Jdk的原生Api,是如何建立Http连接并进行通信的

1
2
3
final HttpURLConnection connection = (HttpURLConnection) new URL(request.url()).openConnection();

OutputStream out = connection.getOutputStream();
  • 首先在URL的构造器中,会解析出协议protocol,以及设置协议对应的URLStreamHandler,它是各种协议连接的一个抽象工厂,关系如下

java.net.URL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public URL(URL context, String spec, URLStreamHandler handler) throws MalformedURLException{
String original = spec;
int i, limit, c;
int start = 0;
String newProtocol = null;
boolean aRef=false;
boolean isRelative = false;

// Check for permission to specify a handler
if (handler != null) {
@SuppressWarnings("removal")
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
checkSpecifyHandler(sm);
}
}

try {
limit = spec.length();
while ((limit > 0) && (spec.charAt(limit - 1) <= ' ')) {
limit--; //eliminate trailing whitespace
}
while ((start < limit) && (spec.charAt(start) <= ' ')) {
start++; // eliminate leading whitespace
}

if (spec.regionMatches(true, start, "url:", 0, 4)) {
start += 4;
}
if (start < spec.length() && spec.charAt(start) == '#') {
/* we're assuming this is a ref relative to the context URL.
* This means protocols cannot start w/ '#', but we must parse
* ref URL's like: "hello:there" w/ a ':' in them.
*/
aRef=true;
}
for (i = start ; !aRef && (i < limit) && ((c = spec.charAt(i)) != '/') ; i++) {
if (c == ':') {
String s = toLowerCase(spec.substring(start, i));
if (isValidProtocol(s)) {
newProtocol = s;
start = i + 1;
}
break;
}
}

// Only use our context if the protocols match.
protocol = newProtocol;
if ((context != null) && ((newProtocol == null) || newProtocol.equalsIgnoreCase(context.protocol))) {
// inherit the protocol handler from the context
// if not specified to the constructor
if (handler == null) {
handler = context.handler;
}

// If the context is a hierarchical URL scheme and the spec
// contains a matching scheme then maintain backwards
// compatibility and treat it as if the spec didn't contain
// the scheme; see 5.2.3 of RFC2396
if (context.path != null && context.path.startsWith("/"))
newProtocol = null;

if (newProtocol == null) {
protocol = context.protocol;
authority = context.authority;
userInfo = context.userInfo;
host = context.host;
port = context.port;
file = context.file;
path = context.path;
isRelative = true;
}
}

if (protocol == null) {
throw new MalformedURLException("no protocol: "+original);
}

// Get the protocol handler if not specified or the protocol
// of the context could not be used
if (handler == null && (handler = getURLStreamHandler(protocol)) == null) {
throw new MalformedURLException("unknown protocol: "+protocol);
}
this.handler = handler;

i = spec.indexOf('#', start);
if (i >= 0) {
ref = spec.substring(i + 1, limit);
limit = i;
}

/*
* Handle special case inheritance of query and fragment
* implied by RFC2396 section 5.2.2.
*/
if (isRelative && start == limit) {
query = context.query;
if (ref == null) {
ref = context.ref;
}
}
handler.parseURL(this, spec, start, limit);
} catch(MalformedURLException e) {
throw e;
} catch(Exception e) {
MalformedURLException exception = new MalformedURLException(e.getMessage());
exception.initCause(e);
throw exception;
}
}
  • 然后就是使用设置的URLStreamHandler来创建对应的URLConnection了,必须调用connect()才会进行连接
java.net.URL
1
2
3
public URLConnection openConnection() throws java.io.IOException {
return handler.openConnection(this);
}

不过就如注释所说,这里只是创建了一个连接实例,并没有真正进行连接,必须主动调用connect()才会进行连接

Returns a URLConnection instance that represents a connection to the remote object referred to by the URL.
A new instance of URLConnection is created every time when invoking the URLStreamHandler.openConnection(URL) method of the protocol handler for this URL.
It should be noted that a URLConnection instance does not establish the actual network connection on creation. This will happen only when calling URLConnection.connect().
If for the URL’s protocol (such as HTTP or JAR), there exists a public, specialized URLConnection subclass belonging to one of the following packages or one of their subpackages: java.lang, java.io, java.util, java.net, the connection returned will be of that subclass. For example, for HTTP an HttpURLConnection will be returned, and for JAR a JarURLConnection will be returned.

下面看下具体的连接方法,这里它先将状态置为连接中connecting

sun.net.www.protocol.http.HttpURLConnection
1
2
3
4
5
6
7
8
9
public void connect() throws IOException {
lock();
try {
connecting = true;
} finally {
unlock();
}
plainConnect();
}

然后再进行连接,当然连接之前会检查下是否已经connected

sun.net.www.protocol.http.HttpURLConnection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void plainConnect() throws IOException {
lock();
try {
if (connected) {
return;
}
} finally {
unlock();
}
SocketPermission p = URLtoSocketPermission(this.url);
if (p != null) {
try {
AccessController.doPrivilegedWithCombiner(
new PrivilegedExceptionAction<>() {
public Void run() throws IOException {
plainConnect0();
return null;
}
}, null, p
);
} catch (PrivilegedActionException e) {
throw (IOException) e.getException();
}
} else {
// run without additional permission
plainConnect0();
}
}

plainConnect0中会新建HttpClient实例,并将真正的建立连接工作委托给它,到这里基本就交给sun提供的网络工具包了

sun.net.www.http.HttpClient
1
2
3
4
5
6
7
8
9
10
11
12
13
protected HttpClient(URL url, Proxy p, int to) throws IOException {
proxy = (p == null) ? Proxy.NO_PROXY : p;
this.host = url.getHost();
this.url = url;
port = url.getPort();
if (port == -1) {
port = getDefaultPort();
}
setConnectTimeout(to);

capture = HttpCapture.getCapture(url);
openServer();
}

最终的网络连接肯定还是要创建客户端Socket实例,然后再进行连接,这里就可以看到connectTimeoutreadTimeout的最终设置了

sun.net.NetworkClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
protected Socket serverSocket = null;

public void openServer(String server, int port) throws IOException, UnknownHostException {
if (serverSocket != null)
closeServer();
serverSocket = doConnect (server, port);
try {
serverOutput = new PrintStream(new BufferedOutputStream(serverSocket.getOutputStream()), true, encoding);
} catch (UnsupportedEncodingException e) {
throw new InternalError(encoding +"encoding not found", e);
}
serverInput = new BufferedInputStream(serverSocket.getInputStream());
}

protected Socket doConnect (String server, int port) throws IOException, UnknownHostException {
Socket s;
if (proxy != null) {
if (proxy.type() == Proxy.Type.SOCKS) {
s = AccessController.doPrivileged(
new PrivilegedAction<>() {
public Socket run() {
return new Socket(proxy);
}
});
} else if (proxy.type() == Proxy.Type.DIRECT) {
s = createSocket();
} else {
// Still connecting through a proxy
// server & port will be the proxy address and port
s = new Socket(Proxy.NO_PROXY);
}
} else {
s = createSocket();
}

// Instance specific timeouts do have priority, that means
// connectTimeout & readTimeout (-1 means not set)
// Then global default timeouts
// Then no timeout.
if (connectTimeout >= 0) {
s.connect(new InetSocketAddress(server, port), connectTimeout);
} else {
if (defaultConnectTimeout > 0) {
s.connect(new InetSocketAddress(server, port), defaultConnectTimeout);
} else {
s.connect(new InetSocketAddress(server, port));
}
}
if (readTimeout >= 0)
s.setSoTimeout(readTimeout);
else if (defaultSoTimeout > 0) {
s.setSoTimeout(defaultSoTimeout);
}
return s;
}

代理构建

FeignBuilder

如类名所表达的意思,FeignBuilder负责Feign实例的构建工作,可以将各种依赖的组件交给FeignBuilder,然后由它来负责组装

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/FeignBuilder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <T> T target(Class<T> apiType, String url, String name,
ApplicationContext applicationContext, StringValueResolver valueResolver, org.slf4j.Logger logger) {
return target(new FeignTarget<>(apiType, name, url, applicationContext, valueResolver), logger);
}

public <T> T target(Target<T> target, org.slf4j.Logger logger) {
return build(logger).newInstance(target);
}

public Feign build(org.slf4j.Logger logger) {
FeignMethodHandlerFactory methodHandlerFactory =
new FeignMethodHandlerFactory(client, retryer, requestInterceptors);
FeignParseHandlersByName parseHandlersByName =
new FeignParseHandlersByName(contract, options, encoder, decoder, methodHandlerFactory, logger);
return new FeignImplement(parseHandlersByName, invocationHandlerFactory);
}
FeignClient

在构建时会传入当前接口类型Class<T> apiType,那么对于一些连接需要的配置参数我们就可以通过注解的形式来指定,比如地址url,超时等信息

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/annotation/FeignClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FeignClient {

@AliasFor(attribute = "url")
String value() default "";

@AliasFor(attribute = "value")
String url() default "";

String name() default "";

Class<?> encoder() default EJacksonEncoder.class;

Class<?> decoder() default EJacksonDecoder.class;

int connectTimeoutMillis() default 60000;

int readTimeoutMillis() default 600000;

// ...
}

然后只要在交给FeignBuilder之前,将注解信息读取解析出来就行,具体的builder(feign)过程就不展开了

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/FeignManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static <T> T getCache(Class<T> clazz, String url) {
FeignClient feign = AnnotationUtils.getAnnotation(clazz, FeignClient.class);
Assert.notNull(feign, clazz + " is not a FeignClient");

String key = clazz.getName() + url;
T exist = (T) localFeigns.get(key);
if (exist != null) {
return exist;
}

Logger logger = LoggerFactory.getLogger(feign.logger());
T created = builder(feign).target(clazz, url, null, applicationContext, valueResolver, logger);
T previous = (T) localFeigns.put(key, created);
if (previous != null) {
return previous;
} else {
return created;
}
}
FeignScan

上面提供了一个静态工厂方法,但是很多时候我们都是基于Spring进行应用开发,所以希望能在容器初始化时进行加载

于是继承spring的注解@Import,指定扫描的packages,然后将其标记到@Configuration的类型上

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/annotation/FeignScan.java
1
2
3
4
5
6
7
8
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(FeignBeanDefinitionRegistrar.class)
public @interface FeignScan {

String[] basePackages() default{};
}

并在ImportBeanDefinitionRegistrar中扫描所有添加了@FeignClient的类,并创建BeanDefinition注册到Spring容器中

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/FeignBeanDefinitionRegistrar.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FeignBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {

@Override
public void registerBeanDefinitions(AnnotationMetadata meta, @NonNull BeanDefinitionRegistry registry) {
AnnotationAttributes attrs = AnnotationAttributes.fromMap(meta.getAnnotationAttributes(FeignScan.class.getName()));
if(attrs == null){
return;
}

String[] basePackages = (String[])attrs.get("basePackages");
for(String pack : basePackages){
Reflections reflections = new Reflections(pack);
for(Class<?> clazz : reflections.getTypesAnnotatedWith(FeignClient.class)){
FeignClient feign = AnnotationUtils.getAnnotation(clazz, FeignClient.class);
if(feign == null){
continue;
}

BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);

GenericBeanDefinition beanDefinition = (GenericBeanDefinition)builder.getBeanDefinition();
beanDefinition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE);
beanDefinition.getPropertyValues().add("feignClass", clazz);
beanDefinition.setBeanClass(FeignFactory.class);

registry.registerBeanDefinition(clazz.getSimpleName(), beanDefinition);
}
}
}
}

这里实际注册的是FactoryBean,spring提供的一个Bean工厂接口,最终在这里进行代理实例的创建

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/FeignFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FeignFactory<T> implements FactoryBean<T>, EmbeddedValueResolverAware, ApplicationContextAware {

private Class<T> feignClass;

// ... ...

@Override
public T getObject() {
FeignClient feign = AnnotationUtils.getAnnotation(feignClass, FeignClient.class);
assert feign != null;
org.slf4j.Logger logger = LoggerFactory.getLogger(feign.logger());

FeignBuilder builder = FeignManager.builder(feign);
return builder.target(feignClass, feign.url(), feign.name(), applicationContext, valueResolver, logger);
}

}

调用链RemoteChain

当服务之间调来调去时,我们希望能跟踪调用的过程,并打印出来,期望达到下面这样的效果

1
2
3
4
5
6
7
8
格式:(* 表示是否异步调用)[成功次数/总次数 Http状态码|业务状态码 总记耗时 服务名称] 请求地址

├─ [1/1 200|200 13ms serviceA] http://{ip}:{port}/{context-path}/api/v1/xxx
│ └─ [1/1 200|200 8ms serviceB] http://{ip}:{port}/{context-path}/api/v1/xxx
│ └─ [1/1 200|200 4ms serviceC] http://{ip}:{port}/{context-path}/api/v1/xxx
├─ [2/2 200|200 6ms serviceD] http://{ip}:{port}/{context-path}/api/v1/xxx
└─ [1/1 200|200 22ms serviceE] http://{ip}:{port}/{context-path}/api/v1/xxx
└─ *[2/2 200|? 18ms serviceF] http://{ip}:{port}/{context-path}/api/v1/xxx
  • 打印调用链

于是定义RemoteChain以及如下属性,为了让消息按照自己的意愿来进行Json序列化,可以先屏蔽掉了所有字段,然后再重写getDetail

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/RemoteChain.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class RemoteChain {

@JsonIgnore
@JSONField(serialize = false)
private String name;

@JsonIgnore
@JSONField(serialize = false)
private String url;

@JsonIgnore
@JSONField(serialize = false)
private AtomicInteger succs = new AtomicInteger(0);

@JsonIgnore
@JSONField(serialize = false)
private AtomicInteger count = new AtomicInteger(1);

@JsonIgnore
@JSONField(serialize = false)
private AtomicLong cost = new AtomicLong(0);

@JsonIgnore
@JSONField(serialize = false)
private boolean async;

@JsonIgnore
@JSONField(serialize = false)
private int httpCode;

@JsonIgnore
@JSONField(serialize = false)
private String code;

private String detail;

private List<RemoteChain> children;

public String getDetail(){
if(detail != null){
return detail;
}
StringBuilder builder = new StringBuilder();
if(async){
builder.append("*");
}
builder.append("[");
builder.append(succs).append("/").append(count);
builder.append(" ").append(httpCode).append("|").append(code);
builder.append(" ").append(cost).append("ms");
if(name != null){
builder.append(" ").append(name);
}
builder.append("]").append(" ").append(url);
return builder.toString();
}
}

另外提供一个格式打印方法,类似于遍历文件目录的命令Tree

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/RemoteChain.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void buildeTree(String prefix, List<RemoteChain> chains, StringBuilder builder) {
if (chains != null) {
for (int i = 0; i < chains.size(); i++) {
RemoteChain chain = chains.get(i);
String newPrefix = prefix + (i == chains.size() - 1 ? "└─ " : "├─ ");
builder.append(newPrefix).append(chain.getDetail());
if (chain.getChildren() != null) {
builder.append("\n");
buildeTree(prefix + (i == chains.size() - 1 ? " " : "│ "), chain.getChildren(), builder);
}
if(i < chains.size() - 1){
builder.append("\n");
}
}
}
}

响应结构Response

  • 记录调用链

思路:对于调用链中的每个服务,都将自己的远程调用记录下来,然后在响应中返回给调用方。调用者收到响应后,解析出返回的调用记录进行拼接。

所以要完整的打印出调用链,就需要遵循一些约定,比如响应结构中需要保护调用链数据,所以我们提供了如下响应结构定义,在Response构造器中,获取当前保存的远程调用记录并组装到响应数据中。另外考虑到方便,提供了一些静态工厂,以及分页场景适配。

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/Response.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
public class Response<T> {

/** 请求Id */
private String requestId;

/** 响应描述 */
private String msg;

/** 响应码 */
private int code;

/** 响应数据 */
private T data;

/** 调用链 **/
private List<RemoteChain> chains;

public Response(){
// 收到响应时会自动进行json反序列化,直接获取chains
}

private Response(int code, T data, String msg){
this.code = code;
this.data = data;
this.msg = msg;

RemoteChainHolder holder = RemoteChain.CHAIN.get(); // 当前请求的同步远程调用记录
RemoteChain.CHAIN.remove();
if(holder != null){
this.chains = holder.getChains();
}

String threadName = Thread.currentThread().getName(); // 请求对应的异步远程调用记录
Map<String, RemoteChain> chainMap = RemoteChain.ASYNC_CHAIN.remove(threadName);
if(chainMap != null){
if(this.chains == null){
this.chains = chainMap.values().stream().toList();
}else{
this.chains.addAll(chainMap.values());
}
}
}

private Response(int code, T data){
this(code, data, null);
}

public Response(ResponseCode responseCode){
this(responseCode.getCode(), null, responseCode.getDesc());
}

public Response(ResponseCode responseCode, T data){
this(responseCode.getCode(), data, responseCode.getDesc());
}

@Override
public String toString() {
return "{requestId=" + requestId + ", code=" + code + ", msg=" + msg + ", data=" + data + "}";
}

public static <V> Response<V> success(){
return new Response<>(ResponseCode.OK);
}

public static <V> Response<V> success(V data){
return new Response<>(ResponseCode.OK, data);
}

public static <V> Response<V> success(String msg, V data){
Response<V> response = new Response<>(ResponseCode.OK, data);
response.msg = msg;
return response;
}

public static <V> Response<V> error(){
return new Response<>(ResponseCode.INTERNAL_SERVER_ERROR);
}

public static <V> Response<V> error(ResponseCode responseCode){
return new Response<>(responseCode);
}

public static <V> Response<V> error(String msg){
Response<V> response = new Response<>(ResponseCode.INTERNAL_SERVER_ERROR);
response.msg = msg;
return response;
}

public static <V> Response<V> error(int code, String msg){
return new Response<>(code, null, msg);
}

public static <V> Response<V> error(ResponseCode responseCode, String msg){
Response<V> response = new Response<>(responseCode);
response.msg = msg;
return response;
}

public static <E> Response<Page<E>> page(List<E> list){
Response<Page<E>> response = new Response<>(ResponseCode.OK);
if(list == null){
list = new ArrayList<>();
}

if(list instanceof com.github.pagehelper.Page<E> page){
response.setData(new Page<>(page, page.getTotal()));
}else {
response.setData(new Page<>(list, list.size()));
}
return response;
}

public static class Page<E> {

/** 总数 */
private int total;

/** 列表数据 */
private Collection<E> list;

public Page(){

}

public Page(Collection<E> list, int total){
this.list = list;
this.total = total;
}

public Page(Collection<E> list, long total){
this.list = list;
this.total = (int)total;
}

public int getTotal() {
return total;
}

public void setTotalRows(int total) {
this.total = total;
}

public Collection<E> getList() {
return list;
}

public void setList(Collection<E> list) {
this.list = list;
}

@Override
public String toString() {
return "{total=" + total + ", list=" + list + "}";
}
}
}

在收到Response的调用数据之后,就是进行拼接了,这里针对Response提供了一个ResponseDecoder

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/Response.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Object decode(Response response, Type type, String name, String url, long cost, int httpCode, org.slf4j.Logger logger) throws Exception {
if (response.body() == null) {
return null;
}

Reader reader = response.body().asReader();
if (!reader.markSupported()) {
reader = new BufferedReader(reader, 1);
}

reader.mark(1); // Read the first byte to see if we have any data
if (reader.read() == -1) {
// Eagerly returning null avoids "No content to map due to end-of-input"
return null;
}
reader.reset();

org.springframework.feign.codec.Response<?> resp =
mapper.readValue(reader, org.springframework.feign.codec.Response.class);
if(ResponseCode.OK.getCode() != resp.getCode()){
RemoteChain.appendChain(false, name, url, cost, httpCode, String.valueOf(resp.getCode()), resp.getChains());
logger.info(">< remote {}|{} {}ms {}", httpCode, resp.getCode(), cost, url);
throw new RemoteException(resp.getCode() + ", " + resp.getMsg());
}

if (void.class == type) {
RemoteChain.appendChain(true, name, url, cost, httpCode, String.valueOf(resp.getCode()), resp.getChains());
logger.info(">< remote {}|{} {}ms {}", httpCode, resp.getCode(), cost, url);
return null;
}

logger.info(">< remote {}|{} {}ms {}", httpCode, resp.getCode(), cost, url);
RemoteChain.appendChain(true, name, url, cost, httpCode, String.valueOf(resp.getCode()), resp.getChains());
String data = mapper.writeValueAsString(resp.getData());
return mapper.readValue(data, mapper.constructType(type));
}

具体的拼接需要考虑一些问题

  • 首先只对servlet线程或者因其异步调用而产生的子线程(约定线程名以-async作为后缀),发起的远程调用进行记录;
  • 记录需要根据远程url(去除参数)进行累计,对于同步调用则直接取最新的一次调用进行比较,对于异步则要根据线程名(去除后缀)和url进行比较;
  • 要处理好记录的删除,具体有两个删除时机:
    1.在拼接时,发现servlet线程名称变了(约定每个请求都有一个唯一的requestId,并且会将它记为servlet处理线程的名称),这里是由后一个请求处理时帮助前面的请求进行清除;
    2.在Response的构造器中,这里假设了Response的构造是请求处理的最后一个操作,在此之后不应在有远程调用,这里是由当前请求处理时自己清除;
https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/codec/RemoteChain.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public static final ThreadLocal<RemoteChainHolder> CHAIN = new ThreadLocal<>();

// <threadName, <url, RemoteChain>>
public static final ConcurrentMap<String, ConcurrentMap<String, RemoteChain>> ASYNC_CHAIN = new ConcurrentHashMap<>();

public static void appendChain(boolean success, String name, String url, long cost, int httpCode, String code, List<RemoteChain> next){
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
String threadName = Thread.currentThread().getName();
if(attributes == null && (threadName == null || !threadName.endsWith("-async"))){
// 如果不是servlet或其子线程,就忽略,'-async'是一个异步线程的后缀约定
return;
}

// 去掉url中的参数
int index = url.indexOf("?");
if(index != -1){
url = url.substring(0, index);
}

if(attributes != null){
// 如果时servlet线程,直接记到当前ThreadLocal中
RemoteChainHolder remoteChainHolder = CHAIN.get();
if(remoteChainHolder == null){
remoteChainHolder = new RemoteChainHolder(threadName);
CHAIN.set(remoteChainHolder);
// 这里不能清除ASYNC_CHAIN,不然第一个同步远程调用肯定会清掉所有已经远程调用过的记录
// 但不删的话,如果远程调用全是异步,那么又会漏删ASYNC_CHAIN,所以在response中也要兜底删除
}else if(!remoteChainHolder.getHolderName().equals(threadName)){
// 说明线程被复用了,前面的请求已经结束了
remoteChainHolder = new RemoteChainHolder(threadName);
CHAIN.set(remoteChainHolder);
ASYNC_CHAIN.remove(threadName);
}

ArrayList<RemoteChain> chainList = remoteChainHolder.getChains();
if(chainList == null){
chainList = new ArrayList<>();
remoteChainHolder.setChains(chainList);
}

if(!chainList.isEmpty()){
RemoteChain lastChain = chainList.get(chainList.size() - 1);
// 去掉参数,如果url一样则累计次数(这里忽略了一种场景:参数不同调用链不一样)
if(url.equals(lastChain.getUrl())){
lastChain.increaseCount();
lastChain.sumCost(cost);
if(success){
lastChain.increaseSuccs();
}else{
// 累计次数时,如果失败则更新code(所以只能看的最后一次失败的code)
lastChain.setHttpCode(httpCode);
lastChain.setCode(code);
}
return;
}
}
chainList.add(newChain(success, name, url, cost, httpCode, code, next));
}else if(threadName.endsWith("-async")){ // 约定对于请求处理的异步线程,以-async作为线程名后缀
// 如果是servlet子线程,则尝试记到全局Map中(如果不存在,则可能是servlet请求处理已经结束而被清除了,所以忽略)
threadName = threadName.substring(0, threadName.length() - 6);
Map<String, RemoteChain> chainMap = ASYNC_CHAIN.get(threadName);
if(chainMap == null){
return;
}

RemoteChain chain = chainMap.get(url);
if(chain == null){
return;
}

// 异步调用在开始时已经记录了,这里直接累计就行
if(success){
chain.increaseSuccs();
}
chain.sumCost(cost);
chain.setHttpCode(httpCode);
chain.setCode(code);
chain.setChildren(next);
}
}

对于同步调用只要在响应时进行记录就行,而对于异步调用只能在发起时进行记录(有可能在收到异步调用的结果时,对应的请求处理已经结束并返回了),然后在收到异步响应时,检测下对应的同步请求处理是否还在,如果已经结束了,就直接丢弃。由于异步多线程可能存在多个调用并发的问题,所以这里也需要考虑下线程安全问题。

https://github.com/shanhm1991/spring-feign/blob/master/src/main/java/org/springframework/feign/invoke/FeignSynchronousMethodHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RequestAttributes attributes =  RequestContextHolder.getRequestAttributes();
String threadName = Thread.currentThread().getName();
if(attributes == null && threadName != null && threadName.endsWith("-async")){
threadName = threadName.substring(0, threadName.length() - 6);
ConcurrentMap<String, RemoteChain> chainMap =
RemoteChain.ASYNC_CHAIN.computeIfAbsent(threadName, (key) -> new ConcurrentHashMap<>());

String realUrl = url;
int index = realUrl.indexOf("?");
if(index != -1){
realUrl = realUrl.substring(0, index);
}

RemoteChain chain = RemoteChain.newChain(false, name, realUrl, 0, 0, "?", null);
chain.setCount(new AtomicInteger(0)); // 先减1尝试放进去,然后再统一加1
chain.setAsync(true);
RemoteChain effectChain = chainMap.computeIfAbsent(realUrl, (key) -> chain);
effectChain.increaseCount();
}


参考: