var app = require('express')(); var http = require('http').Server(app); var io = require('socket.io')(http); var cookie = require('cookie') var redis = require("redis") var subscriber = redis.createClient(12311); var r = redis.createClient(12311); var request = require('superagent') app.get('/', function(req, res){ res.sendfile('index.html'); }); io.on('connection', function(socket){ socket.join(socket.request.headers.origin); var sid = cookie.parse(socket.request.headers.cookie).sid if(!sid) { return; } request.post('http://localhost:8000/api/method/frappe.async.get_user_info') .type('form') .send({ sid: sid }) .end(function(err, res) { if(res.status == 200) { var room = get_user_room(res.body.message.user); socket.join(room); } }) socket.on('task_subscribe', function(task_id) { var room = 'task:' + task_id; socket.join(room); }) socket.on('progress_subscribe', function(task_id) { var room = 'task_progress:' + task_id; socket.join(room); send_existing_lines(task_id, socket); }) socket.on('doc_subscribe', function(doctype, docname) { request.post('http://localhost:8000/api/method/frappe.async.can_subscribe_doc') .type('form') .send({ sid: sid, doctype: doctype, docname: docname }) .end(function(err, res) { if(res.status == 200) { var room = get_doc_room(doctype, docname); socket.join(room); } }) }); socket.on('doc_unsubscribe', function(doctype, docname) { var room = get_doc_room(doctype, docname); socket.leave(room); }); }); function send_existing_lines(task_id, socket) { r.hgetall('task_log:' + task_id, function(err, lines) { socket.emit('task_progress', { "task_id": task_id, "message": { "lines": lines } }) }) } subscriber.on("message", function(channel, message) { message = JSON.parse(message); io.to(message.room).emit(message.event, message.message); console.log(message.room, message.event, message.message) }); subscriber.subscribe("events"); http.listen(3000, function(){ console.log('listening on *:3000'); }); function get_doc_room(doctype, docname) { return 'doc:'+ doctype + '/' + docname; } function get_user_room(user) { return 'user:' + user; }