如何将 PySpark 数据框插入到具有雪花模式的数据库中?

使用 PySpark 我正在计算一个数据框,如果这个数据库有一个雪花模式,我如何将这个数据框附加到我的数据库中?

如何指定拆分数据框的方式,以便将类似 CSV 的数据放入多个联合表中?

我的问题不是特定于 Pyspark 的,同样的问题也可以问到 pandas。



沧海一幻觉
浏览 122回答 2
2回答

有只小跳蛙

将从 CSV 中提取的数据帧附加到由雪花模式组成的数据库:从雪花模式中提取数据。从外部数据源中提取新数据。合并两个数据集。将组合转换为一组维度表和事实表以匹配雪花模式。将转换后的数据帧加载到数据库中,覆盖现有数据。例如,对于具有以下模式的数据框,从外部源中提取:StructType([StructField('customer_name', StringType()),            StructField('campaign_name', StringType())])def entrypoint(spark: SparkSession) -> None:  extracted_customer_campaigns = extract_from_external_source(spark)  existing_customers_dim, existing_campaigns_dim, existing_facts = (    extract_from_snowflake(spark))  combined_customer_campaigns = combine(existing_campaigns_dim,                                        existing_customers_dim,                                        existing_facts,                                        extracted_customer_campaigns)  new_campaigns_dim, new_customers_dim, new_facts = transform_to_snowflake(    combined_customer_campaigns)  load_snowflake(new_campaigns_dim, new_customers_dim, new_facts)def combine(campaigns_dimension: DataFrame,            customers_dimension: DataFrame,            facts: DataFrame,            extracted_customer_campaigns: DataFrame) -> DataFrame:  existing_customer_campaigns = facts.join(    customers_dimension,    on=['customer_id']).join(    campaigns_dimension, on=['campaign_id']).select('customer_name',                                                    'campaign_name')  combined_customer_campaigns = extracted_customer_campaigns.union(    existing_customer_campaigns).distinct()  return combined_customer_campaignsdef transform_to_snowflake(customer_campaigns: DataFrame) -> (    DataFrame, DataFrame):  customers_dim = customer_campaigns.select(    'customer_name').distinct().withColumn(    'customer_id', monotonically_increasing_id())  campaigns_dim = customer_campaigns.select(    'campaign_name').distinct().withColumn(    'campaign_id', monotonically_increasing_id())  facts = (    customer_campaigns.join(customers_dim,                            on=['customer_name']).join(      campaigns_dim, on=[        'campaign_name']).select('customer_id', 'campaign_id'))  return campaigns_dim, customers_dim, facts这是一种简单的功能方法。也许可以通过编写增量来优化,而不是为每个 ETL 批次重新生成雪花键。此外,如果提供了一个单独的外部 CSV 包含要删除的记录,则可以类似地提取它,然后在转换之前从组合数据框中减去,以删除那些现有记录。最后,问题仅涉及附加到表格。如果需要合并/更新插入,则需要手动添加其他步骤,因为Spark 本身不支持它。

跃然一笑

你可以像我在下面的代码中描述的那样做一些事情。我假设您的 csv 具有与 df4 上定义的类似结构。但我认为您可能没有 customer_id、product_id 及其组的 ID。如果是这种情况,您可以使用该 row_number 窗口函数(具有序列号)来计算它们,或者使用如图所示的 monotonically_increasing_id 函数来创建 df5这个解决方案主要是基于PySpark和SQL,所以如果你对传统的DW比较熟悉,就会更好理解。from pyspark.sql.functions import monotonically_increasing_id#Creates input data. Only to rows to show how it should work#The schema is defined on the single dataframe as # customer_id --> business key coming from transactional system# customer_name --> just an attribute to show how it should behave# customer_group_id --> an id that would match the group_id on the snowflake schema, as the idea is to group customers on groups (just as a sample)# product_id --> another future dimension on the model having a snowflake schema# product_group_id --> group id for products to group them on categoriesdf1 = spark.sql("""select 1 customer_id, 'test1' customer_name, 1 customer_group_id, 'group 1' customer_group_name,         1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name,        987.5 sales        """)df2 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name,         7 product_id, 'product 7' product_name, 1 product_group_id, 'product group 1' product_group_name,        12345.5 sales        """)df3 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name,         1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name,        2387.3 sales        """)df4 = df1.union(df2).union(df3)# Added an id on the df to be able to calculate the rest of the surrogate keys for dimensionsdf5 = df4.withColumn("id",  monotonically_increasing_id())# Registered dataframe to be able to query using SQLdf5.createOrReplaceTempView("df")# Now create different dfs as the structure of the DW schema iscustomer_group_df = spark.sql("""select customer_group_id, customer_group_name            from df group by customer_group_id, customer_group_name""")# I use the row_number because the monotonically increasing id function# returns non sequential integers, but if you are good with that, it will be much faster# Also another solution could be to use uuid as key (or other unique identifier providers)# but that will depend on your requirementscustomer_df = spark.sql("""select row_number() over (order by customer_id, customer_name, customer_group_id) as surkey_customer, customer_id customer_bk,             customer_name, customer_group_id            from df group by customer_id, customer_name, customer_group_id """)product_group_df =  spark.sql("""select product_group_id, product_group_name            from df group by product_group_id, product_group_name""")product_df =  spark.sql("""select row_number() over (order by product_id) as surkey_product, product_id product_bk,             product_name, product_group_id            from df group by product_id, product_name, product_group_id""")customer_df.show()product_df.show()df5.show()# You can save those dfs directly on your model in the RBMS. Sorry as you are not defining the target DB I am not writing the code, # but should be done calling the save method of the dataframe pointing to Hive or to a JDBC where your DW model is# You can find more info at https://stackoverflow.com/questions/30664008/how-to-save-dataframe-directly-to-hive or if # the target is a RDBMS https://stackoverflow.com/questions/46552161/write-dataframe-to-mysql-table-using-pyspark# Now the tricky part is to calculate the surrogate keys of the fact table. The way to do it is to join back those df# to the original dataframe. That can have performance issues, so please make sure that your data is # properly distributed (find the best approach to redistribute your dataframes on the nodes so that you reduce shuffling on the joins) # when you run customer_df.createOrReplaceTempView("customer_df")product_df.createOrReplaceTempView("product_df")fact_df = spark.sql("""    select nvl(c.surkey_customer, -1) sk_customer, nvl(p.surkey_product, -1) sk_product, sales    from        df d left outer join customer_df c on d.customer_id = c.customer_bk               left outer join product_df p on d.product_id = p.product_bk""").show()# You can write the fact_df to your target fact table# Be aware that to populate surrogate keys I am using nvl to assign the unknown member on the dimension. If you need# that it also has to be present on the dimension table (customer and product, not group tables)如您所见,此解决方案使用简单的雪花模式。但是,如果您有 Slowly Changing Dimensions Type 2 或其他类型的维度建模,模型可能会更复杂该代码的输出是+---------------+-----------+-------------+-----------------+|surkey_customer|customer_bk|customer_name|customer_group_id|+---------------+-----------+-------------+-----------------+|              1|          1|        test1|                1||              2|          2|        test2|                1|+---------------+-----------+-------------+-----------------++--------------+----------+------------+----------------+|surkey_product|product_bk|product_name|product_group_id|+--------------+----------+------------+----------------+|             1|         1|   product 1|               1||             2|         7|   product 7|               1|+--------------+----------+------------+----------------++-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+|customer_id|customer_name|customer_group_id|customer_group_name|product_id|product_name|product_group_id|product_group_name|  sales|         id|+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+|          1|        test1|                1|            group 1|         1|   product 1|               1|   product group 1|  987.5|          0||          2|        test2|                1|            group 1|         7|   product 7|               1|   product group 1|12345.5| 8589934592||          2|        test2|                1|            group 1|         1|   product 1|               1|   product group 1| 2387.3|17179869184|+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------++-----------+----------+-------+|sk_customer|sk_product|  sales|+-----------+----------+-------+|          1|         1|  987.5||          2|         2|12345.5||          2|         1| 2387.3|+-----------+----------+-------+希望这可以帮助
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python