猿问

如何根据 rdd 中包含 pyspark 元组数组的第一个元素进行过滤?

我在从 rdd 中过滤元组列表时遇到问题。


示例business.json


{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},

{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}

from pyspark import SparkContext

sc = SparkContext.getOrCreate()


stars = "4.0"

input_business_lines = sc.textFile('data/business.json') \

    .map(lambda lines: json.loads(lines))


business_ids = input_business_lines \

    .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \

    .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2])).collect()


上面的代码返回每个元组具有的元组列表(first element = business_id , second element = state)


[('gnKjwL_1w79qoiV3IC_xQQ', 'NC'),('xvX2CttrVhyG2z1dFg_0xw', 'AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ', 'NC')]

到目前为止一切都很好。现在我需要与评论表进行联接,并希望使用评论的 rdd 过滤所有匹配的业务 id。如果那是一个数据框那就容易多了。但对于元组,我不确定我们该怎么做。


示例 review.json


{"review_id": "c-6aA9Bd7JxpmMroRoas9A", "user_id": "bK4Y_GZUoAUTXIrmeEUGYw", "business_id": "gnKjwL_1w79qoiV3IC_xQQ", "stars": 4.0, "text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.", "date": "2014-07-13 20:28:18"},



幕布斯6054654
浏览 101回答 2
2回答

慕标琳琳

import jsonfrom pyspark import SparkContextif __name__ == '__main__':    input_review_json_path = "publicdata/review.json"    input_business_json_path = "publicdata/business.json"    output_csv_path = "outputs/user_state.csv"    stars = "4.0"    sc = SparkContext.getOrCreate()    input_business_lines = sc.textFile(input_business_json_path) \                             .map(lambda lines: json.loads(lines))    business_ids = input_business_lines \                        .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \                        .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))    input_review_lines = sc.textFile(input_review_json_path) \                            .map(lambda lines: json.loads(lines))    rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))    finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))    review_rdd = finalRdd.collect()

慕侠2389804

你可以加入那些rdd。import jsonstars = 4.0input_business_lines = sc.textFile('test.json') \    .map(lambda lines: json.loads(lines))business_ids = input_business_lines \    .filter(lambda kv: kv['stars'] >= stars) \    .map(lambda kv: (kv['business_id'], kv['state']))print(business_ids.collect())input_review_lines = sc.textFile('test2.json') \    .map(lambda lines: json.loads(lines))rew_ids_bus_ids = input_review_lines \    .map(lambda kv: (kv['business_id'], kv['user_id']))joined = business_ids \    .join(rew_ids_bus_ids)print(joined.collect())# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]
随时随地看视频慕课网APP

相关分类

Python
我要回答