52ky 发表于 2022-5-6 16:06:22

PySpark:过滤掉非类型对象上的 RDD 元素失败

问题
我想过滤输出 RDD 的元素,其中字段“状态”是不等于“OK”。我从 HDFS 上的一组 CSV 文件创建一个 RDD,然后在尝试映射之前使用过滤器获取所需的结构:
import csv, StringIO   

files = "/hdfs_path/*.csv"

fields = ["time", "status"]

dial = "excel"

default = {'status': 'OK', 'time': '2014-01-0100:00:00'}

def loadRecord(line, fieldnames, dialect):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
    try:
      line = reader.next()
      if line is None:
            return default
      else:
            return line
    except:
      return default

harmonics = sc.textFile(files) \
            .map(lambda x: loadRecord(x, fields, dial)) \
            .filter(lambda x: "OK" not in x['status'])
我可以在这个 RDD 上执行其他操作 - 例如另一个地图仅对某些字段等进行操作。但是,当我使用 filter 运行代码时,其中一个任务总是失败,并在 lambda 函数中出现异常:

筛选

我认为这意味着 lambda 收到了一个 'NoneType object is not iterable' ,所以我添加了代码来过滤以避免返回 None 。但是,我仍然遇到同样的错误。它确实适用于一个小样本数据集,但我的实际数据足够大,我不确定如何检测它的哪一部分可能导致问题。

欢迎任何意见!

回答
首先,使用 map(lambda x: loadRecord(x, fields, dial)) 重置 map(lambda x: (x, loadRecord(x, fields, dial))) 的空间 - 这会保留原始记录和解析后的记录.

其次,将 flatMap(test_function) 调用替换为 filter(),定义 test_function 如何测试输入,如果第二个传递的参数为 None(解析的记录),则返回第一个参数。

通过这种方式,您可以获得导致问题的输入行并在本地测试脚本。通常,我会将行 global default 添加为函数的第一行



页: [1]
查看完整版本: PySpark:过滤掉非类型对象上的 RDD 元素失败