本篇主要介绍如何通过Cassandra CQL访问Lindorm宽表引擎。

使用cqlsh访问Lindorm

cqlsh是Cassandra社区提供的Shell客户端,提供windows以及类Unix版本,该工具存在于Cassandra安装包内。用户可以在本地或阿里云ECS上下载Cassandra安装包,通过cqlsh工具可以完全兼容的访问Lindorm。

操作前请确认已完成SDK安装且已获取集群的连接地址。

  1. 下载和安装cqlsh。

    下载最新版本的Cassandra安装包然后解压,即可完成安装。例如:下载3.11.4版本(请先在官网上确认版本号),操作如下:

    $ wget http://mirror.bit.edu.cn/apache/cassandra/3.11.4/apache-cassandra-3.11.4-bin.tar.gz
    $ tar -zxf apache-cassandra-3.11.4-bin.tar.gz 
    $ cd apache-cassandra-3.11.4
  2. 启动cqlsh。

    使用Lindorm控制台查看CQL访问方式的地址和账户密码,其中端口默认是9042,然后使用如下命令连接Lindorm:

    bin/cqlsh $host $port -u $username -p $password
    说明 如果您需要经常连接到特定节点,您可以将节点的地址和端口信息保存到环境变量$CQLSH_HOST$CQLSH_PORT中。更多关于 cqlsh 命令支持的参数可以使用 bin/cqlsh -help
  3. 基本cqlsh命令。

    Lindorm现在支持多种cqlsh的访问命令:

    Documented shell commands:===========================
    CAPTURE  COPY  DESCRIBE  LOGIN   DESC  EXIT  HELP PAGING  SHOW
    CQL help topics:================
    CREATE_KEYSPACE        TEXT                ALTER_KEYSPACE           TIME       CREATE_ROLE
    DROP_USER              TIMESTAMP           ALTER_TABLE              CREATE_TABLE
    GRANT                  ALTER_USER          INSERT                   UPDATE   
    CREATE_USER            INSERT_JSON         USE                      ASCII
    DATE                   INT                 UUID                     BATCH
    DELETE                 JSON                BEGIN                    KEYWORDS        
    BLOB                   DROP_COLUMNFAMILY   LIST_PERMISSIONSBOOLEAN  LIST_ROLES      
    COUNTER                DROP_INDEX          LIST_USERS               DROP_KEYSPACE   
    PERMISSIONS            CREATE_COLUMNFAMILY REVOKE                   DROP_ROLE                 
    SELECT                 CREATE_INDEX        DROP_TABLE                SELECT_JSON
  4. 通过 cqlsh 创建 keyspace。

    Lindorm 兼容Cassandra中的keyspace和关系型数据库中的database概念比较类似,一个 keyspace 可以包含一个或多个 tables 或 column families。当您启动 cqlsh 时没有指定 keyspace,那么命令提示符为 cqlsh>,您可以使用 CREATE KEYSPACE 命令来创建 keyspace,具体如下:

    cqlsh> CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

    对于Lindorm用户来说CREATE KEYSPACE 概念中的strategy 、replication_factor和 Cassandra中的概念类似,但是replication_factor 默认都是2,上述建完KEYSPACE 后可以通过DESCRIBE KEYSPACE 来查看keyspace的信息。

    cqlsh> DESCRIBE KEYSPACE  test_keyspace;
    cqlsh> CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

    现在您可以使用 USE 命令来切换到这个 keyspace :

    cqlsh> USE test_keyspace;
    cqlsh:test_keyspace>
  5. 通过 cqlsh 创建表。
    cqlsh> use test_keyspace;
    cqlsh:test_keyspace> CREATE TABLE test_user (first_name text , last_name text, PRIMARY KEY (first_name)) ;

    上述命令表示在 test_keyspace 下面创建了一张名为 test_user 的表。其中包含了 first_name 和 last_name 两个字段,类型都是 text,并且 first_name 是这张表的 PRIMARY KEY。当然,您也可以通过下述命令在 test_keyspace 里面建表。

    cqlsh> CREATE TABLE test_keyspace.test_user(first_name text , last_name text, PRIMARY KEY (first_name)) ;

    查看该表的schema信息:

    cqlsh:test_keyspace> DESCRIBE TABLE test_user;
    CREATE TABLE test_keyspace.test_user (
        first_name text PRIMARY KEY,
        last_name text
    ) WITH bloom_filter_fp_chance = 0.01    
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}    
        AND comment = ''    
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}    
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}    
        AND crc_check_chance = 1.0    
        AND dclocal_read_repair_chance = 0.1    
        AND default_time_to_live = 0    
        AND gc_grace_seconds = 864000    
        AND max_index_interval = 2048    
        AND memtable_flush_period_in_ms = 0    
        AND min_index_interval = 128    
        AND read_repair_chance = 0.0    
        AND speculative_retry = '99PERCENTILE';
    cqlsh:test_keyspace>

    DESCRIBE TABLE 命令会将建表语句以格式化的形式显示出来,除了您制定的设置,还包含了许多默认的设置。Lindorm表级别的配置和Cassandra有部分属性是不一样的,对于不一样的属性使用Cassandra CQL 默认设置展示。如下配置不具有Lindorm使用意义:

    crc_check_chance、gc_grace_seconds、read_repair_chance、speculative_retry、dclocal_read_repair_chance、crc_check_chance

    其他配置长期会和CQL属性完全兼容,现阶段如下属性完全兼容:

    compression:支持LZ4/SNAPPY/ZSTD
    default_time_to_live:支持表级别TTL
  6. 通过 cqlsh 读写数据。

    往表里面插入一些数据:

    cqlsh:test_keyspace> INSERT INTO test_user (first_name, last_name) VALUES ('test', 'LINDORM');
    cqlsh:test_keyspace> INSERT INTO test_user (first_name, last_name) VALUES ('Zhang', 'San');

    上述语句表示往 test_user 表中插入三条数据,其中最后一条数据只指定了 key,last_name 没有值。您可以使用 SELECT COUNT 语句查看数据是否插入成功,但是SELECT COUNT 不建议在海量数据上使用。

    cqlsh:test_keyspace> SELECT COUNT(*) FROM test_user; 
    
      count
    -------     
        2
    
    (1 rows)

    通过命令的输出查看已成功插入数据。您还可以使用下述命令查询这条数据:

    cqlsh:test_keyspace> SELECT * FROM test_user;
    
    first_name | last_name
    ------------+-----------
           test |    LINDORM
          Zhang |       San
    
    (3 rows)
    cqlsh:test_keyspace> SELECT * FROM test_user WHERE first_name='test';
    
    first_name | last_name
    ------------+-----------
           test |    LINDORM
    
    (1 rows)
  7. 删除列或行。

    使用 DELETE 命令删除一些列。例如,删除 last_name 列:

    cqlsh:test_keyspace> DELETE last_name FROM test_user WHERE first_name='test';
    cqlsh:test_keyspace> SELECT * FROM test_user WHERE first_name='test';
    
    first_name | last_name
    ------------+-----------
    (0 rows)

    使用 DELETE 命令删除一整行的数据:

    cqlsh:test_keyspace> DELETE FROM test_user WHERE first_name='test';
    cqlsh:test_keyspace> SELECT * FROM test_user WHERE first_name='test';
    
     first_name | last_name
    ------------+-----------
    
    (0 rows)
    cqlsh:test_keyspace>
  8. 清空或删除表。

    如果您需要清空一张表,您可以使用 TRUNCATE 命令或 DROP TABLE 命令,只有super 用户可以具备truncate 和 drop keyspace/table 权限,例如:

    cqlsh:test_keyspace> TRUNCATE test_user;
    cqlsh:test_keyspace> DROP TABLE test_user;

使用Cassandra CQL Java Driver 访问Lindorm

  1. 创建连接。
     String[] contactPoints = new String[]{
          "ip"//从控制台获取的连接地址
     };
      
     Cluster cluster = Cluster.builder()
          .addContactPoints(contactPoints)      // 填写账户名密码(控制台获取)
          .withAuthProvider(new PlainTextAuthProvider(username, password))
          .build();
     cluster.init();//初始化集群连接,会建立控制
     Session session = cluster.connect();//初始化连接session,不能每个请求创建一个Session。合理的应该是每个进程预先创建若干个。
  2. 使用Driver执行操作。
    • DDL操作
       // 创建keyspace,指定对应strategy, replication factor。
          session.execute(
                      "CREATE KEYSPACE IF NOT EXISTS testKeyspace WITH replication "
                              + "= {'class':'SimpleStrategy', 'replication_factor':1};");
      
       // 创建table,给table指定对应的primary key 以及cluster key 和regular key
          session.execute(
                      "CREATE TABLE IF NOT EXISTS testKeyspace.testTable ("
                              + "id int PRIMARY KEY,"
                              + "name text,"
                              + "age int,"
                              + "address text"
                              + ");");    
      
       //清空表
          session.execute("TRUNCATE TABLE testKeyspace.testTable;");
       //删除表
          session.execute("DROP TABLE testKeyspace.testTable ");
    • DML操作
          // 执行insert 操作
          session.execute(
                      "INSERT INTO testKeyspace.testTable (id, name, age, address) "
                              + "VALUES ("
                              + "1,"
                              + "'testname',"
                              + "11,"
                              + "'hangzhou');");
          // 执行select 操作,这里select * 表示获取所有列,也可以指定需要select 的列名获取对应列数据
          ResultSet res = session.execute(
                      "SELECT * FROM testKeyspace.testTable ;");
      
          // 如果想要获取每一列对应的数据,可以如下操作
          for (Row row : results)
          {
              int id = row.getInt("id");
              String name = row.getString("name");
              int age = row.getInt("age");
              String address = row.getString("address");
          }
      
          // 关闭Session
          session.close();
          // 关闭Cluster
          cluster.close();

使用Cassandra CQL 多语言 Driver 访问Lindorm

此处会介绍多语言Driver(Python、C++)通过Cassandra CQL 访问Lindorm的例子,其他访问方式可以参考社区文档

  • Cassandra CQL Python Driver访问Lindorm

    安装Datastax Python SDK库

    # 指定版本安装(建议安装3.x版本)
    pip install cassandra-driver==3.19.0
    # 安装最新版本
    pip install cassandra-driver
    # https://pypi.org/project/cassandra-driver/#history

    编写Python访问代码

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    import logging
    import sysfrom cassandra.cluster
    import Clusterfrom cassandra.auth 
    import PlainTextAuthProvider
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    cluster = Cluster(
        # 此处填写数据库连接点地址(公网或者内网的)
        contact_points=["ip"],
        # 填写账户名密码(如果忘记可以在 账号管理 处重置
        auth_provider=PlainTextAuthProvider("cassandra", "123456"))
    
    session = cluster.connect()
    # 建keyspace
    session.execute(
                    "CREATE KEYSPACE IF NOT EXISTS testKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};");
    
    # 建table
    session.execute(
                    "CREATE TABLE IF NOT EXISTS testKeyspace.testTable (id int PRIMARY KEY, name text,age int,address text);");
    
    #执行写
    session.execute(
                    "INSERT INTO testKeyspace.testTable (id, name, age, address) VALUES ( 1, 'testname', 11, 'hangzhou');");
    
    #读操作
    rows = session.execute(
                    "SELECT * FROM testKeyspace.testTable ;");
    
    # 打印每行信息到控制台
    for row in rows:
        print("# row: {}".format(row))
    
    # 关闭Session
    session.shutdown()
    
    # 关闭
    Clustercluster.shutdown()
  • Cassandra CQL C++ Driver 访问Lindorm

    通过Datastax c++ 链接获取相关平台下的rpm包。

    编写访问代码

    CassFuture* connect_future = NULL;
    CassCluster* cluster = cass_cluster_new();
    CassSession* session = cass_session_new();
    char* hosts = "ip";//控制台获取ip
    
    //建立相关连接
    cass_cluster_set_contact_points(cluster, hosts);
    connect_future = cass_session_connect(session, cluster);
    
    //DDL操作
    if (cass_future_error_code(connect_future) == CASS_OK) {
        CassFuture* close_future = NULL;
    
        
        const char* query = "SELECT  name FROM testKeyspace.testTable ";
        CassStatement* statement = cass_statement_new(query, 0);
    
        CassFuture* result_future = cass_session_execute(session, statement);
    
        if (cass_future_error_code(result_future) == CASS_OK) {
          //获取相关的结果
          const CassResult* result = cass_future_get_result(result_future);
          const CassRow* row = cass_result_first_row(result);
    
          if (row) {
            const CassValue* value = cass_row_get_column_by_name(row, "name");
            //打印结果
            const char* name;
            size_t name_length;
            cass_value_get_string(value, &name, &name_length);
            printf("release_version: '%.*s'\n", (int)name_length, name);
          }
    
          cass_result_free(result);
        } else {
          //异常处理
          const char* message;
          size_t message_length;
          cass_future_error_message(result_future, &message, &message_length);
          fprintf(stderr, "Unable to run query: '%.*s'\n", (int)message_length, message);
        }
    
        cass_statement_free(statement);
        cass_future_free(result_future);
    
        //手工释放资源信息
        close_future = cass_session_close(session);
        cass_future_wait(close_future);
        cass_future_free(close_future);
      } else {
        //异常处理
        const char* message;
        size_t message_length;
        cass_future_error_message(connect_future, &message, &message_length);
        fprintf(stderr, "Unable to connect: '%.*s'\n", (int)message_length, message);
      }
    
      cass_future_free(connect_future);
      cass_cluster_free(cluster);
      cass_session_free(session);