如何将 sklearn 管道转换为 pyspark 管道?

我们有一个机器学习分类器模型,使用 pandas 数据框和标准 sklearn 管道(StandardScaler、RandomForestClassifier、GridSearchCV 等)进行训练。我们正在开发 Databricks,并希望使用 Spark 提供的并行计算功能将该管道扩展到大型数据集。

将 sklearn 管道转换为并行计算的最快方法是什么?(我们可以根据需要轻松地在 pandas 和 Spark DF 之间切换。)

就上下文而言,我们的选择似乎是:

  1. 使用MLLib重写管道(耗时)

  2. 使用 sklearn-spark 桥接库

对于选项 2,Spark-Sklearn 似乎已被弃用,但 Databricks建议我们使用 joblibspark。然而,这在 Databricks 上引发了一个例外:

from sklearn import svm, datasets

from sklearn.model_selection import GridSearchCV

from joblibspark import register_spark

from sklearn.utils import parallel_backend

register_spark() # register spark backend


iris = datasets.load_iris()

parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}

svr = svm.SVC(gamma='auto')


clf = GridSearchCV(svr, parameters, cv=5)

with parallel_backend('spark', n_jobs=3):

    clf.fit(iris.data, iris.target)

提高


py4j.security.Py4JSecurityException: Method public int org.apache.spark.SparkContext.maxNumConcurrentTasks() is not whitelisted on class class org.apache.spark.SparkContext


拉风的咖菲猫
浏览 106回答 2
2回答

MM们

必要的要求是:Python 3.6+pyspark>=2.4scikit-learn>=0.21joblib>=0.14我无法在运行 Python 3.7.5、Spark 3.0.0、scikit-learn 0.22.1 和 joblib 0.14.1 的社区 Databricks 集群中重现您的问题:import sysimport sklearnimport joblibspark.version# '3.0.0'sys.version# '3.7.5 (default, Nov  7 2019, 10:50:52) \n[GCC 8.3.0]'sklearn.__version__# '0.22.1'joblib.__version__# '0.14.1'通过上述设置,您的代码片段可以顺利运行,并确实生成一个分类器,clf如下所示:GridSearchCV(cv=5, error_score=nan,             estimator=SVC(C=1.0, break_ties=False, cache_size=200,                           class_weight=None, coef0=0.0,                           decision_function_shape='ovr', degree=3,                           gamma='auto', kernel='rbf', max_iter=-1,                           probability=False, random_state=None, shrinking=True,                           tol=0.001, verbose=False),             iid='deprecated', n_jobs=None,             param_grid={'C': [1, 10], 'kernel': ('linear', 'rbf')},             pre_dispatch='2*n_jobs', refit=True, return_train_score=False,             scoring=None, verbose=0)这里的替代示例也是如此:from sklearn.utils import parallel_backendfrom sklearn.model_selection import cross_val_scorefrom sklearn import datasetsfrom sklearn import svmfrom joblibspark import register_sparkregister_spark() # register spark backendiris = datasets.load_iris()clf = svm.SVC(kernel='linear', C=1)with parallel_backend('spark', n_jobs=3):  scores = cross_val_score(clf, iris.data, iris.target, cv=5)print(scores)给予[0.96666667 1.         0.96666667 0.96666667 1.        ]

12345678_0001

这个答案对于标准 Spark / Databricks 设置来说应该是正确的,因此考虑到我的问题的措辞/对其他读者的潜在有用性,我已经接受了它发现我们案例中的问题后,贡献一个单独的“答案”:Databricks 支持人员建议我们案例中的问题是由于我们使用特殊类型的集群(在 AWS 上启用了凭证直通的高并发性)。grid.fit() 没有被列入此类集群的白名单,Databricks 建议他们需要向工程团队提出将其列入白名单。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python