ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Tornado 文件下载Hash值不同 (阅读tornado源码记录)

2022-09-09 13:01:23  阅读:215  来源: 互联网

标签:Hash Tornado self write 源码 ._ future data chunk


版本信息

python3.7
tornado==4.3.0

问题描述: 多次下载同样的文件,每次文件的hash均不相同.

下载文件的示例接口:

import tornado
from tornado.concurrent import futures
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler

@run_on_executor
@tornado.web.asynchronous
class XXX_Handler(RequestHandler):
    executor = futures.ThreadPoolExecutor()

    def get():
        data = open("/data/xxx.csv",encoding="utf-8")
        chunk = data.read(65536)
        while chunk:
            self.write(chunk)
            self.flush()
            chunk = data.read(65536)
        data.close()
        self.set_status(200)
        return self.finish()        

推测可能与线程池有关,果真去掉@run_on_executor则正常.
好奇心驱使进行具体原因调查,后面涉及到Tornado相关源码的查看与记录。

  1. 通过RequestHandlerflush方法 检查到self.stream.write 中的 data每次的hash都是相同,继续向下调查。
        else:
            if callback is not None:
                self._write_callback = stack_context.wrap(callback)
            else:
                future = self._write_future = Future()
            data = b"\r\n".join(lines) + b"\r\n\r\n"
            if chunk:
                data += self._format_chunk(chunk)
            self._pending_write = self.stream.write(data)
            self._pending_write.add_done_callback(self._on_write_complete)
        return future
  1. 查看self.stream如何产生的, 这里就看到TCP server的class,来监听socket连接的请求,使用sock的文件描述记录下载, 分配一个handler来处理, 添加一个读事件(事件都是主线程来处理), 然后获取connection连接, 实例化IOStream 来处理消息的接收和响应
                stream = IOStream(connection, io_loop=self.io_loop,
                                  max_buffer_size=self.max_buffer_size,
                                  read_chunk_size=self.read_chunk_size)
            future = self.handle_stream(stream, address)
            if future is not None:
                self.io_loop.add_future(future, lambda f: f.result())
        except Exception:
            app_log.error("Error in connection callback", exc_info=True)

3.知道通过IOstream来传送数据,查看它的write方法.主要就是按照大小将上层传过来的数据切分到指定大小,
(1).通过self._handle_write来发送 self._write_buffer保存的chunk数据

        if not self._connecting:
            self._handle_write()
            if self._write_buffer:
                self._add_io_state(self.io_loop.WRITE)
            self._maybe_add_error_listener()
        return future

(2). 查看self._handle_write, self._handle_write是ThreadPoolExecutor的线程池来处理的,发生(Resource temporarily unavailable)错误失败, 会返回到上面的函数中添加写事件来处理(主线程) self._add_io_state(self.io_loop.WRITE)
如果发送错误比较多,这就导致出现多线程都在写self._write_buffer的问题, 会出现顺序错误或者重复数据的问题从而导致hash结果不同。

            except (socket.error, IOError, OSError) as e:
                if e.args[0] in _ERRNO_WOULDBLOCK:
                    self._write_buffer_frozen = True
                    break
                else:
                    if not self._is_connreset(e):
                        # Broken pipe errors are usually caused by connection
                        # reset, and its better to not log EPIPE errors to
                        # minimize log spam
                        gen_log.warning("Write error on %s: %s",
                                        self.fileno(), e)
                    self.close(exc_info=True)
                    return

测试使用硬核的方法,失败后去除掉 主线程重试的写事件,而是交给当前线程一直重试。。。这个测试可以获得正确的hash,不过最好是去掉@run_on_executor的使用.

标签:Hash,Tornado,self,write,源码,._,future,data,chunk
来源: https://www.cnblogs.com/dncey/p/16672474.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有