tornado 官方给的websockets 聊天室只支持但进程。而生产环境一般是在nginx 里开启多进程。
这里有个开源已经实现用Tornado+websockets+Redis+Chat+nginx 支持多进程的聊天室,但代码比较复杂,实现方法可以参考一下。 https://github.com/nellessen/Tornado-Redis-Chat
代码实现
下面的代码比较简单
tornado_tyron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import collections
import json
import redis
import threading
from tornado import gen
from tornado import ioloop
from tornado import web
from tornado.options import define
from tornado.options import options
import tornadoredis
import weakref
define("pubsub_channel", default="tyron_pubsub", help="Redis pub/sub channel")
define("redis_hostname", default="localhost", help="Redis host address")
define("redis_port", default=6379, help="Redis host port")
define("redis_db", default=0, help="Redis host db")
define("webserver_port", default=8080, help="Webserver port")
class RedisSub(threading.Thread):
"""
subscribes to a redis pubsub channel and routes
messages to subscribers
messages have this format
{'channel': ..., 'data': ...}
"""
def __init__(self, pubsub_channel, redis_hostname, redis_port, redis_db, redis_password=None):
threading.Thread.__init__(self)
self.pubsub_channel = pubsub_channel
self.redis_hostname = redis_hostname
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_password = redis_password
self.subscriptions = collections.defaultdict(collections.deque)
self._init_redis()
def _init_redis(self):
self.client = self.get_redis_connection()
self.pubsub = self.client.pubsub()
self.pubsub.subscribe(self.pubsub_channel)
def get_redis_connection(self):
return redis.Redis(
self.redis_hostname,
self.redis_port,
self.redis_db,
self.redis_password
)
def subscribe(self, channel, callback):
self.subscriptions[channel].append(callback)
def decode_message(self, message):
return json.loads(message)
def parse_message(self, message):
msg = self.decode_message(message['data'])
return msg['channel'], msg['data']
def notify(self, channel, data):
while True:
try:
cb = self.subscriptions[channel].pop()
except IndexError:
break
if isinstance(cb, (weakref.ref,)):
cb = cb()
if cb is not None:
cb(data)
def run(self):
for message in self.pubsub.listen():
if message['type'] != 'message':
continue
self.notify(*self.parse_message(message))
class SubscribeHandler(web.RequestHandler):
def __call__(self, chunk=None):
self.finish(chunk)
@web.asynchronous
def get(self, channel):
self.application.pubsub.subscribe(
channel=channel,
callback=weakref.ref(self)
)
post = get
class RedisStore(web.RequestHandler):
@web.asynchronous
@gen.engine
def get(self, key):
client = tornadoredis.Client(
connection_pool=self.application.connection_pool
)
value = yield gen.Task(client.get, key)
self.finish(value)
post = get
class HealthCheck(web.RequestHandler):
def get(self, key):
self.finish('OK')
def start_pubsub_thread():
pubsub = RedisSub(
pubsub_channel=options.pubsub_channel,
redis_hostname=options.redis_hostname,
redis_port=options.redis_port,
redis_db=options.redis_db
)
pubsub.daemon = True
pubsub.start()
return pubsub
def redis_store_connection_pool():
return tornadoredis.ConnectionPool(
host=options.redis_hostname,
port=options.redis_port,
max_connections=250,
wait_for_available=True
)
def main():
options.parse_command_line()
application = web.Application([
(r"/health/", HealthCheck),
(r"/store/(.*)/", RedisStore),
(r"/(.*)/", SubscribeHandler),
])
application.pubsub = start_pubsub_thread()
application.connection_pool = redis_store_connection_pool()
application.listen(options.webserver_port)
ioloop.IOLoop.instance().start()
if __name__ == '__main__':
main()
本文网址: https://pylist.com/topic/66.html 转摘请注明来源