Cloudinary如何利用Apache Iceberg和AWS分析重塑其PB级流数据湖
重点内容
在这篇文章中,我们将探讨Cloudinary如何通过Apache Iceberg和AWS分析技术,成功转型其PB级流数据湖。关键要点如下:
Cloudinary通过利用Apache Iceberg简化数据管理,提升性能,降低成本,避免了供应商锁定。通过高效的数据管道处理来自AWS的流数据,Cloudinary能够支持复杂的分析案例。Cloudinary通过优化查询引擎选择,显著提高了查询性能和降低了成本。这篇文章由Cloudinary的Amit Gilad、Alex Dickman和Itay Takersman共同撰写。
全球各大企业和组织希望利用数据的力量,通过将数据置于每个决策过程的中心来做出更好的决策。基于数据的决策能够更有效地应对突发事件,推动创新,同时为客户提供更好的体验。然而,历史上,数据服务提供商控制着客户的数据。尽管在架构上存在存储和计算分离的潜力,但它们通常实际上是紧密结合的。这种结合使得供应商能够凭借对数据的掌控,扮演多种工作负载的主导角色,限制了可用工具和能力的范围。
数据技术的格局正在迅速发展,主要是由开源社区和Apache基金会的项目推动。这种演变的开源生态系统使客户能够完全控制数据存储、处理引擎和权限,显著扩展可用选项。该方法还鼓励供应商基于其提供的业务价值而非存储和计算的潜在融合进行竞争。这营造了一个以客户为中心的竞争环境,促使供应商通过独特的功能和服务满足客户的特定需求和偏好。
Cloudinary基础设施概览
Cloudinary的基础设施每日处理超过200亿个请求,每个请求都会生成事件日志。这些日志通过多条数据管道进行处理,每月存储PB级的数据。这些经过处理的数据会先存储在Amazon S3上,随后再装载到Snowflake Data Cloud。这些数据集为Cloudinary内部团队和数据科学小组提供重要资源,以支持详细的分析和高级用例。
直到最近,这些数据主要由自动化流程准备并聚合成结果表,仅供少数几支内部团队使用。Cloudinary面临着挑战,难以满足其他团队对在线、实时、低粒度动态使用的需求。让PB级数据可用于临时报告成为了一大困难,因为查询时间增加以及计算资源需求飙升。Cloudinary针对本次讨论的特定分析数据设定的保留期限为30天。然而,新的用例驱动了对更长保留期的需求,这将导致成本显著增加。
数据从Cloudinary日志提供商流入文件,并写入Amazon S3,同时通过推送到Amazon Simple Queue ServiceAmazon SQS的事件进行通知。这些SQS事件被运行在Amazon EMR Spark上的Spark应用程序摄取和解析。处理后的日志以Apache Parquet格式写回Amazon S3,然后通过Snowpipe自动加载到Snowflake表中。
Cloudinary为何选择Apache Iceberg
Apache Iceberg是一种为庞大分析工作负载设计的高性能表格格式。它将SQL表的可靠性和简便性引入大数据,使得Apache Spark、Trino、Apache Flink、Presto、Apache Hive和Impala等处理引擎能够安全地同时操作相同的表。
基于Apache Iceberg的解决方案提供完整的数据管理,具备简单的内建表优化能力,能够避免不同解决方案间的数据移动。在探索Apache Iceberg的各种控制和配置选项时,Cloudinary需要对数据进行适配,以使用AWS Glue Data Catalog,同时还需要将大量数据转移到存储在Amazon S3上的Apache Iceberg中。在这个阶段,他们意识到成本将显著降低,并且虽然这是在规划阶段的关键因素,但现在可以获得具体的数据。举个例子,Cloudinary能够以此前存储一个月数据的价格,存储六个月的数据。这一成本节省通过使用Amazon S3存储层以及改进的压缩Zstandard来实现,并且Parquet文件已按序排列。
由于Apache Iceberg得到AWS数据服务的良好支持,且Cloudinary已经在使用Amazon EMR上的Spark,因此他们能够整合写入数据目录,并启动额外的Spark集群以处理数据维护和整理。在继续研究Apache Iceberg的过程中,他们发现了一些有趣的性能指标。例如,在某些查询中,Athena的运行时间是Snowflake的2至4倍更快。
Apache Iceberg的集成
Apache Iceberg的集成在数据加载到Snowflake之前完成。数据使用Apache Parquet数据格式写入Iceberg表,同时使用AWS Glue作为数据目录。此外,运行在Amazon EMR上的Spark应用程序在后台处理Parquet文件的整理,以便通过Athena、EMR上运行的Trino和Snowflake等各种工具进行优化查询。
面临的挑战
Cloudinary在构建其PB级数据湖时面临诸多挑战,包括:
确定最佳表分区方案优化数据摄取解决小文件问题以改善查询性能成本效益地维护Apache Iceberg表选择合适的查询引擎在这一部分,我们将详细描述每一个挑战及实施的解决方案。为了测试性能和数据扫描量,主要使用Athena,因为它提供了易用、完全无服务器且具有成本效益的接口,无需设置基础设施。
确定最佳表分区方案
Apache Iceberg通过实现隐藏分区,使用户在分区时变得更加简单。用户不需要在查询时提供单独的分区过滤器,Iceberg表可以配置将常规列映射到分区键。用户无需维护分区列,甚至无需理解物理表布局,就能获得快速而精确的查询结果。
Iceberg提供了多种分区选项。例如,当对时间戳进行分区时,可以按年、月、日和小时分区。Iceberg跟踪列值与其分区之间的关系,而无需额外的列。Iceberg还支持根据身份、哈希桶或截断进行分类列值的分区。此外,Iceberg的分区操作友好,允许分区布局随时间演变,而不破坏预先编写的查询。
确定正确的分区方案对于处理大数据集至关重要,因为这影响查询性能和扫描的数据量。由于此次迁移是从Snowflake原生存储的现有表迁移到Iceberg,因此测试并提供具有相同或更好性能的解决方案至关重要。
这些测试得益于Apache Iceberg的以下特性:
隐藏分区分区转换分区演变这些特性允许修改表分区,并测试最佳策略而无需重写数据。
以下是一些测试过的分区策略:
PARTITIONED BY (days(day) customerid)PARTITIONED BY (days(day) hour(timestamp))PARTITIONED BY (days(day) bucket(N customerid))PARTITIONED BY (days(day))这些分区策略的结果在写入和查询时有显著不同。经过仔细分析结果,Cloudinary决定按天对数据进行分区并结合排序,这使他们能够在分区内对数据进行排序。
优化数据摄取
Cloudinary从其供应商处接收到各种格式和大小的数十亿事件,存储在Amazon S3中,这意味着每天处理和存储的数TB数据。
由于数据到达不一致,无法预测数据的到达速率和文件大小,因此必须找到一种方法,在保持高吞吐量的同时降低成本。

Cloudinary使用EventBridge将每个接收到的文件推送到Amazon SQS,然后使用在Amazon EMR上运行的Spark进行批处理,从而实现高吞吐量的数据摄取,并根据队列大小扩大集群规模,同时控制成本。
以下是使用Spark从Amazon SQS提取100条消息文件的示例代码:
scalavar client = AmazonSQSClientBuilderstandard()withRegion(useast1)build()var getMessageBatch Iterable[Message] = DistributedSQSReceiverclientreceiveMessage(new ReceiveMessageRequest()withQueueUrl(queueUrl)withMaxNumberOfMessages(10))getMessagesasScalasparkSessionsparkContextparallelize(10) map( =gt getMessageBatch) collect()flatMap(toList)toList
当处理特定分区前缀的高数据摄取速率时,Amazon S3可能会抑制请求并返回503状态码服务不可用。为了解决这个场景,Cloudinary使用了一种Iceberg表属性,即writeobjectstorageenabled,该属性将哈希前缀纳入存储的Amazon S3对象路径。这种方法被认为是有效的,有效地缓解了Amazon S3的抑制问题。
解决小文件问题,改善查询性能
在现代数据架构中,诸如Amazon EMR的流处理引擎通常用于将连续的数据流摄入数据湖,并采用Apache Iceberg。流向Iceberg表的摄取在两个方面可能面临挑战:
它产生许多小文件,导致查询计划时间延长,从而影响读取性能。数据聚类效果差,这会使文件修剪效果降低。通常,流处理过程中由于缺乏足够的新数据来生成优化的文件大小例如512MB,导致此类问题。由于分区是产生文件数量的关键因素,而Cloudinary的数据具有时间特性且大多数查询使用时间过滤器,因此决定从多个方面优化数据湖。
首先,Cloudinary设置了所有必要的配置,帮助在向表中追加数据时减少文件数量,设置writetargetfilesizebytes,定义默认目标文件大小。通过在Spark中设置sparksqlshufflepartitions,可以通过控制回拨操作中用于分区的数量来减少输出文件的数量,这影响数据如何在任务间分配,从而最小化在转换或聚合后生成的输出文件数量。
由于上述方法仅解决了小文件问题,但并没有完全消除Cloudinary使用Apache Iceberg的另一个能力,该能力可以使用Spark并行压缩数据文件,通过rewriteDataFiles操作。这一操作将小文件合并为更大的文件,以减少元数据开销,并尽量减少Amazon S3GetObjectAPI操作的使用。
在进行压缩时,Cloudinary需要选择应用三种提供的策略中的哪一种;每种策略都有其自身的优缺点:
Binpack 简单地将小文件重写为目标大小Sort 依据不同列对数据进行排序Zorder 一种将相关数据放置在同一组文件中的技术起初,Cloudinary评估了Binpack压缩策略。这个策略的速度最快,可以将小文件组合在一起以达到定义的目标文件大小,并且运行后观察到查询性能显著改善。
如前所述,数据是按天分区的,大多数查询集中在特定时间范围内。由于数据来自外部供应商,且有时会迟到,因此在运行压缩日期上的查询时,发现扫描了大量数据,因为特定时间范围可能存在于多个文件中。查询引擎Athena、Snowflake和运行在Amazon EMR上的Trino需要扫描整个分区以仅获取相关行。
为了进一步提高查询性能,Cloudinary决定将压缩过程更改为使用排序,因此现在数据按天分区并按requestedat操作发生的时间戳和客户ID排序。
这一策略在压缩时成本更高,因为需要对数据进行洗牌以进行排序。然而,在采用这种排序策略后,有两个观察结果:之前运行的相同查询现在扫描的数据减少了约50,且查询运行时间提高了30至50。
成本效益地维护Apache Iceberg表
维护Apache Iceberg表对于优化性能、降低存储成本以及确保数据完整性至关重要。Iceberg提供多种维护操作以保持表的良好状态。通过实施这些操作,Cloudinary能够以低成本有效地管理其Iceberg表。
过期快照
每次写入Iceberg表都会创建一个新的快照或版本,可用于时间旅行查询,或将表回滚到任何有效快照。
定期过期快照有助于删除不再需要的数据文件,并保持表元数据的大小较小。Cloudinary决定保留快照不超过7天,以便更好地排查故障和处理可能来自外部源且未能及时识别的损坏数据。
scalaSparkActionsget()expireSnapshots(iceTable)expireOlderThan(TimeUnitDAYStoMillis(7))execute()
删除旧元数据文件
白鲸加速器电脑版Iceberg使用JSON文件记录表元数据。表的每次变化都会生成一个新元数据文件以确保原子性。
默认情况下,旧元数据文件会被保留以便历史记录。经常提交的表,例如由流处理作业写入的表,可能需要定期清理元数据文件。
配置以下属性可以确保只保留最新的十个元数据文件,任何更老的文件都会被删除。
scalawritemetadatadeleteaftercommitenabled=truewritemetadatapreviousversionsmax=10
删除孤立文件
在Spark和其他分布式处理引擎中,当任务或作业失败时,可能会留下不在表元数据中的文件。此外,在某些情况下,标准的快照过期处理可能无法识别不再必要的文件并将其删除。
Apache Iceberg提供了deleteOrphanFiles操作,能够处理未引用的文件。此操作在数据和元数据目录中有大量文件时可能需要很长时间才能完成。如果元数据或数据文件未通过任何有效快照访问,则被视为孤立文件。通过使用Amazon S3的ListObjects操作生成实际文件的集合,这将使这个操作变得昂贵。建议定期运行此操作以避免增加存储使用量,但过于频繁的运行也可能抵消其成本效益。
以下图表展示了这一过程如何去除112 TB的存储。
重写清单文件
Apache Iceberg在其清单列表和清单文件中使用元数据以加速查询规划和修剪不必要的数据文件。元数据树中的清单会按照添加顺序自动压缩,从而使查询在写入模式与读取过滤器对齐时更快。
如果表的写入模式与查询读取过滤器模式不一致,则可以重写元数据以将数据文件重新组合到清单中,使用rewriteManifests。
尽管Cloudinary已有一个优化数据文件的压缩过程,但发现在某些情况下,清单文件也需要优化。结果显示,Cloudinary发现清单文件超过300个,且这些小文件的大小常小于8MB,且由于数据迟到,清单文件指向不同分区中的数据。这导致每个查询的查询规划时间长达12秒。
Cloudinary启动了一个单独的计划任务来执行rewriteManifests,运行后清单文件的数量减少到大约170个,从而实现了清单与查询过滤器基于分区间的更高一致性,查询规划时间提升至约4秒。
选择合适的查询引擎
作为Cloudinary探索的组成部分,旨在测试不同的查询引擎,他们最初列出了几个关键性能指标KPI来指导其选择,包括对Apache Iceberg的支持以及与现有数据源如MySQL和Snowflake的集成、便于一次性查询的Web界面和成本优化。根据这些标准,他们选择评估包括在Amazon EMR上运行的Trino、Athena和支持Apache Iceberg的Snowflake等多种解决方案。这种方法使得他们能够根据定义的KPI评估每种解决方案,为Cloudinary的需求提供全面了解。
Cloudinary打算评估的两个更量化的KPI是成本和性能。Cloudinary在早期过程中意识到,不同的查询和使用类型可能会从不同的运行时引擎中受益。他们决定专注于四个运行时引擎。
引擎详情Snowflake原生基于存储在Snowflake内的数据仓库支持Apache Iceberg的Snowflake基于存储在S3中的Apache Iceberg表的数据仓库Athena按需模式Amazon EMR Trino在八节点(m6g12xl)集群上的开源Trino段测试包括四种不同类型的查询,代表Cloudinary正在运行的不同生产工作负载。它们
发表评论