SSDB 是个新兴的数据库,其数据库的特点简单,性能高效,有好多python 接口,个人比较后选择一个最理想的,但还有提速空间,这里仅作经验分享。
ssdb python 操作库比较
- ssdb-py https://github.com/wrongwaycn/ssdb-py (类似redis.py 较慢,但方便,省掉后期处理)
- pyssdb https://github.com/ifduyue/pyssdb (简单,支持连接池,推荐)
- ssdb.py https://github.com/hit9/ssdb.py (输出和上面一样,速度很快,但不支持连接池,推荐)
瓶颈
主要瓶颈在解析返回数据,ssdb-py 和 pyssdb 都使用socket 的 readline() 方法。可以引入spp_py 库来解析。
spp_py https://github.com/hit9/spp_py 是专门用来解析ssdb 报文的,速度很快。
安装spp_py
1
pip install spp
可以在ssdb-py 和 pyssdb 里引入spp_py。
以 ssdb-py 为例:
打开 /usr/local/lib/python2.7/dist-packages/ssdb/connection.py
在头部 import
1
import spp
添加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class PythonParser(BaseParser):
"""
Plain Python parsing class
"""
encoding = None
def __init__(self, socket_read_size):
self.socket_read_size = socket_read_size
self._sock = None
self._buffer = None
self.parser = None # add
def on_connect(self, connection):
"""
Called when the socket connects
"""
self._sock = connection._sock
self._buffer = SocketBuffer(self._sock, self.socket_read_size)
if connection.decode_responses:
self.encoding = connection.encoding
self.parser = spp.Parser() # add
def on_disconnect(self):
"Called when the socket disconnects"
if self._sock is not None:
self._sock.close()
self._sock = None
if self._buffer is not None:
self._buffer.close()
self._buffer = None
self.encoding = None
self.parser.clear() # add
self.parser = None # add
def read_response(self):
# add
chunks = []
while 1:
buf = self._sock.recv(4096)
if not isinstance(buf, bytes) and not len(buf):
self.on_disconnect()
raise socket.error('Socket closed on remote end')
self.parser.feed(str(buf))
chunk = self.parser.get()
if chunk is not None:
chunks.append(chunk)
break
return chunks[0]
"""
# hidden
try:
lgt = int(self._buffer.readline())
except ValueError:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
status = self._buffer.readline()
if status not in RES_STATUS or lgt!=len(status):
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
result = [status]
print 'status:', status
while True:
lgt = self._buffer.readline()
if lgt == '':
break
try:
value = self._buffer.read(int(lgt))
except ValueError:
raise ConnectionError(RES_STATUS_MSG.ERROR)
if isinstance(value, bytes) and self.encoding:
value = value.decode(self.encoding)
print 'value:', value
result.append(value)
return result
"""
提速效果
提速前每秒get 个数11111, 提速后每秒get 个数14285。
提速近三分之一,因为只get 一个键,当然是纯解析的提速。
对pyssdb 做改造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import spp
class Connection(object):
def __init__(self, host='127.0.0.1', port=8888, socket_timeout=None):
self.pid = os.getpid()
self.host = host
self.port = port
self.socket_timeout = socket_timeout
self._sock = None
self._fp = None
self._parser = None # add
def connect(self):
if self._sock:
return
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.socket_timeout)
sock.connect((self.host, self.port))
self._sock = sock
self._fp = sock.makefile('r')
self._parser = spp.Parser() # add
except socket.error:
raise
def disconnect(self):
self._parser.clear() # add
if self._sock is None:
return
try:
self._sock.close()
except socket.error:
pass
self._sock = self._fp = self._parser = None # edit
close = disconnect
def reconnect(self):
self.disconnect()
self.connect()
def send(self, cmd, *args):
if cmd == 'delete':
cmd = 'del'
self.last_cmd = cmd
if self._sock is None:
self.connect()
args = (cmd, ) + args
if isinstance(args[-1], int):
args = args[:-1] + (str(args[-1]), )
buf = ''.join('%d\n%s\n' % (len(i), i) for i in args) + '\n'
self._sock.sendall(buf)
def recv(self):
# add
chunks = []
while 1:
buf = self._sock.recv(4096)
if not isinstance(buf, bytes) and not len(buf):
self.close()
raise socket.error('Socket closed on remote end')
self._parser.feed(str(buf))
chunk = self._parser.get()
if chunk is not None:
chunks.append(chunk)
break
ret = chunks[0]
cmd = self.last_cmd
'''
# hidden
cmd = self.last_cmd
ret = []
while True:
line = self._fp.readline().rstrip('\n')
if not line:
break
data = self._fp.read(int(line))
self._fp.read(1) # discard '\n'
ret.append(data)
'''
st, ret = ret[0], ret[1:]
if st == 'not_found':
return None
elif st == 'ok':
if cmd.endswith('keys') or cmd.endswith('list') or \
cmd.endswith('scan') or cmd.endswith('range') or \
(cmd.startswith('multi_') and cmd.endswith('get')):
return ret
elif len(ret) == 1:
if cmd.endswith('set') or cmd.endswith('del') or \
cmd.endswith('incr') or cmd.endswith('decr') or \
cmd.endswith('size') or cmd.endswith('rank') or \
cmd == 'setx' or cmd == 'zget':
return int(ret[0])
else:
return ret[0]
elif not ret:
return True
if ret:
raise error(*ret)
else:
raise error('error')
本文网址: https://pylist.com/topic/76.html 转摘请注明来源