博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tornado源码分析-iostream
阅读量:6977 次
发布时间:2019-06-27

本文共 4341 字,大约阅读时间需要 14 分钟。

tornado源码分析-iostream

1.iostream.py作用
用来异步读写文件,socket通信

2.使用示例

import tornado.ioloopimport tornado.iostreamimport socketdef send_request():    stream.write(b'GET / HTTP/1.1\r\nHost: www.sina.com.cn\r\nConnection: close\r\n\r\n')    stream.read_until(b"\r\n\r\n", on_headers)def on_headers(data):    headers = {}    for line in data.split(b"\r\n"):        parts = line.split(b":")        if len(parts) == 2:            headers[parts[0]] = parts[1]    stream.read_bytes(int(headers[b"Content-Length"]), on_body)def on_body(data):    print(data)    with open('sina.html', 'wb') as f:        f.write(data)    stream.close()    tornado.ioloop.IOLoop.current().stop()if __name__ == '__main__':    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    stream = tornado.iostream.IOStream(client_socket)    stream.connect(("www.sina.com.cn", 80), send_request)    tornado.ioloop.IOLoop.current().start()

与传统socket通信的区别是,iostream对socket进行了包装,可以进行异步的读写。

3.iostream主要功能

class IOStream(BaseIOStream):    def read_until(self, delimiter, callback):        future = self._set_read_callback(callback)        self._read_delimiter = delimiter        self._read_max_bytes = max_bytes        try:            self._try_inline_read()        except:            ...    def read_bytes(self, num_bytes, callback, streaming_callback=None)        ...    def read_until_regex(self, regex, callback):        ...    def read_until_close(self, callback, streaming_callback=None):        ...    def write(self, data, callback=None):        ...        def _handle_events(self, fd, events):        ...    def _add_io_state(self, state):        ...        if self._state is None:            self._state = ioloop.IOLoop.ERROR | state            with stack_context.NullContext():                self.io_loop.add_handler(                    self.fileno(), self._handle_events, self._state)        elif not self._state & state:            self._state = self._state | state            self.io_loop.update_handler(self.fileno(), self._state)    def connect(self, address, callback=None, server_hostname=None):        ...    def _try_inline_read(self):        # See if we've already got the data from a previous read        self._run_streaming_callback()        pos = self._find_read_pos()        if pos is not None:            self._read_from_buffer(pos)            return        self._check_closed()        try:            pos = self._read_to_buffer_loop()        except Exception:           ...        if pos is not None:            self._read_from_buffer(pos)            return        if self.closed():            self._maybe_run_close_callback()        else:            self._add_io_state(ioloop.IOLoop.READ)        def _read_from_buffer(self, pos):        self._read_bytes = self._read_delimiter = self._read_regex = None        self._read_partial = False        self._run_read_callback(pos, False)    def _run_read_callback(self, size, streaming):        if streaming:            callback = self._streaming_callback        else:            callback = self._read_callback            self._read_callback = self._streaming_callback = None            if self._read_future is not None:                assert callback is None                future = self._read_future                self._read_future = None                future.set_result(self._consume(size))        if callback is not None:            assert (self._read_future is None) or streaming            self._run_callback(callback, self._consume(size))        else:            self._maybe_add_error_listener()    def _consume(self, loc):        ...

read_until:读到delimiter结束

read_bytes:读取num_bytes个字符后结束
read_until_regex:读取到正则匹配到后结束
read_until_close:读取到scoket关闭后结束
读流程:
self._run_streaming_callback()
字节流回调,待研究

future = self._set_read_callback(callback)

注册读完成回调事件

pos = self._find_read_pos()

读取位置

self._read_from_buffer(pos)

从缓存读取

pos = self._read_to_buffer_loop()

socket等待接收数据,放入缓存
self._read_from_buffer(pos)
从缓存读取

self._consume(size)

读缓冲器size个字节,从缓冲区删除并返回这些数据

_maybe_add_error_listener

没有可读数据,开启监听read事件,当read事件发生时再调用handle_read处理

_pending_callbacks

控制信号量,待研究

write:首先将data按数据块大小WRITE_BUFFER_CHUNK_SIZE分块写入write_buffer,

然后调用handle_write向socket发送数据

connect:建立非阻塞socket连接,注册ioloop可写事件

_add_io_state:为ioloop注册或更新READ,WRITE,ERROR事件,ioloop异步执行读socket,写socket操作等

handle_read:从socket读到缓冲区
handle_write:由缓冲区向socket写

转载地址:http://wnupl.baihongyu.com/

你可能感兴趣的文章
大叔也说并行和串行`性能提升N倍(N由操作系统位数和cpu核数决定)
查看>>
对ListenSocket 的研究(四)
查看>>
JQuery:JQuery 中的CSS()方法
查看>>
Linux内核跟踪之trace框架分析【转】
查看>>
内存分配器memblock【转】
查看>>
C# BackgroundWorker 详解
查看>>
IOS自定义表格UITableViewCell
查看>>
[Linux] ubuntu 格式化u盘
查看>>
一个COM示例程序
查看>>
通过改进算法来优化程序性能的真实案例(Ransac)
查看>>
head命令
查看>>
软件开发经验总结(一)细节决定软件的成败
查看>>
python tar.gz格式压缩、解压
查看>>
JNDI概述(转载)
查看>>
利用java反射机制 读取配置文件 实现动态类载入以及动态类型转换
查看>>
第 7 章 项目运作
查看>>
PYTHON黑帽编程1.5 使用WIRESHARK练习网络协议分析
查看>>
.NET平台开源项目速览(18)C#平台JSON实体类生成器JSON C# Class Generator
查看>>
C# 格式串(收藏)
查看>>
浅谈SQL Server中统计对于查询的影响
查看>>