diff --git a/chat.go b/chat.go
deleted file mode 100644
index ee494e5..0000000
--- a/chat.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "io"
- "net/http"
-
- "github.com/Sirupsen/logrus"
- "github.com/gorilla/websocket"
- "github.com/pkg/errors"
-)
-
-var (
- upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- }
-)
-
-// message sent to us by the javascript client
-type message struct {
- Handle string `json:"handle"`
- Text string `json:"text"`
-}
-
-// validateMessage so that we know it's valid JSON and contains a Handle and
-// Text
-func validateMessage(data []byte) (message, error) {
- var msg message
-
- if err := json.Unmarshal(data, &msg); err != nil {
- return msg, errors.Wrap(err, "Unmarshaling message")
- }
-
- if msg.Handle == "" && msg.Text == "" {
- return msg, errors.New("Message has no Handle or Text")
- }
-
- return msg, nil
-}
-
-// handleWebsocket connection.
-func handleWebsocket(w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
-
- ws, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- m := "Unable to upgrade to websockets"
- log.WithField("err", err).Println(m)
- http.Error(w, m, http.StatusBadRequest)
- return
- }
-
- rr.register(ws)
-
- for {
- mt, data, err := ws.ReadMessage()
- l := log.WithFields(logrus.Fields{"mt": mt, "data": data, "err": err})
- if err != nil {
- if websocket.IsCloseError(err, websocket.CloseGoingAway) || err == io.EOF {
- l.Info("Websocket closed!")
- break
- }
- l.Error("Error reading websocket message")
- }
- switch mt {
- case websocket.TextMessage:
- msg, err := validateMessage(data)
- if err != nil {
- l.WithFields(logrus.Fields{"msg": msg, "err": err}).Error("Invalid Message")
- break
- }
- rw.publish(data)
- default:
- l.Warning("Unknown Message!")
- }
- }
-
- rr.deRegister(ws)
-
- ws.WriteMessage(websocket.CloseMessage, []byte{})
-}
diff --git a/chat/client.go b/chat/client.go
new file mode 100644
index 0000000..4c15ea0
--- /dev/null
+++ b/chat/client.go
@@ -0,0 +1,120 @@
+package chat
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/gorilla/websocket"
+ "github.com/pkg/errors"
+ "github.com/rmattam/go-websocket-chat-demo/redis"
+)
+
+var (
+ upgrader = websocket.Upgrader{
+ ReadBufferSize: 1024,
+ WriteBufferSize: 1024,
+ }
+ waitTimeout = time.Minute * 10
+ log = logrus.WithField("cmd", "go-websocket-chat-demo-chat")
+)
+
+//Hub struct
+type Hub struct {
+ channel string
+
+ receive redis.Receiver
+ write redis.Writer
+}
+
+// message sent to us by the javascript client
+type message struct {
+ Handle string `json:"handle"`
+ Text string `json:"text"`
+}
+
+// SubsribeRedis : Initialize the redis routines required for pub sub.
+func (c *Hub) SubsribeRedis(redisURL string, channel string) {
+ redisPool, err := redis.NewRedisPoolFromURL(redisURL)
+ if err != nil {
+ log.WithField("url", redisURL).Fatal("Unable to create Redis pool")
+ }
+ c.channel = channel
+ c.receive = redis.NewReceiver(redisPool, c.channel, ValidateRedisMessage)
+ c.write = redis.NewWriter(redisPool, c.channel)
+ go c.receive.Setup(redisURL)
+ go c.write.Setup(redisURL)
+}
+
+//ValidateRedisMessage validates incoming redis messages on the chat channel.
+func ValidateRedisMessage(data []byte) error {
+ _, e := validateMessage(data)
+ return e
+}
+
+// validateMessage so that we know it's valid JSON and contains a Handle and
+// Text
+func validateMessage(data []byte) (message, error) {
+ var msg message
+
+ if err := json.Unmarshal(data, &msg); err != nil {
+ return msg, errors.Wrap(err, "Unmarshaling message")
+ }
+
+ if msg.Handle == "" && msg.Text == "" {
+ return msg, errors.New("Message has no Handle or Text")
+ }
+
+ return msg, nil
+}
+
+// HandleWebsocket connection.
+func HandleWebsocket(c *Hub, w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ ws, err := upgrader.Upgrade(w, r, nil)
+ defer func() {
+ ws.Close()
+ }()
+
+ if err != nil {
+ m := "Unable to upgrade to websockets"
+ log.WithField("err", err).Println(m)
+ http.Error(w, m, http.StatusBadRequest)
+ return
+ }
+
+ c.receive.Register(ws)
+
+ for {
+ mt, data, err := ws.ReadMessage()
+ l := log.WithFields(logrus.Fields{"mt": mt, "err": err})
+ if err != nil {
+ if websocket.IsCloseError(err, websocket.CloseGoingAway) || err == io.EOF {
+ l.Info("Websocket closed!")
+ break
+ }
+ l.WithField("data", data).Error("Error reading websocket message")
+ }
+ switch mt {
+ case websocket.TextMessage:
+ msg, err := validateMessage(data)
+ if err != nil {
+ l.WithFields(logrus.Fields{"data": data, "msg": msg, "err": err}).Error("Invalid Message")
+ break
+ }
+ l.WithField("msg", msg).Info("message from client")
+ c.write.Publish(data)
+ default:
+ l.WithField("data", data).Warning("Unknown Message!")
+ }
+ }
+
+ c.receive.DeRegister(ws)
+ ws.WriteMessage(websocket.CloseMessage, []byte{})
+}
diff --git a/main.go b/main.go
index b9395dd..da47a4e 100644
--- a/main.go
+++ b/main.go
@@ -3,17 +3,12 @@ package main
import (
"net/http"
"os"
- "time"
"github.com/Sirupsen/logrus"
- "github.com/heroku/x/redis"
)
var (
- waitTimeout = time.Minute * 10
- log = logrus.WithField("cmd", "go-websocket-chat-demo")
- rr redisReceiver
- rw redisWriter
+ log = logrus.WithField("cmd", "go-websocket-chat-demo")
)
func main() {
@@ -26,44 +21,8 @@ func main() {
if redisURL == "" {
log.WithField("REDIS_URL", redisURL).Fatal("$REDIS_URL must be set")
}
- redisPool, err := redis.NewRedisPoolFromURL(redisURL)
- if err != nil {
- log.WithField("url", redisURL).Fatal("Unable to create Redis pool")
- }
-
- rr = newRedisReceiver(redisPool)
- rw = newRedisWriter(redisPool)
-
- go func() {
- for {
- waited, err := redis.WaitForAvailability(redisURL, waitTimeout, rr.wait)
- if !waited || err != nil {
- log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!")
- }
- rr.broadcast(availableMessage)
- err = rr.run()
- if err == nil {
- break
- }
- log.Error(err)
- }
- }()
-
- go func() {
- for {
- waited, err := redis.WaitForAvailability(redisURL, waitTimeout, nil)
- if !waited || err != nil {
- log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!")
- }
- err = rw.run()
- if err == nil {
- break
- }
- log.Error(err)
- }
- }()
- http.Handle("/", http.FileServer(http.Dir("./public")))
- http.HandleFunc("/ws", handleWebsocket)
+ router := NewRouter(redisURL)
+ http.Handle("/", router)
log.Println(http.ListenAndServe(":"+port, nil))
}
diff --git a/public/css/application.css b/public/css/application.css
index bc3fc2a..a9a8f31 100755
--- a/public/css/application.css
+++ b/public/css/application.css
@@ -2,3 +2,8 @@
overflow: auto;
height: 500px;
}
+
+#chat-text2 {
+ overflow: auto;
+ height: 500px;
+}
diff --git a/public/index.html b/public/index.html
index 3eccbda..9822461 100644
--- a/public/index.html
+++ b/public/index.html
@@ -24,10 +24,25 @@
Go Websocket Chat Demo
+
+
+
+
+
diff --git a/public/js/application.js b/public/js/application.js
index d4da5fc..ead3dd1 100755
--- a/public/js/application.js
+++ b/public/js/application.js
@@ -1,4 +1,4 @@
-var box = new ReconnectingWebSocket(location.protocol.replace("http","ws") + "//" + location.host + "/ws");
+var box = new ReconnectingWebSocket(location.protocol.replace("http","ws") + "//" + location.host + "/ws/chat1");
box.onmessage = function(message) {
var data = JSON.parse(message.data);
@@ -21,3 +21,27 @@ $("#input-form").on("submit", function(event) {
box.send(JSON.stringify({ handle: handle, text: text }));
$("#input-text")[0].value = "";
});
+
+
+var box2 = new ReconnectingWebSocket(location.protocol.replace("http","ws") + "//" + location.host + "/ws/chat2");
+box2.onmessage = function(message) {
+ var data = JSON.parse(message.data);
+ $("#chat-text2").append("" + $('').text(data.handle).html() + "
" + $('').text(data.text).html() + "
");
+ $("#chat-text2").stop().animate({
+ scrollTop: $('#chat-text2')[0].scrollHeight
+ }, 800);
+};
+
+box2.onclose = function(){
+ console.log('box2 closed');
+ this.box2 = new WebSocket(box2.url);
+
+};
+
+$("#input-form2").on("submit", function(event) {
+ event.preventDefault();
+ var handle = $("#input-handle2")[0].value;
+ var text = $("#input-text2")[0].value;
+ box2.send(JSON.stringify({ handle: handle, text: text }));
+ $("#input-text2")[0].value = "";
+});
diff --git a/redis.go b/redis.go
deleted file mode 100644
index 33f82c9..0000000
--- a/redis.go
+++ /dev/null
@@ -1,198 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "fmt"
- "time"
-
- "github.com/Sirupsen/logrus"
- "github.com/garyburd/redigo/redis"
- "github.com/gorilla/websocket"
- "github.com/pkg/errors"
-)
-
-const (
- // Channel name to use with redis
- Channel = "chat"
-)
-
-var (
- waitingMessage, availableMessage []byte
- waitSleep = time.Second * 10
-)
-
-func init() {
- var err error
- waitingMessage, err = json.Marshal(message{
- Handle: "system",
- Text: "Waiting for redis to be available. Messaging won't work until redis is available",
- })
- if err != nil {
- panic(err)
- }
- availableMessage, err = json.Marshal(message{
- Handle: "system",
- Text: "Redis is now available & messaging is now possible",
- })
- if err != nil {
- panic(err)
- }
-}
-
-// redisReceiver receives messages from Redis and broadcasts them to all
-// registered websocket connections that are Registered.
-type redisReceiver struct {
- pool *redis.Pool
-
- messages chan []byte
- newConnections chan *websocket.Conn
- rmConnections chan *websocket.Conn
-}
-
-// newRedisReceiver creates a redisReceiver that will use the provided
-// rredis.Pool.
-func newRedisReceiver(pool *redis.Pool) redisReceiver {
- return redisReceiver{
- pool: pool,
- messages: make(chan []byte, 1000), // 1000 is arbitrary
- newConnections: make(chan *websocket.Conn),
- rmConnections: make(chan *websocket.Conn),
- }
-}
-
-func (rr *redisReceiver) wait(_ time.Time) error {
- rr.broadcast(waitingMessage)
- time.Sleep(waitSleep)
- return nil
-}
-
-// run receives pubsub messages from Redis after establishing a connection.
-// When a valid message is received it is broadcast to all connected websockets
-func (rr *redisReceiver) run() error {
- l := log.WithField("channel", Channel)
- conn := rr.pool.Get()
- defer conn.Close()
- psc := redis.PubSubConn{Conn: conn}
- psc.Subscribe(Channel)
- go rr.connHandler()
- for {
- switch v := psc.Receive().(type) {
- case redis.Message:
- l.WithField("message", string(v.Data)).Info("Redis Message Received")
- if _, err := validateMessage(v.Data); err != nil {
- l.WithField("err", err).Error("Error unmarshalling message from Redis")
- continue
- }
- rr.broadcast(v.Data)
- case redis.Subscription:
- l.WithFields(logrus.Fields{
- "kind": v.Kind,
- "count": v.Count,
- }).Println("Redis Subscription Received")
- case error:
- return errors.Wrap(v, "Error while subscribed to Redis channel")
- default:
- l.WithField("v", v).Info("Unknown Redis receive during subscription")
- }
- }
-}
-
-// broadcast the provided message to all connected websocket connections.
-// If an error occurs while writting a message to a websocket connection it is
-// closed and deregistered.
-func (rr *redisReceiver) broadcast(msg []byte) {
- rr.messages <- msg
-}
-
-// register the websocket connection with the receiver.
-func (rr *redisReceiver) register(conn *websocket.Conn) {
- rr.newConnections <- conn
-}
-
-// deRegister the connection by closing it and removing it from our list.
-func (rr *redisReceiver) deRegister(conn *websocket.Conn) {
- rr.rmConnections <- conn
-}
-
-func (rr *redisReceiver) connHandler() {
- conns := make([]*websocket.Conn, 0)
- for {
- select {
- case msg := <-rr.messages:
- for _, conn := range conns {
- if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
- log.WithFields(logrus.Fields{
- "data": msg,
- "err": err,
- "conn": conn,
- }).Error("Error writting data to connection! Closing and removing Connection")
- conns = removeConn(conns, conn)
- }
- }
- case conn := <-rr.newConnections:
- conns = append(conns, conn)
- case conn := <-rr.rmConnections:
- conns = removeConn(conns, conn)
- }
- }
-}
-
-func removeConn(conns []*websocket.Conn, remove *websocket.Conn) []*websocket.Conn {
- var i int
- var found bool
- for i = 0; i < len(conns); i++ {
- if conns[i] == remove {
- found = true
- break
- }
- }
- if !found {
- fmt.Printf("conns: %#v\nconn: %#v\n", conns, remove)
- panic("Conn not found")
- }
- copy(conns[i:], conns[i+1:]) // shift down
- conns[len(conns)-1] = nil // nil last element
- return conns[:len(conns)-1] // truncate slice
-}
-
-// redisWriter publishes messages to the Redis CHANNEL
-type redisWriter struct {
- pool *redis.Pool
- messages chan []byte
-}
-
-func newRedisWriter(pool *redis.Pool) redisWriter {
- return redisWriter{
- pool: pool,
- messages: make(chan []byte, 10000),
- }
-}
-
-// run the main redisWriter loop that publishes incoming messages to Redis.
-func (rw *redisWriter) run() error {
- conn := rw.pool.Get()
- defer conn.Close()
-
- for data := range rw.messages {
- if err := writeToRedis(conn, data); err != nil {
- rw.publish(data) // attempt to redeliver later
- return err
- }
- }
- return nil
-}
-
-func writeToRedis(conn redis.Conn, data []byte) error {
- if err := conn.Send("PUBLISH", Channel, data); err != nil {
- return errors.Wrap(err, "Unable to publish message to Redis")
- }
- if err := conn.Flush(); err != nil {
- return errors.Wrap(err, "Unable to flush published message to Redis")
- }
- return nil
-}
-
-// publish to Redis via channel.
-func (rw *redisWriter) publish(data []byte) {
- rw.messages <- data
-}
diff --git a/redis/helper.go b/redis/helper.go
new file mode 100644
index 0000000..69c1e3a
--- /dev/null
+++ b/redis/helper.go
@@ -0,0 +1,102 @@
+package redis
+
+import (
+ "net/url"
+ "time"
+
+ "github.com/garyburd/redigo/redis"
+)
+
+// WaitFunc to be executed occasionally by something that is waiting.
+// Should return an error to cancel the waiting
+// Should also sleep some amount of time to throttle connection attempts
+type WaitFunc func(time.Time, []byte) error
+
+// waitForAvailability of the redis server located at the provided url, timeout if the Duration passes before being able to connect
+func waitForAvailability(url string, d time.Duration, waitingMessage []byte, f WaitFunc) (bool, error) {
+ h, _, err := parseURL(url)
+ if err != nil {
+ return false, err
+ }
+ conn := make(chan struct{})
+ errs := make(chan error)
+ go func() {
+ for {
+ c, err := redis.Dial("tcp", h)
+ if err == nil {
+ c.Close()
+ conn <- struct{}{}
+ return
+ }
+ if f != nil {
+ err := f(time.Now(), waitingMessage)
+ if err != nil {
+ errs <- err
+ return
+ }
+ }
+ }
+ }()
+ select {
+ case err := <-errs:
+ return false, err
+ case <-conn:
+ return true, nil
+ case <-time.After(d):
+ return false, nil
+ }
+}
+
+// ParseURL in the form of redis://h:@ec2-23-23-129-214.compute-1.amazonaws.com:25219
+// and return the host and password
+func parseURL(us string) (string, string, error) {
+ u, err := url.Parse(us)
+ if err != nil {
+ return "", "", err
+ }
+ var password string
+ if u.User != nil {
+ password, _ = u.User.Password()
+ }
+ var host string
+ if u.Host == "" {
+ host = "localhost"
+ } else {
+ host = u.Host
+ }
+ return host, password, nil
+}
+
+// NewRedisPoolFromURL returns a new *redigo/redis.Pool configured for the supplied url
+// The url can include a password in the standard form and if so is used to AUTH against
+// the redis server
+func NewRedisPoolFromURL(url string) (*redis.Pool, error) {
+ h, p, err := parseURL(url)
+ if err != nil {
+ return nil, err
+ }
+ return &redis.Pool{
+ MaxIdle: 3,
+ IdleTimeout: 240 * time.Second,
+ Dial: func() (redis.Conn, error) {
+ c, err := redis.Dial("tcp", h)
+ if err != nil {
+ return nil, err
+ }
+ if p != "" {
+ if _, err := c.Do("AUTH", p); err != nil {
+ c.Close()
+ return nil, err
+ }
+ }
+ return c, err
+ },
+ TestOnBorrow: func(c redis.Conn, t time.Time) error {
+ if time.Since(t) < time.Minute {
+ return nil
+ }
+ _, err := c.Do("PING")
+ return err
+ },
+ }, nil
+}
diff --git a/redis/reader.go b/redis/reader.go
new file mode 100644
index 0000000..e8fe994
--- /dev/null
+++ b/redis/reader.go
@@ -0,0 +1,192 @@
+package redis
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/garyburd/redigo/redis"
+ "github.com/gorilla/websocket"
+ "github.com/pkg/errors"
+)
+
+var (
+ log = logrus.WithField("cmd", "go-websocket-chat-demo-redis")
+ waitSleep = time.Second * 10
+ waitTimeout = time.Minute * 10
+)
+
+// message sent to us by the javascript client
+type message struct {
+ Handle string `json:"handle"`
+ Text string `json:"text"`
+}
+
+// Receiver receives messages from Redis and broadcasts them to all
+// registered websocket connections that are Registered.
+type Receiver struct {
+ pool *redis.Pool
+
+ validateMessage func([]byte) error
+ channel string
+ messages chan []byte
+ newConnections chan *websocket.Conn
+ rmConnections chan *websocket.Conn
+}
+
+// NewReceiver creates a redisReceiver that will use the provided
+// rredis.Pool.
+func NewReceiver(pool *redis.Pool, channel string, validateMessage func([]byte) error) Receiver {
+ return Receiver{
+ pool: pool,
+ channel: channel,
+ validateMessage: validateMessage,
+ messages: make(chan []byte, 1000), // 1000 is arbitrary
+ newConnections: make(chan *websocket.Conn),
+ rmConnections: make(chan *websocket.Conn),
+ }
+}
+
+//Wait for redis availability
+func (rr *Receiver) Wait(_ time.Time, waitingMessage []byte) error {
+ rr.Broadcast(waitingMessage)
+ time.Sleep(waitSleep)
+ return nil
+}
+
+//Setup spawns redis receiver service
+func (rr *Receiver) Setup(redisURL string) {
+ for {
+ waitingMessage, _ := json.Marshal(message{
+ Handle: "redis",
+ Text: "Waiting for redis to be available. Messaging won't work until redis is available",
+ })
+ waited, err := waitForAvailability(redisURL, waitTimeout, waitingMessage, rr.Wait)
+ if !waited || err != nil {
+ log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!")
+ }
+ availableMessage, _ := json.Marshal(message{
+ Handle: "redis",
+ Text: "Redis is now available & messaging is now possible",
+ })
+ rr.Broadcast(availableMessage)
+ err = rr.Run()
+ if err == nil {
+ break
+ }
+ log.Error(err)
+ }
+}
+
+// Run receives pubsub messages from Redis after establishing a connection.
+// When a valid message is received it is broadcast to all connected websockets
+func (rr *Receiver) Run() error {
+ l := log.WithField("channel", rr.channel)
+ if rr.channel == "" {
+ return errors.New("Redis channel is not set")
+ }
+
+ conn := rr.pool.Get()
+ defer conn.Close()
+ psc := redis.PubSubConn{Conn: conn}
+ psc.Subscribe(rr.channel)
+ go rr.connHandler()
+ for {
+ switch v := psc.Receive().(type) {
+ case redis.Message:
+ l.WithField("message", string(v.Data)).Info("Redis Message Received")
+ if err := rr.validateMessage(v.Data); err != nil {
+ if status := validateStatusMessage(v.Data); status != nil {
+ l.WithField("err", err).Error("Error unmarshalling message from Redis")
+ continue
+ }
+ }
+ rr.Broadcast(v.Data)
+ case redis.Subscription:
+ l.WithFields(logrus.Fields{
+ "kind": v.Kind,
+ "count": v.Count,
+ }).Println("Redis Subscription Received")
+ case error:
+ return errors.Wrap(v, "Error while subscribed to Redis channel")
+ default:
+ l.WithField("v", v).Info("Unknown Redis receive during subscription")
+ }
+ }
+}
+
+// Broadcast the provided message to all connected websocket connections.
+// If an error occurs while writting a message to a websocket connection it is
+// closed and deregistered.
+func (rr *Receiver) Broadcast(msg []byte) {
+ rr.messages <- msg
+}
+
+// Register the websocket connection with the receiver.
+func (rr *Receiver) Register(conn *websocket.Conn) {
+ rr.newConnections <- conn
+}
+
+// DeRegister the connection by closing it and removing it from our list.
+func (rr *Receiver) DeRegister(conn *websocket.Conn) {
+ rr.rmConnections <- conn
+}
+
+func (rr *Receiver) connHandler() {
+ conns := make([]*websocket.Conn, 0)
+ defer func() {
+ for _, conn := range conns {
+ conn.Close()
+ }
+ }()
+
+ for {
+ select {
+ case msg := <-rr.messages:
+ for _, conn := range conns {
+ if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
+ log.WithFields(logrus.Fields{
+ "data": msg,
+ "err": err,
+ "conn": conn,
+ }).Error("Error writting data to connection! Closing and removing Connection")
+ conns = removeConn(conns, conn)
+ }
+ }
+ case conn := <-rr.newConnections:
+ conns = append(conns, conn)
+ case conn := <-rr.rmConnections:
+ conns = removeConn(conns, conn)
+ }
+ }
+}
+
+func removeConn(conns []*websocket.Conn, remove *websocket.Conn) []*websocket.Conn {
+ var i int
+ var found bool
+ for i = 0; i < len(conns); i++ {
+ if conns[i] == remove {
+ found = true
+ break
+ }
+ }
+ if !found {
+ fmt.Printf("conns: %#v\nconn: %#v\n", conns, remove)
+ panic("Conn not found")
+ }
+ copy(conns[i:], conns[i+1:]) // shift down
+ conns[len(conns)-1] = nil // nil last element
+ return conns[:len(conns)-1] // truncate slice
+}
+
+func validateStatusMessage(data []byte) error {
+ var msg message
+ if err := json.Unmarshal(data, &msg); err != nil {
+ return errors.Wrap(err, "Unmarshaling message")
+ }
+ if msg.Handle == "" && msg.Text == "" {
+ return errors.New("Message has no Handle or Text")
+ }
+ return nil
+}
diff --git a/redis/writer.go b/redis/writer.go
new file mode 100644
index 0000000..c438807
--- /dev/null
+++ b/redis/writer.go
@@ -0,0 +1,68 @@
+package redis
+
+import (
+ "github.com/Sirupsen/logrus"
+ "github.com/garyburd/redigo/redis"
+ "github.com/pkg/errors"
+)
+
+// Writer publishes messages to the Redis CHANNEL
+type Writer struct {
+ pool *redis.Pool
+
+ channel string
+ messages chan []byte
+}
+
+//NewWriter creates a new redis writer and returns it
+func NewWriter(pool *redis.Pool, channel string) Writer {
+ return Writer{
+ pool: pool,
+ channel: channel,
+ messages: make(chan []byte, 10000),
+ }
+}
+
+//Setup spawns the redis writer service
+func (rw *Writer) Setup(redisURL string) {
+ for {
+ waited, err := waitForAvailability(redisURL, waitTimeout, nil, nil)
+ if !waited || err != nil {
+ log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!")
+ }
+ err = rw.Run()
+ if err == nil {
+ break
+ }
+ log.Error(err)
+ }
+}
+
+// Run the main redisWriter loop that publishes incoming messages to Redis.
+func (rw *Writer) Run() error {
+ conn := rw.pool.Get()
+ defer conn.Close()
+
+ for data := range rw.messages {
+ if err := rw.writeToRedis(conn, data); err != nil {
+ rw.Publish(data) // attempt to redeliver later
+ return err
+ }
+ }
+ return nil
+}
+
+func (rw *Writer) writeToRedis(conn redis.Conn, data []byte) error {
+ if err := conn.Send("PUBLISH", rw.channel, data); err != nil {
+ return errors.Wrap(err, "Unable to publish message to Redis")
+ }
+ if err := conn.Flush(); err != nil {
+ return errors.Wrap(err, "Unable to flush published message to Redis")
+ }
+ return nil
+}
+
+// Publish to Redis via channel.
+func (rw *Writer) Publish(data []byte) {
+ rw.messages <- data
+}
diff --git a/routes.go b/routes.go
new file mode 100644
index 0000000..eeecf0a
--- /dev/null
+++ b/routes.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/rmattam/go-websocket-chat-demo/chat"
+)
+
+// ChatSocket : struct to define the route parameters for the chat websocket.
+type ChatSocket struct {
+ Name string
+ Pattern string
+ Channel string
+}
+
+// ChatSockets : list of Route parameters for chat websocket configuration.
+type ChatSockets []ChatSocket
+
+// NewRouter : Initialize the router with the parameters provided.
+func NewRouter(redisURL string) *mux.Router {
+ r := mux.NewRouter().StrictSlash(true)
+
+ for _, route := range chatsockets {
+ route := route
+ Hub := &chat.Hub{}
+ Hub.SubsribeRedis(redisURL, route.Channel)
+ r.
+ Methods("GET").
+ Path(route.Pattern).
+ Name(route.Name).
+ HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ chat.HandleWebsocket(Hub, w, r)
+ })
+ }
+
+ //Setting up file servers
+ r.PathPrefix("/").Handler(http.StripPrefix("/", http.FileServer(http.Dir("./public"))))
+ return r
+}
+
+var chatsockets = ChatSockets{
+ ChatSocket{
+ Name: "Chat1 Websocket Endpoint",
+ Pattern: "/ws/chat1",
+ Channel: "chat1",
+ },
+ ChatSocket{
+ Name: "Chat2 Websocket Endpoint",
+ Pattern: "/ws/chat2",
+ Channel: "chat2",
+ },
+}