Giter Site home page Giter Site logo

lihuanghe / smsgate Goto Github PK

View Code? Open in Web Editor NEW
853.0 69.0 425.0 6.33 MB

这是一个在netty4框架下实现的三网合一短信网关核心框架,支持(cmpp/smpp3.4/sgip1.2/smgp3) 短信协议解析,支持长短信合并和拆分,也支持wap短信和闪信。

License: Apache License 2.0

Java 100.00%
cmpp smpp netty4 sms sgip smgp

smsgate's Introduction

技术问题请加QQ群

qq 20180420170449

群名称:cmppGate短信
群 号:770738500

How To Use

<dependency>
  <groupId>com.chinamobile.cmos</groupId>
  <artifactId>sms-core</artifactId>
  <version>2.1.13.5</version>
</dependency>

常见问题

  • 纯客户端发送短信

    可以使用sms-client, 一个纯发送短信的客户端jar包,Api简单。【sgip协议用sms-client无法接收上行和状态报告】

    也可以参考htt2cmpp 实现将一个短信长连接协议封装成http接口。

    或者参考smsServer用SpringBoot实现一个能支持http,cmpp,sgip,smgp,smpp等多种协议的网关服务。

  • 没看懂如何发送短信?

    短信协议是tcp长连接,类似数据库连接,如jdbc-connection. 所以发送短信前必须要先有一个短信连接。因此你需要在程序启动时建立短信连接。参考demo里的client,调用manager.openEntity()方法,,调用manager.startConnectionCheckTask()开启断线重连。 然后就像调用其它库一样,在需要发送短信的地方,new 一个对应的Message,调用

    List< Future > f = ChannelUtil.syncWriteLongMsgToEntity([clientEntityId],message)方法发送,要判断f是否为Null,为Null表示发送失败,一条短信可能拆分成多条,因此返回List

  • 如何发送长短信?

    smsgate默认已经处理好长短信了,就像发送普通短信一样。长短信发送的时候,框架内部自动拆分成短短信分片发送(一般按67个汉字拆分)。

  • 如何发送闪信?

  //创建一个闪信对象,跟发送普通短信一样
  CmppSubmitRequestMessage msg = CmppSubmitRequestMessage.create(phone, "10690021", "");
  msg.setMsgContent(new SmsTextMessage("你好,我是闪信!",SmsAlphabet.UCS2,SmsMsgClass.CLASS_0));  //class0是闪信
  • 如何接收短信?

    如果你了解netty的handler,那么请看AbstractBusinessHandler的源码即可,这是一个netty的handler.

    如果你不了解netty, 你只需知道:

    当连接刚刚建立时[指登陆验证成功],smsgate会自动调用handler里的userEventTriggered方法,因此在此方法中可以开启一个Consumer去消费MQ里的消息发送到网络连接上;

    当对方发送任意一个消息给你时[包括request,response消息],smsgate会自动调用handler里的channelRead方法,因此可在此方法内接收消息并作处理业务,但避免作非常耗时的操作,会影响netty的处理效率,甚至完全耗完netty的io线程造成消息不响应。在channelRead方法里能获取接收到的消息对象,同时通过本Handler的 getEndpointEntity()方法,或者 ctx.channel().attr(GlobalConstance.entityPointKey).get();能够获取该消息的发送方账号实体Entity对象。

    当连接关闭时,smsgate会自动调用handler里的channelInactive方法,可在此方法中实现连接关闭后的一些清理操作。

  • 如何不改源码,实现修改框架默认的handler

    比如SGIP协议要设置NodeId;你需要这样做:

    1、写一个扩展的SgipClientEndpointEntity子类,如:MySgipClientEndpointEntity,重写buildConnector()方法

    2、再写一个SgipClientEndpointConnector子类,如:MySgipClientEndpointConnector,重写doinitPipeLine()方法

    3、最后再写一个SgipSessionLoginManager子类,如:MySgipSessionLoginManager,重写doLogin方法,实现登陆方法的重写,在方法里创建自己定义的实现。

    4、最后在openEntity通道里,new MySgipClientEndpointEntity就可以了

  • 使用 http 或者 socks 代理

    SmsGate支持HTTP、SOCKS代理以方便在使用代理访问服务器的情况。代理设置方式:

	// 无username 和 password 可写为  http://ipaddress:port
	client.setProxy("http://username:password@ipaddress:port");  //http代理
	client.setProxy("https://username:password@ipaddress:port");  //https代理
	client.setProxy("socks://username@ipaddress:port");  //socks4代理
	client.setProxy("socks4://username@ipaddress:port");  //socks4代理
	client.setProxy("socks5://username:password@ipaddress:port");  //socks5代理

  • 抓包,打印二进制的收发日志

    框架使用 entity.[EntityId]的loggerName打印该 EntityID上所有的收发记录。

    Debug 级别打印短信消息对象的toString内容。

    Trace 级别打印短信消息对象的二进制内容。

    如:针对cmppclientEntityId 通道的 logback.xml , log4j2.xml配置

    <logger name="entity.cmppclientEntityId" level ="debug" additivity="false">
	</logger>
如: log4j.properties 配置 
   log4j.logger.entity.cmppclientEntityId=debug
  • 在Java9以上版上运行
	在java9以上运行,启动java进程要增加以下参数:
	
--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.math=ALL-UNNAMED  
--add-opens java.base/java.util=ALL-UNNAMED  
--add-opens java.base/java.util.concurrent=ALL-UNNAMED  
--add-opens java.base/java.net=ALL-UNNAMED   
--add-opens java.base/java.text=ALL-UNNAMED 
--add-exports java.base/sun.security.x509=ALL-UNNAMED 

新手指引

  • 先看doc目录下的CMPP接口协议V3.0.0.doc文档 (看不懂的到群里咨询)
  • 再看readme里的说明 (看不懂的到群里咨询)
  • 导入工程后,运行测试demo: TestCMPPEndPoint,学会配置账号密码等参数
  • 由于代码是基于netty网络框架,您有必要先有一些Netty的基础

开发短信网关的常见问题

  • 长短信拆分合并原理

短信支持长短信功能是在手机终端实现的,即:手机陆续收到多个短信片断后,会根据短信PDU里的前6个byte信息进行合并。最终在手机上显示为一条短信,但实际却是接收了多条短信(因此收多条的费用)。

因此,长短信在发送时要进行拆分。在开发短信网关时,由于要对短信内容进行校验,拼签名等处理,因此在接收到短信分片后,要进行合并成一条处理,之后发送时再拆分为多条(当然有可能始终只收到一个片断,造成永远无法合并成一条完整的短信)。

短信内容(PDU)字段的前6字节是长短信的协议头(其余内容才是短信文本),前3个字节固定是 0x050003,后3个字节用来做长短信合并的依据(类似IP包的分片)

1字节 包ID[最大255],
1字节 包总分片数
1字节 分片序号

如:45,03,01表示ID为45的第1个分片,总共3个分片。45,03,02表示ID为45的第2个分片,总共3个分片。 当手机收到完整的3个分片后,手机才进行合并显示。

  • 使用redis实现集群长短信合并

框架内部自带一个JVM内存缓存(Guava Cache)的LongMessageFrameCache类,用于保存未完成合并的短信片断。 但集群(多进程,多节点)部署服务时,有可能从不同的节点上(主机上)接收到同一个长短信的不同片断,此时框架默认的JVM内存缓存无法完成长短信合并。 为解决此问题,框架使用SPI机制加载LongMessageFrameCache的实现类,业务侧以SPI方式提供Redis版的LongMessageFrameProvider实现类。 为了让业务自制的LongMessageFrameProvider实现类生效, 要确保业务自制的LongMessageFrameProvider实现类 order() 大于0 。框架优先使用order最大的实现。

具体为:

  1. 打开该通道账号的配置 EndpointEntity.isRecvLongMsgOnMultiLink属性,用于标识该通道的长短信要使用集群部署的长短信合并能力(由于只有少量系统有此问题,不需要所有账号打开该特性,会影响合并性能)。

  2. 提供一个Redis 的合并实现类,可以参考测试包中的代码:RedisLongMessageFrameCache , RedisLongMessageFrameProvider

  • 网关服务前边有nginx,haproxy代理的时候如何获取真实的客户端IP?

首先感谢群友 狠人 提供了使用proxy protocol协议支持从代理服务器获取真实客户IP的思路。

针对ServerEndpoint ,通过设置setProxyProtocol(true) 开启proxy protocol协议开关。框架从channel上第一个消息(HAProxyMessage)获取真实的客户端IP后,设置到channel的 Attribute属性上。业务代码可以从 channel.attr(GlobalConstance.proxyProtocolKey) 获取该信息,从而拿到真实的客户IP. 该特性使得短信网关的集群部署架构更为灵活,比如:服务入口使用nginx,haproxy等代理,真实网关服务以集群的方式部署在后端,横向扩展。

  • 如何关联状态报告【即短信回执,以下都称为状态报告】和submit消息?

运宽商网关响应submitRequest消息时,你会收到submitResponse消息。在response里会有msgId。通过这个msgId跟之后收到的状态报告(reportMessage)里的msgId关联。

  • 如何记录每个消息的发送日志,并向我的客户发送状态报告【即短信回执,以下都称为状态报告】?

当接收到来源客户的submitRequest消息后,要回复response,注意此时要记录回复response时所使用的msgId,即你回复给来源客户的msgId

将消息转发给通道后,当接收到submitResponse后,通过response.getRequest()获取对应的request 。注意此时有两个msgID,一个是通道给你的msgID,一个是你给来源客户的。在数据库里记录相关信息(至少包括消息来源客户,消息出去的通道,两个msgId,消息详情)。之后在接收到状态报告后,通过通道给你的msgId更新消息状态报告里的msgId,并根据来源客户将状态报告回传给客户,注意回传reportMessage里的msgId要使用你给客户回复response时用的msgId. 详见流程图

  • 关于长短信类 LongSMSMessage 中 UniqueLongMsgId 的使用

由于cmpp,sgip等短信协议的异步化特点,框架默认实现长短信的拆分与合并,接收Sp发送的MT消息并匹配上游状态报告【即短信回执,以下都称为状态报告】时,由于缺少短信唯一标识,从Sp接收的短信和最终发送给运营商的短信之间没有关联标识, 造成状态报告回来时难以匹配,实现起来很复杂。为了解决cmpp协议的接收的短信与发送出去的短信关联问题,给长短信增加了这个UniqueLongMsgId。 对http协议接收的短信同样可以使用UniqueLongMsgId: 通过http接收的长短信对象在发送到cmpp协议的短信通道连接以前是没有UniqueLongMsgId的,发送以后框架会设置UniqueLongMsgId 的值 。因此可以在发送完成收到response后通过response.getRequest()获取Request对象从而拿到UniqueLongMsgId

UniqueLongMsgId 中 id 是唯一标识,即使在极短时间内收到相同手机号端口号的短信也能保持唯一性。该ID当短信从网络上接收到还未合并时进行设置,直到转发给运营商通道都不会变化,并且相同长短信的不同分片的ID也相同。

UniqueLongMsgId 除了 id 以外,还包含其它信息如:从消息从哪个通道账号Id提交的,从哪个IP端口提交的、长短信的分片ID、总分片数、分片序号以及消息序列号、时间戳。 在Test包里有一个模拟的匹配状态报告的测试用例用是用 UniqueLongMsgId 实现的,并且经过相同手机号、端口号在极限并发压力下的匹配测试,单JVM多线程安全。逻辑供参考: com.zx.sms.transgate.TestReportForward

  • 集群环境如何平均分配上游连接数?

网关平台通常会有多个服务节点,而对接的通道给的连接数通常不是服务节点数的整倍数,极端情况连接数小于服务节点数,这样如何平均分配连接数就成了一个问题。

这里介绍一个算法:通过在redis里记录 {全局的服务节点列表},来计算每个服务节点连几个tcp连接。

var curNodeIndex = getCurNodeIndexFromRedis(thisNode); // 当前节点在全局服务节点的排序号,{0,1,2,3,...}
var cntNode ; // 从Redis里获取的总的服务进程节点数
//所有短信通道,逐一计算每一个通道,在当前节点上最大允许的tcp连接数, 
allEntityPointList.foreach(e->{
	var curEntityIndex = getCurEntityIndex(e); //所有短信通道根据Id排序后,当前通道的排序号,{0,1,2,3,4,5,6,7,...}
	var curMaxChannel = e.getMaxChannel(); //当前通道全局允许的最大连接数
	
	//连接数不是服务节点数的整倍数,按服务节点数平均分配后一定会有余数, 按当前节点的排序号先后把余下的连接数分完。
	//但是服务节点排序号是固定不变的,这样排序号靠前的节点总是优先分到余下的连接数,造成全局通道总连接数分配不均,因此要结合"当前通道的排序号" 对 "服务节点排序号"进行位移
	//因此,当"服务节点"或者"全局通道账号"有任一个变化时,都会影响连接的分配。
	var shiftNodeIndex = (curNodeIndex + curEntityIndex) % cntNode;
	//平均分配后,余下的连接数
	var remainderChannel = curMaxChannel % cntNode;
	//平均分配连接数
	var hostMaxChannel = curMaxChannel / cntNode;
	//余数处理
	if(remainderChannel > 0 && shiftNodeIndex < remainderChannel){
		hostMaxChannel = hostMaxChannel + 1;
	}
	var hostChannel = getConnectionCountAtCurrentNode(e);   //当前通道在本节点上的连接数
	if(hostChannel < hostMaxChannel){
		openChannel(e); //新建一个连接
	}else{
		//关闭该通道超过数量的连接
		closeSomeChannel(e,hostMaxChannel - hostChannel);
	}
});

CMPPGate , SMPPGate , SGIPGate, SMGPGate

中移短信cmpp协议/smpp协议 netty实现编解码

这是一个在netty4框架下实现的cmpp3.0/cmpp2.0短信协议解析及网关端口管理。 代码copy了 [email protected] 基于netty3.7的cmpp协议解析 [email protected] 的代码

目前已支持发送和解析长文本短信拆分合并WapPush短信,以及彩信通知类型的短信。可以实现对彩信或者wap-push短信的拦截和加工处理。wap短信的解析使用 smsj 的短信库

cmpp协议已经跟华为,东软,亚信的短信网关都做过联调测试,兼容了不同厂家的错误和异常,如果跟网关通信出错,可以打开trace日志查看二进制数据。

因要与短信中心对接,新增了对SMPP协议的支持。

SMPP的协议解析代码是从 Twitter-SMPP 的代码 copy过来的。

新增对sgip协议(联通短信协议)的支持

sgip的协议解析代码是从 [email protected] 的代码 copy过来后改造的。

新增对smgp协议(电信短信协议)的支持

smgp的协议解析代码是从 SMS-China 的代码 copy过来后改造的。

支持发送彩信通知,WAP短信以及闪信(Flash Message):

性能测试

在48core,128G内存的物理服务器上测试协议解析效率:35K条/s, cpu使用率25%.

Build

执行mvn package . jdk1.6以上.

增加了业务处理API

业务层实现接口:BusinessHandlerInterface,或者继承AbstractBusinessHandler抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。

如何实现自己的Handler,比如按短短信计费

参考 CMPPChargingDemoTest 里的扩展位置

实体类说明

CMPP的连接端口

com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个Tcp连接的发起端,或者接收端。用来记录连接的IP.port,以及CMPP协议的用户名,密码,业务处理的ChannelHandler集合等其它端口参数。包含三个子类:

  1. com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个List属性。 一个服务端口包含多个CMPPServerChildEndpointEntity端口

  2. com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含CMPP连接用户名,密码,以及协议版本等信息

  3. com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含CMPP连接用户名,密码,以及协议版本,以及服务端IP.port. 用于连接服务端

端口连接器接口

com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个EndpointConnector.当CMPP连接建立完成,将连接加入连接器管理,并给pipeLine上挂载业务处理的ChannelHandler.

  1. com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的open()调用netty的ServerBootstrap.bind()开一个服务监听

  2. com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集CMPPServerChildEndpointEntity端口下的所有连接。它的open()方法为空.

  3. com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类open()调用netty的Bootstrap.connect()发起一个TCP连接

端口管理器

com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。

CMPP协议的连接登陆管理

com.zx.sms.session.cmpp.SessionLoginManager 这是一个netty的ChannelHandler实现,主要负责CMPP连接的建立。当CMPP连接登陆成功、会话建立完成后,会调用EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给channel的pipeline上挂载业务处理的Handler,最后触发 SessionState.Connect事件,通知业务处理Handler连接已建立成功。

CMPP的连接状态管理器

com.zx.sms.session.cmpp.SessionStateManager 这是一个netty的ChannelHandler实现。负责每个连接上CMPP消息的存储,短信重发,流量窗口控制,过期短信的处理

CMPP协议解析器

CMPP20MessageCodecAggregator [2.0协议] CMPPMessageCodecAggregator [这是3.0协议] 聚合了CMPP主要消息协议的解析,编码,长短信拆分,合并处理。

短信持久化存储实现 StoredMapFactory

使用BDB的StoreMap实现消息持久化,防止系统意外丢失短信。

程序启动处理流程

  1. 程序启动类 new 一个CMPPEndpointEntity的实体类并设置IP,port,用户名,密码,业务处理的Handler等参数,
  2. 程序启动类 调用EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器
  3. 程序启动类 调用EndpointManager.openAll()或者EndpointManager.openEndpoint()方法打开端口。
  4. EndpointManager会调用EndpointEntity.buildConnector()创建一个端口连接器,并调用EndpointConnector.open()方法打开端口。
  5. 如果是CMPPClientEndpointEntity的话,就会向服务器发起TCP连接请求,如果是CMPPServerEndpointEntity则会在本机开启一个服务端口等客户端连接。
  6. TCP连接建立完成后。netty会调用EndpointConnector.initPipeLine()方法初始化PipeLine,把CMPP协议解析器,SessionLoginManager加到PipeLine里去,然后netty触发ChannelActive事件。
  7. 在SessionLoginManager类里,客户端收到ChannelActive事件后会发送一个CMPPConnnect消息,请求建立CMPP连接.
  8. 同样在SessionLoginManager.channelRead()方法里,服务端会收到CMPPConnnect消息,开始对用户名,密码进行鉴权,并给客户端返回鉴权结果。
  9. 鉴权通过后,SessionLoginManager调用EndpointConnector.addChannel(channel)方法,把channel加入ArrayList,并给pipeLine上挂载SessionStateManager和业务处理的ChannelHandler,如心跳处理,日志记录,长短信合并拆分处理类。
  10. EndpointConnector.addChannel(channel)完成后,SessionLoginManager调用ctx.fireUserEventTriggered()方法,触发 SessionState.Connect事件。

以上CMPP连接建立完成。

  1. 业务处理类收到SessionState.Connect事件,开始业务处理,如从MQ获取短信下发,或开启Consumer接收MQ推送的消息。
  2. SessionStateManager会拦截所有read()和write()的消息,进行消息持久化,消息重发,流量控制。

增加同步调用api

smsgate自开发以来,一直使用netty的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到response。因此增加一个同步调用的api。即:发送消息后等接收到对应的响应后才返回。 使用方法如下:

	//因为长短信要拆分,因此返回一个promiseList.每个拆分后的短信对应一个promise
	List<Promise> futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
	for(Promise  future: futures){
		//调用sync()方法,阻塞线程。等待接收response
		future.sync(); 
		//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
		if(future.isSuccess()){
			//打印收到的response消息
			logger.info("response:{}",future.get());
		}else{
			打印错误原因
			logger.error("response:{}",future.cause());
		}
	}

	//或者不阻塞进程,不调用sync()方法。
	List<Promise> promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
	for(Promise  promise: promises){
		//接收到response后回调Listener方法
		promise.addListener(new GenericFutureListener() {
			@Override
			public void operationComplete(Future future) throws Exception {
				//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
				if(future.isSuccess()){
					//打印收到的response消息
					logger.info("response:{}",future.get());
				}else{
					打印错误原因
					logger.error("response:{}",future.cause());
				}
			}
		});
	}

CMPP Api使用举例

public class TestCMPPEndPoint {
	private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);

	@Test
	public void testCMPPEndpoint() throws Exception {
		ResourceLeakDetector.setLevel(Level.ADVANCED);
		final EndpointManager manager = EndpointManager.INS;

		CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();
		server.setId("server");
		server.setHost("127.0.0.1");
		server.setPort(7890);
		server.setValid(true);
		//使用ssl加密数据流
		server.setUseSSL(false);

		CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();
		child.setId("child");  //自定义通道账号ID,保持全局唯一
		child.setChartset(Charset.forName("utf-8"));
		child.setGroupName("test");   //自定义通道账号分组ID,用于对通道标识不同组,方便路由实现
		child.setUserName("901783");  //通道账号,可能和企业代码相同
		child.setPassword("ICP001");  //密码

		child.setValid(true);
		child.setVersion((short)0x30);   //协议版本号,48是3.0 协议,32是2.0协议

		child.setMaxChannels((short)4);
		child.setRetryWaitTimeSec((short)30);
		child.setMaxRetryCnt((short)3);
		child.setReSendFailMsg(true);
//		child.setWriteLimit(200);
//		child.setReadLimit(200);
		List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
		serverhandlers.add(new CMPPMessageReceiveHandler()); //在这个handler里接收短信
		child.setBusinessHandlerSet(serverhandlers);
		server.addchild(child);
		
		manager.addEndpointEntity(server);
	
		CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();
		client.setId("client");   //自定义通道账号ID,保持全局唯一
		client.setHost("127.0.0.1");
//		client.setLocalhost("127.0.0.1");
//		client.setLocalport(65521);
		client.setPort(7890);
		client.setChartset(Charset.forName("utf-8"));
		client.setGroupName("test"); //自定义通道账号分组ID,用于对通道标识不同组,方便路由实现
		client.setUserName("901783"); //通道账号,可能和企业代码相同
		client.setPassword("ICP001"); //密码
		client.setMsgSrc("902176");  //企业代码 ,可能和UserName相同
		client.setSpCode("10658762"); //服务代码,即显示到手机上的号码
		client.setMaxChannels((short)10);  //最大连接数
		client.setVersion((short)0x30);    //协议版本号
		client.setRetryWaitTimeSec((short)30);//发送request后 等待N秒后没有收到response,则重发消息
		client.setMaxRetryCnt((short)3);  // 发送消息的最大次数,如果为3,则表示连带第一次发送,再重试2次,一共发送3次
		client.setCloseWhenRetryFailed(false);  // 当发送消息次数达到最大(MaxRetryCnt)后 ,是否关闭连接。默认是 true 关闭连接
		client.setUseSSL(false);          //是否使用SSL加密连接,默认为false,不加密
		client.setWriteLimit(100);        //发送request消息的最大速度(单位条数)
		client.setReadLimit(100);         //接收request的最大速度(单位条数),当消息量超过一定限制后,消息将积压在TCP网络协议栈的接收缓冲区
		client.setWindow(16);         //设置发送消息的滑动窗口,滑动窗口默认为16,该大小根据网络时延不同,会影响发送速度
													
		//默认为false ,发送request是否保存在本地磁盘。如果为true,当进程关闭后,本地磁盘会保存未收到response的消息,当进程再次启动框架自动读取消息并发送。可能造成消息重复发送
		client.setReSendFailMsg(true);  
		
		client.setSupportLongmsg(SupportLongMessage.BOTH);  //是否支持长短信的自动拆分和拼接
		
		List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
		clienthandlers.add( new CMPPSessionConnectedHandler(10000));  //在这个handler里发送短信
		client.setBusinessHandlerSet(clienthandlers);
		
		manager.addEndpointEntity(client);
		
		manager.openEndpoint(server);
		
		Thread.sleep(1000);
		for(int i=0;i<=child.getMaxChannels()+1;i++)
			manager.openEndpoint(client);

        System.out.println("start.....");
        
//		Thread.sleep(300000);
        LockSupport.park();
		EndpointManager.INS.close();
	}
}

SMPP Api使用举例

public class TestSMPPEndPoint {
	private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);

	@Test
	public void testSMPPEndpoint() throws Exception {
	
		final EndpointManager manager = EndpointManager.INS;

		SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();
		server.setId("smppserver");
		server.setHost("127.0.0.1");
		server.setPort(2776);
		server.setValid(true);
		//使用ssl加密数据流
		server.setUseSSL(false);
		
		SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();
		child.setId("smppchild");
		child.setSystemId("901782");
		child.setPassword("ICP");

		child.setValid(true);
		child.setChannelType(ChannelType.DUPLEX);
		child.setMaxChannels((short)3);
		child.setRetryWaitTimeSec((short)30);
		child.setMaxRetryCnt((short)3);
		child.setReSendFailMsg(true);
		child.setIdleTimeSec((short)15);
//		child.setWriteLimit(200);
//		child.setReadLimit(200);
		List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
		serverhandlers.add(new SMPPSessionConnectedHandler(10000));   
		child.setBusinessHandlerSet(serverhandlers);
		server.addchild(child);
		
		SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();
		client.setId("smppclient");
		client.setHost("127.0.0.1");
		client.setPort(2776);
		client.setSystemId("901782");
		client.setPassword("ICP");
		client.setChannelType(ChannelType.DUPLEX);

		client.setMaxChannels((short)12);
		client.setRetryWaitTimeSec((short)100);
		client.setUseSSL(false);
		client.setReSendFailMsg(true);
//		client.setWriteLimit(200);
//		client.setReadLimit(200);
		client.setSupportLongmsg(SupportLongMessage.SEND);  //接收长短信时不自动合并
		List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
		clienthandlers.add( new SMPPMessageReceiveHandler()); 
		client.setBusinessHandlerSet(clienthandlers);
		
		
		manager.addEndpointEntity(server);
		manager.addEndpointEntity(client);
		manager.openAll();
		manager.startConnectionCheckTask();
		Thread.sleep(1000);
		for(int i=0;i<child.getMaxChannels();i++)
			manager.openEndpoint(client);
		System.out.println("start.....");
		LockSupport.park();
		EndpointManager.INS.close();
	}
}
	

SGIP Api使用举例

public class TestSgipEndPoint {
	private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);

	@Test
	public void testsgipEndpoint() throws Exception {
		ResourceLeakDetector.setLevel(Level.ADVANCED);
		final EndpointManager manager = EndpointManager.INS;

		SgipServerEndpointEntity server = new SgipServerEndpointEntity();
		server.setId("sgipserver");
		server.setHost("127.0.0.1");
		server.setPort(8001);
		server.setValid(true);
		//使用ssl加密数据流
		server.setUseSSL(false);
		
		SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();
		child.setId("sgipchild");
		child.setLoginName("333");
		child.setLoginPassowrd("0555");

		child.setValid(true);
		child.setChannelType(ChannelType.DUPLEX);
		child.setMaxChannels((short)3);
		child.setRetryWaitTimeSec((short)30);
		child.setMaxRetryCnt((short)3);
		child.setReSendFailMsg(false);
		child.setIdleTimeSec((short)30);
//		child.setWriteLimit(200);
//		child.setReadLimit(200);
		child.setSupportLongmsg(SupportLongMessage.SEND);  //接收长短信时不自动合并
		List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
		
		serverhandlers.add(new SgipReportRequestMessageHandler());
		serverhandlers.add(new SGIPMessageReceiveHandler());   
		child.setBusinessHandlerSet(serverhandlers);
		server.addchild(child);
		
		manager.addEndpointEntity(server);
		
		
		SgipClientEndpointEntity client = new SgipClientEndpointEntity();
		client.setId("sgipclient");
		client.setHost("127.0.0.1");
		client.setPort(8001);
		client.setLoginName("333");
		client.setLoginPassowrd("0555");
		client.setChannelType(ChannelType.DUPLEX);

		client.setMaxChannels((short)10);
		client.setRetryWaitTimeSec((short)100);
		client.setUseSSL(false);
		client.setReSendFailMsg(true);
//		client.setWriteLimit(200);
//		client.setReadLimit(200);
		List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
		clienthandlers.add(new SGIPSessionConnectedHandler(10000));
		client.setBusinessHandlerSet(clienthandlers);
		manager.addEndpointEntity(client);
		manager.openAll();
		Thread.sleep(1000);
		for(int i=0;i<child.getMaxChannels();i++)
			manager.openEndpoint(client);
		System.out.println("start.....");
      
        LockSupport.park();

		EndpointManager.INS.close();
	}
}

Demo 执行日志


11:31:52.842 [workGroup2] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df
11:31:52.852 [workGroup1] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be
11:31:52.852 [workGroup1] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58
11:31:52.869 [workGroup1] INFO  c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]
11:31:52.867 [workGroup2] INFO  c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]
11:31:53.863 [busiWork-3] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343,   speed : 343/s
11:31:54.872 [busiWork-1] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381,   speed : 1038/s
11:31:55.873 [busiWork-8] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704,   speed : 1323/s
11:31:56.875 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010,   speed : 1306/s
11:31:57.880 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416,   speed : 1406/s
11:31:58.881 [busiWork-7] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442,   speed : 2026/s
11:31:59.882 [busiWork-8] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581,   speed : 2139/s
11:32:00.883 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865,   speed : 3284/s
11:32:01.884 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937,   speed : 3072/s
11:32:02.886 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489,   speed : 3552/s
11:32:03.887 [busiWork-6] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065,   speed : 3576/s
11:32:04.888 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337,   speed : 3272/s

smsgate's People

Contributors

lihuanghe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

smsgate's Issues

code

image

请问下这个是自己创建的线程池,还是使用netty的线程池

ChannelUtil.syncWriteLongMsgToEntity()同步接口,当client.setMaxChannels((short)2)时,会报错 receive ResponseMessage ,but not found related Request Msg

黄工,您好!您的项目集成netty以及封装很好的cmpp等接口,除了很大的应用价值外,我们也能从中学习到netty、异步及程序设计的很多东西,非常感谢!我遇到一个问题,想向您再请教下。我在跑TestCMPPEndPoint例子时,调整了客户端的通道数,client.setMaxChannels((short)2),结果发现发送阻塞,控制台有这样的警告 receive ResponseMessage ,but not found related Request Msg,跟了会代码,感觉是在 AbstractSessionStateManager.channelRead() 时,storeMap 得不到 SequenceId 对应的Msg,该怎么调整?再次感谢

SGIP 发送短信,没有特殊的情况报错,什么原因

io.netty.handler.codec.EncoderException: java.lang.NullPointerException
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
at io.netty.handler.codec.MessageToMessageCodec.write(MessageToMessageCodec.java:116)
at com.zx.sms.codec.sgip12.codec.SgipMessageCodecAggregator.write(SgipMessageCodecAggregator.java:50)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at com.zx.sms.handler.MessageLogHandler.write(MessageLogHandler.java:49)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.traffic.ChannelTrafficShapingHandler.submitWrite(ChannelTrafficShapingHandler.java:186)
at io.netty.handler.traffic.AbstractTrafficShapingHandler.write(AbstractTrafficShapingHandler.java:565)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:801)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at com.zx.sms.session.AbstractSessionStateManager.safewrite(AbstractSessionStateManager.java:534)
at com.zx.sms.session.AbstractSessionStateManager.writeWithWindow(AbstractSessionStateManager.java:311)
at com.zx.sms.session.AbstractSessionStateManager.write(AbstractSessionStateManager.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:106)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at com.zx.sms.handler.sgip.ReWriteNodeIdHandler.write(ReWriteNodeIdHandler.java:22)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:102)
at io.netty.handler.codec.MessageToMessageCodec.write(MessageToMessageCodec.java:116)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
at io.netty.handler.codec.MessageToMessageCodec.write(MessageToMessageCodec.java:116)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:106)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1700(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1127)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1174)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1098)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at java.lang.Thread.run(Thread.java:662)

SGIP怎么发彩信短视频

SGIP怎么发彩信短视频
SmsMmsNotificationMessage 是不是通过这个类。
有没有参考demo,
非常感谢,这个项目对我的帮助很大了。

长连cmpp网关,会报connection reset by peer

作者您好,我也遇到了这样的错误,这个错误发生有概率,我目前也不好重现,有时候一天出现,有时候几天出现,错误日志如下:

16:04:39.777 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] FLUSH
16:04:39.777 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] FLUSH
16:04:39.825 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] RECEIVED: 13B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0d 80 00 00 08 1e 4e 22 3e 00          |.........N">.   |
+--------+-------------------------------------------------+----------------+
16:04:39.825 [workGroup1] DEBUG entity.client - Receive:DefaultMessage [packetType=CMPPACTIVETESTRESPONSE, header=DefaultHeader [commandId=2147483656, sequenceId=508437054], getClass()=class com.zx.sms.codec.cmpp.msg.CmppActiveTestResponseMessage]
16:05:09.825 [workGroup1] DEBUG entity.client - Send:DefaultMessage [packetType=CMPPACTIVETESTREQUEST, header=DefaultHeader [commandId=8, sequenceId=508437055], getClass()=class com.zx.sms.codec.cmpp.msg.CmppActiveTestRequestMessage]
16:05:09.825 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] WRITE: 12B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0c 00 00 00 08 1e 4e 22 3f             |.........N"?    |
+--------+-------------------------------------------------+----------------+
16:05:09.825 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] FLUSH
16:05:09.825 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] FLUSH
16:05:09.874 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 - R:/183.230.96.94:17890] EXCEPTION: java.io.IOException: Connection reset by peer
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:357)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:898)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at java.lang.Thread.run(Thread.java:745)
16:05:09.875 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 ! R:/183.230.96.94:17890] INACTIVE
16:05:09.875 [workGroup1] TRACE entity.client - [id: 0x92d969f2, L:/172.17.0.3:51331 ! R:/183.230.96.94:17890] UNREGISTERED

期待您的答复。

运营商通道自动签名时长短信切割出错

有的渠道的Server端会自动在短信头带上‘【xx公司】’,尾部带上 ‘回复td退订’等标签,会导致长短信切割后的第一条短信加上签名的长度过长。
希望能支持解决这种情况。

Sgip 作为Server 报错

Sgip 作为Server 报错
2023-05-10 09:45:24.231 . WARN c.z.s.s.AbstractSessionLoginManager - session is not created. the entity is EndpointEntity [Id=sgip------------server, Desc=null, channelType=DUPLEX, host=null, port=579, maxChannels=0, valid=true, businessHandlerSet=null].channel remote is /xx.xx.xx.xx:45529
麻烦请问什么原因?

长时间连接到cmpp网关,会报Connection reset by peer

长时间连接到cmpp网关,会有异常
[workGroup-1-6] WARN c.t.s.h.c.CmppServerIdleStateHandler - CmppServerIdleStateHandler has exception.Connection reset by peer
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:853)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:240)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:115)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)

打包问题

打包后,没有包含resource下.properties和.xml文件
Caused by: java.lang.NullPointerException at com.zx.sms.config.PropertiesUtils.loadProperties(PropertiesUtils.java:66) at com.zx.sms.config.PropertiesUtils.<clinit>(PropertiesUtils.java:10) ... 26 more ![qq 20171106091242](https://user-images.githubusercontent.com/23043552/32975340-5a0255b6-cc41-11e7-9b5a-a932434617df.png)

在客户端测如何判断与服务器的连接状态

我看到 EndpointManager 的 openEndpoint 方法中有 conn.open() 这个方法会返回一个 ChannelFuture,但是 openEndpoint 并没有将这个对象返回,我希望在调用 openEndpoint 方法后判断是已经连接,要怎么作?

有关cmpp速度问题

黄工你好,我们在使用过程中,在局域网能接收的速度还是很快,但是把服务器放到阿里云上面后,变的超级慢?请问下是否cmpp参数没有设置正确?以下是服务端代码,
public static void start() throws Exception
{

	ResourceLeakDetector.setLevel(Level.ADVANCED);
	final EndpointManager manager = EndpointManager.INS;
	
	CMPPServerEndpointEntity server = new SohanCMPPServerEndpointEntity();
	server.setId(SERVER_ID);
	server.setHost("0.0.0.0");
	SohanConfig config = ApplicationContextUtil.getBean(SohanConfig.class);
	String port = config.getCmppServerRecivePort();
	if (StringUtils.isEmpty(port))
	{
		server.setPort(7890);
	} else
	{
		server.setPort(Integer.valueOf(port));
	}
	server.setValid(true);
	// 使用ssl加密数据流
	server.setUseSSL(false);
	
	manager.openEndpoint(server);
	manager.startConnectionCheckTask();
	
}

/*
 * 创建自己扩展的Entity
 */
private static class SohanCMPPServerEndpointEntity
		extends CMPPServerEndpointEntity
{
	
	@Override
	public EndpointEntity getChild(String userName)
	{
		
		SohanConfig config = ApplicationContextUtil.getBean(SohanConfig.class);
		// 如果当前是模拟环境,则进行模拟登录即可。模拟登录密码默认都是123456
		if(config.isSimulation()) {
			EndpointEntity child = simulationLogin(userName);
			super.addchild(child);
			return child;
		}
		// 依赖业务服务器的登录判断
		GrpcCmppService handler = ApplicationContextUtil
				.getBean(GrpcCmppService.class);
		LoginResponse resp = handler.loginBlocking(
				LoginRequest.newBuilder().setUserName(userName).build());
		if (resp != null)
		{
			SohanCMPPServerChildEndpointEntity child = new SohanCMPPServerChildEndpointEntity();
			child.setId(resp.getCmppSpId());
			child.setChartset(Charset.forName("utf-8"));
			child.setGroupName("test");
			child.setUserName(resp.getCmppSpId());
			child.setUser(resp);
			
			try
			{
				child.setPassword(AESUtil.decrypt2Str(
						resp.getCmppPwd(), AESUtil.aesKey));
			} catch (Exception e)
			{
				log.error("密码错误,账号:{}", resp.getCmppSpId());

// e.printStackTrace();
}

			child.setValid(true);
			child.setVersion((short) 0x20);
			
			child.setMaxChannels((short) 12);
			child.setRetryWaitTimeSec((short) 30);
			child.setMaxRetryCnt((short) 3);
			child.setReSendFailMsg(false);
			// 服务器的写,通常都是回执和上行短信,此时不限速,越快越好
			// child.setWriteLimit(1500);
			child.setReadLimit(3000);
			List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
			serverhandlers.add(new MessageReceiveHandler());
			child.setBusinessHandlerSet(serverhandlers);
			return child;
		}
		return null;
	}
}
能否帮我抽空看下,在局域网内能接收很多,一上阿里云一直没有超过 30/s, 谢谢!

同一个entityId,不管它有多少个channel,AbstractSessionStateManager 的storeMap是同一个。但msgRetryMap是不同Channel有不的实例

一、代码解释
ConcurrentMap<Serializable, VersionObject> storedMap = null;
if (endpoint.isReSendFailMsg()) {
// 如果上次发送失败的消息要重发一次,则要创建持久化Map用于存储发送的message
storedMap = BDBStoredMapFactoryImpl.INS.buildMap(endpoint.getId(), "Session_" + endpoint.getId());
} else {
storedMap = new ConcurrentHashMap();
}

		logger.info("Channel added To Endpoint {} .totalCnt:{} ,remoteAddress: {}", endpoint, nowConnCnt + 1, ch.remoteAddress());

		if (nowConnCnt == 0 && endpoint.isReSendFailMsg()) {
			// 如果是第一个连接。要把上次发送失败的消息取出,再次发送一次
			ch.pipeline().addAfter(GlobalConstance.codecName, GlobalConstance.sessionHandler, createSessionManager(endpoint, storedMap, true));
		} else {
			ch.pipeline().addAfter(GlobalConstance.codecName, GlobalConstance.sessionHandler, createSessionManager(endpoint, storedMap, false));
		}

/**
 * 重发队列
 **/
private final ConcurrentHashMap<K, Entry> msgRetryMap = new ConcurrentHashMap<K, Entry>();

二、存在问题:
(1)storedMap相当于由entityId决定唯一,而msgRetryMap由不同的channle绑定。采用多个channel连接对端,比如channel1提交过去的,对端是channel2回复Response。此时,storedMap.get(seqId)可以查到seqId的Request,但msgRetryMap.get(seqId)为null,导致msgRetryMap.remove(seqId)失败,即重试Map还在。

(2)如果是多个channel来连接,这边提交时,不同的channel按自己的序号递增,则有可能产生ID相同。但storedMap.get(seqId)就可能出现重复。(除非业务控制好,不同channel,seqId是不会相同的)

received the same index

@Lihuanghe 你好,4连接数压测的时候出现如下问题,是什么原因导致的呢?

com.zx.sms.common.NotSupportedException: received the same index at com.zx.sms.codec.cmpp.wap.FrameHolder.merge(FrameHolder.java:103) ~[sms-core-2.1.12.1.jar:na] at com.zx.sms.codec.cmpp.wap.LongMessageFrameHolder.mergeFrameHolder(LongMessageFrameHolder.java:252) ~[sms-core-2.1.12.1.jar:na] at com.zx.sms.codec.cmpp.wap.LongMessageFrameHolder.putAndget(LongMessageFrameHolder.java:197) ~[sms-core-2.1.12.1.jar:na] at com.zx.sms.codec.cmpp.wap.AbstractLongMessageHandler.decode(AbstractLongMessageHandler.java:33) [sms-core-2.1.12.1.jar:na] at com.zx.sms.codec.cmpp.wap.AbstractLongMessageHandler.decode(AbstractLongMessageHandler.java:1) [sms-core-2.1.12.1.jar:na] at io.netty.handler.codec.MessageToMessageCodec$2.decode(MessageToMessageCodec.java:81) [netty-codec-4.1.52.Final.jar:4.1.52.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) [netty-codec-4.1.52.Final.jar:4.1.52.Final] at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.52.Final.jar:4.1.52.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.52.Final.jar:4.1.52.Final]

jdk

为了支持jdk1.6,是不是不会支持jdk17了?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.