Esiry.com
Focus on Machine Learning.

GOLANG WEBSOCKET Combines Consistent Hash Algorithm to Build High Concurrent Push Service

1 Scene Introduction
In web applications, there are often scenarios where the business status needs to be updated in real time. For example, a long background task may take several tens of seconds from the trigger execution of the browser user to the completion of the execution. At this time, the front end needs to request the background every few seconds to query the progress of the task execution. This method is a long polling method. It has certain drawbacks, which increases the load of the background service. If the concurrent operation is too large, the background pressure will multiply. The industry often uses the websocket extension protocol of http1.1 to establish a long connection with the browser to implement real-time business status updates.

2 Implementation Plan
This article uses golang to implement a long connection service, providing two external interfaces, one is the http-based rest message sending interface, and the other is the websocket-based cient access interface, as shown in the following figure.

In order to make the front-end access easier, from the establishment of the connection to the user closing the browser, the intermediate front end does not need to send a message to inform the server whether the client is offline. We put the detection in the background, and the background uses the timing heartbeat method to keep listening to the client. If the heartbeat fails, the client is removed. As shown below.

3 Golang Implementation Code
There are two modules in the comet service. The http server is responsible for receiving messages. The comet server is responsible for maintaining the websocket client. Each client enables a go routine to maintain heartbeat detection for the client.
3.1 Core Module

package comet  
  
import (  
    "golang.org/x/net/websocket"  
    "time"  
    "log"  
    "encoding/json"  
)  
  
type HttpServer struct {  
    wsServer *WsServer  
}  
  
type WsServer struct {  
    Clients map[string][]*Client  
    AddCli  chan *Client  
    DelCli  chan *Client  
    Message chan *Message  
}  
  
type Client struct {  
    UserId    string  
    Timestamp int64  
    conn      *websocket.Conn  
    wsServer  *WsServer  
}  
  
type Message struct {  
    UserId  string `json:"user_id"`  
    Message string `json:"message"`  
}  
  
func NewWsServer() *WsServer {  
    return &WsServer{  
        make(map[string][]*Client),  
        make(chan *Client),  
        make(chan *Client),  
        make(chan *Message, 1000),  
    }  
}  
  
func NewHttpServer(wsServer *WsServer) *HttpServer {  
    return &HttpServer{wsServer}  
}  
  
func (httpServer *HttpServer) SendMessage(userId, message string) {  
    log.Printf("message reveived, user_id: %s, message: %s", userId, message)  
    httpServer.wsServer.Message <- &Message{userId, message} } func (wsServer *WsServer) SendMessage(userId, message string) { clients := wsServer.Clients[userId] if len(clients) > 0 {  
        for _, c := range clients {  
            c.conn.Write([]byte(message))  
        }  
        log.Printf("message success sent to client, user_id: %s", userId)  
    } else {  
        log.Printf("client not found, user_id: %s", userId)  
    }  
}  
  
func (wsServer *WsServer) addClient(c *Client) {  
    clients := wsServer.Clients[c.UserId]  
    wsServer.Clients[c.UserId] = append(clients, c)  
    log.Printf("a client added, userId: %s, timestamp: %d", c.UserId, c.Timestamp)  
}  
  
func (wsServer *WsServer) delClient(c *Client) {  
    clients := wsServer.Clients[c.UserId]  
    if len(clients) > 0 {  
        for i, client := range clients {  
            if client.Timestamp == c.Timestamp {  
                wsServer.Clients[c.UserId] = append(clients[:i], clients[i+1:]...)  
                break  
            }  
        }  
    }  
    if 0 == len(clients) {  
        delete(wsServer.Clients, c.UserId)  
    }  
    log.Printf("a client deleted, user_id: %s, timestamp: %d", c.UserId, c.Timestamp)  
}  
  
func (wsServer *WsServer) Start() {  
    for {  
        select {  
        case msg := <-wsServer.Message:  
            wsServer.SendMessage(msg.UserId, msg.Message)  
        case c := <-wsServer.AddCli:  
            wsServer.addClient(c)  
        case c := <-wsServer.DelCli:  
            wsServer.delClient(c)  
  
        }  
    }  
}  
  
func (c *Client) heartbeat() error {  
    millis := time.Now().UnixNano() / 1000000  
    heartbeat := struct {  
        Heartbeat int64 `json:"heartbeat"`  
    }{millis}  
    bytes, _ := json.Marshal(heartbeat)  
    _, err := c.conn.Write(bytes)  
    return err  
}  
  
func (c *Client) Listen() {  
    for {  
        err := c.heartbeat()  
        if nil != err {  
            log.Printf("client heartbeat error, user_id: %v, timestamp: %d, err: %s", c.UserId, c.Timestamp, err)  
            c.wsServer.DelCli <- c  
            return  
        }  
        time.Sleep(time.Second * 5)  
    }  
}  

3.2 Full code
https://github.com/olzhy/comet

4 Consistent Hash Packaging
Considering that the simultaneous online support of a single service is limited, it is wrapped in a consistent hash algorithm at its upper level. In this way, the same user_id will establish a connection and will hit the same background server. Sending a message to this user_id will also hit the same server. In this way, multiple comet services are deployed in the background to form a cluster to support high concurrent message push scenarios. As shown in the following figure, the outermost nginx mounts the public domain name, and provides a wss-based message receiving interface and an http-based message sending interface. In the middle, haproxy is used to perform consistent hash forwarding on the user_id parameter, and the operation of the same user_id will hit the same comet server. The underlying extension is a multi-comet server to build a highly concurrent message push service.

Your support will encourage us to be creative continuously!

Use [WeChat] Scan QR code for Appreciation

Use [Alipay] Scan QR code for Appreciation

Jumping to PayPal...
赞(0)
Please indicate the source:Esiry » GOLANG WEBSOCKET Combines Consistent Hash Algorithm to Build High Concurrent Push Service

Comment 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址