猿问

使 Boto3 上传调用阻塞(单线程)

编辑:我最初的假设被证明部分错误。我在这里添加了一个冗长的答案,我邀请其他人对其进行压力测试和纠正。


我正在寻找一种以单线程方式利用 Boto3 S3 API 来模拟线程安全键值存储的方法。简而言之,我想使用调用线程而不是新线程来进行上传。


.upload_fileobj()据我所知, Boto3(或)中方法的默认行为.upload_file()是将任务启动到新线程并None立即返回。


从文档:


这是一种托管传输,如有必要,它将在多个线程中执行分段上传。


(如果我对此的理解首先是错误的,那么对此进行更正也会有所帮助。这是在 Boto3 1.9.134 中。)


>>> import io

>>> import boto3

>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')

>>> buf = io.BytesIO(b"test")

>>> res = bucket.upload_fileobj(buf, 'testobj')

>>> res is None

True

现在,假设这buf不是一个短的 4 字节字符串,而是一个巨大的文本 blob,它将花费不可忽略的时间来完全上传。


我还使用此函数来检查具有给定键的对象是否存在:


def key_exists_in_bucket(bucket_obj, key: str) -> bool:

    try:

        bucket_obj.Object(key).load()

    except botocore.exceptions.ClientError:

        return False

    else:

        return True

如果对象按名称存在,我的意图是不重写该对象。


这里的竞争条件相当明显:异步启动上传,然后快速检查key_exists_in_bucket(),False如果对象仍在写入,则返回,然后不必要地再次写入它。


有没有办法确保bucket.upload_fileobj()由当前线程而不是在该方法范围内创建的新线程调用?


我意识到这会减慢速度。在这种情况下,我愿意牺牲速度。


慕桂英546537
浏览 299回答 3
3回答

沧海一幻觉

我认为,由于这个问题的答案和另一个类似问题的答案似乎直接冲突,所以最好直接使用pdb.概括boto3 默认情况下使用多个线程 (10)但是,它不是异步的,因为它在返回之前等待(加入)这些线程,而不是使用“即发即弃”技术因此,以这种方式,如果您尝试与来自多个客户端的 s3 存储桶通信,则读/写线程安全性就位。细节我在这里努力解决的一个方面是多个(子线程)并不意味着顶级方法本身是非阻塞的:如果调用线程开始上传到多个子线程,然后等待这些线程完成并返回,我敢说这仍然是一个阻塞电话。反过来asyncio说,如果方法调用是一个“即发即弃”的调用。使用threading,这实际上归结为是否x.join()曾经被调用过。这是取自 Victor Val 的初始代码,用于启动调试器:import ioimport pdbimport boto3# From dd if=/dev/zero of=100mb.txt&nbsp; bs=50M&nbsp; count=1buf = io.BytesIO(open('100mb.txt', 'rb').read())bucket = boto3.resource('s3').Bucket('test-threads')pdb.run("bucket.upload_fileobj(buf, '100mb')")此堆栈帧来自 Boto 1.9.134。现在跳入pdb:.upload_fileobj() 首先调用一个嵌套方法——还没有太多可看的。(Pdb) s--Call--> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,(Pdb) s(Pdb) l574&nbsp; &nbsp; &nbsp;575&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;:type Config: boto3.s3.transfer.TransferConfig576&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;:param Config: The transfer configuration to be used when performing the577&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;upload.578&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;"""579&nbsp; ->&nbsp; &nbsp; &nbsp;return self.meta.client.upload_fileobj(580&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,581&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Callback=Callback, Config=Config)582&nbsp; &nbsp; &nbsp;583&nbsp; &nbsp; &nbsp;584&nbsp;&nbsp;所以顶级方法确实返回了一些东西,但目前还不清楚那个东西最终会变成什么None。所以我们进入了那个。现在,.upload_fileobj()确实有一个config参数,默认情况下是 None :(Pdb) l 531526&nbsp; &nbsp; &nbsp;527&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;subscribers = None528&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if Callback is not None:529&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;subscribers = [ProgressCallbackInvoker(Callback)]530&nbsp; &nbsp; &nbsp;531&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;config = Config532&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if config is None:533&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;config = TransferConfig()534&nbsp; &nbsp; &nbsp;535&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;with create_transfer_manager(self, config) as manager:536&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;future = manager.upload(这意味着config成为默认值TransferConfig():use_threads-- 如果为 True,则执行 S3 传输时将使用线程。如果为 False,则不会使用线程来执行传输:所有逻辑都将在主线程中运行。max_concurrency-- 请求执行传输的最大线程数。如果 use_threads 设置为 False,则忽略提供的值,因为传输只会使用主线程。哇啦,他们在这里:(Pdb) unt 534> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()-> with create_transfer_manager(self, config) as manager:(Pdb) config<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>(Pdb) config.use_threadsTrue(Pdb) config.max_concurrency10现在我们在调用堆栈中下降一个级别以使用TransferManager(上下文管理器)。此时,max_concurrency已被用作类似名称的参数max_request_concurrency:# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223&nbsp; &nbsp; # The executor responsible for making S3 API transfer requests&nbsp; &nbsp; self._request_executor = BoundedExecutor(&nbsp; &nbsp; &nbsp; &nbsp; max_size=self._config.max_request_queue_size,&nbsp; &nbsp; &nbsp; &nbsp; max_num_threads=self._config.max_request_concurrency,&nbsp; &nbsp; &nbsp; &nbsp; tag_semaphores={&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; IN_MEMORY_UPLOAD_TAG: TaskSemaphore(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._config.max_in_memory_upload_chunks),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._config.max_in_memory_download_chunks)&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; executor_cls=executor_cls&nbsp; &nbsp; )至少在这个 boto3 版本中,该类来自单独的库s3transfer。(Pdb) n> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()-> future = manager.upload((Pdb) manager<s3transfer.manager.TransferManager object at 0x7f178db437f0>(Pdb) manager._config<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>(Pdb) manager._config.use_threadsTrue(Pdb) manager._config.max_concurrency10接下来,让我们进入manager.upload(). 这是该方法的全文:(Pdb) l 290, 303290&nbsp; ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if extra_args is None:291&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;extra_args = {}292&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if subscribers is None:293&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;subscribers = []294&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)295&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;call_args = CallArgs(296&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,297&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;subscribers=subscribers298&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)299&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;extra_main_kwargs = {}300&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if self._bandwidth_limiter:301&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter302&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return self._submit_transfer(303&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;call_args, UploadSubmissionTask, extra_main_kwargs)(Pdb) unt 301> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()-> return self._submit_transfer((Pdb) extra_main_kwargs{}(Pdb) UploadSubmissionTask<class 's3transfer.upload.UploadSubmissionTask'>(Pdb) call_args<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>(Pdb) l 300, 5300&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if self._bandwidth_limiter:301&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter302&nbsp; ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return self._submit_transfer(303&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;call_args, UploadSubmissionTask, extra_main_kwargs)304&nbsp; &nbsp; &nbsp;305&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;def download(self, bucket, key, fileobj, extra_args=None,啊,太可爱了——所以我们至少需要再往下一层才能看到实际的底层上传。(Pdb) s> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()-> call_args, UploadSubmissionTask, extra_main_kwargs)(Pdb) s--Call--> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()-> def _submit_transfer(self, call_args, submission_task_cls,(Pdb) s> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()-> if not extra_main_kwargs:(Pdb) l 440, 10440&nbsp; ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if not extra_main_kwargs:441&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;extra_main_kwargs = {}442&nbsp; &nbsp; &nbsp;443&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# Create a TransferFuture to return back to the user444&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;transfer_future, components = self._get_future_with_components(445&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;call_args)446&nbsp; &nbsp; &nbsp;447&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# Add any provided done callbacks to the created transfer future448&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# to be invoked on the transfer future being complete.449&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for callback in get_callbacks(transfer_future, 'done'):450&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;components['coordinator'].add_done_callback(callback)好的,所以现在我们有一个TransferFuture, 定义在没有明确的证据表明线程已经被启动了,但是当涉及到期货s3transfer/futures.py 时,它肯定听起来像这样。(Pdb) l444&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;transfer_future, components = self._get_future_with_components(445&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;call_args)446&nbsp; &nbsp; &nbsp;447&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# Add any provided done callbacks to the created transfer future448&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# to be invoked on the transfer future being complete.449&nbsp; ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for callback in get_callbacks(transfer_future, 'done'):450&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;components['coordinator'].add_done_callback(callback)451&nbsp; &nbsp; &nbsp;452&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# Get the main kwargs needed to instantiate the submission task453&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;main_kwargs = self._get_submission_task_main_kwargs(454&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;transfer_future, extra_main_kwargs)(Pdb) transfer_future<s3transfer.futures.TransferFuture object at 0x7f178db5a780>下面的最后一行来自TransferCoordinator课堂,乍一看似乎很重要:class TransferCoordinator(object):&nbsp; &nbsp; """A helper class for managing TransferFuture"""&nbsp; &nbsp; def __init__(self, transfer_id=None):&nbsp; &nbsp; &nbsp; &nbsp; self.transfer_id = transfer_id&nbsp; &nbsp; &nbsp; &nbsp; self._status = 'not-started'&nbsp; &nbsp; &nbsp; &nbsp; self._result = None&nbsp; &nbsp; &nbsp; &nbsp; self._exception = None&nbsp; &nbsp; &nbsp; &nbsp; self._associated_futures = set()&nbsp; &nbsp; &nbsp; &nbsp; self._failure_cleanups = []&nbsp; &nbsp; &nbsp; &nbsp; self._done_callbacks = []&nbsp; &nbsp; &nbsp; &nbsp; self._done_event = threading.Event()&nbsp; # < ------ !!!!!!您通常会看到threading.Event 一个线程用于发出事件状态的信号,而其他线程可以等待该事件发生。TransferCoordinator是由 .使用的TransferFuture.result()。好的,从上面循环回来,我们现在在s3transfer.futures.BoundedExecutor它的max_num_threads属性:class BoundedExecutor(object):&nbsp; &nbsp; EXECUTOR_CLS = futures.ThreadPoolExecutor&nbsp; &nbsp; # ...&nbsp; &nbsp; def __init__(self, max_size, max_num_threads, tag_semaphores=None,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;executor_cls=None):&nbsp; &nbsp; self._max_num_threads = max_num_threads&nbsp; &nbsp; if executor_cls is None:&nbsp; &nbsp; &nbsp; &nbsp; executor_cls = self.EXECUTOR_CLS&nbsp; &nbsp; self._executor = executor_cls(max_workers=self._max_num_threads)这基本上相当于:from concurrent import futures_executor = futures.ThreadPoolExecutor(max_workers=10)但是仍然存在一个问题:这是一种“即发即弃”,还是调用实际上是在等待线程完成并返回?似乎是后者。 .result()来电self._done_event.wait(MAXINT)。# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249def result(self):&nbsp; &nbsp; self._done_event.wait(MAXINT)&nbsp; &nbsp; # Once done waiting, raise an exception if present or return the&nbsp; &nbsp; # final result.&nbsp; &nbsp; if self._exception:&nbsp; &nbsp; &nbsp; &nbsp; raise self._exception&nbsp; &nbsp; return self._result最后,重新运行 Victor Val 的测试,这似乎证实了上述内容:>>> import boto3>>> import time>>> import io>>>&nbsp;>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())>>>&nbsp;>>> bucket = boto3.resource('s3').Bucket('test-threads')>>> start = time.time()>>> print("starting to upload...")starting to upload...>>> bucket.upload_fileobj(buf, '100mb')>>> print("finished uploading")finished uploading>>> end = time.time()>>> print("time: {}".format(end-start))time: 2.6030001640319824(此示例在网络优化实例上运行时,此执行时间可能更短。但 2.5 秒仍然是一个明显的大块时间,并且根本不表示线程被启动并且没有等待。)最后,这是一个Callbackfor的示例.upload_fileobj()。它遵循文档中的示例。首先,一个小帮手可以有效地获取缓冲区的大小:def get_bufsize(buf, chunk=1024) -> int:&nbsp; &nbsp; start = buf.tell()&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; size = 0&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out = buf.read(chunk)&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if out:&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; size += chunk&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; return size&nbsp; &nbsp; finally:&nbsp; &nbsp; &nbsp; &nbsp; buf.seek(start)类本身:import osimport sysimport threadingimport timeclass ProgressPercentage(object):&nbsp; &nbsp; def __init__(self, filename, buf):&nbsp; &nbsp; &nbsp; &nbsp; self._filename = filename&nbsp; &nbsp; &nbsp; &nbsp; self._size = float(get_bufsize(buf))&nbsp; &nbsp; &nbsp; &nbsp; self._seen_so_far = 0&nbsp; &nbsp; &nbsp; &nbsp; self._lock = threading.Lock()&nbsp; &nbsp; &nbsp; &nbsp; self.start = None&nbsp; &nbsp; def __call__(self, bytes_amount):&nbsp; &nbsp; &nbsp; &nbsp; with self._lock:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if not self.start:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.start = time.monotonic()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._seen_so_far += bytes_amount&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; percentage = (self._seen_so_far / self._size) * 100&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sys.stdout.write(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "\r%s&nbsp; %s of %s&nbsp; (%.2f%% done, %.2fs elapsed\n" % (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._filename, self._seen_so_far, self._size,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; percentage, time.monotonic() - self.start))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # Use sys.stdout.flush() to update on one line&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # sys.stdout.flush()例子:In [19]: import io&nbsp;&nbsp; &nbsp; ...:&nbsp;&nbsp;&nbsp; &nbsp; ...: from boto3.session import Session&nbsp;&nbsp; &nbsp; ...:&nbsp;&nbsp;&nbsp; &nbsp; ...: s3 = Session().resource("s3")&nbsp;&nbsp; &nbsp; ...: bucket = s3.Bucket("test-threads")&nbsp;&nbsp; &nbsp; ...: buf = io.BytesIO(open('100mb.txt', 'rb').read())&nbsp;&nbsp; &nbsp; ...:&nbsp;&nbsp;&nbsp; &nbsp; ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;mykey&nbsp; 262144 of 104857600.0&nbsp; (0.25% done, 0.00s elapsedmykey&nbsp; 524288 of 104857600.0&nbsp; (0.50% done, 0.00s elapsedmykey&nbsp; 786432 of 104857600.0&nbsp; (0.75% done, 0.01s elapsedmykey&nbsp; 1048576 of 104857600.0&nbsp; (1.00% done, 0.01s elapsedmykey&nbsp; 1310720 of 104857600.0&nbsp; (1.25% done, 0.01s elapsedmykey&nbsp; 1572864 of 104857600.0&nbsp; (1.50% done, 0.02s elapsed

吃鸡游戏

upload_fileobj接受一个 Config 参数。这是一个boto3.s3.transfer.TransferConfig对象,它又具有一个名为use_threads(默认为 true)的参数 - 如果为 True,则执行 S3 传输时将使用线程。如果为 False,则不会使用线程来执行传输:所有逻辑都将在主线程中运行。希望这对你有用。

慕尼黑8549860

测试该方法是否阻塞:我自己根据经验测试了这种行为。首先,我生成了一个 100MB 的文件:dd if=/dev/zero of=100mb.txt&nbsp; bs=100M&nbsp; count=1然后我尝试以与您相同的方式上传文件并测量所花费的时间:import boto3import timeimport iofile = open('100mb.txt', 'rb')buf = io.BytesIO(file.read())bucket = boto3.resource('s3').Bucket('testbucket')start = time.time()print("starting to upload...")bucket.upload_fileobj(buf, '100mb')print("finished uploading")end = time.time()print("time: {}".format(end-start))upload_fileobj() 方法完成并读取下一个 python 行(1gb 文件需要 50 秒)需要 8 秒以上,所以我假设这个方法是阻塞的。使用线程测试:使用多个线程时,即使使用选项 use_threads=False ,我也可以验证该方法是否同时支持多个传输。我开始上传一个 200mb 的文件,然后是一个 100mb 的文件,然后 100mb 的文件首先完成。这证实了TransferConfig中的并发与多部分传输有关。代码:import boto3import timeimport iofrom boto3.s3.transfer import TransferConfigimport threadingconfig = TransferConfig(use_threads=False)bucket = boto3.resource('s3').Bucket('testbucket')def upload(filename):&nbsp; &nbsp; &nbsp;file = open(filename, 'rb')&nbsp; &nbsp; &nbsp;buf = io.BytesIO(file.read())&nbsp; &nbsp; &nbsp;start = time.time()&nbsp; &nbsp; &nbsp;print("starting to upload file {}".format(filename))&nbsp; &nbsp; &nbsp;bucket.upload_fileobj(buf,filename,Config=config)&nbsp; &nbsp; &nbsp;end = time.time()&nbsp; &nbsp; &nbsp;print("finished uploading file {}. time: {}".format(filename,end-start))x1 = threading.Thread(target=upload, args=('200mb.txt',))x2 = threading.Thread(target=upload, args=('100mb.txt',))x1.start()time.sleep(2)x2.start()输出:开始上传文件 200mb.txt开始上传文件 100mb.txt完成上传文件 100mb.txt。时间:46.35254502296448完成上传文件200mb.txt。时间:61.70564889907837使用会话进行测试:如果您希望上传方法按照调用的顺序完成,这就是您所需要的。代码:import boto3import timeimport iofrom boto3.s3.transfer import TransferConfigimport threadingconfig = TransferConfig(use_threads=False)session = boto3.session.Session()s3 = session.resource('s3')bucket = s3.Bucket('testbucket')def upload(filename):&nbsp; &nbsp; &nbsp;file = open(filename, 'rb')&nbsp; &nbsp; &nbsp;buf = io.BytesIO(file.read())&nbsp; &nbsp; &nbsp;start = time.time()&nbsp; &nbsp; &nbsp;print("starting to upload file {}".format(filename))&nbsp; &nbsp; &nbsp;bucket.upload_fileobj(buf,filename)&nbsp; &nbsp; &nbsp;end = time.time()&nbsp; &nbsp; &nbsp;print("finished uploading file {}. time: {}".format(filename,end-start))x1 = threading.Thread(target=upload, args=('200mb.txt',))x2 = threading.Thread(target=upload, args=('100mb.txt',))x1.start()time.sleep(2)x2.start()输出:开始上传文件 200mb.txt开始上传文件 100mb.txt完成上传文件 200mb.txt。时间:46.62478971481323完成上传文件100mb.txt。时间:50.515950202941895我发现的一些资源:-这是在 SO 中提出的关于阻塞或非阻塞方法的问题。这不是决定性的,但那里可能有相关信息。- GitHub 上存在一个开放问题,允许在 boto3 中进行异步传输。- 还有像aioboto和aiobotocore这样的工具,专门用于允许从/到 s3 和其他 aws 服务的异步下载和上传。关于我之前的回答:您可以在此处阅读有关 boto3 中的文件传输配置的信息。特别是:传输操作使用线程来实现并发。可以通过将 use_threads 属性设置为 False 来禁用线程使用。最初我认为这与同时执行的多个传输有关。但是,阅读源代码时,使用TransferConfig时参数max_concurrency中的注释解释说并发不是指多次传输,而是指 “将发出请求以执行传输的线程数”。所以这是用来加速传输的东西。use_threads属性仅用于允许多部分传输中的并发性。
随时随地看视频慕课网APP

相关分类

Python
我要回答