数据库加密技术属于主动防御机制,可以防止明文存储引起的数据泄密、突破边界防护的外部黑客攻击以及来自内部高权限用户的数据窃取,从根本上解决数据库敏感数据泄漏问题。通过加密SDK在客户端加密的数据可以存储在关系数据库或非关系数据库中。本文为您介绍数据库敏感数据加密的使用场景、原理和示例。

使用场景

  • 数据库敏感数据被拖库后,避免因明文存储导致的数据泄露。

    通常情况下,数据库中的数据是以明文形式进行存储和使用的,一旦数据文件(或备份文件)丢失,可能引发严重的数据泄露问题。而在拖库攻击中,明文存储的数据对于攻击者同样没有任何秘密可言。此时您需要对数据进行加密,避免数据泄露。

  • 对高权限用户,数据库敏感数据加密可以防范内部窃取数据造成的数据泄露。

    数据库加密可以提供独立于数据库系统自身权限控制体系之外的增强权限控制的能力,由专用的加密系统为数据库中的敏感数据设置访问权限,从而有效限制数据库超级用户或其他高权限用户对敏感数据的访问行为,保障数据安全。

加密和解密原理

  • 加密原理
    1. 创建密钥。

      加密SDK向密钥管理服务KMS(Key Management Service)发送调用GenerateDataKey接口请求,申请一个数据密钥(Data Key)。KMS返回数据密钥以及数据密钥密文(Encrypted Data Key)。

    2. 加密并存储数据。
      1. 使用数据密钥对数据进行加密,得到加密结果,进行Base64编码。
      2. 将加密数据的Base64编码存储在数据库中。
  • 解密原理

    检索并解密数据。

    1. 从数据库读取密文。
    2. 将密文进行Base64解密,解析密文消息。加密SDK调用KMS的Decrypt接口将数据密钥进行解密,KMS返回数据密钥给本地加密客户端。
    3. 解密数据。加密SDK(Encryption SDK)使用数据密钥对数据密文进行解密,得到原始数据。

示例

加密SDK用于面向应用的数据库加密,密钥由KMS产生和管理。

通过以下Spring JPA和Python示例代码,可以实现对User表中email字段的写入加密和读取解密。示例中每个字段使用一个数据密钥,解密时设置了密钥缓存,对同一字段进行多次查询时会检索缓存中可用的数据密钥。

为保证字段能存储加密后的字段,需要对被加密字段的长度进行扩容,扩容比例为3倍。

  • 定义实体类
    @Entity
    public class User {
        @Id
        @GeneratedValue
        private Long id;
    
        private String name;
        private String email;
    
        // getters and setters ...
    }
  • 定义UserRepository类
    public interface UserRepository extends CrudRepository<User, Long> {
     }
  • 实现Spring JPA的AttributeConverter接口相关功能

    EncryptionConverter调用加密SDK的接口,获取数据密钥,对指定的数据进行加密、解密。

    @Converter
     public class EncryptionConverter implements AttributeConverter<String, String> {
         private static String ACCESS_KEY_ID = "<AccessKeyId>";
         private static String ACCESS_KEY_SECRET = "<AccessKeySecret>";
         private static String CMK_ARN = "acs:kms:RegionId:UserId:key/CmkId";
         private static AliyunConfig config;
         static {
             config = new AliyunConfig();
             config.withAccessKey(ACCESS_KEY_ID, ACCESS_KEY_SECRET);
         }
     
         @Override
         public String convertToDatabaseColumn(String plainText) {
             BaseDataKeyProvider dataKeyProvider = new DefaultDataKeyProvider(CMK_ARN);
             AliyunCrypto crypto = new AliyunCrypto(config);
     
             try {
                 CryptoResult<byte[]> encryptResult = crypto.encrypt(dataKeyProvider, plainText.getBytes(StandardCharsets.UTF_8), Collections.singletonMap("sample", "context"));
                 return Base64.getEncoder().encodeToString(encryptResult.getResult());
             } catch (InvalidAlgorithmException e) {
                 System.out.println("Failed.");
                 System.out.println("Error message: " + e.getMessage());
             }
             return null;
         }
     
         @Override
         public String convertToEntityAttribute(String cipherText) {
             BaseDataKeyProvider dataKeyProvider = new DefaultDataKeyProvider(CMK_ARN);
             AliyunCrypto crypto = new AliyunCrypto(config);
             // *** 设置缓存数据主密钥 ***
             CryptoKeyManager ckm = new CachingCryptoKeyManager(new LocalDataKeyMaterialCache())
             crypto.setCryptoKeyManager(ckm);
             try {
                 CryptoResult<byte[]> decryptResult = crypto.decrypt(dataKeyProvider, Base64.getDecoder().decode(cipherText));
                 return new String(decryptResult.getResult(), StandardCharsets.UTF_8);
             } catch (InvalidAlgorithmException | UnFoundDataKeyException e) {
                 e.printStackTrace();
             }
             return null;
         }
     }
  • 添加@Convert注解

    添加@Convert注解到需要加密的属性(数据库中的列)。

    @Entity
     public class User {
         @Id
         @GeneratedValue
         private Long id;
     
         private String name;
         @Convert(converter = EncryptionConverter.class)
         private String email;
         
         // getters and setters ...
     }
  • 实体定义
    class User(object):
         def get_name(self):
             return self.name
     
         def set_name(self, value):
             self.name = value
     
         def get_email(self):
             return self.email
     
         def set_email(self, value):
             self.email = value
  • 实现数据库操作函数
    def user_add(user):
         # 连接数据库。
         conn = db_connection()
         # 得到一个可以执行SQL语句并且将结果作为字典返回的游标。
         cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
         # 定义要执行的SQL语句。
         sql = 'insert into user(name, email) values(%s,%s);'
         # 执行SQL语句。
         cursor.execute(sql, [user.name, user.email])
         print("insert name: " + user.name)
         print("insert email: " + user.email)
         # 提交写操作。
         conn.commit()
         last_id = cursor.lastrowid
         # 关闭光标对象。
         cursor.close()
         # 关闭数据库连接。
         conn.close()
         return last_id
     
     
     def user_select_by_id(id):
         # 连接数据库。
         conn = db_connection()
         # 得到一个可以执行SQL语句并且将结果作为字典返回的游标。
         cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
         # 定义要执行的SQL语句。
         sql = 'select * from user where id = %s;'
         # 执行SQL语句。
         cursor.execute(sql, [id])
         result = cursor.fetchone()
         print("select result: " + str(result))
         user = User()
         user.__dict__ = result
         # 关闭光标对象。
         cursor.close()
         # 关闭数据库连接。
         conn.close()
         return user
  • 实现数据加密、解密装饰器(decorator)
    def enc_convert():
         def enc_(func):
             def wrapper(self, plain_text):
                 provider = DefaultDataKeyProvider(AES_KEY_ARN)
                 client = build_aliyun_crypto(False)
                 cipher_text, enc_material = client.encrypt(provider, plain_text.encode("utf-8"), ENCRYPTION_CONTEXT)
                 cipher_text_str = base64.standard_b64encode(cipher_text).decode("utf-8")
                 f = func(self, cipher_text_str)
                 return f
     
             return wrapper
     
         return enc_
     
     
     def dec_convert(column_name):
         def dec_(func):
             def wrapper(self):
                 user = self.__dict__
                 cipher_text = user.get(column_name)
                 cipher_text_bytes = base64.standard_b64decode(cipher_text.encode("utf-8"))
                 provider = DefaultDataKeyProvider(AES_KEY_ARN)
                 client = build_aliyun_crypto(False)
                 plain_text, dec_material = client.decrypt(provider, cipher_text_bytes)
                 user[column_name] = bytes.decode(plain_text)
                 result = User()
                 result.__dict__ = user
                 f = func(result)
                 return f
     
             return wrapper
     
         return dec_
  • 为需要保护的数据字段添加装饰器
    class User(object):
         def get_name(self):
             return self.name
     
         def set_name(self, value):
             self.name = value
     
         @dec_convert(column_name="email")
         def get_email(self):
             return self.email
     
         @enc_convert()
         def set_email(self, value):
             self.email = value

关于数据库敏感数据加密的更多示例,请参见Spring JPA示例Python示例