SparkはORCファイルの読み取りに時間がかかりすぎ(パーティションの計算に時間がかかりすぎます)、単一のORCファイルのストライプの数が多すぎます。解決



Spark Takes Too Long Read Orc File



1.背景:

1日あたり7000のアップストリームファイルの数を制御します。各ファイルサイズは2億5600万、50億以上、orc形式未満です。各ファイルのストライプの数、約500、クエリコマンドを確認します。

hdfs fsck viewfs://hadoop/nn01/warehouse/…….db/……/partition_date=2017-11-11/part-06999 -files -blocks

ストライプの数を表示するコマンド:



hive --orcfiledump viewfs://hadoop/nn01/warehouse/…….db/table/partition_date=2017-11-11/part-06999 | less

2.問題が発生します:

orc形式のファイルがSparkSQLを介して読み取られ、Sparkジョブがパーティションに送信されてからタスクが計算されてから、間隔が長すぎます。

次のログを頻繁に印刷します。



17/11/11 03:52:01 INFO BlockManagerMasterEndpoint: Registering block manager gh-data-hdp-dn0640.---:11942 with 6.1 GB RAM, BlockManagerId(554, ----, 11942) 17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: DatanodeInfoWithStorage[10.20.--.--:50010,DS-32f8aaa5-c6ce-48a9-a2b1-3b169df193b9,DISK], -- 17/11/11 03:52:29 INFO DFSClient: Firstly choose dn:

問題の抽象化:次の単純なSQLを実行すると、ジョブが送信された後にApplicationMaster(ドライバー)が起動し、ジョブタスクが遅延し、パーティションを計算できません。 SparkUIはDAUマップを表示できず、ステージ関連情報を表示できません。

SELECT * from table where partition_date=2017-11-11 limit 1

3.問題分析
予備分析:ドライバーはDataNodeのデータを読み取ります。 GCログを分析すると、次のことがわかります。ドライバーがDataNodeのデータ(orcファイルのヘッド情報)を読み取ったことを確認し、ドライバーが完全なGCを生成するようにします。

ソースコード追跡分析:orcファイルを読み取るスパークの戦略に関連していることがわかりました。



HiveConf.javaを確認し、SparkがデフォルトでHYBRID戦略を使用してorcファイルを読み取ることを確認します。

HIVE_ORC_SPLIT_STRATEGY('hive.exec.orc.split.strategy', 'HYBRID', new StringSet(new String[]{'HYBRID', 'BI', 'ETL'}), 'This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics.'),

OrcInputFormat.javaファイルを確認し、HYBRIDセグメンテーション戦略コードが次のとおりであることを確認します。

public SplitStrategy call() throws IOException { final SplitStrategy splitStrategy AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, context.transactionList) List deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()) Path base = dirInfo.getBaseDirectory() List original = dirInfo.getOriginalFiles() boolean[] covered = new boolean[context.numBuckets] boolean isOriginal = base == null // if we have a base to work from if (base != null || !original.isEmpty()) { // find the base files (original or new style) List children = original if (base != null) { children = SHIMS.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter) } long totalFileSize = 0 for (FileStatus child : children) { totalFileSize += child.getLen() AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename (child.getPath(), context.conf) int b = opts.getBucket() // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. if (b >= 0 && b context.maxSize) { splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas, covered) } else { splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas, covered) } break } } else { // no base, only deltas splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered) } return splitStrategy } }

ハイブリッド戦略:Spark Driverが起動すると、nameNodeに移動してメタデータを読み取り、合計ファイルサイズとファイル数に基づいてファイルの平均サイズを計算します。平均値がデフォルトの256Mより大きい場合、ETL戦略がトリガーされます。 ETL戦略は、DataNodeに移動して、orcファイルのヘッドおよびその他の情報を読み取ります。ストライプの数またはメタデータ情報が多すぎる場合、ドライバーはFUllGCを生成します。このとき、ドライバーの起動からタスクの実行までの間隔として表示されます。現象。

4.解決策:解決策:

スパーク1.6.2:

val hiveContext = new HiveContext(sc) // The default is 64M, which means that a stripe will be generated when the data volume is accumulated to 64M before compression. The corresponding hive.exec.orc.default.row.index.stride=10000 can control how many rows generate a stripe. // Adjust this parameter to control the number of stripes in a single file. If you do not configure too many stripe for a single file, it will affect downstream use. If the ETL segmentation strategy is configured or the ETL segmentation strategy is triggered by heuristics, it will be read by the Driver DataNode metadata is too large, which leads to frequent GC, which makes the calculation of Partition too long and unacceptable. hiveContext.setConf('hive.exec.orc.default.stripe.size','268435456') // There are three strategies in total {'HYBRID', 'BI', 'ETL'}), the default is 'HYBRID','This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics.'), // If not configured, when the orc file size is larger than the average 256M estimated by the spark framework, the ETL strategy will be triggered, causing the Driver to take a lot of time to read the DataNode data split. hiveContext.setConf('hive.exec.orc.split.strategy', 'BI')

spark2.2.0:

// Create a SparkSession that supports Hive val sparkSession = SparkSession .builder() .appName('PvMvToBase') // The default is 64M, which means that a stripe will be generated when the data volume is accumulated to 64M before compression. The corresponding hive.exec.orc.default.row.index.stride=10000 can control how many rows generate a stripe. // Adjust this parameter to control the number of stripes in a single file. If you do not configure too many stripe for a single file, it will affect downstream use. If the ETL segmentation strategy is configured or the ETL segmentation strategy is triggered by heuristics, the Driver will read DataNode metadata is too large, which in turn leads to frequent GC, making the calculation of Partition too long and unacceptable. .config('hive.exec.orc.default.stripe.size', 268435456L) // There are three strategies in total {'HYBRID', 'BI', 'ETL'}), the default is 'HYBRID','This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics.'), // If not configured, when the orc file size is larger than the average 256M estimated by the spark framework, the ETL strategy will be triggered, causing the Driver to take a lot of time to read the DataNode data split. .config('hive.exec.orc.split.strategy', 'BI') .enableHiveSupport() .getOrCreate()