阿里云API网关对外提供双向通信能力,官方提供的SDK的语言有限,有些语言需要用户自己去开发SDK,本文档把SDK的实现细节描述出来,供用户在其他语言上实现SDK时参考。API网关官方提供的Android和Objective-C的SDK,是完整支持双向通信的,开发同学也可以参考Android和iOS的Objective-C结合本文档来实现目标语言的SDK。

关于API网关提供的双向通信的使用流程请参见:双向通信使用指南

1. 通信协议及相关命令字

客户端和API网关之间通过WebSocket通道进行通信,通信类型分两种:调用API和发送命令。在通道上制定了一些简单的通信规则:

1.1 API调用格式转换

客户端调用API,请求和应答是纯Json格式的字符串,也就是把HTTP请求对象按照Json的语法格式化后传输,就是将下面这个对象转化成Json格式:

public class WebSocketApiRequest {
    String method;
    String host;
    String path;
    Map<String , String> querys;
    Map<String, List<String>> headers;
    int isBase64 = 0;
    String body;
}

下面是一个示例,首先展示一个标准的HTTP请求报文:

POST /http2test/test?param1=test HTTP/1.1  
host: apihangzhou444.foundai.com  
x-ca-seq:0 
x-ca-key:12344133
ca_version:1
content-type:application/x-www-form-urlencoded; charset=utf-8
x-ca-timestamp:1525872629832
date:Wed, 09 May 2018 13:30:29 GMT+00:00
x-ca-signature:kYjdIuCnCrxx+EyLMTTx5NiXxqvfTTHBQAe68tv33Tw=
user-agent:ALIYUN-ANDROID-DEMO
x-ca-nonce:c9f15cbf-f4ac-4a6c-b54d-f51abf4b5b44
x-ca-signature-headers:x-ca-seq,x-ca-key,x-ca-timestamp,x-ca-nonce
content-length:33

username=xiaoming&password=123456789  

WebSocket在通信的时候会将这个HTTP请求报文格式化为下面的字符串格式进行传输:

{
	"headers": {
		"accept": ["application/json; charset=utf-8"],
		"host": ["apihangzhou444.foundai.com"],
		"x-ca-seq": ["0"],
		"x-ca-key": ["12344133"],
		"ca_version": ["1"],
		"content-type": ["application/x-www-form-urlencoded; charset=utf-8"],
		"x-ca-timestamp": ["1525872629832"],
		"date": ["Wed, 09 May 2018 13:30:29 GMT+00:00"],
		"x-ca-signature": ["kYjdIuCnCrxx+EyLMTTx5NiXxqvfTTHBQAe68tv33Tw="],
		"user-agent": ["ALIYUN-ANDROID-DEMO"],
		"x-ca-nonce": ["c9f15cbf-f4ac-4a6c-b54d-f51abf4b5b44"],
		"x-ca-signature-headers": ["x-ca-seq,x-ca-key,x-ca-timestamp,x-ca-nonce"]
	},
	"host": "apihangzhou444.foundai.com",
	"isBase64": 0,
	"method": "POST",
	"path": "/http2test/test",
	"querys": {"param1":"test"},
	"body":"username=xiaoming&password=123456789"
}

1.2 双向通信命令

在双向通信过程中,除了正常的API调用,还有一系列定制的命令:

1.2.1 客户端注册

命令字:RG  
含义:在API网关注册长连接,携带DeviceId
命令类型:请求  
发送端:客户端  
格式:RG#DeviceId  
示例:RG#ffd3234343dae324342@12344133

命令字:RO  
含义:DeviceId注册成功时,API网关返回成功,并将连接唯一标识和心跳间隔配置返回
命令类型:应答  
发送端:API网关  
格式:RO#ConnectionCredential#keepAliveInterval  
示例:RO#1534692949977#25000

失败命令字:RF
含义:API网关返回注册失败应答
命令类型:应答
发送端:API网关
格式:RF#ErrorMessage
示例:RF#ServerError

1.2.2 客户端保持心跳

命令字:H1
含义:客户端心跳请求信令
命令类型:请求
发送端:客户端
没有其他参数,直接发送命令字

命令字:HO
含义:API网关返回心跳保持成功应答信令
命令类型:应答
发送端:API网关
格式:HO#ConnectionCredential
示例:HO#ffd3234343dae324342

1.2.3 下行通知

命令字:NF
含义:API网关发送下行通知请求
命令类型:请求
发送端:API网关
格式为NF#MESSAGE
示例:NF#HELLO WORLD!

命令字:NO
含义:客户端返回接收下行通知应答
命令类型:应答
发送端:客户端
没有其他参数,直接发送命令字

1.2.4 其他信令

命令字:OS
含义:客户端请求量达到API网关流控阈值,API网关会给客户端发送这个命令,需要客户端主动断掉连接,主动重连。主动重连将不会影响用户体验。否则API网关不久后会主动断链长连接。
命令类型:请求
发送端:API网关
没有其他参数,直接发送命令字

命令字:CR
含义:连接达到长连接生命周期,API网关会给客户端发送这个命令,需要客户端主动断掉连接,主动重连。主动重连将不会影响用户体验。否则API网关不久后会主动断链长连接。
命令类型:请求
发送端:API网关
没有其他参数,直接发送命令字

1.2.5 设备唯一标识

DeviceId唯一标记客户端的长连接,格式为UUID@AppKey,下面是建议的JAVA实现。所有DeviceId在APP维度必须是唯一的。也就是同一个APP,不允许出现重复的DeviceId,否则在注册的时候会报错。

String deviceId = UUID.randomUUID().toString().replace("-" , "")+ “@” + appKey;

下面是一个示例:ffd3234343dae324342@12344133。

2. 通信流程

下图是SDK和API网关之间交互流程图:

从上图中我们可以看到,SDK主要需要完成一下工作:

  • WebSocket连接建立(包括协议协商)
  • 发送DeviceId注册请求(RG)
  • 调用注册API
  • 启动保持心跳线程,定期保持心跳(H1)
  • 调用其他API
  • 接收API网关发送的通知
  • 调用注销API请求

除了正常流程,SDK还需要处理以下异常流程:

  • 断线重连
  • 触及流控

2.1 WebSocket连接建立(包括协议协商)

API网关的双向通信是基于WebSocket实现的,客户端和API网关之间的连接建立过程是标准的WebSocket连接建立流程。WebSocket连接建立过程请参考WebSocket协议定义

一般的语言都会有WebSocket协议客户端开源实现,比如Android用的就是OkHttp的WebSocket开源实现。

2.2 发送DeviceId注册请求(RG)

客户端在WebSocket连接建立之后,需要马上向API网关发送一条注册请求,请求中需要携带本机DeviceId;

API网关在收到注册信令后,会在接入层注册一条通信长连接,并且使用DeviceId标识这条长连接。API网关会给客户端返回注册成功的应答。

具体信令请参见1.2.1中的信令描述。

2.3 调用注册API

客户端在发送完注册信令后,需要马上发送一个注册API的请求。注册API需要在API网关提前定义好,具体的定义方法请参见《双向通信使用指南》文档的第二章。

API请求的发送格式请参见本文的1.1节。在发送注册API请求的时候,需要注意以下两点:

  1. 在API网关对RG信令正确应答后再发送;
  2. 需要增加一个标识注册API的头:x-ca-websocket_api_type:REGISTER

2.4 启动保持心跳线程,定期保持心跳(H1)

客户端需要每25秒给API网关发送一个心跳请求,API网关在接收到心跳请求后会更新用户的在线时间并且给出一个心跳处理成功的应答。

具体信令请参见1.2.2中的信令描述。

2.5 调用其他API

客户端在Websocket长连接建立好之后,随时可以给API网关发送普通的API请求,发送格式请参见本文的1.1节。

2.6 接收API网关发送的通知

客户端在注册API发送成功,并且收到API网关的200应答后,就可以正式接收API网关发送过来的下行通知了,下行通知的格式非常简单,请参见本文的1.2.3。

2.7 调用注销API请求

客户端在用户退出登录之前,需要发送一个注销API的请求。注册API需要在API网关提前定义好,具体的定义方法请参见《双向通信使用指南》文档的第二章。

API请求的发送格式请参见本文的1.1节。在发送注册API请求的时候,需要注意一点: 需要增加一个标识注册API的头:x-ca-websocket_api_type:UNREGISTER。

2.8 处理API网关发送过来的断线重连信令

每条长连接都会存在一个生命周期,API网关的长连接生命周期在处理完2000个API请求后结束。API网关会在处理完1500个请求的时候,给客户端发送一条要求客户端主动断线重连的信令(CR),具体信令格式参见本文1.2.4。API网关会在长连接的请求数积累到2000个请求的时候,删除这条长连接。

客户端在收到断线重连这两种信令后,需要暂时停止接收上层API发送请求,并且在发送完已经接收到的API请求后,重新连接API网关,重新发送注册信令和注册API,完成断线重连的过程。建议在第一次发送注册API的时候,将注册API的请求对象缓存在本地,每次断线重连的时候直接使用即可。

2.9 处理API网关发送过来的流控限制信令

API网关有流控限制,客户端的请求QPS如果超过流控限制会触发API网关的流控,客户端会收到API网关发送的触发流控阈值的信令(OS),具体信令格式请参见本文1.2.4节。这个时候客户端应该控制报文的发送速度。如果客户端不理会此信令,并且让QPS持续增长,API网关会删除这条长连接。

3. 总结

3.1 流程总结

画一张图总结一下支持API网关双向通信的SDK需要做的事情:

注:普通API的请求应答报文格式和注册API的请求应答报文格式是一样的,故没有示例。

3.2 代码参考

目前API网关的Android和Objective-C已经具备双向通信能力,并且严格按照前两节规范实现的。下面我们使用Android的代码来逐步过第二章中提到相关流程的实现。

3.2.1 WebSocket连接建立(包括协议协商)

//使用绑定的域名加8080端口作为连接地址
String websocketUrl = "ws:" + yourdomain + ":8080";

//新建一个client对象
OkHttpClient client = new OkHttpClient.Builder()
                .readTimeout(params.getReadTimeout(), TimeUnit.MILLISECONDS)
                .writeTimeout(params.getWriteTimeout(), TimeUnit.MILLISECONDS)
                .connectTimeout(params.getConnectionTimeout(), TimeUnit.MILLISECONDS)
                .build();
                
//建立WebSocket连接需要的HttpRequest请求
Request connectRequest = new Request.Builder().url(websocketUrl).build();

//连接建立好后的事件监听类
webSocketListener = new WebSocketListener() {
	......
}

//建立长连接
client.newWebSocket(connectRequest, webSocketListener);

3.2.2 发送DeviceId注册请求/心跳请求

	String deviceId = generateDeviceId();
	webSocketListener = new WebSocketListener() {
                @Override
                //连接连接好后,回调本方法
                public void onOpen(WebSocket webSocket, Response response) {
                		//发送注册信令
                    String registerCommand = SdkConstant.CLOUDAPI_COMMAND_REGISTER_REQUEST + "#" + deviceId;
                    webSocketRef.getObj().send(registerCommand);
                }

                @Override
                //收到API网关的数据后,调用本方法
                public void onMessage(WebSocket webSocket, String text) {
                    if(null  == text || "".equalsIgnoreCase(text)) {
                        return;
                    }
                    //注册成功
                    else if(text.length() > 2 && text.startsWith(SdkConstant.CLOUDAPI_COMMAND_REGISTER_SUCCESS_RESPONSE)){
                        //解析API网关注册成功应答
                        String responseObject[] = text.split("#");
                        connectionCredential = responseObject[1];
                        //从API网关应答中获取心跳时间间隔
                        heartBeatInterval = Integer.parseInt(responseObject[2]);

                        }
							 
						//启动心跳线程,发送心跳信令
                        if (null != heartBeatManager) {
                            heartBeatManager.stop();
                        }
                        heartBeatManager = new HeartBeatManager(instance, heartBeatInterval);
                        heartbeatThread = new Thread(heartBeatManager);
                        heartbeatThread.start();
                        return;
                    }
                    //注册失败
                    else if(text.length() > 2 && text.startsWith(SdkConstant.CLOUDAPI_COMMAND_REGISTER_FAIL_REQUEST)){
                        
                        String responseObject[] = text.split("#");
                        errorMessage.setObj(responseObject[1]);
						//停止发送心跳
                        if (null != heartBeatManager) {
                            heartBeatManager.stop();
                        }
                        return;
                }
            };
        }

    private String generateDeviceId(){
        return UUID.randomUUID().toString().replace("-" , "").substring(0 , 8);
    }			

3.2.3 发送API请求:注册API、普通API、注销API

protected void sendAsyncRequest(final ApiRequest apiRequest , final ApiCallback apiCallback){
        checkIsInit();
		//判断连接是否建立
        synchronized (connectionLock) {
            if (null != connectLatch.getObj() && connectLatch.getObj().getCount() == 1) {
                try {
                    connectLatch.getObj().await(10, TimeUnit.SECONDS);
                } catch (InterruptedException ex) {
                    throw new SdkException("WebSocket connect server failed ", ex);
                } finally {
                    connectLatch.setObj(null);
                }
            }

            if (status == WebSocketConnectStatus.LOST_CONNECTION) {
                apiCallback.onFailure(apiRequest, new SdkException("WebSocket conection lost , connecting"));
                return;
            }

			//针对注册、注销类API,做特殊处理
            if (WebSocketApiType.COMMON != apiRequest.getWebSocketApiType()) {
                if(!preSendWebsocketCommandApi(apiRequest , apiCallback)) {
                    return;
                }
            }

            Integer seqNumber = seq.getAndIncrement();
            apiRequest.addHeader(SdkConstant.CLOUDAPI_X_CA_SEQ, seqNumber.toString());
            callbackManager.add(seqNumber, new ApiContext(apiCallback, apiRequest));
            
            //生成需要发送的字符串
            String request = buildRequest(apiRequest);
            webSocketRef.getObj().send(request);
        }

    }
    
    private boolean preSendWebsocketCommandApi(final ApiRequest apiRequest , final ApiCallback apiCallback){
        //注册类API,需要判断注册信令是否完成
        if(WebSocketApiType.REGISTER == apiRequest.getWebSocketApiType()) {
            try {
                if (null != registerLatch.getObj() && !registerLatch.getObj().await(10, TimeUnit.SECONDS)) {
                    Thread.sleep(5000);
                    close();
                    apiCallback.onFailure(apiRequest, new SdkException("WebSocket conection lost , connecting"));
                    return false;
                }
            } catch (InterruptedException ex) {
                throw new SdkException("WebSocket register failed ", ex);
            } finally {
                registerLatch.setObj(null);
            }

            if (!registerCommandSuccess.getObj()) {
                apiCallback.onFailure(null, new SdkException("Register Comand return error :" + errorMessage.getObj()));
                return false;
            }

			//记录注册API,用于断线重连
            lastRegisterReqeust = apiRequest.duplicate();
            lastRegisterCallback = apiCallback;

        }
		//增加注册、注销API的标识头
        apiRequest.addHeader(SdkConstant.CLOUDAPI_X_CA_WEBSOCKET_API_TYPE, apiRequest.getWebSocketApiType().toString());


        return true;
    }
    
    
     private String buildRequest(ApiRequest apiRequest){
        apiRequest.setHost(host);
        apiRequest.setScheme(scheme);
        ApiRequestMaker.make(apiRequest , appKey , appSecret);


        WebSocketApiRequest webSocketApiRequest = new WebSocketApiRequest();
        webSocketApiRequest.setHost(host);
        webSocketApiRequest.setPath(apiRequest.getPath());
        webSocketApiRequest.setMethod(apiRequest.getMethod().getValue());
        webSocketApiRequest.setQuerys(apiRequest.getQuerys());
        webSocketApiRequest.setHeaders(apiRequest.getHeaders());
        webSocketApiRequest.setIsBase64(apiRequest.isBase64BodyViaWebsocket() == true ? 1 : 0);
        MediaType bodyType = MediaType.parse(apiRequest.getFirstHeaderValue(HttpConstant.CLOUDAPI_HTTP_HEADER_CONTENT_TYPE));

        if(null != apiRequest.getFormParams() && apiRequest.getFormParams().size() > 0){
            webSocketApiRequest.setBody(HttpCommonUtil.buildParamString(apiRequest.getFormParams()));
        }else if(null != apiRequest.getBody()){
            webSocketApiRequest.setBody(new String(apiRequest.getBody() , bodyType.charset(SdkConstant.CLOUDAPI_ENCODING)));
        }

        if(apiRequest.isBase64BodyViaWebsocket()){
            webSocketApiRequest.setBody(new String(Base64.encode(apiRequest.getBody() , Base64.DEFAULT) , bodyType.charset(SdkConstant.CLOUDAPI_ENCODING)));
        }

        return JSON.toJSONString(webSocketApiRequest);
}		

3.2.4 接收API网关发送的通知

	ApiWebSocketListner apiWebSocketListner = params.getApiWebSocketListner();
	webSocketListener = new WebSocketListener() {
		......

                @Override
                //收到API网关的数据后,调用本方法
                public void onMessage(WebSocket webSocket, String text) {
                     String message  = text.substring(3);
                     apiWebSocketListner.onNotify(message);
                     if(status == WebSocketConnectStatus.CONNECTED && webSocketRef.getObj() != null){
                            webSocketRef.getObj().send(SdkConstant.CLOUDAPI_COMMAND_NOTIFY_RESPONSE);
                     }
                     return ;
             	}

     ......
	}

具体代码请下载Android SDK后,在src文件夹中可以找到。