設為首頁收藏本站

                      LUPA開源社區

                       找回密碼
                       注冊
                      文章 帖子 博客
                      LUPA開源社區 首頁 業界資訊 技術文摘 查看內容

                      Ignite與Spark都很強,那如果把它們整合起來會怎樣?

                      2019-1-17 09:59| 發布者: joejoe0332| 查看: 1958| 評論: 0|原作者: oschina|來自: oschina

                      摘要: 在前面的文章中,我們分別介紹了 Ignite 和 Spark 這兩種技術,從功能上對兩者進行了全面深入的對比。經過分析,可以得出這樣一個結論:兩者都很強大,但是差別很大,定位不同,因此會有不同的適用領域。 ...

                      前面的文章中,我們分別介紹了 Ignite 和 Spark 這兩種技術,從功能上對兩者進行了全面深入的對比。經過分析,可以得出這樣一個結論:兩者都很強大,但是差別很大,定位不同,因此會有不同的適用領域。

                      但是,這兩種技術也是可以互補的,那么它們互補適用于場景是什么呢?主要是這么幾個方面:如果覺得 Spark 中的 SQL 等運行速度較慢,那么 Ignite 通過自己的方式提供了對 Spark 應用進行進一步加速的解決方案,這方面可選的解決方案并不多,推薦開發者考慮,另外就是數據和狀態的共享,當然這方面的解決方案也有很多,并不是一定要用 Ignite 實現。

                      Ignite 原生提供了對 Spark 的支持,本文主要探討為何如何將 Ignite 和 Spark 進行集成。

                      1.將 Ignite 與 Spark 整合

                      整合這兩種技術會為 Spark 應用帶來若干明顯的好處:

                      • 通過避免大量的數據移動,獲得真正可擴展的內存級性能;
                      • 提高 RDD、DataFrame 和 SQL 的性能;
                      • 在 Spark 作業之間更方便地共享狀態和數據。

                      下圖顯示了如何整合這兩種技術,并且標注了顯著的優勢: 

                      通過該圖,可以從整體架構的角度看到 Ignite 在整個 Spark 應用中的位置和作用。

                      Ignite 對 Spark 的支持主要體現為兩個方面,一個是 Ignite RDD,一個是 Ignite DataFrame。本文會首先聚焦于 Ignite RDD,之后再講講 Ignite DataFrame。

                      2.Ignite RDD

                      Ignite 提供了一個SparkRDD的實現,叫做IgniteRDD,這個實現可以在內存中跨 Spark 作業共享任何數據和狀態,IgniteRDD為 Ignite 中相同的內存數據提供了一個共享的、可變的視圖,它可以跨多個不同的 Spark 作業、工作節點或者應用,相反,原生的 SparkRDD 無法在 Spark 作業或者應用之間進行共享。

                      IgniteRDD作為 Ignite 分布式緩存的視圖,既可以在 Spark 作業執行進程中部署,也可以在 Spark 工作節點中部署,也可以在它自己的集群中部署。因此,根據預配置的部署模型,狀態共享既可以只存在于一個 Spark 應用的生命周期內部(嵌入式模式),也可以存在于 Spark 應用的外部(獨立模式)。

                      Ignite 還可以幫助 Spark 應用提高 SQL 的性能,雖然 SparkSQL 支持豐富的 SQL 語法,但是它沒有實現索引。從結果上來說,即使在普通較小的數據集上,Spark 查詢也可能花費幾分鐘的時間,因為需要進行全表掃描。如果使用 Ignite,Spark 用戶可以配置主索引和二級索引,這樣可以帶來上千倍的性能提升。

                      2.1.IgniteRDD 示例

                      下面通過一些代碼以及創建若干應用的方式,展示 IgniteRDD 帶來的好處。

                      可以使用多種語言來訪問 Ignite RDD,這對于有跨語言需求的團隊來說有友好的,下邊代碼共包括兩個簡單的 Scala 應用和兩個 Java 應用。此外,會從兩個不同的環境運行應用:從終端運行 Scala 應用以及通過 IDE 運行 Java 應用。另外還會在 Java 應用中運行一些 SQL 查詢。

                      對于 Scala 應用,一個應用會用于往 IgniteRDD 中寫入數據,而另一個應用會執行部分過濾然后返回結果集。使用 Maven 將代碼構建為一個 jar 文件后在終端窗口中執行這個程序,下面是詳細的代碼:

                      object RDDWriter extends App {
                      
                          val conf = new SparkConf().setAppName("RDDWriter")
                      
                          val sc = new SparkContext(conf)
                      
                          val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
                      
                          val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
                      
                          sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))
                      
                          ic.close(true)
                      
                          sc.stop()
                      
                      }
                      
                      object RDDReader extends App {
                      
                          val conf = new SparkConf().setAppName("RDDReader")
                      
                          val sc = new SparkContext(conf)
                      
                          val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
                      
                          val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
                      
                          val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)
                      
                          println("The count is " + greaterThanFiveHundred.count())
                      
                          ic.close(true)
                      
                          sc.stop()
                      
                      }

                      在這個 Scala 的RDDWriter中,首先創建了包含應用名的SparkConf,之后基于這個配置創建了SparkContext,最后,根據這個SparkContext創建一個IgniteContext。創建IgniteContext有很多種方法,本例中使用一個叫做example-shared-rdd.xml的 XML 文件,該文件會結合 Ignite 發行版然后根據需求進行預配置。顯然,需要根據自己的環境修改路徑(Ignite 主目錄),之后指定 IgniteRDD 持有的整數值元組,最后,將從 1 到 1000 的整數值存入 IgniteRDD,數值的存儲使用了 10 個 parallel 操作。

                      在這個 Scala 的RDDReader中,初始化和配置與 Scala RDDWriter相同,也會使用同一個 XML 配置文件,應用會執行部分過濾,然后關注存儲了多少大于 500 的值,答案最后會輸出。

                      關于IgniteContextIgniteRDD的更多信息,可以看 Ignite 的文檔

                      要構建 jar 文件,可以使用下面的 maven 命令:

                      mvn clean install

                      接下來,看下 Java 代碼,先寫一個 Java 應用往IgniteRDD中寫入多個記錄,然后另一個應用會執行部分過濾然后返回結果集,下面是RDDWriter的代碼細節:

                      public class RDDWriter {
                      
                          public static void main(String args[]) {
                      
                              SparkConf sparkConf = new SparkConf().setAppName("RDDWriter").setMaster("local").set("spark.executor.instances", "2");
                      
                              JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                      
                                  sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);
                      
                              JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
                      
                              List<Integer> data = new ArrayList<>(20);
                      
                              for (int i = 1001; i <= 1020; i++) {
                      
                                  data.add(i);
                      
                              }
                      
                              JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
                      
                              sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
                      
                                  public Tuple2<Integer, Integer> call(Integer val) throws Exception {
                      
                                      return new Tuple2<Integer, Integer>(val, val);
                      
                                  }
                      
                              }));
                      
                              igniteContext.close(true);
                      
                              sparkContext.close();
                      
                          }
                      
                      }

                      在這個 Java 的RDDWriter中,首先創建了包含應用名和執行器數量的SparkConf,之后基于這個配置創建了SparkContext,最后,根據這個SparkContext創建一個IgniteContext。最后,往 IgniteRDD 中添加了額外的 20 個值。

                      在這個 Java 的RDDReader中,初始化和配置與 Java RDDWriter相同,也會使用同一個 XML 配置文件,應用會執行部分過濾,然后關注存儲了多少大于 500 的值,答案最后會輸出,下面是 Java RDDReader的代碼:

                      public class RDDReader {
                      
                          public static void main(String args[]) {
                      
                              SparkConf sparkConf = new SparkConf().setAppName("RDDReader").setMaster("local").set("spark.executor.instances", "2");
                      
                              JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                      
                                  sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);
                      
                              JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
                      
                              JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
                      
                                  public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {
                      
                                      return tuple._2() > 500;
                      
                                  }
                      
                              });
                      
                              System.out.println("The count is " + greaterThanFiveHundred.count());
                      
                              System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
                      
                              Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");
                      
                              df.show();
                      
                              igniteContext.close(true);
                      
                              sparkContext.close();
                      
                          }
                      
                      }

                      到這里就可以對代碼進行測試了。

                      2.2.運行應用

                      在第一個終端窗口中,啟動 Spark 的主節點,如下:

                      $SPARK_HOME/sbin/start-master.sh

                      在第二個終端窗口中,啟動 Spark 工作節點,如下:

                      $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

                      根據自己的環境,修改 IP 地址和端口號(ip:port)。

                      在第三個終端窗口中,啟動一個 Ignite 節點,如下:

                      $IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

                      這里使用了之前討論過的example-shared-rdd.xml文件。

                      在第四個終端窗口中,可以運行 Scala 版的 RDDWriter 應用,如下:

                      $SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

                      根據自己的環境修改 IP 地址和端口(ip:port),以及 jar 文件的路徑(/path_to_jar_file)。

                      會產生如下的輸出:

                      The count is 500

                      這是期望的輸出。

                      接下來,殺掉 Spark 的主節點和工作節點,而 Ignite 節點仍然在運行中并且IgniteRDD對于其它應用仍然可用,下面會使用 IDE 通過 Java 應用接入IgniteRDD

                      運行 Java 版RDDWriter會擴展之前存儲于 IgniteRDD 中的記錄列表,通過運行 Java 版RDDReader可以進行測試,它會產生如下的輸出:

                      The count is 520

                      這也是期望的輸出。

                      最后,SQL 查詢會在IgniteRDD中執行一個 SELECT 語句,返回范圍在 10 到 100 之間的最初 10 個值,輸出如下:

                      結果正確。

                      3.IgniteDataframes

                      Spark 的 DataFrame API 為描述數據引入了模式的概念,Spark 通過表格的形式進行模式的管理和數據的組織。

                      DataFrame 是一個組織為命名列形式的分布式數據集,從概念上講,DataFrame 等同于關系數據庫中的表,并允許 Spark 使用 Catalyst 查詢優化器來生成高效的查詢執行計劃。而 RDD 只是跨集群節點分區化的元素集合。

                      Ignite 擴展了 DataFrames,簡化了開發,改進了將 Ignite 作為 Spark 的內存存儲時的數據訪問時間,好處包括:

                      • 通過 Ignite 讀寫 DataFrames 時,可以在 Spark 作業之間共享數據和狀態;
                      • 通過優化 Spark 的查詢執行計劃加快 SparkSQL 查詢,這些主要是通過 IgniteSQL 引擎的高級索引以及避免了 Ignite 和 Spark 之間的網絡數據移動實現的。

                      3.1.IgniteDataframes 示例

                      下面通過一些代碼以及搭建幾個小程序的方式,了解如何通過 Ignite DataFrames 整合 Ignite 與 Spark。

                      一共會寫兩個 Java 的小應用,然后在 IDE 中運行,還會在這些 Java 應用中執行一些 SQL 查詢。

                      一個 Java 應用會從 JSON 文件中讀取一些數據,然后創建一個存儲于 Ignite 的 DataFrame,這個 JSON 文件 Ignite 的發行版中已經提供,另一個 Java 應用會從 Ignite 的 DataFrame 中讀取數據然后使用 SQL 進行查詢。

                      下面是寫應用的代碼:

                      public class DFWriter {
                      
                          private static final String CONFIG = "config/example-ignite.xml";
                      
                          public static void main(String args[]) {
                      
                              Ignite ignite = Ignition.start(CONFIG);
                      
                              SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate();
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              Dataset<Row> peopleDF = spark.read().json(
                      
                                  resolveIgnitePath("resources/people.json").getAbsolutePath());
                      
                              System.out.println("JSON file contents:");
                      
                              peopleDF.show();
                      
                              System.out.println("Writing DataFrame to Ignite.");
                      
                              peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save();
                      
                              System.out.println("Done!");
                      
                              Ignition.stop(false);
                      
                          }
                      
                      }

                      DFWriter中,首先創建了SparkSession,它包含了應用名,之后會使用spark.read().json()讀取 JSON 文件并且輸出文件內容,下一步是將數據寫入 Ignite 存儲。下面是DFReader的代碼:

                      public class DFReader {
                      
                          private static final String CONFIG = "config/example-ignite.xml";
                      
                          public static void main(String args[]) {
                      
                              Ignite ignite = Ignition.start(CONFIG);
                      
                              SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate();
                      
                              Logger.getRootLogger().setLevel(Level.OFF);
                      
                              Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
                      
                              System.out.println("Reading data from Ignite table.");
                      
                              Dataset<Row> peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load();
                      
                              peopleDF.createOrReplaceTempView("people");
                      
                              Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");
                      
                              sqlDF.show();
                      
                              System.out.println("Done!");
                      
                              Ignition.stop(false);
                      
                          }
                      
                      }

                      DFReader中,初始化和配置與DFWriter相同,這個應用會執行一些過濾,需求是查找所有的 id > 0 以及 < 6 的人,然后輸出結果。

                      在 IDE 中,通過下面的代碼可以啟動一個 Ignite 節點:

                      public class ExampleNodeStartup {
                      
                          public static void main(String[] args) throws IgniteException {
                      
                              Ignition.start("config/example-ignite.xml");
                      
                          }
                      
                      }

                      到此,就可以對代碼進行測試了。

                      3.2.運行應用

                      首先在 IDE 中啟動一個 Ignite 節點,然后運行DFWriter應用,輸出如下:

                      如果將上面的結果與 JSON 文件的內容進行對比,會顯示兩者是一致的,這也是期望的結果。

                      下一步運行DFReader,輸出如下:

                      這也是期望的輸出。

                      4.總結

                      通過本文,會發現 Ignite 與 Spark 的集成很簡單,也可以看到如何從多個環境中使用多個編程語言輕松地訪問IgniteRDD。可以對IgniteRDD進行數據的讀寫,并且即使 Spark 已經關閉狀態也能通過 Ignite 得以保持,也看到了通過 Ignite 進行 DataFrame 的讀寫。讀者可以輕松嘗試一下。

                      如果想要這些示例的源代碼,可以從這里下載。

                      作者

                      李玉玨,架構師,有豐富的架構設計和技術研發團隊管理經驗,社區技術翻譯作者以及撰稿人,開源技術貢獻者。Apache Ignite 技術中文文檔翻譯作者,長期在國內進行 Ignite 技術的推廣/技術支持/咨詢工作。


                      酷斃

                      雷人

                      鮮花

                      雞蛋

                      漂亮
                      • 快畢業了,沒工作經驗,
                        找份工作好難啊?
                        趕緊去人才芯片公司磨練吧!!

                      最新評論

                      關于LUPA|人才芯片工程|人才招聘|LUPA認證|LUPA教育|LUPA開源社區 ( 浙B2-20090187  

                      返回頂部
                      双色球中奖图片

                                                              2019重庆时时停售 900彩票旧版 李逵劈鱼50元提现 超级体彩大乐透下载 重庆时时彩输死多少人 浙江十一选五走势图 山东时时 重庆时时彩冷热号分析 专家预测号 体彩七位数预测号