Tornado+websockets+Redis+Chat+nginx多进程聊天室

tornado 官方给的websockets 聊天室只支持但进程。而生产环境一般是在nginx 里开启多进程。

Tornado+websockets+Redis+Chat+nginx多进程聊天室

这里有个开源已经实现用Tornado+websockets+Redis+Chat+nginx 支持多进程的聊天室,但代码比较复杂,实现方法可以参考一下。 https://github.com/nellessen/Tornado-Redis-Chat

代码实现

下面的代码比较简单

tornado_tyron.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import collections
import json
import redis
import threading
from tornado import gen
from tornado import ioloop
from tornado import web
from tornado.options import define
from tornado.options import options
import tornadoredis
import weakref
 
 
define("pubsub_channel", default="tyron_pubsub", help="Redis pub/sub channel")
define("redis_hostname", default="localhost", help="Redis host address")
define("redis_port", default=6379, help="Redis host port")
define("redis_db", default=0, help="Redis host db")
define("webserver_port", default=8080, help="Webserver port")
 
 
class RedisSub(threading.Thread):
    """
    subscribes to a redis pubsub channel and routes
    messages to subscribers

    messages have this format
    {'channel': ..., 'data': ...}

    """
 
    def __init__(self, pubsub_channel, redis_hostname, redis_port, redis_db, redis_password=None):
        threading.Thread.__init__(self)
        self.pubsub_channel = pubsub_channel
        self.redis_hostname = redis_hostname
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.redis_password = redis_password
        self.subscriptions = collections.defaultdict(collections.deque)
        self._init_redis()
 
    def _init_redis(self):
        self.client = self.get_redis_connection()
        self.pubsub = self.client.pubsub()
        self.pubsub.subscribe(self.pubsub_channel)
 
    def get_redis_connection(self):
        return redis.Redis(
            self.redis_hostname,
            self.redis_port,
            self.redis_db,
            self.redis_password
        )
 
    def subscribe(self, channel, callback):
        self.subscriptions[channel].append(callback)
 
    def decode_message(self, message):
        return json.loads(message)
 
    def parse_message(self, message):
        msg = self.decode_message(message['data'])
        return msg['channel'], msg['data']
 
    def notify(self, channel, data):
        while True:
            try:
                cb = self.subscriptions[channel].pop()
            except IndexError:
                break
            if isinstance(cb, (weakref.ref,)):
                cb = cb()
            if cb is not None:
                cb(data)
 
    def run(self):
        for message in self.pubsub.listen():
            if message['type'] != 'message':
                continue
            self.notify(*self.parse_message(message))
 
class SubscribeHandler(web.RequestHandler):
 
    def __call__(self, chunk=None):
        self.finish(chunk)
 
    @web.asynchronous
    def get(self, channel):
        self.application.pubsub.subscribe(
            channel=channel,
            callback=weakref.ref(self)
        )
 
    post = get
 
class RedisStore(web.RequestHandler):
 
    @web.asynchronous
    @gen.engine
    def get(self, key):
        client = tornadoredis.Client(
            connection_pool=self.application.connection_pool
        )
        value = yield gen.Task(client.get, key)
        self.finish(value)
 
    post = get
 
class HealthCheck(web.RequestHandler):
 
    def get(self, key):
        self.finish('OK')
 
def start_pubsub_thread():
    pubsub = RedisSub(
        pubsub_channel=options.pubsub_channel,
        redis_hostname=options.redis_hostname,
        redis_port=options.redis_port,
        redis_db=options.redis_db
    )
    pubsub.daemon = True
    pubsub.start()
    return pubsub
 
def redis_store_connection_pool():
    return tornadoredis.ConnectionPool(
        host=options.redis_hostname,
        port=options.redis_port,
        max_connections=250,
        wait_for_available=True
    )
 
def main():
    options.parse_command_line()
    application = web.Application([
        (r"/health/", HealthCheck),
        (r"/store/(.*)/", RedisStore),
        (r"/(.*)/", SubscribeHandler),
    ])
    application.pubsub = start_pubsub_thread()
    application.connection_pool = redis_store_connection_pool()
    application.listen(options.webserver_port)
    ioloop.IOLoop.instance().start()
 
if __name__ == '__main__':
    main()

本文网址: https://pylist.com/topic/66.html 转摘请注明来源

Suggested Topics

在128M的VPS上配置mysql+Tornado+Nginx笔记

最近 123systems http://goo.gl/2Q0X2 又推出一年$10的便宜 VPS,128M内存,可以用来学习。在这样的vps 上放一个博客或做反向代理绰绰有余,买下后尝试配一个mysql+Tornado+Nginx 环境。...

Nginx 和 Golang web 上抢先体验 QUIC

QUIC(Quick UDP Internet Connection)是谷歌推出的一套基于 UDP 的传输协议,它实现了 TCP + HTTPS + HTTP/2 的功能,目的是保证可靠性的同时降低网络延迟。QUIC 是使用 UDP 协议,可以与原来的 TCP 服务不冲突。...

Leave a Comment