使用 PySpark 从 S3 消耗数据

你可以使用两种方法来使用 AWS S3 存储桶中的数据。

  1. 使用 sc.textFile(或 sc.wholeTextFiles)API:此 api 也可用于 HDFS 和本地文件系统。
aws_config = {}  # set your aws credential here
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
data_rdd = sc.wholeTextFiles(s3_keys)
  1. 使用自定义 API 读取它(说一个 boto 下载器):
def download_data_from_custom_api(key):
    # implement this function as per your understanding (if you're new, use [boto][1] api)
    # don't worry about multi-threading as each worker will have single thread executing your job
    return ''

s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
# numSlices is the number of partitions. You'll have to set it according to your cluster configuration and performance requirement
key_rdd = sc.parallelize(s3_keys, numSlices=16) 

data_rdd = key_rdd.map(lambda key: (key, download_data_from_custom_api(key))

我建议使用方法 2,因为在使用方法 1 时,驱动程序会下载所有数据,而工作人员只需处理它。这有以下缺点:

  1. 随着数据大小的增加,内存不足。
  2. 你的工作人员将闲置,直到数据下载完毕