PySpark 模拟:异常测试成功但未处理异常

我正在使用python 2.7(不要问我为什么,我是承包商,我只是使用他们给我的东西)。


我正在尝试实现一个pyspark函数,该函数利用spark-bigquery 连接器提交一个利用 Spark SQL 数据源 API 的简单查询。


我正在经历最奇怪的事情;我编写了这个函数,并在我实际运行它时确认它确实适用于服务器。我想确保如果用户提供了一个不存在的表名,则会根据处理服务器返回的表名引发异常,我确实做到了(我知道这不是 TDD,但就这样吧)。然后我开始为它编写测试,显然我必须生成一个模拟异常,我做了如下操作:


模块/query_bq


from py4j.protocol import Py4JJavaError

from pyspark.sql import SparkSession


def submit_bq_query(spark, table, filter_string):

    try:

        df = spark.read.format('bigquery').option('table', table).option('filter', filter_string).load()

        return df

    except Py4JJavaError as e:

        java_error_msg = str(e).split('\n')[1]

        if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:

            raise Exception("RuntimeException: Table {} not found!".format(table))

正如我所说,这就像一个魅力。现在,它的测试看起来像这样:


模块/test_query_bq


import pytest

from mock import patch, mock

from py4j.java_gateway import GatewayProperty, GatewayClient, JavaObject

from py4j.protocol import Py4JJavaError

from pyspark.sql.dataframe import DataFrame

from pyspark.sql.types import StructType



def mock_p4j_java_error_generator(msg):

    gateway_property = GatewayProperty(auto_field="Mock", pool="Mock")

    client = GatewayClient(gateway_property=gateway_property)

    java_object = JavaObject("RunTimeError", client)

    exception = Py4JJavaError(msg, java_exception=java_object)

    return Exception(exception)



def test_exception_is_thrown_if_table_not_present():


    # Given

    mock_table_name = 'spark_bq_test.false_table_name'

    mock_filter = "word is 'V'"

    mock_errmsg = "Table {} not found".format(mock_table_name)


运行测试成功,但是当我尝试调试它时,只是为了跟踪执行,我注意到捕获到异常之后的代码:


永远达不到。尽管如此,测试仍然成功。


简而言之,在测试中应该模拟和抛出异常。它也被捕获,但未处理。测试的断言通过并且测试成功,就好像它没有被处理一样,但我从来没有检查过模拟异常的内部。再一次,让我注意到module/query_bq在服务器上工作得很好;当表不存在时,返回dataframes并处理异常就好了!这里的重点是测试。


我需要对module/query_bq中的异常处理部分做额外的事情,但我不能,因为我不知道发生了什么。谁能解释一下?


梵蒂冈之花
浏览 119回答 1
1回答

宝慕林4294392

经过3天的挣扎,我把它整理出来了。主要问题是:我没有正确地模拟 spark.read 进程签名,并且;我没有正确实例化 Py4JJavaError 的模拟实例。以下是我如何做到这两点:.../utils/bigquery_util.pyimport loggingfrom py4j.protocol import Py4JJavaError, Py4JNetworkErrordef load_bq_table(spark, table, filter_string):&nbsp; &nbsp; tries = 3&nbsp; &nbsp; for i in range(tries):&nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.info("SQL statement being executed...")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; df = get_df(spark, table, filter_string)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.info("Table-ID: {}, Rows:{} x Cols:{}".format(table, df.count(), len(df.columns)))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.debug("Table-ID: {}, Schema: {}".format(table, df.schema))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return df&nbsp; &nbsp; &nbsp; &nbsp; except Py4JJavaError as e:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; java_exception_str = get_exception_msg(e)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; is_runtime_exception = "java.lang.RuntimeException" in java_exception_str&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; table_not_found = ("{} not found".format(table)) in java_exception_str&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if is_runtime_exception and table_not_found:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.error(java_exception_str)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise RuntimeError("Table {} not found!".format(table))&nbsp; &nbsp; &nbsp; &nbsp; except Py4JNetworkError as ne:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if i is tries-1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; java_exception_str = ne.cause&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runtime_error_str = "Error while trying to reach server... {}"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.error(java_exception_str)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise EnvironmentError(runtime_error_str.format(java_exception_str))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continuedef get_exception_msg(e):&nbsp; &nbsp; return str(e.java_exception)def get_df(spark, table, filter_string):&nbsp; &nbsp; return (spark.read&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .format('bigquery')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option('table', table)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .option('filter', filter_string)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .load())至于测试:.../test/utils/test_bigquery_util.py&nbsp; &nbsp; import pytest&nbsp; &nbsp; from mock import patch, mock&nbsp; &nbsp; from <...>.utils.bigquery_util import load_bq_table&nbsp; &nbsp; from <...>.test.utils.mock_py4jerror import *&nbsp; &nbsp; def test_runtime_error_exception_is_thrown_if_table_not_present():&nbsp; &nbsp; &nbsp; &nbsp; # Given&nbsp; &nbsp; &nbsp; &nbsp; mock_table_name = 'spark_bq_test.false_table_name'&nbsp; &nbsp; &nbsp; &nbsp; mock_filter = "word is 'V'"&nbsp; &nbsp; &nbsp; &nbsp; # Mocking&nbsp; &nbsp; &nbsp; &nbsp; py4j_error_exception = get_mock_py4j_error_exception(get_mock_gateway_client(), mock_target_id="o123")&nbsp; &nbsp; &nbsp; &nbsp; mock_errmsg = "java.lang.RuntimeException: Table {} not found".format(mock_table_name)&nbsp; &nbsp; &nbsp; &nbsp; # When&nbsp; &nbsp; &nbsp; &nbsp; with mock.patch('red_agent.common.utils.bigquery_util.get_exception_msg', return_value=mock_errmsg):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; with mock.patch('red_agent.common.utils.bigquery_util.get_df', side_effect=py4j_error_exception):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; with pytest.raises(RuntimeError):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mock_spark = mock.Mock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; df = load_bq_table(mock_spark, mock_table_name, mock_filter)..最后用于模拟 Py4JJavaError: .../test/utils/mock_py4jerror.pyimport mockfrom py4j.protocol import Py4JJavaError, Py4JNetworkErrordef get_mock_gateway_client():&nbsp; &nbsp; mock_client = mock.Mock()&nbsp; &nbsp; mock_client.send_command.return_value = "0"&nbsp; &nbsp; mock_client.converters = []&nbsp; &nbsp; mock_client.is_connected.return_value = True&nbsp; &nbsp; mock_client.deque = mock.Mock()&nbsp; &nbsp; return mock_clientdef get_mock_java_object(mock_client, mock_target_id):&nbsp; &nbsp; mock_java_object = mock.Mock()&nbsp; &nbsp; mock_java_object._target_id = mock_target_id&nbsp; &nbsp; mock_java_object._gateway_client = mock_client&nbsp; &nbsp; return mock_java_objectdef get_mock_py4j_error_exception(mock_client, mock_target_id):&nbsp; &nbsp; mock_java_object = get_mock_java_object(mock_client, mock_target_id)&nbsp; &nbsp; mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)&nbsp; &nbsp; return Py4JJavaError(mock_errmsg, java_exception=mock_java_object)def get_mock_py4j_network_exception(mock_target_id):&nbsp; &nbsp; mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)&nbsp; &nbsp; return Py4JNetworkError(mock_errmsg)希望这会帮助某人...
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python