Andy Niu �����ĵ�

Andy Niu

Andy Niu Help  1.0.0.0
架构和设计

变量

 为什么new出来pe协议,打包的时候调用dhtp协议的serialize方法
 
 理解消息模式的线程处理
 
 网络数据的打包解析
 
 以scs为例说明网络包的接收和发送
 
 连接和登录的区别
 
 底层库局部更新的办法
 
 主备构架iptables设置
 
 理解CComProcessorBase类
 
 CComProcessorBase使用说明
 
 一主多从__负载均衡
 
 一主一从__主从切换
 
 负载均衡
 
 项目中大数据的处理
 
 设备增删改的影响
 
 发送消息的peer是怎么来的
 
 服务sdk发送消息的session是怎么来的
 
 rpu2000T有关
 
 dmu_sdk同步与异步
 
 负载均衡的两种场景
 
 理解rtsp的传输模式
 
 消息交互异步存在的问题
 
 原始请求消息的保存
 
 如何确定业务服务器在哪个板卡上
 

详细描述

变量说明

CComProcessorBase使用说明
1、start方式
    start(0):PushMsg的时候,触发run()-->Heartbeat()-->DealMsg(msg)
    start(1):while死循环,定时触发
2、创建定时器
    threadInit() 创建定时器,定时器属于当前线程
    其它地方创建定时器,定时器属于主线程
3、在threadInit() 创建定时器,可以避免多线程竞争的问题,但是启动方式必须是start(0)
参见
dmu_sdk同步与异步
1、整个框架都是基于消息的,每个消息处理单元继承CComProcessorBase,CComProcessorBase启动一个线程处理消息。
    启动线程有两种方式:
    start(0) 基于事件,push进去一个消息,调用notify,然后回调上来,处理消息。
    start(1) while死循环,处理消息队列中的消息,防止cpu占用100%,没有消息sleep(5毫秒),有消息一直处理
2、事件驱动,SendMsg-->PushMsg-->notify 回调上来
    特别注意:SendMsg的时候,检查方法形参的接收者是否为空,为空,设置为默认的接收者。
    然后接收者Push消息进入自己的消息队列。
    当前线程SendMsg,不是Push给自己(Push给自己没有意义),而是Push给其它的线程,其它的线程DealWithOneRequest
3、sdk异步的过程:
    比如当前模块A关联outprocess,并设置outprocess的接收者是自己。
    初始化设置异步回调,A模块线程SendMsg,关联outprocess,PushMsg给outprocess,触发outprocess线程回调上来。
        outprocess处理消息DealWithOneRequest,调用底层库把消息打包发出去。
    底层库线程收到消息,然后解析,onPDU回调上来,给outprocess,outprocess调用SendMsg,找到默认接收者模块A,
        Push给模块A,模块A回调上来,处理消息DealWithOneResponse
    实际情况还要多一层关联,dmu_session关联sdk_manager,sdk_manager关联outprocess,
    发请求,dmu_session模块SendMsg直接调用m_sdkManager模块SendMsg,m_sdkManager模块SendMsg直接Push给outprocess
    收回复,outprocess的OnPDU上来,SendMsg时Push给SdkManager,SdkManager进行DealWithOneResponse时调用dmu_session的DealWithOneResponse
4、sdk同步的过程:
    同步是对异步的封装,在发送消息和接收消息的两个口子,处理不一样。
    发送消息:使用P操作(检查超时)
    接收消息:设置异步回调,异步回调上来,进行V操作。然后发送消息线程的P操作进行下去。
    注意:同步与异步,设置的异步回调不一样。同步设置的异步回调,回调上来,执行V操作。
5、SendMsg是对PushMsg的封装,对于请求消息,SendMsg的时候保存请求消息,为了和回复消息关联起来。
rpu2000T有关
1、rpu2000T对外(也可以理解对上)暴露口子uac,uac向上级uas注册。
2、uac下面是我们的IBP平台,包含cmu,dmu,vtdu等,uac登录IBP平台的cmu,连接dmu,vtdu等
3、IBP平台下面是uas,设备分为三种情况:
    本级域设备,dmu直接登录
    下级域设备,dmu通知uas去登录
    还有一种设备,现在nvr上添加设备,然后nvr向uas注册,推送设备。
4、请求rtsp的流程,要经过uac,获取到rtsp之后,根据rtsp的地址直接拉码流,就不需要经过uac
一主一从__主从切换
1、通过hau心跳服务器,彼此通过udp,监听端口,和向对端发送信息。起始的时候,认为自己是slave,
    如果一段时间没有收到对端的master报告,自己升级为master,同时定时通知对端,自己是master。
    这里存在一个极端的情况:两个服务同时启动,都收不到对端的master报告,
    然后同时升级为master,并向对端报告自己是master,这就出现,自己是master,又收到对端的master报告。
2、怎么解决这个问题?
3、增加一个比较项,也就是两个服务设置不同的等级。发送master报告的时候,带上自己的等级。
    当服务自己是master,同时又收到对端的mster报告,比较自己的等级和对端的等级,等级高,自己还是master,
    等级低,自己降为slave。这样保证两个master,其中一个自动降为slave
参见
一主多从__负载均衡
    有个中心管理器,连接一组dmu,决定哪个是master,其他的是slave。决策机制是:同步去连接这一组dmu,
    根据回复的先后顺序,进行排序,第一个为master,第一个挂了,第二个指定为master。
参见
为什么new出来pe协议,打包的时候调用dhtp协议的serialize方法
1、new ibpSocket的时候,创建peParser,然后peParser创建dhParser
2、dhParser在构造方法中,加载动态库new_dhtp_protocold.dll,
    定位到方法 void* pFun = pMa->dllsym(fileName, "protocol_factory");
    方法原型为 DLL_EXPORT ibpProtocol* protocol_factory(t_int32 command, enumIbpCmdType etype)
    放入集合vecICreate
3、发送数据,打包的时候调用dhParser::createCopyProtocol(ibpProtocol* protocol)
    obj = ((i_protocol_factory)m_vecICreate[i])(protocol->_cmd, protocol->GetType());
    根据协议的命令码和类型(请求、回复、通知)创建出dhtp协议
    m_vecICreate[i] 就是dhtpProtocol.cpp中的方法protocol_factory
    强转为方法i_protocol_factory,然后调用,传入参数(protocol->_cmd, protocol->GetType())
4、在方法protocol_factory中,
    ibpProtocol*   retPdu = NULL;
    const size_t count = sizeof(DHCP_ALL_PROTOCOL_UNITS)/sizeof(DHCP_OPERATION_UNIT);
    const DHCP_OPERATION_UNIT* opUnit;
    for(size_t i = 1; count > i; ++i)
    {
        opUnit = &DHCP_ALL_PROTOCOL_UNITS[i];
        if(opUnit->_opType == command)
        {
            if(eIbpCmdType_Request == etype)
                retPdu = opUnit->_reqFactory();
            else if(eIbpCmdType_Response == etype)
                retPdu = opUnit->_resFactory();
            else if(eIbpCmdType_Notify == etype)
                retPdu = opUnit->_notifyFactory();
            break;
        }
    }
    在这里遍历数组DHCP_ALL_PROTOCOL_UNITS,数组DHCP_ALL_PROTOCOL_UNITS的元素是:
    struct DHCP_OPERATION_UNIT
    {
        enumIbpCmd                          _opType;
        const char*                         _opStr;
        FunDHCPProtocolRequestFatcory       _reqFactory;
        FunDHCPProtocolResponseFatcory      _resFactory;
        FunDHCPProtocolNotifyFatcory        _notifyFactory;
    };
    后面三个字段都是模板方法,如下:
    template<class T>
    ibpProtocolRequset* dhcpProtocolRequestFatcory()
    {
        return new T;
    }
    初始化DHCP_ALL_PROTOCOL_UNITS指定模板参数
    {eIbpCmd_HeartBeat, "HeartBeat",dhcpProtocolRequestFatcory<dhcpHeartBeatRequest>,dhcpProtocolResponseFatcory<dhcpHeartBeatResponse>,NULL},
    retPdu = opUnit->_reqFactory(); 就可以new出dhtp协议
5、从网络上收到数据,是如何解析的?如下:
    t_int32 dhParser::onPacket(prtlParserI *parser, prtlPeerI *peer, smartPrtlPacketI packet, void *arg)
    {
        //版本
        //t_int32   packVersion = packet->getVersion();         
        //命令
        stringI packMethod = "";
        packet->getMethod(packMethod);
        //命令类型 请求 应答 回复
        //t_int32 packType = packet->getPacketType();
        //包序号
        t_uint32 packSeq = 0;
        packet->getSequence(packSeq);
        //模块名称
        stringI packMode = "";
        packet->getModule(packMode);
        t_uint32 packSession = 0;
        packet->getSession(packSession);
        //包内容
        int packBodyLen = 0;
        char* packBody = packet->getBody(packBodyLen);
    
        if (parser == NULL || peer == NULL)
        {
            LogErr(IBP_UTILS, "parser or peer is null cmd=[%s]", packMethod.c_str());
            return E_FAIL;
        }
        
        LogDebug(IBP_UTILS, "onPacket method=[%s] netseq=[%d]", packMethod.c_str(), packSeq);
    
        ibpProtocol* protocol = NULL;   
    
        try 
        {           
            char* buffer = NULL;
            rapidxml::xml_document<>  obj;
            if((t_uint32)packBodyLen > m_nMaxLenBody)
            {
                delete[] m_pBodyBuffer;
    
                m_pBodyBuffer = new char[packBodyLen+1];
                m_nMaxLenBody = packBodyLen+1;
            }
                
            t_int32 nCmd = -1;
            t_int32 nUnit = -1;
            for (size_t i = 0; i < m_vecINameToInt.size(); i++)
            {
                if(0 == ((i_cmd_str_to_int)m_vecINameToInt[i])(packMethod.c_str(), nCmd))
                {
                    break;
                }
            }
            for (size_t i = 0; i < m_vecINameToUnit.size(); i++)
            {
                if(0 == ((i_unit_str_to_int)m_vecINameToUnit[i])(packMode.c_str(), nUnit))
                {
                    break;
                }
            }
    
            for (size_t i = 0; i < m_vecICreate.size(); i++)
            {
                protocol = ((i_protocol_factory)m_vecICreate[i])(nCmd, (enumIbpCmdType)packet->getPacketType());
                if(protocol)
                {
                    //多个命令,共享协议体时,需要再次赋值
                    protocol->_cmd = nCmd;
                    break;
                }           
            }
            if(protocol != NULL)
            {
                protocol->setBody(packBody, packBodyLen);
                protocol->_sequence = packSeq;
                protocol->_unit = nUnit;
                protocol->_session = packSession;
    
                buffer = (char*)protocol->getBody();
                if (buffer != NULL && packBodyLen > 0)
                {
                    memset(m_pBodyBuffer, 0, m_nMaxLenBody);
                    char* end = strstr(buffer + sizeof(XML_HEAD_RAPID_EX), XML_HEAD_RAPID_EX);
                    if (end == NULL)
                    {
                        end = strstr(buffer + sizeof(XML_HEAD_RAPID_EX), XML_HEAD_RAPID_EXT);
                    }
    
                    if (end == NULL)
                    {
                        end = strstr(buffer + sizeof(XML_HEAD_RAPID_EX), XML_HEAD_RAPID_EXX);
                    }
    
                    if(end != NULL)
                    {
                        memcpy(m_pBodyBuffer, buffer, end - buffer);                
                        obj.parse<0>(m_pBodyBuffer);    
                        rapidxml::xml_node<>* pbody = obj.first_node();
                        if (pbody != NULL)
                        {
                            rapidxml::xml_attribute<>  *head = pbody->first_attribute(IBP_TAG_TAKE_LEN);
                            if(head != NULL)
                            {
                                protocol->_takeLen = atoi(head->value());
                            }
                        }
                    }
                    else
                    {
                        memcpy(m_pBodyBuffer, buffer, packBodyLen);
                        //如果下面没有xml直接调用xml.parse会出错
                        t_uint32 takeLen = 0;
                        {
                            char tag[64] = {0};
                            sprintf(tag, "%s=", IBP_TAG_TAKE_LEN);
                            char* pTakenLen = strstr(m_pBodyBuffer, tag);
                            if(pTakenLen != NULL)
                            {
                                pTakenLen += strlen(tag);
                                if(pTakenLen)
                                {
                                    char* pStart = pTakenLen + 1;
                                    char* pEnd = strstr(pStart, "\"");
                                    char szLen[32] = {0};
                                    if(pStart != NULL && pEnd != NULL)
                                    {
                                        memcpy(szLen, pStart, pEnd - pStart);
                                        takeLen = atoi(szLen);
                                    }                           
                                }
                            }
                        }
                        if(takeLen == 0)
                        {
                            obj.parse<0>(m_pBodyBuffer);
                        }
                        else
                        {
                            char* tmpBuffer = obj.allocate_string(0,packBodyLen - takeLen + 1);
                            memset(tmpBuffer, 0, packBodyLen - takeLen + 1);
                            memcpy(tmpBuffer, m_pBodyBuffer, packBodyLen - takeLen);
                            obj.parse<0>(tmpBuffer);
                        }               
    
                        rapidxml::xml_node<>* pbody = obj.first_node();
    
                        if (pbody != NULL)
                        {
                            rapidxml::xml_attribute<>  *head = pbody->first_attribute(IBP_TAG_TAKE_LEN);
    
                            if(head != NULL)
                            {
                                protocol->_takeLen = atoi(head->value());
                            }
                        }   
                        if(takeLen != protocol->_takeLen)
                        {
                            protocol->release();
                            protocol = NULL;
    
                            onPacketError(parser, peer, packet, arg);
    
                            LogErr(IBP_UTILS, "protocol takelen error! cmd=[%s]", packMethod.c_str());
                            return  E_FAIL;
                        }
                    }   
                }
                
                int ret = protocol->deserialize(obj);
                if(ret < 0)
                {
                    protocol->release();
                    protocol = NULL;
    
                    onPacketError(parser, peer, packet, arg);
    
                    LogErr(IBP_UTILS, "protocol deserialize failed! cmd=[%s]", packMethod.c_str());
                    return  E_FAIL;
                }
            }
            else
            {
                onPacketError(parser, peer, packet, arg);
                LogErr(IBP_UTILS, "create the protocol failed! cmd=[%s]", packMethod.c_str());
            }
    
            if(m_pIbpProtocol != NULL && protocol != NULL)
            {
                protocol->_peer = (t_uint32)peer;
                protocol->_transId = packet->getTransId();
                
                protocol->addRef();
                m_pIbpProtocol->onPDU(parser, peer, protocol, arg);
    
                LogDebug(IBP_UTILS, "onPDU ok cmd=[%s]", packMethod.c_str());
    
                protocol->release();
            }
            obj.clear();
        }
        catch (...)
        {
            if(protocol)
            {
                protocol->release();
            }
            LogErr(IBP_UTILS, "exception occur! cmd=[%s]", packMethod.c_str());
            return  -1;
        }
    
        return   E_OK;
    }
6、收到数据,首先解析出来命令码,然后和包的类型(请求、回复、通知),根据工厂方法创建出dhtp协议
    protocol = ((i_protocol_factory)m_vecICreate[i])(nCmd, (enumIbpCmdType)packet->getPacketType());
    设置protocol->setBody(packBody, packBodyLen); 然后调用dhtp协议的解析接口int ret = protocol->deserialize(obj);
主备构架iptables设置
1、考虑下面的需求,两台设备,部署两套服务,主服务器主备热切换,业务服务器负载均衡。
2、每台设备都有两个网卡,内网网卡和外网网卡,设备之间通过内网通信,对外只暴露一个外网ip,
    也就是说,只有一台设备的外网网卡激活。
3、比如外网ip 10.65.200.71,A设备内网192.168.1.71,业务服务监听地址为9820
    B设备内网192.168.1.72,业务服务监听地址为9822,考虑iptables的配置。
4、现在假设A设备外网可用,外部Client通信地址10.65.200.100,主要修改内容:
    A<-->Client,直接使用外网卡通信
    B--->Client,B的默认网关为192.168.1.71,A的iptables设置 src[192.168.1.72] dst[10.65.200.100],转化为
                src[10.65.200.71] dst[10.65.200.100]
    Client--->B,通过端口区分,A的iptables设置,src[10.65.200.100] dst[10.65.200.71],端口为9822,转化为
                src[10.65.200.71] dst[192.168.1.72]
参见
以scs为例说明网络包的接收和发送
1、dmu去监听scs,发送peLoginRequest,string dllName = "scs_deviced.dll";
2、适配器管理器加载动态库scs_deviced.dll,创建scs_device
3、scs_device处理peLoginRequest,创建peOptSocketInfoRequest,加载pSocket->_dllNames.push_back("scs_protocold.dll"); 发给outprocess
4、outprocess处理peOptSocketInfoRequest,判断socket的flag
    typedef enum
    {
        eIbpNet_Invalidate = 0,
        eIbp_TCPClient,
        eIbp_TCPServer,
        eIbp_UDPClient,
        eIbp_UDPServer,
    }enumIbpNetType;
    创建ibpSocket *pSession = new ibpSocket
    创建m_peParser = new peParser(this, dllNames, protocolType); // dllNames就是scs_protocold.dll
    创建m_protocolParser = new dhParser(ipbProtocol,dllNames);
5、outprocess关联一组ibpSocket,ibpSocket关联peParser,peParser关联prtlCallbackIbp【dhParser和SCSCmdParser都继承prtlCallbackIbp】
    peParser创建prtlCallbackIbp,根据协议类型,如果内部协议,直接new dhParser(ipbProtocol,dllNames);
    否者是裸的,动态加载方法create_parser
6、ibpSocket关联ibpNet,ibpNet关联prtlCallbackI【peParser继承prtlCallbackI】
    ibpNet发送消息的时候,找到peParser,peParser找到SCSCmdParser,SCSCmdParser找到协议工厂,做个协议拷贝,然后打包,发出去。
    从framework底层库收到数据,peParser执行onData,找到SCSCmdParser,然后解析。
7、协议打包出现问题,有可能是ibp_utils没有编译好。
原始请求消息的保存
1、当前模块转发请求,orgReqMsg是原始请求,reqMsg是新创建的请求。
    msg中的协议体是复制指针,而msg是整体拷贝。为什么msg要整体拷贝?
2、因为转发请求,src和dst要发生变化,不能直接修改原始请求信息,否则找不到原始请求。
    发送模块,发送一个请求,sequence+1,要保存原始请求,简单的做法是:
    SendMsg(reqMsg);
    saveOrgReqMsg(reqMsg._moduleSeq, orgReqMsg); // 发送成功再保存原始请求
    收到回复ackMsg,ackMsg的_moduleSeq与reqMsg的_moduleSeq相同,
    根据ackMsg或者reqMsg的_moduleSeq找到原始请求。
3、这里的设计不好,逻辑有点绕,把简单的问题搞的复杂。
    PushOriginalMsg(orgReqMsg) 保存原始请求,返回Id,根据这个Id再找回原始请求。
    这个Id赋值给reqMsg._moduleContext,
    收到回复ackMsg,ackMsg的_moduleContext与reqMsg的_moduleContext相同,
    根据ackMsg或者reqMsg的_moduleContext找到原始请求。
4、上面的第一种方法,发送成功再保存原始请求,而这里不行,要先保存原始请求,再发送。
    为什么?
    先发送,会保存请求消息【用于处理回复的时候,调用DealWithOneResponse(peInnerMsg& msgAck, peInnerMsg& msgReq)】
    这个时候,保存到vector起来,整体拷贝,_moduleContext是0
    后面再设置_moduleContext,已经没有用了,因为vector中保存的msg的_moduleContext还是0
    因此,要先保存原始请求。
5、但是这存在问题,如果发送失败,会造成资源泄露。怎么解决这个问题?
    CheckOutOfDateRequest会定时清除。
发送消息的peer是怎么来的
1、msg发给outprocess,需要指定peer,outprocess再发出去。
2、outprocess首先根据peer找串口句柄,如果找到直接发出去,并return。
3、然后getPeerBySeq根据peer找到realpeer,然后getSessionFromPeer根据realpeer找出ibpSocket,ibpSocket发送数据
4、ibpSocket关联ibpNet,在ibpNet调用 m_dhcpParser->serialize(ibpPacket, peer); 
    m_dhcpParser是协议解析基类,ibpPacket是数据报
5、outprocess在onConnect的时候保存peer与realpeer的映射关系,
6、outprocess在DealCreateSocket的时候,
    ibpSocket *pSession = new ibpSocket();
    SocketInfo info;
    m_mapSocketInfo[pSession] = info;
    在onConnect的时候,保存ibpSocket关联的一组realpeer
    it_map_SocketInfo it = m_mapSocketInfo.find(session);
    if(it != m_mapSocketInfo.end())
    {
        it->second.insert(peer);
    }
如何确定业务服务器在哪个板卡上
1、以vmu为例说明,首先在主控板,查看cmu连接的vmu在哪个ip地址,如下:
    [root@localhost ~]# netstat -anp|grep 9930
    tcp        0      0 192.168.1.164:59784         192.168.1.161:9930          ESTABLISHED 8595/./cmu_main 
2、查看iptables的DNAT设置,确定端口的映射
    [root@localhost ~]# service iptables status
    Table: nat
    Chain PREROUTING (policy ACCEPT)
    num  target     prot opt source               destination         
    1    DNAT       tcp  --  0.0.0.0/0            10.22.4.201         tcp dpt:1737 to:192.168.1.163:1733 
    2    DNAT       tcp  --  0.0.0.0/0            10.22.4.201         tcp dpt:1736 to:192.168.1.161:1733
3、连接10.22.4.201的1736端口,即可进入192.168.1.161
底层库局部更新的办法
1、适配器管理器根据设备类型,创建不同的设备适配器,设备适配器对不同的设备sdk封装,设备sdk是动态库。
    比如:dhdevice对dhsdk封装,hkdevice对hksdk封装。
2、这里存在问题,其他服务使用老的dhsdk,而vru需要使用新的dhsdk(提供视频浓缩的功能,访问ivs-s服务)进行视频浓缩。
    新的dhsdk,头文件和库的内容都发生了变化,会导致生成的dhdevice动态库变化,怎么办?
3、不能影响老的dhdevice编译,因此增加编译配置选项为Ivss_Debug和Ivss_Release,根据新的dhsdk生成新的dhdevice
4、其他服务的编译不用修改,对于编译vru,需要修改的地方:
    a、编译vru之前,先对老的dhsdk头文件和库,做备份,重命名为xxx_bak
    b、并且已经生成dhdevice也要做备份,重命名为xxx_bak,
    c、然后拷贝新的dhsdk头文件和库,到对应目录,使用Ivss_xxx编译选项,生成新的dhdevice
    d、编译生成vru,将新生成的dhdevice拷贝vru目录中,
    e、另外vru的启动脚本加载dh_sdk中的dhsdk动态库,这里为了不影响其他服务的使用,需要把新的dhsdk库拷贝到vru目录中
    f、上面的事情做完以后,还要恢复到原来的样子,也就是把xxx_bak都重命名回来,进行覆盖。
5、对于我本地调试,我可以直接覆盖,使用的dhsdk头文件和库,生成新的dhdevice,并使用
服务sdk发送消息的session是怎么来的
1、以Dmu为例说明,DMU_AC_Connect异步连接,产生一个dmuSession,dmuSession在基类SdkSessionBase自增Id
    m_sdkSessionSeq = SdkDataCenter::instance()->CreateSessionSeq();
2、保存映射关系,如下:
    DmuSession* pDmuSs = new DmuSession(DmuSdkManager::instance()->m_sdkManager);
    SdkDataCenter::instance()->AddSession(pDmuSs);  // 保存dmuSession与pDmuSs的映射
    *dmusession = pDmuSs->GetSdkSessionSeq();       // 返回给客户端dmuSession
    以后客户端请求,使用dmuSession,在sdk中找到pDmuSs,然后发送消息
3、pDmuSs发送消息,需要指定peer,这个peer就是连接成功,outprocess返回的peer
    protocol->_peer = m_peer;
4、pDmuSs发送消息,需要指定session,dmu服务根据这个session向cmu查询,这个session是否登陆过cmu
    protocol->_session = ss;
    这个session是怎么来的?
5、客户端主动连接cmu之后,m_session = pResponse->_session;//其他服务用于鉴权
    根据cmuSession获取pSs,然后获取session
    SdkSessionBase* pSs = SdkDataCenter::instance()->GetSession(m_cmuSession);
    ss = pSs->GetCheckSession();
6、有一种情况,比如iis服务,不是主动连接cmu,而是cmu主动连接iis,这个用户把m_session直接设置为-1
    //不使用CMU登陆时的异步连接专用 usersession 填写用户验证session
    t_int32 DMU_A_ConnectNotCmuHandle(t_uint32 usersession, char* ipinfo, char* svrdomid, t_int32 svrid, peInnerMsg& msg, t_uint32* dmusession);
7、上面的解决办法是投机的做法,更好的解决办法是:
    cmu连接iis之后,报告这个连接的session,然后iis连接dmu的时候,传递这个session
消息交互异步存在的问题
1、存在问题:
    客户端在vru上配置录像计划,对两个通道录像。vru向vtdu请求两路的rtsp,收到rtsp的地址。
    然后分别去连接rtsp,并且设置回调。连接两次,只收到一次回调连接成功,导致只有一路视频进行录像。
2、分析日志发现:
    对于第二次的连接,在设置回调之前,就已经收到连接成功onConnect,这时候回调方法为NULL,导致回调没有执行。
3、怎么解决这个问题?
4、先设置回调,再去连接rtsp。
    但是这里不行,因为必须先连接rtsp,获取到一个句柄,再根据这个句柄来设置回调。
5、换一种方法:
    在onConnect的时候,检查回调方法,不为NULL,直接回调。为NULL,保存消息,等到设置回调的时候,再找到信息回调上去。
    这还会存在问题,因为这里需要一个集合保存消息,为了同步,需要对集合加锁。
    考虑:网络线程onConnect上来,这时候没有设置回调,试图去保存消息,对集合尝试加锁。
    但是,但是,但是,这时候主线程设置了回调,尝试到集合中取出消息,先对集合加锁成功,找不到消息,没有回调成功,释放锁。
    然后网络线程获取到锁,进去保存消息,但是这时候已经没有办法回调成功了。
6、怎么解决上面的问题?
    问题的根源是:onConnect在设置回调之前。根本的解决办法是:
    connect是一个假动作,把句柄返回出来,然后设置回调,设置回调之后,再进行真正的connect操作。
    真正的connect操作放在设置回调之后。
理解CComProcessorBase类
1、CComProcessorBase是底层库的基类,对网络库的封装,处理收到的数据,并发送数据。处理缓冲区收到的数据有两种方式:
    a、创建一个线程,while(1)循环,调用run(),sleep(5),run调用HeartBeat
    b、创建一个线程,调用run(),run调用HeartBeat,下一次再调用run,通过外部触发,比如发送一个信息,处理缓冲区收到的数据。
    注意:HeartBeat从缓冲区取数据处理。可以一次取一条数据,也可以把当前缓冲区的数据全部取完处理。
2、上面两种方式的使用场景不同:
    对于while循序,不管什么情况,定时处理缓冲区收到的数据,缺点是有些循环,调用run是没有必要的,因为缓冲区不可能有数据。
    对于第二种方式,使用场景是,触发一下,从缓冲区取数据。比如,我知道如果我不发数据,缓冲区不可能收到数据,
    没有必要while循环去处理缓冲区数据,因为不可能有数据。解决办法是:我发一次数据,期望收到数据。
    过段时间,这个时候才调用run处理   缓冲区数据。
1、CComProcessorBase发送消息的时候,不是自己直接发送。策略是:
    a、通过SetReciver设置一个默认的接收者
    b、SendMsg的时候,也指定一个接收者,如果没有指定接收者,使用默认的接收者
    然后接收者直接PushMsg,放入自己的MsgPool
2、一个进程中,会有多个线程,也就是模块,模块之间会彼此发送消息。解决办法是:
    创建一个Manager,维护所有的模块,这些模块向Manager注册,接收者都设置为Manager,
    并且Manager定义为转发者,调用DispatchMsg。
    这些模块发送消息,接收者是Manager,并且Manager是转发者,Manager并不把消息放到自己的MsgPool,
    而是根据Dst,找到对应的模块,这个模块把消息Push到自己的MsgPool
3、CComProcessorBase处理消息有两种方式:
    a、定时处理,HeartBeat
    b、事件触发
4、定时处理的流程:
    启动线程threadfuncNoEvent-->While循环(内部run,然后sleep),也就是死循环,定时run
    -->run-->Heartbeat-->从MsgPool取出消息-->DealMsg(msg)
    也就是:定时从pool取出消息处理,sleep一会,继续
5、事件触发的流程:
    启动线程threadfunc-->m_evnetNotify调用open-->然后设置回调m_evnetNotify.setCallback(event_notify_callback, args)
    每次PushMsg(把消息Push到自己的MsgPool),都会调用m_evnetNotify.notify(),让它回调上来,回调上来的时候
    -->run-->Heartbeat-->从MsgPool取出消息-->DealMsg(msg)
    也就是:每次Push一个消息(包括请求,回复,通知消息),通知event让它回调。
    回调的时候从pool取出消息处理,sleep一会,继续
6、事件触发的方式,为什么不直接处理?而要绕一个圈子:先放进去,通知回调,回调的时候再取出来处理?
    为了接口统一化,其它模块只需要Push到当前模块就好了,后面是当前模块的处理。
7、之前vmu与vms交互,发现回调上来的序号不对,原因是:
    使用了事件触发的模式,但是想vms发送消息,并没有PushMsg的过程,不能正确地触发回调上来。
    重写了run,调用CComProcessorBase::Heartbeat(),并且调用m_cmdChannel->Heartbeat()等。
参见
理解rtsp的传输模式
1、传输码流方式有以下几种:
    enum RtpType
    {
        RTP_OVER_UDP,
        RTP_OVER_TCP,
        RTP_OVER_RTSP,
        OTHERS_OVER_TCP,
        OTHERS_OVER_UDP,
    };
    RTP_OVER_UDP:9836端口的tcp连接走rtsp流程,沟通udp端口走码流
    RTP_OVER_TCP:9836端口的tcp连接走rtsp流程,沟通tcp端口走码流
    RTP_OVER_RTSP:9836端口的tcp连接走rtsp流程,同时码流也在这个连接上传输
    也就是说,rtsp信令(setup,play,teardown)走在一个tcp连接上,码流可以新建一个udp连接,或者新建一个tcp连接,
    或者码流和信令走同一个连接。
    注:Others是指PS码流,PGPS是指对PS码流封装了一些头部信息,主要是一些属性。
2、客户端StartVideo的时候,指定传输协议enumTransProto    _transProto;
    typedef enum 
    {
        ST_TCP,
        ST_UDP,
        ST_RTSP
    }enumTransProto;
    在创建RtspClientKit的时候,根据_transProto转化为RtpType
3、rstp分为服务端和客户端,vru存在一个问题:
    在rtsp连接,onAccept以后,创建自增Id,然后new MediaSession用于发送码流。
    这就导致一个问题,一个rtsp的9836连接,只能回放一路视频。
    而vtdu从vru回放视频的时候,只建立一个rtsp连接,用于多路视频的回放,这就存在问题了。
4、怎么解决呢?
理解消息模式的线程处理
1、我们的程序是基于消息,内部流程都是异步的,只要把消息发给目标就好了。
2、什么要发消息给目标,而不是关联目标,直接调用目标的处理方法?
    如果使用后者,目标直接处理可能会比较耗时,也就是说卡在这里,这就是同步处理了。
    而发送消息给目标是异步的,当前模块只管发送请求给目标(不会卡主),目标处理完了,会发送回复,当前模块收到回复,然后处理。
    也就是说当前模块不会卡主,但是目标收到的请求比较多,按顺序处理,会导致当前模块收到回复比较慢。
3、这里还有一个问题,发送消息给目标,如果当前模块关联目标,就会导致耦合性很高,因为彼此之间都会有关联,形成蜘蛛网。
    解决办法是:使用中介者模式,每个模块向中介者注册,发送消息的时候,只要标识消息的目标(也就是接收者),
    发给中介,中介进行转发。
    这是在进程中的多个模块之间进行处理,如果通过网络发给对端,需要使用outprocess负责与外部进行交互。   
    一方面,消息接收者需要指定outprocess,同时需要指定消息中的协议的peer值,这个peer对应socket连接的句柄,
    这样才能通过网络发出去。
4、服务内部都是异步的,客户端有时候需要调用同步接口,怎么处理?
    使用PV操作。客户端调用同步接口,发送请求的时候,创建一个信号量,保存到map中,key是请求的seq,然后进行P操作。
    另外有一个线程,处理回复,收到回复之后,把回复保存在map中,同时根据回复的seq(等于请求的seq)找到信号量,进行V操作。
    在这个时候,上面P操作就可以进行下去,然后到map中找到回复,返回给客户端。
    特备注意:这里的PV操作必须是两个线程,否者死锁。
    如果是一个线程,当前的P操作卡住,等待V操作,而V操作代码在P操作代码下面,等待P操作的执行,形成死锁。
5、目标处理消息,要启动一个线程,启动线程有两种方式:
    a、基于event,先设置回调,每次pushMsg的时候,调用notify,然后就会回调上来。
    b、使用while死循环,处理消息。
    这两种启动方式在CComProcessorBase体现
    // 启动线程,默认0启动EventManager  1启动while 调用 run
    virtual bool    start(int iStartFlag = 0);
6、每个目标启动一个线程,存在问题:
    目标很多,特别是存在设备的情况下,线程特别多。我们知道一个线程的开销还是比较大的
    (在windows下线程栈默认是1M,Linux下线程栈默认是10M,可以使用ulimit -s设置)。
7、怎么解决上面的问题?
    不能每个目标启动一个线程,而是创建一个线程,管理一组目标,由这个线程驱动,使用while死循环,遍历目标,
    调用每个目标的heartbeat处理消息。也就是说,消息还是发给目标,但是目标不会主动处理消息,要靠外部线程驱动。
    也就是说,目标处理消息,有两种驱动方式:
    a、自身驱动,线程个数比较多
    b、外部驱动,一个线程管理一组目标。
    这两种方式在CDeviceBase: public CComProcessorBase中体现
    virtual t_int32 startDriver(void)=0;
    自身驱动在startDriver中调用 ibpThreadBase::start(); 否者就是工作线程驱动。
8、对于外部线程驱动的流程是:调用适配器管理器的时候,会初始化登录线程和工作线程的个数。
    添加一个设备,会分配给一个负载最小的登录线程。一个登录线程管理一组设备,对于登录失败的设备,sleep一会,下次继续登录。
    对于登录成功的设备,从登录线程移除,放入到负载最小的工作线程。以后消息还是直接发给设备,但是设备不会主动的处理消息,而是由外部线程驱动。
参见
网络数据的打包解析
1、一个进程内的线程之间通信,没有经过网络传输数据,不需要打包解析。
2、如果通过网络传输数据,需要打包解析。TCP和UDP是一种流协议,没有报文边界的概念,网络上传输的都是字节流。
3、应用程序发送数据和接收数据都是直接对结构体操作,结构体提供打包和解析的接口。
    发送数据:应用程序把结构体传给底层库,底层库调用结构体的打包接口,然后把字节流发出去。
    接收数据:底层库收到数据,对数据进行解析,得到一个结构体,回调给应用程序。
4、底层库负责发送和接收数据,但是不知道怎么打包和解析,因此会关联一个抽象的解析器。
    应用程序需要提供一个具体的解析器,让底层库来调用。
5、底层库关联peParser,peParser关联prtlCallbackIIbp*     m_protocolParser;
    而class dhParser: public prtlCallbackIIbp
    在与外部通信的时候,创建socket的时候,指定协议库pSocketInfo->_dllNames.push_back("new_dhtp_protocold.dll");
    动态加载,根据协议库的dll名称,创建出m_protocolParser = new dhParser(ipbProtocol,dllNames);
6、收到数据,底层库处理Input 会执行 dhParser::onPacket --> int ret = protocol->deserialize(obj); 解析数据
    发送数据,outprocess发送一个消息,ibpSocket::sendMsgPackage --> ibpNet::sendPackage --> copyPro->serialize(); 打包发出去。
1、ibpNet关联底层库,把peParser传递过去,peParser关联ptrlCallbackIIbp,ptrlCallbackIIbp是解析类的基类。
2、ptrlCallbackIIbp的子类有dhParser,ScsCmdParser,PioneerParser,解析器和协议库在一起。
    通过接口create_parser 暴露出去
3、以ScsCmdParser为例,ScsCmdParser关联ibpProtocolI(也就是ibpSocket),ibpProtocolI暴露接口onPDU
    ibpProtocolI的子类有ibpSocket和ibpCom
4、outprocess关联一组ibpSocket,创建解析器的时候,把ibpSocket传递过去。
5、底层库收到数据,onData找到peParser,找到ScsCmdParser,网络数据解析完成,调用ibpSocket的onPDU
6、ibpSocket的onPDU回调给outprocess,outprocess把消息Push到自己的队列中,heartbeat来处理。
7、scsDevice继承CComProcessorBase,在虚方法DealWithOneRequest处理解析上来的消息。
8、注意:在ScsCmdParser,对于收上来的数据,每个连接(Peer)有一个自己的接收缓冲区。
    当数据累积到一个完整的消息(消息头包含消息长度),解析,回调上去。
设备增删改的影响
1、存在问题:
    在设备存在的情况下,配置了很多信息,比如上墙的设备,配置了预置位,预置位配置了测温对象,现在设备要删除,
    已经配置好的信息怎么办?
2、根据实际的需求来处理。有些情况,不允许删除设备,先解除关联,才能删除。有些情况,把关系的信息一起删除。
参见
负载均衡
1、解决什么问题?
    一个域(可以理解为区域)下面有很多设备,需要对这些设备管理,比如登录设备,控制设备,接收告,这些都是dmu服务的功能。
    问题是设备很多,一个dmu管理不过来,需要一组dmu分布式部署,共同管理这些设备。这些dmu之间,构成一主多从负载均衡的关系,
    master由cmu来指定,slave去连接master。
    存在两种情况:
    a、自上而下的控制命令,客户端发送控制命令给master,master找到负责的salve,把命令发给slave,再原路返回,
        类似LVS的NAT模式(Network Address Translation,网络地址转换)。
    b、自下而上的告警收集,客户端向master请求所有的slave,然后分别去连接,slave把告警发给客户端,
        类似LVS的IP隧道模式(IP Tunneling,IP隧道模式)。
        还有一种DR模式(Direct Routing,直接路由模式),和IP隧道模式类似,比IP隧道模式更为直接和底层,DR模式直接修改Mac地址。
    再考虑级联的情况,dmu除了直接管理的当前域设备,还要管理下级域的设备。有多个下级域,每个下级域都有一组dmu服务。
    举个现实的例子,一个省有一个省长,多个副省长。下面有多个市,每个市都有一个市长,多个副市长。
    
    对于当前域,负载均衡的输入是当前域的一组dmu,和总的设备。总的设备包括:当前域的设备,
    和各个下级域的一组dmu服务以及对应管理的设备。这里有分为两种情况:
    对下级域的所有设备拿过来分,还是把下级域的dmu服务作为一个设备。考虑下级的一个dmu负责100个设备,上级两个dmu负载,
    理论上应该是每个dmu负责50个。实际上没有必要,可以上级一个dmu负责整个下级dmu,另外一个dmu负责为空。
    因为瓶颈在下级dmu,上面dmu负载均衡也没有特别的意义。
2、怎么解决?
3、不考虑权重,没有记忆功能,每个dmu应该负责的设备个数 (DevSize-1)/ServSize +1,直接分配。但是这样存在问题:
    比如当前两个dmu,60个设备,负载后,d1[1-30],d2[31-60],现在多部署一个dmu,负载后
    d1[1-20],d2[21-40],d3[41-60],但是这样会导致:
    d1登出10个设备
    d2登出20个设备,同时登入10个设备
    d3登入20个设备
    性能不好,理论上应该是d1[1-20],d2[31-50],d3[21-30,50-60],
4、这就需要有记忆功能。思路是:第一个平均分配,以后每次负载都要参考上一次的负载均衡结果。分为以下情况:
    a、设备增加,计算出每个dmu应该负责多少设备,遍历dmu,不够的补充。
    b、设备删除,去掉设备,计算出每个dmu应该负责多少设备,遍历dmu,逐个收集,然后再逐个补充。也就是,损有余,补不足。
    c、dmu增加,计算出每个dmu应该负责多少设备,遍历dmu,逐个收集,然后再逐个补充。
    d、dmu删除,负责的设备拿出来,再逐个补充。
5、上面没有考虑权重,有的设备只有一个通道,而有的设备可能有几十个通道。他们的权重不一样,必须考虑。
    思路:应该负责多少个,当前负责多少个,找出一个最接近的补充上,继续。
    考虑极端的情况,权重分别为:100,4,3,2,1 3个dmu负载。理想的结果是 d1[100], d2[4,1], d3[3,2]
    按照上面的思路,应该负责37,结果是d1[100], d2[4,3,2,1], d3[] 但是这没有关系,因为瓶颈在d1,d2,d3分配的再均衡也没有意义。
6、考虑权重,再考虑记忆功能,和上面的情况类似。
7、负载均衡,由master调用,好了之后,告诉每个slave负责的设备,slave登入登出处理。
参见
负载均衡的两种场景
1、考虑dmu的负载均衡,也就是M个dmu去分N个设备。Master进行负载均衡,计算每个Slave负责的设备。
    分好之后,通知每个Slave,然后Slave对负责的设备进行登录。
2、问题来了,当设备增删改的时候,怎么办?
    设备的增删要重新负载均衡,Master重新负载均衡之后,再次通知Slave,Slave根据上次负责的设备和这一次负责的设备,
    计算出应该登录哪些设备和应该登出哪些设备,进行处理。设备的修改,Master通知对应的Slave,Slave根据具体情况处理,
    如果登录信息发生变化(比如ip、port、登录名和密码)需要登出老的,重新登录。
3、考虑vru的负载均衡,vru负责录像和回放。开始的时候是不知道要进行哪些录像和回放,是动态变化的。
    处理策略是每次收到一个新的录像或者回放的请求,找出当前负载最小的vru,发给对应的vru。
4、这里存在问题,负载均衡的时候,需要知道每个vru的负载信息。怎么处理?
    一种办法是:可以在Master端进行管理,发给对应Slave录像或者回放的时候,把Slave的负载加1。
    当Slave完成录像或者回放的时候,对应Slave的负载减1.
    另一种办法是:Slave端保存自己的负载信息,定时向Master汇报自己的负载信息,Master端进行更新。
连接和登录的区别
1、连接就是网络层建立tcp或者udp连接
2、登录是指在连接成功之后,输入用户名和密码
项目中大数据的处理
1、主要是实时数据和告警。以告警为例,自下而上收到的告警非常多,数据量非常大。怎么解决?
2、从源头上过滤,一段时间的重复告警过滤掉,过滤策略可以针对告警类型,也可以细化到具体的设备。
    比如移动侦测告警,小区门口特别多,仓库里就很少。小区门口,10分钟内的告警过滤掉,仓库的内告警不过滤。
3、按照客户端的订阅,来上报给客户端。减少网络流量,进入页面订阅,离开取消。可以订阅具体的设备,也可以订阅组织。
4、告警的写入,缓存起来,批量写入。
5、历史告警会查询,因为查询条件肯定带着时间段。因此按时间段建立分区表,同时动态管理分区表,老的分区删除,建立新的分区。
6、实时数据的平均值,比如温度,求一天每个小时的平均问题,一个月每天的平均温度。如果直接来求,效能太差。解决办法是:
    利用mysql的时间调度器,定时求出平均值放入平均值表中,以后直接查询平均值表。
参见
Copyright (c) 2015~2016, Andy Niu @All rights reserved. By Andy Niu Edit.