Ganos Spark模块允许用户基于Apache Spark分布式系统进行大规模的地理信息数据处理与分析。它基于Spark环境提供了一系列的接口进行数据加载、分析和保存。Ganos Spark提供了不同级别的数据分析模型,最基础的是GeometryRDD模型,用来实现Ganos数据中SimpleFeature与Spark中RDD模型的之间的转换。在GeometryRDD基础上,Ganos Spark基于SparkSQL设计了一系列用于空间数据表达的UDT与UDF或UDAF,允许用户使用类似SQL结构化查询语言进行数据的查询与分析。Ganos Spark整体框架如下:

1. 获取Ganos Spark 工具包

首先请从此链接获取GanosSparkSDK开发包:Ganos Spark驱动

在工程目录的pom文件中增加依赖:
<!-- Ganos Spark -->
<dependency>
    <groupId>com.aliyun.ganos</groupId>
    <artifactId>ganos-spark-runtime</artifactId>
    <version>1.0-SNAPSHOT</version>
    <scope>system</scope>
    <systemPath>../ganos-spark-runtime-1.0-SNAPSHOT.jar</systemPath>
</dependency>

<!-- Spark -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-yarn_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
     <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.18.Final</version>
</dependency>

2 使用Ganos Spark查询HBase Ganos

开发环境配置完成后,用户可参考下面实例通过Ganos Spark连接HBase Ganos并查询数据:

package com.aliyun.ganos

import com.aliyun.ganos.spark.GanosSparkKryoRegistrator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object GanosSparkDemo {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com").setLevel(Level.ERROR)

    //指定HBase连接参数,POINT为Catalog名称
    val params = Map(
      "hbase.catalog" -> "POINT",
      "hbase.zookeepers" -> "zookeeper地址",
      "geotools" -> "true")

    //初始化SparkSession
    val sparkSession = SparkSession.builder
      .appName("Simple Application")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.crossJoin.enabled", "true")
      .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
      .master("local[*]")
      .getOrCreate()

    //加载AIS数据源
    val dataFrame = sparkSession.read
      .format("ganos")
      .options(params)
      .option("ganos.feature", "AIS")
      .load()

    //查询全部数据
    dataFrame.createOrReplaceTempView("ais")
    val r=sparkSession.sql("SELECT * FROM ais")
    r.show()

    //时空查询
    val r1=sparkSession.sql("SELECT * FROM ais WHERE st_contains(st_makeBBOX(70.00000,11.00000,75.00000,14.00000), geom)")
    r1.show()

    //将查询结果反写入HBase Ganos
    r1.write.format("ganos").options(params).option("ganos.feature", "result").save()
  }
}
			

运行结果如下:

关于Ganos Spark支持的空间操作函数,用户可参考:Ganos Spark函数

3. 在Jupyter中使用Ganos Spark

Ganos Spark提供了相关工具包允许用户在Jupyter环境下查询数据并进行可视化。

首先下载Ganos Spark Leaflet工具包:Leaflet工具

进入控制台,进行如下操作:

(1) 安装Jupyter:
$ pip install --upgrade jupyter
$ pip3 install --upgrade jupyter
(2) 配置SPARK_HOME环境变量,通过toree添加名为“Ganos Spark Test“的 kernel,并启动jupyter:
$ jars="ganos-spark-runtime-1.0-SNAPSHOT.jar,ganos-spark-jupyter-leaflet-1.0-SNAPSHOT.jar"
$ jupyter toree install --replace --user --kernel_name "Ganos Spark Test" --spark_home=${SPARK_HOME} --spark_opts="--master localhost[*] --jars $jars"
$ jupyter notebook

服务器启动成功后,用户可在浏览器中访问:http://localhost:8888进入Jupyter控制台创建Ganos Spark Test会话。

(3) 加载HBase Ganos数据

a) 创建Spark会话:

b) 通过Spark SQL查询HBase Ganos数据:

c) 在Leaflet中展示数据:

用户可以下载完整notebook文档测试:GanosSpark测试.ipynb