设为首页收藏本站

                      LUPA开源社区

                       ?#19968;?#23494;码
                       注册
                      文章 帖子 博客
                      LUPA开源社区 首页 业界资讯 技术文摘 查看内容

                      Ignite与Spark都很强,那如果把它们整合起来会怎样?

                      2019-1-17 09:59| 发布者: joejoe0332| 查看: 578| 评论: 0|原作者: oschina|来自: oschina

                      摘要: 在前面的文章中,我们分别介绍了 Ignite 和 Spark 这两种技术,从功能上对两者进行了全面深入的对比。经过分析,可以得出这样一个结论:两者都很强大,但是差别很大,定位不同,因此会有不同的适用领域。 ...

                      前面的文章中,我们分别介绍了 Ignite 和 Spark 这两种技术,从功能上对两者进行了全面深入的对比。经过分析,可以得出这样一个结论:两者都很强大,但是差别很大,定位不同,因此会有不同的适用领域。

                      但是,这两种技术也?#24378;?#20197;互补的,那么它们互补适用于场景是什么呢?主要是这么几个方面:如果觉得 Spark 中的 SQL 等运行速度较慢,那么 Ignite 通过自己的方式提供了对 Spark 应用进行进一步加速的解决方案,这方面可选的解决方案并不多,推荐开发者考虑,另外就是数据和状态的共享,?#27604;?#36825;方面的解决方案也有很多,并不是一定要用 Ignite 实现。

                      Ignite 原生提供了对 Spark 的支持,本文主要探讨为何如何将 Ignite 和 Spark 进行集成。

                      1.将 Ignite 与 Spark 整合

                      整合这两种技术会为 Spark 应用带来若干明显的?#20040;Γ?/p>

                      • 通过避免大量的数据移动,获得真正可扩展的内存级性能;
                      • 提高 RDD、DataFrame 和 SQL 的性能;
                      • 在 Spark 作业之间更方便地共享状态和数据。

                      下图显示了如何整合这两种技术,并且标注了显著的优势: 

                      通过该图,可?#28304;?#25972;体架构的角度看到 Ignite 在整个 Spark 应用中的位置和作用。

                      Ignite 对 Spark 的支?#31181;?#35201;体现为两个方面,一个是 Ignite RDD,一个是 Ignite DataFrame。本文会首先聚焦于 Ignite RDD,之后再?#27493;?nbsp;Ignite DataFrame。

                      2.Ignite RDD

                      Ignite 提供了一个SparkRDD的实现,叫做IgniteRDD,这个实现可以在内存中跨 Spark 作业共享任?#38382;?#25454;和状态,IgniteRDD为 Ignite 中相同的内存数据提供了一个共享的、可变的视图,它可以跨多个不同的 Spark 作业、工作节点或者应用,相反,原生的 SparkRDD 无法在 Spark 作业或者应用之间进行共享。

                      IgniteRDD作为 Ignite 分布式缓存的视图,既可以在 Spark 作业执行进程中部署,也可以在 Spark 工作节点中部署,也可以在它自己的集群中部署。因此,根据预配置的部署模型,状态共享既可以只存在于一个 Spark 应用的生命周期内部(嵌入式模式),也可?#28304;?#22312;于 Spark 应用的外部(独立模式)。

                      Ignite 还可以帮助 Spark 应用提高 SQL 的性能,虽然 SparkSQL 支持丰富的 SQL 语法,但是它没有实现索引。从结果上来说,即使在普通较小的数据集上,Spark 查询也可能花费几分钟的时间,因为需要进行全表扫描。如果使用 Ignite,Spark 用户可以配置主索引和二级索引,这样可?#28304;?#26469;上千倍的性能提升。

                      2.1.IgniteRDD 示例

                      下面通过一些代码以及创建若干应用的方式,展示 IgniteRDD 带来的?#20040;Α?/p>

                      可以使用多种语言来访问 Ignite RDD,这对于有跨语言需求的团队来说有友好的,下边代码共包括两个简单的 Scala 应用和两个 Java 应用。此外,会从两个不同的环境运行应用:从终端运行 Scala 应用以及通过 IDE 运行 Java 应用。另外还会在 Java 应用中运行一些 SQL 查询。

                      对于 Scala 应用,一个应用会用于往 IgniteRDD 中写入数据,而另一个应用会执行部分过滤然后返回结果集。使用 Maven 将代码构建为一个 jar 文件后在终端窗口中执行这个程序,下面是详细的代码:

                      object RDDWriter extends App {
                      
                          val conf = new SparkConf().setAppName("RDDWriter")
                      
                          val sc = new SparkContext(conf)
                      
                          val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
                      
                          val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
                      
                          sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))
                      
                          ic.close(true)
                      
                          sc.stop()
                      
                      }
                      
                      object RDDReader extends App {
                      
                          val conf = new SparkConf().setAppName("RDDReader")
                      
                          val sc = new SparkContext(conf)
                      
                          val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
                      
                          val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
                      
                          val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)
                      
                          println("The count is " + greaterThanFiveHundred.count())
                      
                          ic.close(true)
                      
                          sc.stop()
                      
                      }

                      在这个 Scala 的RDDWriter中,首先创建了包含应用名的SparkConf,之后基于这个配?#20040;?#24314;了SparkContext,最后,根据这个SparkContext创建一个IgniteContext。创建IgniteContext有很多种方法,本例中使用一个叫做example-shared-rdd.xml的 XML 文件,该文件会结合 Ignite 发行版然后根据需求进行预配置。显然,需要根据自己的环境修改路径(Ignite 主目录),之后指定 IgniteRDD 持有的整数值元组,最后,将从 1 到 1000 的整数值存入 IgniteRDD,数值的存储使用了 10 个 parallel 操作。

                      在这个 Scala 的RDDReader中,初始化和配置与 Scala RDDWriter相同,也会使用同一个 XML 配置文件,应用会执行部分过滤,然后关注存储了多少大于 500 的值,答案最后会输出。

                      关于IgniteContextIgniteRDD的更多信息,可以看 Ignite 的文档

                      要构建 jar 文件,可以使用下面的 maven 命令:

                      mvn clean install

                      接下来,看下 Java 代码,先写一个 Java 应用往IgniteRDD中写入多个记录,然后另一个应用会执行部分过滤然后返回结果集,下面是RDDWriter的代码细节:

                      public class RDDWriter {
                      
                          public static void main(String args[]) {
                      
                              SparkConf sparkConf = new SparkConf().setAppName("RDDWriter").setMaster("local").set("spark.executor.instances", "2");
                      
                              JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                      
                                  sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);
                      
                              JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
                      
                              List<Integer> data = new ArrayList<>(20);
                      
                              for (int i = 1001; i <= 1020; i++) {
                      
                                  data.add(i);
                      
                              }
                      
                              JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
                      
                              sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
                      
                                  public Tuple2<Integer, Integer> call(Integer val) throws Exception {
                      
                                      return new Tuple2<Integer, Integer>(val, val);
                      
                                  }
                      
                              }));
                      
                              igniteContext.close(true);
                      
                              sparkContext.close();
                      
                          }
                      
                      }

                      在这个 Java 的RDDWriter中,首先创建了包含应用名和执行器数量的SparkConf,之后基于这个配?#20040;?#24314;了SparkContext,最后,根据这个SparkContext创建一个IgniteContext。最后,往 IgniteRDD 中添加了额外的 20 个值。

                      在这个 Java 的RDDReader中,初始化和配置与 Java RDDWriter相同,也会使用同一个 XML 配置文件,应用会执行部分过滤,然后关注存储了多少大于 500 的值,答案最后会输出,下面是 Java RDDReader的代码:

                      public class RDDReader {
                      
                          public static void main(String args[]) {
                      
                              SparkConf sparkConf = new SparkConf().setAppName("RDDReader").setMaster("local").set("spark.executor.instances", "2");
                      
                              JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                      
                                  sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);
                      
                              JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
                      
                              JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
                      
                                  public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {
                      
                                      return tuple._2() > 500;
                      
                                  }
                      
                              });
                      
                              System.out.println("The count is " + greaterThanFiveHundred.count());
                      
                              System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
                      
                              Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");
                      
                              df.show();
                      
                              igniteContext.close(true);
                      
                              sparkContext.close();
                      
                          }
                      
                      }

                      到这里就可以?#28304;?#30721;进行测试了。

                      2.2.运行应用

                      在第一个终端窗口中,启动 Spark 的主节点,如下:

                      $SPARK_HOME/sbin/start-master.sh

                      在第二个终端窗口中,启动 Spark 工作节点,如下:

                      $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

                      根据自己的环境,修改 IP 地址和端口号(ip:port)。

                      在第三个终端窗口中,启动一个 Ignite 节点,如下:

                      $IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

                      这里使用了之前讨论过的example-shared-rdd.xml文件。

                      在第四个终端窗口中,可以运行 Scala 版的 RDDWriter 应用,如下:

                      $SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

                      根据自己的环境修改 IP 地址和端口(ip:port),以及 jar 文件的路径(/path_to_jar_file)。

                      会产生如下的输出:

                      The count is 500

                      这是期望的输出。

                      接下来,杀掉 Spark 的主节点和工作节点,而 Ignite 节点仍然在运行中并且IgniteRDD对于其它应用仍然可用,下面会使用 IDE 通过 Java 应用接入IgniteRDD

                      运行 Java 版RDDWriter会扩展之前存储于 IgniteRDD 中的记录列表,通过运行 Java 版RDDReader可以进行测试,它会产生如下的输出:

                      The count is 520

                      这也是期望的输出。

                      最后,SQL 查询会在IgniteRDD中执行一个 SELECT 语句,返回范围在 10 到 100 之间的最初 10 个值,输出如下:

                      结果正确。

                      3.IgniteDataframes

                      Spark 的 DataFrame API 为描述数据引入了模式的概念,Spark 通过表格的?#38382;?#36827;行模式的管理和数据的组织。

                      DataFrame 是一个组织为命名?#34892;问?#30340;分布式数据集,从概念上讲,DataFrame 等同于关系数据库中的表,并允许 Spark 使用 Catalyst 查询优化器来生成高效的查询执行计划。而 RDD 只?#24378;?#38598;?#33322;?#28857;分区化的元素集合。

                      Ignite 扩展了 DataFrames,简化了开发,改进了将 Ignite 作为 Spark 的内存存储时的数据访?#36866;?#38388;,?#20040;?#21253;括:

                      • 通过 Ignite ?#21015;?DataFrames 时,可以在 Spark 作业之间共享数据和状态;
                      • 通过优化 Spark 的查询执行计划加快 SparkSQL 查询,这些主要是通过 IgniteSQL 引擎的高级索引以及避免了 Ignite 和 Spark 之间的网络数据移动实现的。

                      3.1.IgniteDataframes 示例

                      下面通过一些代码以及搭建几个小程序的方式,了解如何通过 Ignite DataFrames 整合 Ignite 与 Spark。

                      一共会写两个 Java 的小应用,然后在 IDE 中运行,还会在这些 Java 应用中执行一些 SQL 查询。

                      一个 Java 应用会从 JSON 文件中读取一些数据,然后创建一个存储于 Ignite 的 DataFrame,这个 JSON 文件 Ignite 的发行版中已经提供,另一个 Java 应用会从 Ignite 的 DataFrame 中读取数据然后使用 SQL 进行查询。

                      下面是?#20174;?#29992;的代码:

                      public class DFWriter {
                      
                          private static final String CONFIG = "config/example-ignite.xml";
                      
                          public static void main(String args[]) {
                      
                              Ignite ignite = Ignition.start(CONFIG);
                      
                              SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate();
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              Dataset<Row> peopleDF = spark.read().json(
                      
                                  resolveIgnitePath("resources/people.json").getAbsolutePath());
                      
                              System.out.println("JSON file contents:");
                      
                              peopleDF.show();
                      
                              System.out.println("Writing DataFrame to Ignite.");
                      
                              peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save();
                      
                              System.out.println("Done!");
                      
                              Ignition.stop(false);
                      
                          }
                      
                      }

                      DFWriter中,首先创建了SparkSession,它包含了应用名,之后会使用spark.read().json()读取 JSON 文件并且输出文件内容,下一步是将数据写入 Ignite 存储。下面是DFReader的代码:

                      public class DFReader {
                      
                          private static final String CONFIG = "config/example-ignite.xml";
                      
                          public static void main(String args[]) {
                      
                              Ignite ignite = Ignition.start(CONFIG);
                      
                              SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate();
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              System.out.println("Reading data from Ignite table.");
                      
                              Dataset<Row> peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load();
                      
                              peopleDF.createOrReplaceTempView("people");
                      
                              Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");
                      
                              sqlDF.show();
                      
                              System.out.println("Done!");
                      
                              Ignition.stop(false);
                      
                          }
                      
                      }

                      DFReader中,初始化和配置与DFWriter相同,这个应用会执行一些过滤,需求是查找所有的 id > 0 以及 < 6 的人,然后输出结果。

                      在 IDE 中,通过下面的代码可以启动一个 Ignite 节点:

                      public class ExampleNodeStartup {
                      
                          public static void main(String[] args) throws IgniteException {
                      
                              Ignition.start("config/example-ignite.xml");
                      
                          }
                      
                      }

                      到此,就可以?#28304;?#30721;进行测试了。

                      3.2.运行应用

                      首先在 IDE 中启动一个 Ignite 节点,然后运行DFWriter应用,输出如下:

                      如果将上面的结果与 JSON 文件的内容进行对比,会显示两者是一致的,这也是期望的结果。

                      下一步运行DFReader,输出如下:

                      这也是期望的输出。

                      4.总结

                      通过本文,会发现 Ignite 与 Spark 的集成很简单,也可以看到如何从多个环境中使用多个编程语言轻松地访问IgniteRDD。可以对IgniteRDD进行数据的?#21015;矗?#24182;且即使 Spark 已经关闭状态也能通过 Ignite 得以保持,也看到了通过 Ignite 进行 DataFrame 的?#21015;礎?#35835;者可以轻松尝试一下。

                      如果想要这些示例的源代码,可?#28304;?a href="https://github.com/VeryFatBoy/ignite-and-spark-integration" target="_blank" rel="nofollow">这里下载。

                      作者

                      李玉珏,架构师,有丰富的架构设计和技术研发团队管理经验,社区技术翻译作者以及撰稿人,开源技术贡献者。Apache Ignite 技术中文文档翻译作者,长期在国内进行 Ignite 技术的推广/技术支持/咨询工作。


                      酷毙

                      雷人

                      鲜花

                      鸡蛋

                      漂亮
                      • 快毕业了,没工作经验,
                        找份工作好难啊?
                        赶紧去人才芯片公司磨练吧!!

                      最新评论

                      关于LUPA|人才芯片工程|人才招聘|LUPA?#29616;?/a>|LUPA教育|LUPA开源社区 ( 浙B2-20090187  

                      返回顶部
                      双色球中奖图片