Andy Niu Help
1.0.0.0
|
架构和设计
详细描述
变量说明
CComProcessorBase使用说明 |
1、start方式 start(0):PushMsg的时候,触发run()-->Heartbeat()-->DealMsg(msg) start(1):while死循环,定时触发 2、创建定时器 threadInit() 创建定时器,定时器属于当前线程 其它地方创建定时器,定时器属于主线程 3、在threadInit() 创建定时器,可以避免多线程竞争的问题,但是启动方式必须是start(0)
- 参见
dmu_sdk同步与异步 |
1、整个框架都是基于消息的,每个消息处理单元继承CComProcessorBase,CComProcessorBase启动一个线程处理消息。 启动线程有两种方式: start(0) 基于事件,push进去一个消息,调用notify,然后回调上来,处理消息。 start(1) while死循环,处理消息队列中的消息,防止cpu占用100%,没有消息sleep(5毫秒),有消息一直处理 2、事件驱动,SendMsg-->PushMsg-->notify 回调上来 特别注意:SendMsg的时候,检查方法形参的接收者是否为空,为空,设置为默认的接收者。 然后接收者Push消息进入自己的消息队列。 当前线程SendMsg,不是Push给自己(Push给自己没有意义),而是Push给其它的线程,其它的线程DealWithOneRequest 3、sdk异步的过程: 比如当前模块A关联outprocess,并设置outprocess的接收者是自己。 初始化设置异步回调,A模块线程SendMsg,关联outprocess,PushMsg给outprocess,触发outprocess线程回调上来。 outprocess处理消息DealWithOneRequest,调用底层库把消息打包发出去。 底层库线程收到消息,然后解析,onPDU回调上来,给outprocess,outprocess调用SendMsg,找到默认接收者模块A, Push给模块A,模块A回调上来,处理消息DealWithOneResponse 实际情况还要多一层关联,dmu_session关联sdk_manager,sdk_manager关联outprocess, 发请求,dmu_session模块SendMsg直接调用m_sdkManager模块SendMsg,m_sdkManager模块SendMsg直接Push给outprocess 收回复,outprocess的OnPDU上来,SendMsg时Push给SdkManager,SdkManager进行DealWithOneResponse时调用dmu_session的DealWithOneResponse 4、sdk同步的过程: 同步是对异步的封装,在发送消息和接收消息的两个口子,处理不一样。 发送消息:使用P操作(检查超时) 接收消息:设置异步回调,异步回调上来,进行V操作。然后发送消息线程的P操作进行下去。 注意:同步与异步,设置的异步回调不一样。同步设置的异步回调,回调上来,执行V操作。 5、SendMsg是对PushMsg的封装,对于请求消息,SendMsg的时候保存请求消息,为了和回复消息关联起来。
rpu2000T有关 |
1、rpu2000T对外(也可以理解对上)暴露口子uac,uac向上级uas注册。 2、uac下面是我们的IBP平台,包含cmu,dmu,vtdu等,uac登录IBP平台的cmu,连接dmu,vtdu等 3、IBP平台下面是uas,设备分为三种情况: 本级域设备,dmu直接登录 下级域设备,dmu通知uas去登录 还有一种设备,现在nvr上添加设备,然后nvr向uas注册,推送设备。 4、请求rtsp的流程,要经过uac,获取到rtsp之后,根据rtsp的地址直接拉码流,就不需要经过uac
一主一从__主从切换 |
1、通过hau心跳服务器,彼此通过udp,监听端口,和向对端发送信息。起始的时候,认为自己是slave, 如果一段时间没有收到对端的master报告,自己升级为master,同时定时通知对端,自己是master。 这里存在一个极端的情况:两个服务同时启动,都收不到对端的master报告, 然后同时升级为master,并向对端报告自己是master,这就出现,自己是master,又收到对端的master报告。 2、怎么解决这个问题? 3、增加一个比较项,也就是两个服务设置不同的等级。发送master报告的时候,带上自己的等级。 当服务自己是master,同时又收到对端的mster报告,比较自己的等级和对端的等级,等级高,自己还是master, 等级低,自己降为slave。这样保证两个master,其中一个自动降为slave
- 参见
一主多从__负载均衡 |
有个中心管理器,连接一组dmu,决定哪个是master,其他的是slave。决策机制是:同步去连接这一组dmu, 根据回复的先后顺序,进行排序,第一个为master,第一个挂了,第二个指定为master。
- 参见
为什么new出来pe协议,打包的时候调用dhtp协议的serialize方法 |
1、new ibpSocket的时候,创建peParser,然后peParser创建dhParser 2、dhParser在构造方法中,加载动态库new_dhtp_protocold.dll, 定位到方法 void* pFun = pMa->dllsym(fileName, "protocol_factory"); 方法原型为 DLL_EXPORT ibpProtocol* protocol_factory(t_int32 command, enumIbpCmdType etype) 放入集合vecICreate 3、发送数据,打包的时候调用dhParser::createCopyProtocol(ibpProtocol* protocol) obj = ((i_protocol_factory)m_vecICreate[i])(protocol->_cmd, protocol->GetType()); 根据协议的命令码和类型(请求、回复、通知)创建出dhtp协议 m_vecICreate[i] 就是dhtpProtocol.cpp中的方法protocol_factory 强转为方法i_protocol_factory,然后调用,传入参数(protocol->_cmd, protocol->GetType()) 4、在方法protocol_factory中, ibpProtocol* retPdu = NULL; const size_t count = sizeof(DHCP_ALL_PROTOCOL_UNITS)/sizeof(DHCP_OPERATION_UNIT); const DHCP_OPERATION_UNIT* opUnit; for(size_t i = 1; count > i; ++i) { opUnit = &DHCP_ALL_PROTOCOL_UNITS[i]; if(opUnit->_opType == command) { if(eIbpCmdType_Request == etype) retPdu = opUnit->_reqFactory(); else if(eIbpCmdType_Response == etype) retPdu = opUnit->_resFactory(); else if(eIbpCmdType_Notify == etype) retPdu = opUnit->_notifyFactory(); break; } } 在这里遍历数组DHCP_ALL_PROTOCOL_UNITS,数组DHCP_ALL_PROTOCOL_UNITS的元素是: struct DHCP_OPERATION_UNIT { enumIbpCmd _opType; const char* _opStr; FunDHCPProtocolRequestFatcory _reqFactory; FunDHCPProtocolResponseFatcory _resFactory; FunDHCPProtocolNotifyFatcory _notifyFactory; }; 后面三个字段都是模板方法,如下: template<class T> ibpProtocolRequset* dhcpProtocolRequestFatcory() { return new T; } 初始化DHCP_ALL_PROTOCOL_UNITS指定模板参数 {eIbpCmd_HeartBeat, "HeartBeat",dhcpProtocolRequestFatcory<dhcpHeartBeatRequest>,dhcpProtocolResponseFatcory<dhcpHeartBeatResponse>,NULL}, retPdu = opUnit->_reqFactory(); 就可以new出dhtp协议 5、从网络上收到数据,是如何解析的?如下: t_int32 dhParser::onPacket(prtlParserI *parser, prtlPeerI *peer, smartPrtlPacketI packet, void *arg) { //版本 //t_int32 packVersion = packet->getVersion(); //命令 stringI packMethod = ""; packet->getMethod(packMethod); //命令类型 请求 应答 回复 //t_int32 packType = packet->getPacketType(); //包序号 t_uint32 packSeq = 0; packet->getSequence(packSeq); //模块名称 stringI packMode = ""; packet->getModule(packMode); t_uint32 packSession = 0; packet->getSession(packSession); //包内容 int packBodyLen = 0; char* packBody = packet->getBody(packBodyLen); if (parser == NULL || peer == NULL) { LogErr(IBP_UTILS, "parser or peer is null cmd=[%s]", packMethod.c_str()); return E_FAIL; } LogDebug(IBP_UTILS, "onPacket method=[%s] netseq=[%d]", packMethod.c_str(), packSeq); ibpProtocol* protocol = NULL; try { char* buffer = NULL; rapidxml::xml_document<> obj; if((t_uint32)packBodyLen > m_nMaxLenBody) { delete[] m_pBodyBuffer; m_pBodyBuffer = new char[packBodyLen+1]; m_nMaxLenBody = packBodyLen+1; } t_int32 nCmd = -1; t_int32 nUnit = -1; for (size_t i = 0; i < m_vecINameToInt.size(); i++) { if(0 == ((i_cmd_str_to_int)m_vecINameToInt[i])(packMethod.c_str(), nCmd)) { break; } } for (size_t i = 0; i < m_vecINameToUnit.size(); i++) { if(0 == ((i_unit_str_to_int)m_vecINameToUnit[i])(packMode.c_str(), nUnit)) { break; } } for (size_t i = 0; i < m_vecICreate.size(); i++) { protocol = ((i_protocol_factory)m_vecICreate[i])(nCmd, (enumIbpCmdType)packet->getPacketType()); if(protocol) { //多个命令,共享协议体时,需要再次赋值 protocol->_cmd = nCmd; break; } } if(protocol != NULL) { protocol->setBody(packBody, packBodyLen); protocol->_sequence = packSeq; protocol->_unit = nUnit; protocol->_session = packSession; buffer = (char*)protocol->getBody(); if (buffer != NULL && packBodyLen > 0) { memset(m_pBodyBuffer, 0, m_nMaxLenBody); char* end = strstr(buffer + sizeof(XML_HEAD_RAPID_EX), XML_HEAD_RAPID_EX); if (end == NULL) { end = strstr(buffer + sizeof(XML_HEAD_RAPID_EX), XML_HEAD_RAPID_EXT); } if (end == NULL) { end = strstr(buffer + sizeof(XML_HEAD_RAPID_EX), XML_HEAD_RAPID_EXX); } if(end != NULL) { memcpy(m_pBodyBuffer, buffer, end - buffer); obj.parse<0>(m_pBodyBuffer); rapidxml::xml_node<>* pbody = obj.first_node(); if (pbody != NULL) { rapidxml::xml_attribute<> *head = pbody->first_attribute(IBP_TAG_TAKE_LEN); if(head != NULL) { protocol->_takeLen = atoi(head->value()); } } } else { memcpy(m_pBodyBuffer, buffer, packBodyLen); //如果下面没有xml直接调用xml.parse会出错 t_uint32 takeLen = 0; { char tag[64] = {0}; sprintf(tag, "%s=", IBP_TAG_TAKE_LEN); char* pTakenLen = strstr(m_pBodyBuffer, tag); if(pTakenLen != NULL) { pTakenLen += strlen(tag); if(pTakenLen) { char* pStart = pTakenLen + 1; char* pEnd = strstr(pStart, "\""); char szLen[32] = {0}; if(pStart != NULL && pEnd != NULL) { memcpy(szLen, pStart, pEnd - pStart); takeLen = atoi(szLen); } } } } if(takeLen == 0) { obj.parse<0>(m_pBodyBuffer); } else { char* tmpBuffer = obj.allocate_string(0,packBodyLen - takeLen + 1); memset(tmpBuffer, 0, packBodyLen - takeLen + 1); memcpy(tmpBuffer, m_pBodyBuffer, packBodyLen - takeLen); obj.parse<0>(tmpBuffer); } rapidxml::xml_node<>* pbody = obj.first_node(); if (pbody != NULL) { rapidxml::xml_attribute<> *head = pbody->first_attribute(IBP_TAG_TAKE_LEN); if(head != NULL) { protocol->_takeLen = atoi(head->value()); } } if(takeLen != protocol->_takeLen) { protocol->release(); protocol = NULL; onPacketError(parser, peer, packet, arg); LogErr(IBP_UTILS, "protocol takelen error! cmd=[%s]", packMethod.c_str()); return E_FAIL; } } } int ret = protocol->deserialize(obj); if(ret < 0) { protocol->release(); protocol = NULL; onPacketError(parser, peer, packet, arg); LogErr(IBP_UTILS, "protocol deserialize failed! cmd=[%s]", packMethod.c_str()); return E_FAIL; } } else { onPacketError(parser, peer, packet, arg); LogErr(IBP_UTILS, "create the protocol failed! cmd=[%s]", packMethod.c_str()); } if(m_pIbpProtocol != NULL && protocol != NULL) { protocol->_peer = (t_uint32)peer; protocol->_transId = packet->getTransId(); protocol->addRef(); m_pIbpProtocol->onPDU(parser, peer, protocol, arg); LogDebug(IBP_UTILS, "onPDU ok cmd=[%s]", packMethod.c_str()); protocol->release(); } obj.clear(); } catch (...) { if(protocol) { protocol->release(); } LogErr(IBP_UTILS, "exception occur! cmd=[%s]", packMethod.c_str()); return -1; } return E_OK; } 6、收到数据,首先解析出来命令码,然后和包的类型(请求、回复、通知),根据工厂方法创建出dhtp协议 protocol = ((i_protocol_factory)m_vecICreate[i])(nCmd, (enumIbpCmdType)packet->getPacketType()); 设置protocol->setBody(packBody, packBodyLen); 然后调用dhtp协议的解析接口int ret = protocol->deserialize(obj);
主备构架iptables设置 |
1、考虑下面的需求,两台设备,部署两套服务,主服务器主备热切换,业务服务器负载均衡。 2、每台设备都有两个网卡,内网网卡和外网网卡,设备之间通过内网通信,对外只暴露一个外网ip, 也就是说,只有一台设备的外网网卡激活。 3、比如外网ip 10.65.200.71,A设备内网192.168.1.71,业务服务监听地址为9820 B设备内网192.168.1.72,业务服务监听地址为9822,考虑iptables的配置。 4、现在假设A设备外网可用,外部Client通信地址10.65.200.100,主要修改内容: A<-->Client,直接使用外网卡通信 B--->Client,B的默认网关为192.168.1.71,A的iptables设置 src[192.168.1.72] dst[10.65.200.100],转化为 src[10.65.200.71] dst[10.65.200.100] Client--->B,通过端口区分,A的iptables设置,src[10.65.200.100] dst[10.65.200.71],端口为9822,转化为 src[10.65.200.71] dst[192.168.1.72]
- 参见
以scs为例说明网络包的接收和发送 |
1、dmu去监听scs,发送peLoginRequest,string dllName = "scs_deviced.dll"; 2、适配器管理器加载动态库scs_deviced.dll,创建scs_device 3、scs_device处理peLoginRequest,创建peOptSocketInfoRequest,加载pSocket->_dllNames.push_back("scs_protocold.dll"); 发给outprocess 4、outprocess处理peOptSocketInfoRequest,判断socket的flag typedef enum { eIbpNet_Invalidate = 0, eIbp_TCPClient, eIbp_TCPServer, eIbp_UDPClient, eIbp_UDPServer, }enumIbpNetType; 创建ibpSocket *pSession = new ibpSocket 创建m_peParser = new peParser(this, dllNames, protocolType); // dllNames就是scs_protocold.dll 创建m_protocolParser = new dhParser(ipbProtocol,dllNames); 5、outprocess关联一组ibpSocket,ibpSocket关联peParser,peParser关联prtlCallbackIbp【dhParser和SCSCmdParser都继承prtlCallbackIbp】 peParser创建prtlCallbackIbp,根据协议类型,如果内部协议,直接new dhParser(ipbProtocol,dllNames); 否者是裸的,动态加载方法create_parser 6、ibpSocket关联ibpNet,ibpNet关联prtlCallbackI【peParser继承prtlCallbackI】 ibpNet发送消息的时候,找到peParser,peParser找到SCSCmdParser,SCSCmdParser找到协议工厂,做个协议拷贝,然后打包,发出去。 从framework底层库收到数据,peParser执行onData,找到SCSCmdParser,然后解析。 7、协议打包出现问题,有可能是ibp_utils没有编译好。
原始请求消息的保存 |
1、当前模块转发请求,orgReqMsg是原始请求,reqMsg是新创建的请求。 msg中的协议体是复制指针,而msg是整体拷贝。为什么msg要整体拷贝? 2、因为转发请求,src和dst要发生变化,不能直接修改原始请求信息,否则找不到原始请求。 发送模块,发送一个请求,sequence+1,要保存原始请求,简单的做法是: SendMsg(reqMsg); saveOrgReqMsg(reqMsg._moduleSeq, orgReqMsg); // 发送成功再保存原始请求 收到回复ackMsg,ackMsg的_moduleSeq与reqMsg的_moduleSeq相同, 根据ackMsg或者reqMsg的_moduleSeq找到原始请求。 3、这里的设计不好,逻辑有点绕,把简单的问题搞的复杂。 PushOriginalMsg(orgReqMsg) 保存原始请求,返回Id,根据这个Id再找回原始请求。 这个Id赋值给reqMsg._moduleContext, 收到回复ackMsg,ackMsg的_moduleContext与reqMsg的_moduleContext相同, 根据ackMsg或者reqMsg的_moduleContext找到原始请求。 4、上面的第一种方法,发送成功再保存原始请求,而这里不行,要先保存原始请求,再发送。 为什么? 先发送,会保存请求消息【用于处理回复的时候,调用DealWithOneResponse(peInnerMsg& msgAck, peInnerMsg& msgReq)】 这个时候,保存到vector起来,整体拷贝,_moduleContext是0 后面再设置_moduleContext,已经没有用了,因为vector中保存的msg的_moduleContext还是0 因此,要先保存原始请求。 5、但是这存在问题,如果发送失败,会造成资源泄露。怎么解决这个问题? CheckOutOfDateRequest会定时清除。
发送消息的peer是怎么来的 |
1、msg发给outprocess,需要指定peer,outprocess再发出去。 2、outprocess首先根据peer找串口句柄,如果找到直接发出去,并return。 3、然后getPeerBySeq根据peer找到realpeer,然后getSessionFromPeer根据realpeer找出ibpSocket,ibpSocket发送数据 4、ibpSocket关联ibpNet,在ibpNet调用 m_dhcpParser->serialize(ibpPacket, peer); m_dhcpParser是协议解析基类,ibpPacket是数据报 5、outprocess在onConnect的时候保存peer与realpeer的映射关系, 6、outprocess在DealCreateSocket的时候, ibpSocket *pSession = new ibpSocket(); SocketInfo info; m_mapSocketInfo[pSession] = info; 在onConnect的时候,保存ibpSocket关联的一组realpeer it_map_SocketInfo it = m_mapSocketInfo.find(session); if(it != m_mapSocketInfo.end()) { it->second.insert(peer); }
如何确定业务服务器在哪个板卡上 |
1、以vmu为例说明,首先在主控板,查看cmu连接的vmu在哪个ip地址,如下: [root@localhost ~]# netstat -anp|grep 9930 tcp 0 0 192.168.1.164:59784 192.168.1.161:9930 ESTABLISHED 8595/./cmu_main 2、查看iptables的DNAT设置,确定端口的映射 [root@localhost ~]# service iptables status Table: nat Chain PREROUTING (policy ACCEPT) num target prot opt source destination 1 DNAT tcp -- 0.0.0.0/0 10.22.4.201 tcp dpt:1737 to:192.168.1.163:1733 2 DNAT tcp -- 0.0.0.0/0 10.22.4.201 tcp dpt:1736 to:192.168.1.161:1733 3、连接10.22.4.201的1736端口,即可进入192.168.1.161
底层库局部更新的办法 |
1、适配器管理器根据设备类型,创建不同的设备适配器,设备适配器对不同的设备sdk封装,设备sdk是动态库。 比如:dhdevice对dhsdk封装,hkdevice对hksdk封装。 2、这里存在问题,其他服务使用老的dhsdk,而vru需要使用新的dhsdk(提供视频浓缩的功能,访问ivs-s服务)进行视频浓缩。 新的dhsdk,头文件和库的内容都发生了变化,会导致生成的dhdevice动态库变化,怎么办? 3、不能影响老的dhdevice编译,因此增加编译配置选项为Ivss_Debug和Ivss_Release,根据新的dhsdk生成新的dhdevice 4、其他服务的编译不用修改,对于编译vru,需要修改的地方: a、编译vru之前,先对老的dhsdk头文件和库,做备份,重命名为xxx_bak b、并且已经生成dhdevice也要做备份,重命名为xxx_bak, c、然后拷贝新的dhsdk头文件和库,到对应目录,使用Ivss_xxx编译选项,生成新的dhdevice d、编译生成vru,将新生成的dhdevice拷贝vru目录中, e、另外vru的启动脚本加载dh_sdk中的dhsdk动态库,这里为了不影响其他服务的使用,需要把新的dhsdk库拷贝到vru目录中 f、上面的事情做完以后,还要恢复到原来的样子,也就是把xxx_bak都重命名回来,进行覆盖。 5、对于我本地调试,我可以直接覆盖,使用的dhsdk头文件和库,生成新的dhdevice,并使用
服务sdk发送消息的session是怎么来的 |
1、以Dmu为例说明,DMU_AC_Connect异步连接,产生一个dmuSession,dmuSession在基类SdkSessionBase自增Id m_sdkSessionSeq = SdkDataCenter::instance()->CreateSessionSeq(); 2、保存映射关系,如下: DmuSession* pDmuSs = new DmuSession(DmuSdkManager::instance()->m_sdkManager); SdkDataCenter::instance()->AddSession(pDmuSs); // 保存dmuSession与pDmuSs的映射 *dmusession = pDmuSs->GetSdkSessionSeq(); // 返回给客户端dmuSession 以后客户端请求,使用dmuSession,在sdk中找到pDmuSs,然后发送消息 3、pDmuSs发送消息,需要指定peer,这个peer就是连接成功,outprocess返回的peer protocol->_peer = m_peer; 4、pDmuSs发送消息,需要指定session,dmu服务根据这个session向cmu查询,这个session是否登陆过cmu protocol->_session = ss; 这个session是怎么来的? 5、客户端主动连接cmu之后,m_session = pResponse->_session;//其他服务用于鉴权 根据cmuSession获取pSs,然后获取session SdkSessionBase* pSs = SdkDataCenter::instance()->GetSession(m_cmuSession); ss = pSs->GetCheckSession(); 6、有一种情况,比如iis服务,不是主动连接cmu,而是cmu主动连接iis,这个用户把m_session直接设置为-1 //不使用CMU登陆时的异步连接专用 usersession 填写用户验证session t_int32 DMU_A_ConnectNotCmuHandle(t_uint32 usersession, char* ipinfo, char* svrdomid, t_int32 svrid, peInnerMsg& msg, t_uint32* dmusession); 7、上面的解决办法是投机的做法,更好的解决办法是: cmu连接iis之后,报告这个连接的session,然后iis连接dmu的时候,传递这个session
消息交互异步存在的问题 |
1、存在问题: 客户端在vru上配置录像计划,对两个通道录像。vru向vtdu请求两路的rtsp,收到rtsp的地址。 然后分别去连接rtsp,并且设置回调。连接两次,只收到一次回调连接成功,导致只有一路视频进行录像。 2、分析日志发现: 对于第二次的连接,在设置回调之前,就已经收到连接成功onConnect,这时候回调方法为NULL,导致回调没有执行。 3、怎么解决这个问题? 4、先设置回调,再去连接rtsp。 但是这里不行,因为必须先连接rtsp,获取到一个句柄,再根据这个句柄来设置回调。 5、换一种方法: 在onConnect的时候,检查回调方法,不为NULL,直接回调。为NULL,保存消息,等到设置回调的时候,再找到信息回调上去。 这还会存在问题,因为这里需要一个集合保存消息,为了同步,需要对集合加锁。 考虑:网络线程onConnect上来,这时候没有设置回调,试图去保存消息,对集合尝试加锁。 但是,但是,但是,这时候主线程设置了回调,尝试到集合中取出消息,先对集合加锁成功,找不到消息,没有回调成功,释放锁。 然后网络线程获取到锁,进去保存消息,但是这时候已经没有办法回调成功了。 6、怎么解决上面的问题? 问题的根源是:onConnect在设置回调之前。根本的解决办法是: connect是一个假动作,把句柄返回出来,然后设置回调,设置回调之后,再进行真正的connect操作。 真正的connect操作放在设置回调之后。
理解CComProcessorBase类 |
1、CComProcessorBase是底层库的基类,对网络库的封装,处理收到的数据,并发送数据。处理缓冲区收到的数据有两种方式: a、创建一个线程,while(1)循环,调用run(),sleep(5),run调用HeartBeat b、创建一个线程,调用run(),run调用HeartBeat,下一次再调用run,通过外部触发,比如发送一个信息,处理缓冲区收到的数据。 注意:HeartBeat从缓冲区取数据处理。可以一次取一条数据,也可以把当前缓冲区的数据全部取完处理。 2、上面两种方式的使用场景不同: 对于while循序,不管什么情况,定时处理缓冲区收到的数据,缺点是有些循环,调用run是没有必要的,因为缓冲区不可能有数据。 对于第二种方式,使用场景是,触发一下,从缓冲区取数据。比如,我知道如果我不发数据,缓冲区不可能收到数据, 没有必要while循环去处理缓冲区数据,因为不可能有数据。解决办法是:我发一次数据,期望收到数据。 过段时间,这个时候才调用run处理 缓冲区数据。
1、CComProcessorBase发送消息的时候,不是自己直接发送。策略是: a、通过SetReciver设置一个默认的接收者 b、SendMsg的时候,也指定一个接收者,如果没有指定接收者,使用默认的接收者 然后接收者直接PushMsg,放入自己的MsgPool 2、一个进程中,会有多个线程,也就是模块,模块之间会彼此发送消息。解决办法是: 创建一个Manager,维护所有的模块,这些模块向Manager注册,接收者都设置为Manager, 并且Manager定义为转发者,调用DispatchMsg。 这些模块发送消息,接收者是Manager,并且Manager是转发者,Manager并不把消息放到自己的MsgPool, 而是根据Dst,找到对应的模块,这个模块把消息Push到自己的MsgPool 3、CComProcessorBase处理消息有两种方式: a、定时处理,HeartBeat b、事件触发 4、定时处理的流程: 启动线程threadfuncNoEvent-->While循环(内部run,然后sleep),也就是死循环,定时run -->run-->Heartbeat-->从MsgPool取出消息-->DealMsg(msg) 也就是:定时从pool取出消息处理,sleep一会,继续 5、事件触发的流程: 启动线程threadfunc-->m_evnetNotify调用open-->然后设置回调m_evnetNotify.setCallback(event_notify_callback, args) 每次PushMsg(把消息Push到自己的MsgPool),都会调用m_evnetNotify.notify(),让它回调上来,回调上来的时候 -->run-->Heartbeat-->从MsgPool取出消息-->DealMsg(msg) 也就是:每次Push一个消息(包括请求,回复,通知消息),通知event让它回调。 回调的时候从pool取出消息处理,sleep一会,继续 6、事件触发的方式,为什么不直接处理?而要绕一个圈子:先放进去,通知回调,回调的时候再取出来处理? 为了接口统一化,其它模块只需要Push到当前模块就好了,后面是当前模块的处理。 7、之前vmu与vms交互,发现回调上来的序号不对,原因是: 使用了事件触发的模式,但是想vms发送消息,并没有PushMsg的过程,不能正确地触发回调上来。 重写了run,调用CComProcessorBase::Heartbeat(),并且调用m_cmdChannel->Heartbeat()等。
- 参见
理解rtsp的传输模式 |
1、传输码流方式有以下几种: enum RtpType { RTP_OVER_UDP, RTP_OVER_TCP, RTP_OVER_RTSP, OTHERS_OVER_TCP, OTHERS_OVER_UDP, }; RTP_OVER_UDP:9836端口的tcp连接走rtsp流程,沟通udp端口走码流 RTP_OVER_TCP:9836端口的tcp连接走rtsp流程,沟通tcp端口走码流 RTP_OVER_RTSP:9836端口的tcp连接走rtsp流程,同时码流也在这个连接上传输 也就是说,rtsp信令(setup,play,teardown)走在一个tcp连接上,码流可以新建一个udp连接,或者新建一个tcp连接, 或者码流和信令走同一个连接。 注:Others是指PS码流,PGPS是指对PS码流封装了一些头部信息,主要是一些属性。 2、客户端StartVideo的时候,指定传输协议enumTransProto _transProto; typedef enum { ST_TCP, ST_UDP, ST_RTSP }enumTransProto; 在创建RtspClientKit的时候,根据_transProto转化为RtpType 3、rstp分为服务端和客户端,vru存在一个问题: 在rtsp连接,onAccept以后,创建自增Id,然后new MediaSession用于发送码流。 这就导致一个问题,一个rtsp的9836连接,只能回放一路视频。 而vtdu从vru回放视频的时候,只建立一个rtsp连接,用于多路视频的回放,这就存在问题了。 4、怎么解决呢?
理解消息模式的线程处理 |
1、我们的程序是基于消息,内部流程都是异步的,只要把消息发给目标就好了。 2、什么要发消息给目标,而不是关联目标,直接调用目标的处理方法? 如果使用后者,目标直接处理可能会比较耗时,也就是说卡在这里,这就是同步处理了。 而发送消息给目标是异步的,当前模块只管发送请求给目标(不会卡主),目标处理完了,会发送回复,当前模块收到回复,然后处理。 也就是说当前模块不会卡主,但是目标收到的请求比较多,按顺序处理,会导致当前模块收到回复比较慢。 3、这里还有一个问题,发送消息给目标,如果当前模块关联目标,就会导致耦合性很高,因为彼此之间都会有关联,形成蜘蛛网。 解决办法是:使用中介者模式,每个模块向中介者注册,发送消息的时候,只要标识消息的目标(也就是接收者), 发给中介,中介进行转发。 这是在进程中的多个模块之间进行处理,如果通过网络发给对端,需要使用outprocess负责与外部进行交互。 一方面,消息接收者需要指定outprocess,同时需要指定消息中的协议的peer值,这个peer对应socket连接的句柄, 这样才能通过网络发出去。 4、服务内部都是异步的,客户端有时候需要调用同步接口,怎么处理? 使用PV操作。客户端调用同步接口,发送请求的时候,创建一个信号量,保存到map中,key是请求的seq,然后进行P操作。 另外有一个线程,处理回复,收到回复之后,把回复保存在map中,同时根据回复的seq(等于请求的seq)找到信号量,进行V操作。 在这个时候,上面P操作就可以进行下去,然后到map中找到回复,返回给客户端。 特备注意:这里的PV操作必须是两个线程,否者死锁。 如果是一个线程,当前的P操作卡住,等待V操作,而V操作代码在P操作代码下面,等待P操作的执行,形成死锁。 5、目标处理消息,要启动一个线程,启动线程有两种方式: a、基于event,先设置回调,每次pushMsg的时候,调用notify,然后就会回调上来。 b、使用while死循环,处理消息。 这两种启动方式在CComProcessorBase体现 // 启动线程,默认0启动EventManager 1启动while 调用 run virtual bool start(int iStartFlag = 0); 6、每个目标启动一个线程,存在问题: 目标很多,特别是存在设备的情况下,线程特别多。我们知道一个线程的开销还是比较大的 (在windows下线程栈默认是1M,Linux下线程栈默认是10M,可以使用ulimit -s设置)。 7、怎么解决上面的问题? 不能每个目标启动一个线程,而是创建一个线程,管理一组目标,由这个线程驱动,使用while死循环,遍历目标, 调用每个目标的heartbeat处理消息。也就是说,消息还是发给目标,但是目标不会主动处理消息,要靠外部线程驱动。 也就是说,目标处理消息,有两种驱动方式: a、自身驱动,线程个数比较多 b、外部驱动,一个线程管理一组目标。 这两种方式在CDeviceBase: public CComProcessorBase中体现 virtual t_int32 startDriver(void)=0; 自身驱动在startDriver中调用 ibpThreadBase::start(); 否者就是工作线程驱动。 8、对于外部线程驱动的流程是:调用适配器管理器的时候,会初始化登录线程和工作线程的个数。 添加一个设备,会分配给一个负载最小的登录线程。一个登录线程管理一组设备,对于登录失败的设备,sleep一会,下次继续登录。 对于登录成功的设备,从登录线程移除,放入到负载最小的工作线程。以后消息还是直接发给设备,但是设备不会主动的处理消息,而是由外部线程驱动。
- 参见
网络数据的打包解析 |
1、一个进程内的线程之间通信,没有经过网络传输数据,不需要打包解析。 2、如果通过网络传输数据,需要打包解析。TCP和UDP是一种流协议,没有报文边界的概念,网络上传输的都是字节流。 3、应用程序发送数据和接收数据都是直接对结构体操作,结构体提供打包和解析的接口。 发送数据:应用程序把结构体传给底层库,底层库调用结构体的打包接口,然后把字节流发出去。 接收数据:底层库收到数据,对数据进行解析,得到一个结构体,回调给应用程序。 4、底层库负责发送和接收数据,但是不知道怎么打包和解析,因此会关联一个抽象的解析器。 应用程序需要提供一个具体的解析器,让底层库来调用。 5、底层库关联peParser,peParser关联prtlCallbackIIbp* m_protocolParser; 而class dhParser: public prtlCallbackIIbp 在与外部通信的时候,创建socket的时候,指定协议库pSocketInfo->_dllNames.push_back("new_dhtp_protocold.dll"); 动态加载,根据协议库的dll名称,创建出m_protocolParser = new dhParser(ipbProtocol,dllNames); 6、收到数据,底层库处理Input 会执行 dhParser::onPacket --> int ret = protocol->deserialize(obj); 解析数据 发送数据,outprocess发送一个消息,ibpSocket::sendMsgPackage --> ibpNet::sendPackage --> copyPro->serialize(); 打包发出去。
1、ibpNet关联底层库,把peParser传递过去,peParser关联ptrlCallbackIIbp,ptrlCallbackIIbp是解析类的基类。 2、ptrlCallbackIIbp的子类有dhParser,ScsCmdParser,PioneerParser,解析器和协议库在一起。 通过接口create_parser 暴露出去 3、以ScsCmdParser为例,ScsCmdParser关联ibpProtocolI(也就是ibpSocket),ibpProtocolI暴露接口onPDU ibpProtocolI的子类有ibpSocket和ibpCom 4、outprocess关联一组ibpSocket,创建解析器的时候,把ibpSocket传递过去。 5、底层库收到数据,onData找到peParser,找到ScsCmdParser,网络数据解析完成,调用ibpSocket的onPDU 6、ibpSocket的onPDU回调给outprocess,outprocess把消息Push到自己的队列中,heartbeat来处理。 7、scsDevice继承CComProcessorBase,在虚方法DealWithOneRequest处理解析上来的消息。 8、注意:在ScsCmdParser,对于收上来的数据,每个连接(Peer)有一个自己的接收缓冲区。 当数据累积到一个完整的消息(消息头包含消息长度),解析,回调上去。
设备增删改的影响 |
1、存在问题: 在设备存在的情况下,配置了很多信息,比如上墙的设备,配置了预置位,预置位配置了测温对象,现在设备要删除, 已经配置好的信息怎么办? 2、根据实际的需求来处理。有些情况,不允许删除设备,先解除关联,才能删除。有些情况,把关系的信息一起删除。
- 参见
负载均衡 |
1、解决什么问题? 一个域(可以理解为区域)下面有很多设备,需要对这些设备管理,比如登录设备,控制设备,接收告,这些都是dmu服务的功能。 问题是设备很多,一个dmu管理不过来,需要一组dmu分布式部署,共同管理这些设备。这些dmu之间,构成一主多从负载均衡的关系, master由cmu来指定,slave去连接master。 存在两种情况: a、自上而下的控制命令,客户端发送控制命令给master,master找到负责的salve,把命令发给slave,再原路返回, 类似LVS的NAT模式(Network Address Translation,网络地址转换)。 b、自下而上的告警收集,客户端向master请求所有的slave,然后分别去连接,slave把告警发给客户端, 类似LVS的IP隧道模式(IP Tunneling,IP隧道模式)。 还有一种DR模式(Direct Routing,直接路由模式),和IP隧道模式类似,比IP隧道模式更为直接和底层,DR模式直接修改Mac地址。 再考虑级联的情况,dmu除了直接管理的当前域设备,还要管理下级域的设备。有多个下级域,每个下级域都有一组dmu服务。 举个现实的例子,一个省有一个省长,多个副省长。下面有多个市,每个市都有一个市长,多个副市长。 对于当前域,负载均衡的输入是当前域的一组dmu,和总的设备。总的设备包括:当前域的设备, 和各个下级域的一组dmu服务以及对应管理的设备。这里有分为两种情况: 对下级域的所有设备拿过来分,还是把下级域的dmu服务作为一个设备。考虑下级的一个dmu负责100个设备,上级两个dmu负载, 理论上应该是每个dmu负责50个。实际上没有必要,可以上级一个dmu负责整个下级dmu,另外一个dmu负责为空。 因为瓶颈在下级dmu,上面dmu负载均衡也没有特别的意义。 2、怎么解决? 3、不考虑权重,没有记忆功能,每个dmu应该负责的设备个数 (DevSize-1)/ServSize +1,直接分配。但是这样存在问题: 比如当前两个dmu,60个设备,负载后,d1[1-30],d2[31-60],现在多部署一个dmu,负载后 d1[1-20],d2[21-40],d3[41-60],但是这样会导致: d1登出10个设备 d2登出20个设备,同时登入10个设备 d3登入20个设备 性能不好,理论上应该是d1[1-20],d2[31-50],d3[21-30,50-60], 4、这就需要有记忆功能。思路是:第一个平均分配,以后每次负载都要参考上一次的负载均衡结果。分为以下情况: a、设备增加,计算出每个dmu应该负责多少设备,遍历dmu,不够的补充。 b、设备删除,去掉设备,计算出每个dmu应该负责多少设备,遍历dmu,逐个收集,然后再逐个补充。也就是,损有余,补不足。 c、dmu增加,计算出每个dmu应该负责多少设备,遍历dmu,逐个收集,然后再逐个补充。 d、dmu删除,负责的设备拿出来,再逐个补充。 5、上面没有考虑权重,有的设备只有一个通道,而有的设备可能有几十个通道。他们的权重不一样,必须考虑。 思路:应该负责多少个,当前负责多少个,找出一个最接近的补充上,继续。 考虑极端的情况,权重分别为:100,4,3,2,1 3个dmu负载。理想的结果是 d1[100], d2[4,1], d3[3,2] 按照上面的思路,应该负责37,结果是d1[100], d2[4,3,2,1], d3[] 但是这没有关系,因为瓶颈在d1,d2,d3分配的再均衡也没有意义。 6、考虑权重,再考虑记忆功能,和上面的情况类似。 7、负载均衡,由master调用,好了之后,告诉每个slave负责的设备,slave登入登出处理。
- 参见
负载均衡的两种场景 |
1、考虑dmu的负载均衡,也就是M个dmu去分N个设备。Master进行负载均衡,计算每个Slave负责的设备。 分好之后,通知每个Slave,然后Slave对负责的设备进行登录。 2、问题来了,当设备增删改的时候,怎么办? 设备的增删要重新负载均衡,Master重新负载均衡之后,再次通知Slave,Slave根据上次负责的设备和这一次负责的设备, 计算出应该登录哪些设备和应该登出哪些设备,进行处理。设备的修改,Master通知对应的Slave,Slave根据具体情况处理, 如果登录信息发生变化(比如ip、port、登录名和密码)需要登出老的,重新登录。 3、考虑vru的负载均衡,vru负责录像和回放。开始的时候是不知道要进行哪些录像和回放,是动态变化的。 处理策略是每次收到一个新的录像或者回放的请求,找出当前负载最小的vru,发给对应的vru。 4、这里存在问题,负载均衡的时候,需要知道每个vru的负载信息。怎么处理? 一种办法是:可以在Master端进行管理,发给对应Slave录像或者回放的时候,把Slave的负载加1。 当Slave完成录像或者回放的时候,对应Slave的负载减1. 另一种办法是:Slave端保存自己的负载信息,定时向Master汇报自己的负载信息,Master端进行更新。
连接和登录的区别 |
1、连接就是网络层建立tcp或者udp连接 2、登录是指在连接成功之后,输入用户名和密码
项目中大数据的处理 |
1、主要是实时数据和告警。以告警为例,自下而上收到的告警非常多,数据量非常大。怎么解决? 2、从源头上过滤,一段时间的重复告警过滤掉,过滤策略可以针对告警类型,也可以细化到具体的设备。 比如移动侦测告警,小区门口特别多,仓库里就很少。小区门口,10分钟内的告警过滤掉,仓库的内告警不过滤。 3、按照客户端的订阅,来上报给客户端。减少网络流量,进入页面订阅,离开取消。可以订阅具体的设备,也可以订阅组织。 4、告警的写入,缓存起来,批量写入。 5、历史告警会查询,因为查询条件肯定带着时间段。因此按时间段建立分区表,同时动态管理分区表,老的分区删除,建立新的分区。 6、实时数据的平均值,比如温度,求一天每个小时的平均问题,一个月每天的平均温度。如果直接来求,效能太差。解决办法是: 利用mysql的时间调度器,定时求出平均值放入平均值表中,以后直接查询平均值表。
- 参见
Copyright (c) 2015~2016, Andy Niu @All rights reserved. By Andy Niu Edit.