本文主要为您介绍多元索引的并发导出数据接口。

接口

在表格存储的多元索引中,我们提供了Search接口,Search接口提供了全功能集,包括所有的查询功能,以及排序、统计聚合等分析能力,其结果会按照指定的顺序返回。

但是在有些场景下,我们需要完整的查询能力,但不关心整个结果集的顺序,只希望能以更快的速度将命中的数据全部返回,例如在对接计算系统Spark、Presto等场景,或者是一些圈选场景中。为了支持这种需求,我们提供了一套新的接口ParallelScan。

ParallelScan接口相对于Search接口,保留了所有的查询功能,但是舍弃了排序、统计聚合等分析功能,带来了5倍以上的性能提升,这样就可以轻松实现1分钟内上亿级别数据行的导出能力(导出能力可以水平扩展,不存在上限)。

在接口的实现中,单次请求的limit限制更宽松。目前Search的limit最大允许100行,但是ParallelScan最大允许是2000行,下一个版本会提高到5000行。同时支持多个线程一起并发请求,因此导出速度会极快。

多并发数据导出功能涉及以下两个接口。
  • ComputeSplits:获取当前ParallelScan单个请求的最大并发数。
  • ParallelScan:执行具体的数据导出功能。

5.6.0 及其以上版本的SDK开始支持ParallelScan功能。

特点

ParallelScan接口相对于Search接口,还有一些典型的特征,具体如下。

  • 结果稳定性

    ParallelScan任务是有状态的。在一个Session的请求中,扫描出的数据结果集是确定的,由发起第一次请求时的数据状态决定。发起第一次请求后又插入了新的数据或者修改了原有的数据不会对结果集造成影响。

  • 新增会话(Session)概念
    在ParallelScan系列接口中,我们新增了Session概念。使用SessionId能够保证获取到的结果集合是稳定的。具体流程如下。
    1. 通过ComputeSplits接口获取最大并发数和当前SessionID。
    2. 通过发起多个并发ParallelScan请求读取数据,请求中需要指定当前的SessionID和当前并发ID。

    ParallelScan支持不携带SessionID发起请求,方便用户在某些场景中不方便获取SessionID的场景。但是不使用SessionID可能会有极小的概率导致获取到的结果集合中有重复数据。

    如果ParallelScan过程中发生网络异常、线程异常、动态Schema修改、AB测试等场景,服务端会返回ErrorCod为 “OTSSessionExpired”,导致ParallelScan不可继续扫描数据,需要重新开始一个新的ParallelScan任务,从最开始重新拉取数据。

  • 最大并发数

    ParallelScan支持的单请求的最大并发数由ComputeSplits返回值确定,用户的数据越多,则支持的并发越大。单请求指同一个查询语句,例如查询city=“杭州”的结果,如果用Search接口,那么Search请求的返回值中会包括所有city=“杭州”的结果,但是如果用ParallelScan接口,且并发数如果是2,那么每个请求返回50%的结果,最后两个并发的结果集加在一起后才是完整的结果集。

  • 每次返回行数

    limit默认2000,最大可以到5000。超过5000后,limit的变化对性能基本无影响。

  • 性能

    ParallelScan单并发扫描数据的性能是Search的5倍,当增大并发时性能可以继续线性提高,例如8并发时仍可以继续提高4倍性能。

  • 成本

    因为ParallelScan请求对资源的消耗更少,在价格上会更加便宜,所以对于大数据量的导出类需求强烈建议使用ParallelScan接口。

  • 返回列

    只支持返回多元索引中已经建立了索引的列,不能返回多元索引中没有的列,可以支持的ReturnType是RETURN_ALL_INDEX或者RETURN_SPECIFIED,不支持RETURN_ALL。

    此外,也不支持返回数组、地理位置和嵌套字段。如果有三种数据的需求,请通过工单或者钉钉群反馈给我们,我们会提高优先级支持。

  • 限制项

    同时存在的ParallelScan任务数量有一定的限制,当前为10个,后期会根据客户需求继续调整。同一个SessionID且ScanQuery相同的多个并发任务视为一个任务。一个ParallelScan任务的生命周期定义为:开始时间是第一次发出ParallelScan请求,结束时间是翻页扫描完所有数据或者请求的token失效。

选择

  • 如果请求关心排序、统计聚合,或者是终端客户的直接查询请求,需要用search接口。
  • 如果请求不关心排序,只关心把所有符合条件的数据尽快返回,或者是计算系统(Spark、Presto等)拉取数据,可以考虑ParallelScan接口。

参数

参数 说明
tableName 主表名
indexName 多元索引名
scanQuery query 多元索引的Query语句。支持精确查询、模糊查询范围查询、地理位置查询、复杂的嵌套查询等,功能和Search接口一致。
limit 扫描数据的时候一次返回多少条数据。
maxParallel 最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大则支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。
currentParallelId 当前并发ID。取值范围为 [0, maxParallel)。
token 用于翻页功能。ParallelScan请求结果中有下一次进行翻页的token,基于该token可以接着上一次的结果继续取数据。
aliveTime ParallelScan的当前任务有效时间,也是token的有效时间。建议使用默认值。如果在有效时间内没有发起下一次请求,则不能继续取数据。持续发起请求将会刷新token有效时间。
columnsToGet ParallelScan功能目前仅可以扫描多元索引中的数据,需要在创建索引时候勾选附加存储(store=true),且以下数据类型不支持进行ParallelScan:Array、Nested、GeoPoint。
sessionId 本次并发扫描数据任务的sessionId。创建session可以通过computeSplits API来创建,同时获得本次任务支持的最大并发数。

Java示例

  • 单并发扫描数据

    相对于多并发,单并发扫描数据的代码更简单,单并发代码无需关心currentParallelId和maxParallel参数。

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
    import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
    import com.alicloud.openservices.tablestore.model.search.query.Query;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    public class Test {
    
        public static List<Row> scanQuery(final SyncClient client) {
            String tableName = "<TableName>";
            String indexName = "<IndexName>";
            //获取sessionId和本次请求支持的最大并发数。
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            byte[] sessionId = computeSplitsResponse.getSessionId();
            int splitsSize = computeSplitsResponse.getSplitsSize();
            /*
             *  创建并发扫数据请求。
             */
            ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
            parallelScanRequest.setTableName(tableName);
            parallelScanRequest.setIndexName(indexName);
            ScanQuery scanQuery = new ScanQuery();
            //该query决定了扫描出的数据的范围,可构建嵌套的复杂的query。
            Query query = new MatchAllQuery();
            scanQuery.setQuery(query);
    
            //设置单次请求返回的行数。
            scanQuery.setLimit(2000);
            parallelScanRequest.setScanQuery(scanQuery);
            ColumnsToGet columnsToGet = new ColumnsToGet();
            columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
            parallelScanRequest.setColumnsToGet(columnsToGet);
            parallelScanRequest.setSessionId(sessionId);
    
            /*
             * builder模式构建ParallelScanRequest, 功能与上方一致。
             */
            ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
                .tableName(tableName)
                .indexName(indexName)
                .scanQuery(ScanQuery.newBuilder()
                    .query(QueryBuilders.matchAll())
                    .limit(2000)
                    .build())
                .addColumnsToGet("col_1", "col_2")
                .sessionId(sessionId)
                .build();
            List<Row> result = new ArrayList<>();
    
            /*
             * 原生的API扫描数据。
             */
            {
                ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
                //下次请求的ScanQuery的token。
                byte[] nextToken = parallelScanResponse.getNextToken();
                //取出数据。
                List<Row> rows = parallelScanResponse.getRows();
                result.addAll(rows);
                while (nextToken != null) {
                    //设置token。
                    parallelScanRequest.getScanQuery().setToken(nextToken);
                    //继续扫描数据。
                    parallelScanResponse = client.parallelScan(parallelScanRequest);
                    //取出数据。
                    rows = parallelScanResponse.getRows();
                    result.addAll(rows);
                    nextToken = parallelScanResponse.getNextToken();
                }
            }
    
            /*
             *  推荐方式。
             *  使用iterator方式扫描所有匹配数据。使用方式上更简单,速度和上述方法一致。
             */
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    result.add(row);
                    //取出具体的值。
                    String col_1 = row.getLatestColumn("col_1").getValue().asString();
                    long col_2 = row.getLatestColumn("col_2").getValue().asLong();
                }
            }
    
            /*
             * 关于失败重试的问题(如果本函数的外部调用者有重试机制的话或者不需要考虑失败重试这种问题,可以忽略这部分内容)。
             * 为了保证可用性,遇到任何异常都推荐进行任务级别的重试,重新开始一个新的parallelScan任务。
             * 异常分为两种:
             * 1、服务端session异常: OTSSessionExpired。
             * 2、调用者客户端网络等异常。
             */
            try {
                //正常处理逻辑。
                {
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        //处理row,内存够大可直接放到list中。
                        result.add(row);
                    }
                }
            } catch (Exception ex) {
                //重试。
                {
                    result.clear();
                    RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        //处理row,内存够大可直接放到list中。
                        result.add(row);
                    }
                }
            }
            return result;
        }
    }
  • 多线程并发扫描数据
    import java.util.ArrayList;
    import java.util.List;
    
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
    import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
    import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
    import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
    import com.alicloud.openservices.tablestore.model.search.ScanQuery;
    import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
    
    public class Test2 {
    
        public static void scanQueryWithMultiThread(final SyncClient client) throws InterruptedException {
            final String tableName = "yourTableName";
            final String indexName = "yourIndexName";
    
            //获取sessionId和本次请求支持的最大并发数。
            ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
            computeSplitsRequest.setTableName(tableName);
            computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
            ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
            final byte[] sessionId = computeSplitsResponse.getSessionId();
            final int maxParallel = computeSplitsResponse.getSplitsSize();
    
            /*
             *  为了一个函数实现多线程功能,这里构建一个内部类继承Thread来使用多线程。
             *  用户可构建一个正常的外部类,这样代码更有条理。
             */
            final class ThreadForScanQuery extends Thread {
                private int currentParallelId;
    
                private ThreadForScanQuery(int currentParallelId) {
                    this.currentParallelId = currentParallelId;
                    this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  //设置线程名字
                }
    
                @Override
                public void run() {
                    try {
                        //正常处理逻辑。
                        {
                            ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                .tableName(tableName)
                                .indexName(indexName)
                                .scanQuery(ScanQuery.newBuilder()
                                    .query(QueryBuilders.range("col_long").lessThan(123)) //这里的query决定了取得什么数据
                                    .limit(2000)
                                    .currentParallelId(currentParallelId)
                                    .maxParallel(maxParallel)
                                    .build())
                                .addColumnsToGet("col_1", "col_2", "col_3")  //只能是多元索引中的字段, 或者使用下一行注释的代码返回所有索引中的数据。
                                //.returnAllColumnsFromIndex(true)
                                .sessionId(sessionId)
                                .build();
                            //使用Iterator形式获取所有数据。
                            RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                            System.out.println("线程名字:" + this.getName());
                            long count = 0;
                            while (ltr.hasNext()) {
                                Row row = ltr.next();
                                //增加自定义的处理逻辑,或者将所有线程的row放在一个统一的list中。
                                count++;
                            }
                            System.out.println("线程名字:" + this.getName() + ", 该线程获取到的数据总数:" + count);
                        }
                    } catch (Exception ex) {
                        //如果有异常,此处需要考虑重试上面的正常处理逻辑。
                    }
                }
            }
    
            //多个线程同时执行, currentParallelId取值范围是[0, maxParallel)。
            List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
            for (int i = 0; i < maxParallel; i++) {
                ThreadForScanQuery thread = new ThreadForScanQuery(i);
                threadList.add(thread);
            }
    
            //同时启动。
            for (ThreadForScanQuery thread : threadList) {
                thread.start();
            }
    
            //主线程阻塞等待所有线程完成任务。
            for (ThreadForScanQuery thread : threadList) {
                thread.join();
            }
            System.out.println("all thread done!");
        }
    }