有只小跳蛙
将从 CSV 中提取的数据帧附加到由雪花模式组成的数据库:从雪花模式中提取数据。从外部数据源中提取新数据。合并两个数据集。将组合转换为一组维度表和事实表以匹配雪花模式。将转换后的数据帧加载到数据库中,覆盖现有数据。例如,对于具有以下模式的数据框,从外部源中提取:StructType([StructField('customer_name', StringType()), StructField('campaign_name', StringType())])def entrypoint(spark: SparkSession) -> None: extracted_customer_campaigns = extract_from_external_source(spark) existing_customers_dim, existing_campaigns_dim, existing_facts = ( extract_from_snowflake(spark)) combined_customer_campaigns = combine(existing_campaigns_dim, existing_customers_dim, existing_facts, extracted_customer_campaigns) new_campaigns_dim, new_customers_dim, new_facts = transform_to_snowflake( combined_customer_campaigns) load_snowflake(new_campaigns_dim, new_customers_dim, new_facts)def combine(campaigns_dimension: DataFrame, customers_dimension: DataFrame, facts: DataFrame, extracted_customer_campaigns: DataFrame) -> DataFrame: existing_customer_campaigns = facts.join( customers_dimension, on=['customer_id']).join( campaigns_dimension, on=['campaign_id']).select('customer_name', 'campaign_name') combined_customer_campaigns = extracted_customer_campaigns.union( existing_customer_campaigns).distinct() return combined_customer_campaignsdef transform_to_snowflake(customer_campaigns: DataFrame) -> ( DataFrame, DataFrame): customers_dim = customer_campaigns.select( 'customer_name').distinct().withColumn( 'customer_id', monotonically_increasing_id()) campaigns_dim = customer_campaigns.select( 'campaign_name').distinct().withColumn( 'campaign_id', monotonically_increasing_id()) facts = ( customer_campaigns.join(customers_dim, on=['customer_name']).join( campaigns_dim, on=[ 'campaign_name']).select('customer_id', 'campaign_id')) return campaigns_dim, customers_dim, facts这是一种简单的功能方法。也许可以通过编写增量来优化,而不是为每个 ETL 批次重新生成雪花键。此外,如果提供了一个单独的外部 CSV 包含要删除的记录,则可以类似地提取它,然后在转换之前从组合数据框中减去,以删除那些现有记录。最后,问题仅涉及附加到表格。如果需要合并/更新插入,则需要手动添加其他步骤,因为Spark 本身不支持它。
跃然一笑
你可以像我在下面的代码中描述的那样做一些事情。我假设您的 csv 具有与 df4 上定义的类似结构。但我认为您可能没有 customer_id、product_id 及其组的 ID。如果是这种情况,您可以使用该 row_number 窗口函数(具有序列号)来计算它们,或者使用如图所示的 monotonically_increasing_id 函数来创建 df5这个解决方案主要是基于PySpark和SQL,所以如果你对传统的DW比较熟悉,就会更好理解。from pyspark.sql.functions import monotonically_increasing_id#Creates input data. Only to rows to show how it should work#The schema is defined on the single dataframe as # customer_id --> business key coming from transactional system# customer_name --> just an attribute to show how it should behave# customer_group_id --> an id that would match the group_id on the snowflake schema, as the idea is to group customers on groups (just as a sample)# product_id --> another future dimension on the model having a snowflake schema# product_group_id --> group id for products to group them on categoriesdf1 = spark.sql("""select 1 customer_id, 'test1' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name, 987.5 sales """)df2 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 7 product_id, 'product 7' product_name, 1 product_group_id, 'product group 1' product_group_name, 12345.5 sales """)df3 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name, 2387.3 sales """)df4 = df1.union(df2).union(df3)# Added an id on the df to be able to calculate the rest of the surrogate keys for dimensionsdf5 = df4.withColumn("id", monotonically_increasing_id())# Registered dataframe to be able to query using SQLdf5.createOrReplaceTempView("df")# Now create different dfs as the structure of the DW schema iscustomer_group_df = spark.sql("""select customer_group_id, customer_group_name from df group by customer_group_id, customer_group_name""")# I use the row_number because the monotonically increasing id function# returns non sequential integers, but if you are good with that, it will be much faster# Also another solution could be to use uuid as key (or other unique identifier providers)# but that will depend on your requirementscustomer_df = spark.sql("""select row_number() over (order by customer_id, customer_name, customer_group_id) as surkey_customer, customer_id customer_bk, customer_name, customer_group_id from df group by customer_id, customer_name, customer_group_id """)product_group_df = spark.sql("""select product_group_id, product_group_name from df group by product_group_id, product_group_name""")product_df = spark.sql("""select row_number() over (order by product_id) as surkey_product, product_id product_bk, product_name, product_group_id from df group by product_id, product_name, product_group_id""")customer_df.show()product_df.show()df5.show()# You can save those dfs directly on your model in the RBMS. Sorry as you are not defining the target DB I am not writing the code, # but should be done calling the save method of the dataframe pointing to Hive or to a JDBC where your DW model is# You can find more info at https://stackoverflow.com/questions/30664008/how-to-save-dataframe-directly-to-hive or if # the target is a RDBMS https://stackoverflow.com/questions/46552161/write-dataframe-to-mysql-table-using-pyspark# Now the tricky part is to calculate the surrogate keys of the fact table. The way to do it is to join back those df# to the original dataframe. That can have performance issues, so please make sure that your data is # properly distributed (find the best approach to redistribute your dataframes on the nodes so that you reduce shuffling on the joins) # when you run customer_df.createOrReplaceTempView("customer_df")product_df.createOrReplaceTempView("product_df")fact_df = spark.sql(""" select nvl(c.surkey_customer, -1) sk_customer, nvl(p.surkey_product, -1) sk_product, sales from df d left outer join customer_df c on d.customer_id = c.customer_bk left outer join product_df p on d.product_id = p.product_bk""").show()# You can write the fact_df to your target fact table# Be aware that to populate surrogate keys I am using nvl to assign the unknown member on the dimension. If you need# that it also has to be present on the dimension table (customer and product, not group tables)如您所见,此解决方案使用简单的雪花模式。但是,如果您有 Slowly Changing Dimensions Type 2 或其他类型的维度建模,模型可能会更复杂该代码的输出是+---------------+-----------+-------------+-----------------+|surkey_customer|customer_bk|customer_name|customer_group_id|+---------------+-----------+-------------+-----------------+| 1| 1| test1| 1|| 2| 2| test2| 1|+---------------+-----------+-------------+-----------------++--------------+----------+------------+----------------+|surkey_product|product_bk|product_name|product_group_id|+--------------+----------+------------+----------------+| 1| 1| product 1| 1|| 2| 7| product 7| 1|+--------------+----------+------------+----------------++-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+|customer_id|customer_name|customer_group_id|customer_group_name|product_id|product_name|product_group_id|product_group_name| sales| id|+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+| 1| test1| 1| group 1| 1| product 1| 1| product group 1| 987.5| 0|| 2| test2| 1| group 1| 7| product 7| 1| product group 1|12345.5| 8589934592|| 2| test2| 1| group 1| 1| product 1| 1| product group 1| 2387.3|17179869184|+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------++-----------+----------+-------+|sk_customer|sk_product| sales|+-----------+----------+-------+| 1| 1| 987.5|| 2| 2|12345.5|| 2| 1| 2387.3|+-----------+----------+-------+希望这可以帮助