我正在使用python 2.7(不要问我为什么,我是承包商,我只是使用他们给我的东西)。
我正在尝试实现一个pyspark函数,该函数利用spark-bigquery 连接器提交一个利用 Spark SQL 数据源 API 的简单查询。
我正在经历最奇怪的事情;我编写了这个函数,并在我实际运行它时确认它确实适用于服务器。我想确保如果用户提供了一个不存在的表名,则会根据处理服务器返回的表名引发异常,我确实做到了(我知道这不是 TDD,但就这样吧)。然后我开始为它编写测试,显然我必须生成一个模拟异常,我做了如下操作:
模块/query_bq
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
def submit_bq_query(spark, table, filter_string):
try:
df = spark.read.format('bigquery').option('table', table).option('filter', filter_string).load()
return df
except Py4JJavaError as e:
java_error_msg = str(e).split('\n')[1]
if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:
raise Exception("RuntimeException: Table {} not found!".format(table))
正如我所说,这就像一个魅力。现在,它的测试看起来像这样:
模块/test_query_bq
import pytest
from mock import patch, mock
from py4j.java_gateway import GatewayProperty, GatewayClient, JavaObject
from py4j.protocol import Py4JJavaError
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType
def mock_p4j_java_error_generator(msg):
gateway_property = GatewayProperty(auto_field="Mock", pool="Mock")
client = GatewayClient(gateway_property=gateway_property)
java_object = JavaObject("RunTimeError", client)
exception = Py4JJavaError(msg, java_exception=java_object)
return Exception(exception)
def test_exception_is_thrown_if_table_not_present():
# Given
mock_table_name = 'spark_bq_test.false_table_name'
mock_filter = "word is 'V'"
mock_errmsg = "Table {} not found".format(mock_table_name)
运行测试成功,但是当我尝试调试它时,只是为了跟踪执行,我注意到捕获到异常之后的代码:
永远达不到。尽管如此,测试仍然成功。
简而言之,在测试中应该模拟和抛出异常。它也被捕获,但未处理。测试的断言通过并且测试成功,就好像它没有被处理一样,但我从来没有检查过模拟异常的内部。再一次,让我注意到module/query_bq在服务器上工作得很好;当表不存在时,返回dataframes并处理异常就好了!这里的重点是测试。
我需要对module/query_bq中的异常处理部分做额外的事情,但我不能,因为我不知道发生了什么。谁能解释一下?
宝慕林4294392
相关分类