tornado源码分析-iostream
1.iostream.py作用 用来异步读写文件,socket通信2.使用示例
import tornado.ioloopimport tornado.iostreamimport socketdef send_request(): stream.write(b'GET / HTTP/1.1\r\nHost: www.sina.com.cn\r\nConnection: close\r\n\r\n') stream.read_until(b"\r\n\r\n", on_headers)def on_headers(data): headers = {} for line in data.split(b"\r\n"): parts = line.split(b":") if len(parts) == 2: headers[parts[0]] = parts[1] stream.read_bytes(int(headers[b"Content-Length"]), on_body)def on_body(data): print(data) with open('sina.html', 'wb') as f: f.write(data) stream.close() tornado.ioloop.IOLoop.current().stop()if __name__ == '__main__': client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) stream = tornado.iostream.IOStream(client_socket) stream.connect(("www.sina.com.cn", 80), send_request) tornado.ioloop.IOLoop.current().start()
与传统socket通信的区别是,iostream对socket进行了包装,可以进行异步的读写。
3.iostream主要功能
class IOStream(BaseIOStream): def read_until(self, delimiter, callback): future = self._set_read_callback(callback) self._read_delimiter = delimiter self._read_max_bytes = max_bytes try: self._try_inline_read() except: ... def read_bytes(self, num_bytes, callback, streaming_callback=None) ... def read_until_regex(self, regex, callback): ... def read_until_close(self, callback, streaming_callback=None): ... def write(self, data, callback=None): ... def _handle_events(self, fd, events): ... def _add_io_state(self, state): ... if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler( self.fileno(), self._handle_events, self._state) elif not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.fileno(), self._state) def connect(self, address, callback=None, server_hostname=None): ... def _try_inline_read(self): # See if we've already got the data from a previous read self._run_streaming_callback() pos = self._find_read_pos() if pos is not None: self._read_from_buffer(pos) return self._check_closed() try: pos = self._read_to_buffer_loop() except Exception: ... if pos is not None: self._read_from_buffer(pos) return if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ) def _read_from_buffer(self, pos): self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False self._run_read_callback(pos, False) def _run_read_callback(self, size, streaming): if streaming: callback = self._streaming_callback else: callback = self._read_callback self._read_callback = self._streaming_callback = None if self._read_future is not None: assert callback is None future = self._read_future self._read_future = None future.set_result(self._consume(size)) if callback is not None: assert (self._read_future is None) or streaming self._run_callback(callback, self._consume(size)) else: self._maybe_add_error_listener() def _consume(self, loc): ...
read_until:读到delimiter结束
read_bytes:读取num_bytes个字符后结束 read_until_regex:读取到正则匹配到后结束 read_until_close:读取到scoket关闭后结束 读流程: self._run_streaming_callback() 字节流回调,待研究future = self._set_read_callback(callback)
注册读完成回调事件pos = self._find_read_pos()
读取位置self._read_from_buffer(pos)
从缓存读取pos = self._read_to_buffer_loop()
socket等待接收数据,放入缓存 self._read_from_buffer(pos) 从缓存读取self._consume(size)
读缓冲器size个字节,从缓冲区删除并返回这些数据_maybe_add_error_listener
没有可读数据,开启监听read事件,当read事件发生时再调用handle_read处理_pending_callbacks
控制信号量,待研究write:首先将data按数据块大小WRITE_BUFFER_CHUNK_SIZE分块写入write_buffer,
然后调用handle_write向socket发送数据connect:建立非阻塞socket连接,注册ioloop可写事件
_add_io_state:为ioloop注册或更新READ,WRITE,ERROR事件,ioloop异步执行读socket,写socket操作等
handle_read:从socket读到缓冲区 handle_write:由缓冲区向socket写