websocket连接推送更新数据

一、项目应用场景:

不断更新展示设备状态数据,所以有2种做法:轮训和长连接推送
2B67A1EF-BCC0-42C2-AD3F-63594E91FCEA.png

二、项目页面:

C81FC140-B1EF-4790-8705-3BEF76C0F1F2.png

三、设计方案

这里选择的设计方案是websocket长连接
用一个SingleThreadScheduledExecutor()创建单线程周期性更新设备状态数据,更新完的数据放到缓存中,然后在推送map中查找当前在线的连接,推送缓存中最新的数据。在连接下线时从map中移除,如果所有连接都断开,关闭更新线程。executeMap存放多种查询条件对应的更新线程。
这样做的好处是更新和推送分开处理,只需要一个更新线程更新数据。全部断开时不用一直写入缓存。
CC23C1AA-F0CA-468F-9213-247BC384721E.png

四、开发代码

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.kaicom.mes.monitor.service.EquipmentStatusService;
import com.kaicom.mes.util.GsonUtil;

import io.swagger.annotations.Api;

/**
 * @Author: BillYu
 * @Description:
 * @Date: Created in 09:57 2019-07-03.
 */
@ServerEndpoint(value = "/websocket/equipmentStatus")
@Component
@Api(description = "设备状态")
public class EquipmentStatusWebsocket {

    private static final Logger logger = LoggerFactory.getLogger(EquipmentStatusWebsocket.class);

    /**
     * 这里使用静态,让 service 属于类
     */
    private static EquipmentStatusService statusService;

    /**
     * 注入的时候,给类的 service 注入
     */
    @Autowired
    public void setStatusService(EquipmentStatusService statusService) {
        EquipmentStatusWebsocket.statusService = statusService;
    }

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<EquipmentStatusWebsocket> webSocketSet = new CopyOnWriteArraySet<>();

    private static ConcurrentHashMap<String, ScheduledExecutorService> executeMap = new ConcurrentHashMap<>();

    private static ConcurrentHashMap<String, ScheduledExecutorService> noticeMap = new ConcurrentHashMap<>();


    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;


    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {

        this.session = session;

        //加入set中
        webSocketSet.add(this);
        logger.info("有新连接加入!当前在线人数为" + getOnlineCount());
        try {
            sendMessage("已打开长连接");
            String key = session.getQueryString();
            //双重锁检查
            if (executeMap.get(key) == null) {
                synchronized (EquipmentStatusWebsocket.class) {
                    if (executeMap.get(key) == null) {
//                       //更新线程
                        ScheduledExecutorService updateService = Executors.newSingleThreadScheduledExecutor();
                        String workshop = session.getRequestParameterMap().get("workshop").get(0);
                        String equipmentName = session.getRequestParameterMap().get("equipmentName").get(0);
                        String sessionId = session.getId();
                        updateService.scheduleAtFixedRate(new Runnable() {
                            @Override
                            public void run() {
                                logger.info(Thread.currentThread()+"update ...");
                                try {
                                    statusService.getEquipmentStatusStatistics(workshop,equipmentName );
                                } catch (Exception e) {
                                    logger.error(e.getMessage());
                                }
                            }
                        }, 0, 10, TimeUnit.SECONDS);
                        executeMap.put(key, updateService);
                    }
                }
            }

            //推送线程

            String workshop = session.getRequestParameterMap().get("workshop").get(0);
            String location = session.getRequestParameterMap().get("equipmentName").get(0);

            ScheduledExecutorService pushService = Executors.newSingleThreadScheduledExecutor();
                        pushService.scheduleAtFixedRate(new Runnable() {

                            @Override
                            public void run() {
                                logger.info("send msg");
                                try {
                                        sendMessage(statusService.fetchCacheStatus(workshop,location));
                                } catch (IOException e) {
                                    logger.error(e.getMessage());
                                }
                            }
                        }, 0, 10, TimeUnit.SECONDS);
            noticeMap.put(session.getId(),pushService);




        } catch (IOException e) {
            logger.error("IO异常");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        //从set中删除
        webSocketSet.remove(this);
        logger.info("有一连接关闭!当前在线人数为" + getOnlineCount());
        //清除消息线程
        if(noticeMap.get(session.getId())!=null){
            noticeMap.get(session.getId()).shutdownNow();
            noticeMap.remove(session.getId());
        }
        //清除更新数据线程
        synchronized (EquipmentStatusWebsocket.class) {
            Boolean needRemove = true;
            for (EquipmentStatusWebsocket statusSocket : webSocketSet) {
                //是否有同条件的查询
                if (session.getQueryString().equals(statusSocket.session.getQueryString())) {
                    needRemove = false;
                    break;
                }
            }
            if (needRemove) {
                executeMap.get(session.getQueryString()).shutdownNow();
                executeMap.remove(session.getQueryString());
            }
        }


    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("来自客户端[" + session.getUserProperties().get("ClientIP") + "]的消息:" + message);

    }

    public Boolean sendClientMessage(String sessionId, String message) {
        boolean isSend = false;
        for (EquipmentStatusWebsocket statusWebsocket : webSocketSet) {
            if (statusWebsocket.session.getId().equals(sessionId)) {
                try {
                    statusWebsocket.sendMessage(message);
                    isSend = true;
                } catch (IOException e) {
                    logger.error("推送异常:" + e.getMessage() + "sessionId:" + sessionId + "message:" + message);
                    isSend = false;
                }
            }
        }
        return isSend;
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 群发自定义消息
     */
    public static void sendInfo(String message) throws IOException {

        for (EquipmentStatusWebsocket statusWebsocket : webSocketSet) {
            try {
                statusWebsocket.sendMessage(message);

            } catch (IOException e) {
                logger.error("推送异常:" + e.getMessage() + "message:" + message);
                continue;
            }
        }
    }

    public static int getOnlineCount() {
        return webSocketSet.size();
    }

}

//这里分为 更新线程和推送线程

发表新评论