本文介绍了在程序中通过AnalyticDB MySQL版Client高效写入数据的方法。

背景信息

AnalyticDB MySQL版Client旨在为用户提供一个高效、简单地插入数据到分析型数据库MySQL版的方法。您只需要通过接口将数据提交给AnalyticDB MySQL版Client,便可以直接插入数据到分析型数据库MySQL版中。使用AnalyticDB MySQL版 Client时,您无需关心分区聚合、连接池等问题,而且对数据实时写入也有更多的自主能力,不再强依赖DataHub等服务。

Maven repositories

通过Maven管理配置新的SDK版本,Maven的配置信息如下:

<dependency>
  <groupId>com.alibaba.cloud.analyticdb</groupId>
  <artifactId>adbclient</artifactId>
  <version>1.0.2</version>
</dependency>

接口列表

DatabaseConfig类
接口名 描述
setHost(String adbHost) 设置需要连接的AnalyticDB MySQL版的主机或域名。
setPort(int port) 设置需要连接的AnalyticDB MySQL版端口。
setDatabase(String database) 需要连接的AnalyticDB MySQL版库名。
setUser(String username) 设置连接AnalyticDB MySQL版使用的用户名,请填AccessKey ID。

如何获取AccessKey ID,请参见账号与权限管理

setPassword(String pwd) 设置连接AnalyticDB MySQL版使用的密码,请填AccessKey Secret。
setTable(List<String> table) 需要写入的表名List,建议小写。
setColumns(String tableName, List<String> columnList) 需要插入表的字段名,若是全字段插入,则使用columnList.add("*")即可。table列表中的所有表都需要设置小写的字段名,否则无法通过检查。
setIngnoreInsertError(boolean isIngnoreInsertError) 针对配置的所有表,设置是否忽略插入时遇到的error,默认为false。
setInsertIgnore(boolean insertIgnore) 针对配置的所有表,请根据业务场景判断是否使用insert ignore into语句。
setEmptyAsNull(boolean emptyAsNull) 针对配置的所有表,将empty数据设置为Null,默认为true。
setParallelNumber(int parallelNumber) 针对配置的所有表,设置写入AnalyticDB MySQL版时的并发线程数,默认为4。
setLogger(Logger logger) 设置client中使用的logger对象,此处使用slf4j.Logger
setRetryTimes(int retryTimes) 设置提交时写入AnalyticDB MySQL版出现异常时重试的次数,默认为0。
setRetryIntervalTime(long retryIntervalTime) 设置重试间隔的时间,单位是ms,默认为0。
setCommitSize(long commitSize) 设置自动提交的SQL长度(单位Byte),默认为32 KB,一般不建议设置。
setInsertWithColumnName(boolean insertWithColumnName) 设置在拼接INSERT SQL时,是否需要带字段名,默认为true。例如,insert into tableName (col1,col2) values ('value1','value2');如果为false,语句则类似insert into tableName values ('value1','value2');,此种情况下如果在表中加字段可能会导致写入失败。此设置只有在column列设置为*的情况下才会生效。
Row类
接口名 描述
setColumn(int index, Object value) 设置Row字段列表的值,必须确认字段的顺序。此种方式下,Row实例不可复用,每条数据必须使用单独的Row实例。
setColumnValues(List<Object> values) 直接将List格式数据行写入Row中。
updateColumn(int index, Object value) 更新Row字段列表的值。此方法中,Row实例可以复用,只需更新Row实例中的数据即可。
AdbClient类
接口名 描述
addRow(String tableName, Row row) / addRows(String tableName, List<Row> rows) 插入对应表的Row格式化的数据,即一条记录。数据会按照分区聚合的形式在SDK的内存中缓存,等待提交。如果SQL长度已满,则会在addRow或者addMap的时候做一次自动提交,然后将最新的数据新增进来。如果在自动提交时失败,调用方需要处理异常,并且会在异常中得到失败的数据列表。
addMap(String tableName, Map<String, String> dataMap) / addMaps(String tableName, List<Map<String, String>> dataMaps) 对应于addRow,支持map格式数据的写入。如果SQL长度已满,则会在addRow或者addMap时做一次自动提交,然后将最新的数据新增进来。如果在自动提交时失败,调用方需要处理此异常,并且会在异常中得到失败的数据列表。
addStrictMap(String table, Map<String, String> dataMap) / addStrictMaps(String tableName, List<Map<String, String>> dataMaps) 作用和addMap或addMaps类似,不同之处是传入的Map数据的key必须是表的字段。如果不是,会重新获取一次表结构信息,如果还没有改key的字段,则直接报错。其他功能与addMap或addMaps一致。如果没有动态表结构变更,不建议使用改接口,性能会有所损耗。
commit() 将缓存的数据进行提交,写入AnalyticDB MySQL版中。若提交失败,会把执行错误的语句放在异常中抛出,调用方需要对此异常进行处理。
TableInfo getTableInfo(String tableName) 获取对应表的结构信息。
List<ColumnInfo> getColumnInfo(String tableName) 获取对应表的字段列表信息,字段类是ColumnInfo,可以通过columnInfo.isNullable()获取该字段是否能为null。
Connection getConnection() throws SQLException 从客户端连接池获取mysql Connection连接信息,调用方可以使用获得的Connection做非插入操作,使用方式和JDBC的连接使用方式一致。
注意 使用结束后需要释放掉相应的资源(如ResultSet、Statement、Connection)。
ColumnInfo类
接口名 描述
boolean isNullable() 判断该字段是否能为null。
AdbClientException错误码
错误码名 错误码值 描述
SQL_LENGTH_LIMIT 100 SQL长度超过限制的长度,默认为32KB。
COMMIT_ERROR_DATA_LIST 101 提交中某些数据出现异常,会返回异常的数据,通过e.getErrData()即可获得异常数据List<String>。此错误码在addMap(s)addRow(s)和提交操作的时候都可能会发生,需要单独处理此错误码的异常。
COMMIT_ERROR_OTHER 102 Commit提交中的其他异常。
ADD_DATA_ERROR 103 新增数据过程中出现的异常。
CREATE_CONNECTION_ERROR 104 创建连接出现异常。
CLOSE_CONNECTION_ERROR 105 关闭连接出现异常。
CONFIG_ERROR 106 配置DatabaseConfig出现配置错误。
STOP_ERROR 107 停止实例时的报错。
OTHER 999 默认异常错误码。

示例代码

public class AdbClientUsage {
        public void demo() {
            DatabaseConfig databaseConfig = new DatabaseConfig();
            // 分析型数据库MySQL版的主机名或者URL
            databaseConfig.setHost("100.100.100.100");
            // 分析型数据库MySQL版的连接端口号
            databaseConfig.setPort(8888);
            // 分析型数据库MySQL版的所属账号的AccessKey ID
            databaseConfig.setUser("your db username");
            // 分析型数据库MySQL版的所属账号的AccessKey Secret
            databaseConfig.setPassword("your db password");
            databaseConfig.setDatabase("your db name");
            // 设置需要写入的表名列表
            List<String> tables = new ArrayList<String>();
            tables.add("your table name");
            tables.add("your table name 2");
            // 一旦new Client实例之后,表配置是不可修改的
            databaseConfig.setTable(tables);

            // 设置需要写入的表字段
            List<String> columns = new ArrayList<String>();
            columns.add("column1");
            columns.add("column2");
            // 如果是所有字段,字段列表使用columns.add("*")即可
            databaseConfig.setColumns("your table name", columns);
            databaseConfig.setColumns("your table name 2", Collections.singletonList("*"));

            // 如果出现插入失败,是否跳过
            //忽略插入失败可能导致数据丢失
            databaseConfig.setIgnoreInsertError(false);
              //如果该列的值为空,则直接设置为null即可。
            databaseConfig.setEmptyAsNull(true);
            // 使用insert ignore into方式进行插入
            databaseConfig.setInsertIgnore(true);
            // 提交时,写入分析型数据库MySQL版出现异常时重试的3次
            databaseConfig.setRetryTimes(3);
            // 重试间隔的时间为1s,单位是ms
            databaseConfig.setRetryIntervalTime(1000);
            // 初始化AdbClient,初始化实例之后,databaseConfig的配置信息无法再进行修改
            AdbClient adbClient = new AdbClient(databaseConfig);

            // 数据需要攒批,分多次添加,再提交
            for (int i = 0; i < 10; i++) {
                // Add row(s) to buffer. One instance for one record
                Row row = new Row(columns.size());
                // Set column
                // the column index must be same as the sequence of columns
                // the column value can be any type, internally it will be formatted according to column type
                row.setColumn(0, i); // Number value
                row.setColumn(1, "string value"); // String value
                // 如果SQL长度已满,则会在addRow或者addMap的时进行一次自动提交
                // 如果提交失败,则返回AdbClientException异常,错误码为COMMIT_ERROR_DATA_LIST
                adbClient.addRow("your table name", row);
            }
            Row row = new Row();
            row.setColumn(0, 10); // Number value
            row.setColumn(1, "2018-01-01 08:00:00"); //参数2的数据类型为 Date、Timestamp或者Time value
            adbClient.addRow("your table name 2", row);
            // Update column. Row实例可复用
            row.updateColumn(0, 11);
            row.updateColumn(1, "2018-01-02 08:00:00");
            adbClient.addRow("your table name 2", row);

            // 将map加入缓存中
            Map<String, String> rowMap = new HashMap<String, String>();
            rowMap.put("column1", "124");
            rowMap.put("column2", "string value");
            // 数据需要攒批,最好多次添加之后再进行提交
            adbClient.addMap("your table name", rowMap);

            //将缓存中的数据提交到分析型数据库MySQL版
            //数据成功提交到分析型数据库MySQL版之后,缓存中的数据会被清除
            try {
                adbClient.commit();
            } catch (Exception e) {
            } finally {
                adbClient.stop();
            }
        }
    }

注意事项

  • AnalyticDB MySQL版Client本身依赖druid(1.1.10)mysql-connector-java(5.1.45)commons-lang3(3.4)slf4j-api(1.7.25)slf4j-log4j12(1.7.25),如果在使用过程中,出现版本冲突,请检查这几个包的版本并解决冲突。
  • AnalyticDB MySQL版Client SDK是非线程安全的,如果多线程调用时,需要每个线程维护自己的Client对象。
    说明 强烈不建议多线程共用SDK实例,除了线程安全问题外,还容易使Client成为写入性能的瓶颈。
  • 数据必须在调用commit成功后才算成功写入AnalyticDB MySQL版
  • 针对Client抛出的异常,调用方要根据错误码的意义自行判断如何处理。如果是数据写入有问题,可以重复提交或者记录下有问题的数据后跳过。
  • 很多时候写入线程并不是越多越好,因为业务程序会涉及到攒数据的场景,所以对内存的消耗比较明显,业务调用方一定要多关注应用程序的GC情况。
  • 数据攒批数量不要太小,如果太小,分区聚合写意义就不大了。最好通过消息数*消息长度≈分区数*32KB大概得出攒批的消息数。
  • DatabaseConfig配置在new AdbClient成功之后是无法进行修改,所有配置项必须在Client对象初始化之前完成配置。