nsq.dart 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. library nsq;
  2. import 'dart:async';
  3. import 'dart:convert';
  4. import 'dart:io';
  5. import 'dart:typed_data';
  6. import 'package:audioplayers/audio_cache.dart';
  7. import 'package:bbyyy/beans/message_bean_entity.dart';
  8. import 'package:bbyyy/beans/new_order_bean_entity.dart';
  9. import 'package:bbyyy/beans/nsq_type_bean_entity.dart';
  10. import 'package:bbyyy/msgDB/my_msg_db.dart';
  11. import 'package:bbyyy/my_tools/const.dart';
  12. import 'package:bbyyy/my_tools/event_bus.dart';
  13. import 'package:bbyyy/my_tools/my_cookie.dart';
  14. import 'package:flutter_local_notifications/flutter_local_notifications.dart';
  15. import 'package:bbyyy/beans/system_information_bean_entity.dart';
  16. import '../main.dart';
  17. //const ServerIP = '127.0.0.1';
  18. //const ServerPort = 4150; // 服务器端口号
  19. String Topic;
  20. String Channel;
  21. String ClientId;
  22. String Hostname;
  23. var HandlerDataFunc;
  24. // 1, 消息处理的代码函数,自定义消息处理函数
  25. Future<void> handlerMessage(Message message) async {
  26. // 在这里写你的消息处理逻辑
  27. print('Got a message: ${utf8.decode(message.Body)}');
  28. print(
  29. '时间:${message.Timestamp}, Attempts: ${message.Attempts}, ID: ${utf8.decode(message.ID)}');
  30. try {
  31. NsqTypeBeanEntity entity =
  32. NsqTypeBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  33. print(entity.type);
  34. if (entity.type == notifyTypeNewMsg) {
  35. if (MyCookie().prefs.getBool('聊天新消息提示音')) {
  36. showNotification(appName, entity.type);
  37. }
  38. } else {
  39. showNotification(appName, entity.type);
  40. }
  41. if (entity.type == notifyTypeIncome) {
  42. if (MyCookie().prefs.getBool('收入进账提示音')) {
  43. AudioCache audioCache = AudioCache();
  44. audioCache.play('jifendaozhang.aac');
  45. }
  46. }
  47. if (entity.type == notifyTypeNewOrder) {
  48. NewOrderBeanEntity order =
  49. NewOrderBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  50. EventBus().emit('hasNewOrder', order);
  51. }
  52. else if (entity.type == notifyTypeNewMsg) {
  53. MessageBeanEntity entity =
  54. MessageBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  55. MsgDB msgDB =
  56. MsgDB('table${MyCookie().getUID()}_${entity.content.senderUid}');
  57. if (!msgDB.isTableExits) {
  58. await msgDB.open();
  59. }
  60. await msgDB.addTableData([entity.content]);
  61. EventBus().emit('hasNewMsg', entity);
  62. EventBus().emit('hasNewMsgInMsgPage', entity);
  63. }
  64. else if (entity.type == notifyTypeOrderPaymentIsSuccessful) {
  65. EventBus().emit('alipayPaymentCallback');
  66. }
  67. else if(entity.type == notifyTypeSysMsg){
  68. print('$notifyTypeSysMsg---------------------------\n${utf8.decode(message.Body)}\n---------------------');
  69. SystemInformationBeanEntity entity = SystemInformationBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  70. String systemInformation = MyCookie().prefs.getString('${MyCookie().getUID()}系统消息');
  71. if(systemInformation==null){
  72. List <SystemInformationBeanEntity> sI = [];
  73. sI.add(entity);
  74. MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', json.encode(sI));
  75. }else{
  76. List <SystemInformationBeanEntity> sI = [];
  77. List sL = json.decode(systemInformation);
  78. sL.forEach((element) {
  79. sI.add(SystemInformationBeanEntity().fromJson(json.decode(json.encode(element))));
  80. });
  81. sI.insert(0, entity);
  82. MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', json.encode(sI));
  83. }
  84. EventBus().emit('systemInformation',entity);
  85. }
  86. } catch (e) {}
  87. return;
  88. }
  89. // 服务端返回数据类型: 下面的数据不能修噶
  90. const int FrameTypeResponse = 0; // 无需相应
  91. const int FrameTypeError = 1; // 错误返回
  92. const int FrameTypeMessage = 2; // 服务器发来的消息,重点
  93. var byteSpace = utf8.encode(' '); // 空白符
  94. var byteNewLine = utf8.encode('\n'); // 换行符
  95. Socket socket; // 套接字连接
  96. void ConnectServer(handlerFunc,
  97. {String serverIp = '127.0.0.1',
  98. topic,
  99. channel,
  100. clientId,
  101. hostname,
  102. int serverPort = 4150}) {
  103. HandlerDataFunc = handlerFunc;
  104. Topic = topic;
  105. Channel = channel;
  106. ClientId = clientId;
  107. Hostname = hostname;
  108. Socket.connect(serverIp, serverPort).then((Socket sock) {
  109. socket = sock;
  110. sock.add(utf8.encode(' V2'));
  111. if (sock == null) {
  112. print('socket is null');
  113. return;
  114. }
  115. var isOk = connectNsq(sock);
  116. if (isOk == false) {
  117. print('连接Nsq失败');
  118. } else {
  119. print('连接Nsq成功');
  120. }
  121. sock.listen(dataHandler,
  122. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  123. });
  124. }
  125. // 消息对象
  126. const MsgIDLength = 16;
  127. class Message {
  128. var ID = List(MsgIDLength);
  129. List Body; // 消息文本
  130. int Timestamp = 0;
  131. int Attempts = 0;
  132. String NSQDAddress = '';
  133. int autoResponseDisabled;
  134. int responded;
  135. }
  136. Message DecodeMessage(Uint8List b) {
  137. var msg = Message();
  138. if (b.length < 10 + MsgIDLength) {
  139. return null;
  140. }
  141. msg.Timestamp = toInt64(b.sublist(0, 8));
  142. msg.Attempts = toInt16(b.sublist(8, 10));
  143. msg.ID = b.sublist(10, 10 + MsgIDLength);
  144. msg.Body = b.sublist(10 + MsgIDLength, b.length);
  145. return msg;
  146. }
  147. void dataHandler(connData) {
  148. var conn = connData;
  149. var res = ReadUnpackedResponse(conn);
  150. if (res[0] == -1) {
  151. print('解析数据出错: ${utf8.decode(conn)}');
  152. }
  153. var frameType = res[0];
  154. var data = res[1];
  155. if (frameType == FrameTypeResponse && utf8.decode(data) == '_heartbeat_') {
  156. print('心跳了一下❤️');
  157. NOP().writeTo(socket);
  158. } else {
  159. switch (frameType) {
  160. case FrameTypeResponse:
  161. print('this is response: ${utf8.decode(data)}');
  162. break;
  163. case FrameTypeMessage:
  164. var msg = DecodeMessage(data);
  165. if (msg == null) {
  166. print('msg是空的!');
  167. break;
  168. }
  169. HandlerDataFunc(msg);
  170. Finish(msg.ID).writeTo(socket);
  171. break;
  172. case FrameTypeError:
  173. print('FrameTypeError: ${data}');
  174. break;
  175. default:
  176. print('unknown frame type ${frameType}');
  177. break;
  178. }
  179. }
  180. }
  181. void errorHandler(error, StackTrace trace) {
  182. print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=');
  183. print(error);
  184. reconnection();
  185. }
  186. int reconnectionTime = 5;
  187. bool reconnecting = false;
  188. void reconnection() {
  189. if (reconnecting) {
  190. return;
  191. } else {
  192. reconnecting = true;
  193. }
  194. print('-----------reconnection==============${DateTime.now().toString()}');
  195. socket.destroy();
  196. Socket.connect(MyCookie().server, 4150, timeout: Duration(seconds: 5))
  197. .then((Socket sock) {
  198. socket = sock;
  199. sock.add(utf8.encode(' V2'));
  200. if (sock == null) {
  201. print('socket is null');
  202. return;
  203. }
  204. var isOk = connectNsq(sock);
  205. if (isOk == false) {
  206. print('连接Nsq失败');
  207. reconnecting = false;
  208. reconnection();
  209. } else {
  210. print('连接Nsq成功');
  211. reconnecting = false;
  212. }
  213. sock.listen(dataHandler,
  214. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  215. }).catchError((onError) {
  216. print('---------------catchError\n$onError');
  217. reconnecting = false;
  218. reconnection();
  219. });
  220. }
  221. void doneHandler() {
  222. print('-------------doneHandler----------$reconnecting');
  223. reconnection();
  224. }
  225. bool sendMagic(socketConn) {
  226. socketConn.add(utf8.encode(' V2'));
  227. var isOk = connectNsq(socketConn);
  228. if (isOk == false) {
  229. print('连接Nsq失败');
  230. } else {
  231. print('连接Nsq成功');
  232. }
  233. return true;
  234. }
  235. bool connectNsq(Socket socketConn) {
  236. // 1, 连接
  237. var identity = getDefaultIdentity();
  238. var cmd = identityToCmd(identity);
  239. var n = cmd.writeTo(socketConn);
  240. print('写入字数:${n}');
  241. // 2, 订阅
  242. var subCmd = Subscribe(Topic, Channel);
  243. n = subCmd.writeTo(socketConn);
  244. print('订阅收到: n=${n}');
  245. // 3, 发送Nop
  246. n = NOP().writeTo(socketConn);
  247. // 4, 发送ready
  248. sendRead(socketConn, 1);
  249. return true;
  250. }
  251. // 获取默认的实体名
  252. class Identity {
  253. Map toJson() {
  254. var ci = {};
  255. ci['client_id'] = ClientId;
  256. ci['hostname'] = Hostname; //
  257. ci['user_agent'] = 'go-nsq/1.0.8';
  258. //ci['short_id'] = 'xusanhuodeMacBook-Pro' // deprecated
  259. //ci['long_id'] = 'xusanhuodeMacBook-Pro.local' // deprecated
  260. ci['tls_v1'] = false;
  261. ci['deflate'] = false;
  262. ci['deflate_level'] = 6;
  263. ci['snappy'] = false;
  264. ci['feature_negotiation'] = true;
  265. ci['heartbeat_interval'] = 10000; // 单位:毫秒,心跳间隔时间
  266. ci['sample_rate'] = 0;
  267. ci['output_buffer_size'] = 16384;
  268. ci['output_buffer_timeout'] = 250;
  269. ci['max_msg_timeout'] = 90000;
  270. ci['msg_timeout'] = 2000;
  271. return ci;
  272. }
  273. }
  274. Identity getDefaultIdentity() {
  275. var idt = Identity();
  276. return idt;
  277. }
  278. // s实体转commend
  279. Commend identityToCmd(Identity js) {
  280. var body = utf8.encode(jsonEncode(js));
  281. print('解析对象:$body');
  282. var c = Commend();
  283. c.Name = utf8.encode('IDENTIFY');
  284. c.Body = body;
  285. return c;
  286. }
  287. //如果客户端空闲,发送 NOP命令。
  288. //如果 2 个 _heartbeat_ 响应没有被应答, nsqd 将会超时,并且强制关闭客户端连接。
  289. //IDENTIFY 命令可以用来改变/禁用这个行为。
  290. class Commend {
  291. Uint8List Name;
  292. List Params; // 二维数组
  293. Uint8List Body;
  294. String getString() {
  295. if (Params.isNotEmpty) {
  296. return utf8.decode(Name) + ' ' + Params.join(utf8.decode(byteSpace));
  297. }
  298. return utf8.decode(Name);
  299. }
  300. int writeTo(Socket socketWrite) {
  301. var socketWrite = socket;
  302. var total = 0;
  303. var buf = Uint8List(4);
  304. // 写入名字
  305. socketWrite.add(Name);
  306. total += Name.length;
  307. if (Params != null) {
  308. Params.forEach((param) {
  309. socketWrite.add(byteSpace);
  310. total += byteSpace.length;
  311. socketWrite.add(param);
  312. total += param.length;
  313. });
  314. }
  315. // 写入行
  316. socketWrite.add(byteNewLine);
  317. total += byteNewLine.length;
  318. // 写入body
  319. if (Body != null) {
  320. var bufs = buf;
  321. var bLength = Body.length;
  322. bufs[0] = bLength >> 24;
  323. bufs[1] = bLength >> 16;
  324. bufs[2] = bLength >> 8;
  325. bufs[3] = bLength;
  326. // 写入内容长度
  327. socketWrite.add(bufs);
  328. total += bufs.length;
  329. // 写入内容
  330. socketWrite.add(Body);
  331. total += Body.length;
  332. }
  333. return total;
  334. }
  335. }
  336. Commend NOP() {
  337. var c = Commend();
  338. c.Name = utf8.encode('NOP');
  339. return c;
  340. }
  341. // 准备好
  342. Commend getReady(int count) {
  343. var c = Commend();
  344. var params = List(1);
  345. params[0] = utf8.encode(count.toString());
  346. c.Name = utf8.encode('RDY');
  347. c.Params = params;
  348. return c;
  349. }
  350. // 完成
  351. Commend Finish(Uint8List id) {
  352. var params = List(1);
  353. params[0] = id;
  354. var c = Commend();
  355. c.Name = utf8.encode('FIN');
  356. c.Params = params;
  357. return c;
  358. }
  359. // 订阅
  360. Commend Subscribe(String topic, String chanel) {
  361. var c = Commend();
  362. var params = List(2);
  363. params[0] = utf8.encode(topic);
  364. params[1] = utf8.encode(chanel);
  365. c.Name = utf8.encode('SUB');
  366. c.Params = params;
  367. return c;
  368. }
  369. void sendRead(socketWrite, count) {
  370. var cmd = getReady(count);
  371. var n = cmd.writeTo(socketWrite);
  372. print('Ready返回:${n}');
  373. }
  374. Uint8List ReadResponse(Uint8List socketData) {
  375. var sizeList = socketData.sublist(0, 4);
  376. var msgSize = toInt32(sizeList);
  377. if (msgSize < 0) {
  378. print('response msg size is negative: ${msgSize}');
  379. return List(0);
  380. } else {
  381. print('size=${msgSize}');
  382. }
  383. var buf = List(msgSize);
  384. buf = socketData.sublist(4, 4 + msgSize);
  385. return buf;
  386. }
  387. List UnpackResponse(Uint8List response) {
  388. if (response.length < 4) {
  389. return [-1, 'length of response is too small'];
  390. }
  391. return [
  392. toInt32(response.sublist(0, 4)),
  393. response.sublist(4, response.length)
  394. ];
  395. }
  396. List ReadUnpackedResponse(Uint8List socketData) {
  397. var resp = ReadResponse(socketData);
  398. if (resp.isEmpty) {
  399. return [-1, ''];
  400. }
  401. return UnpackResponse(resp);
  402. }
  403. int toInt32(List list, {int index = 0}) {
  404. var byteArray = Uint8List.fromList(list);
  405. var buffer = byteArray.buffer;
  406. var data = ByteData.view(buffer);
  407. var short = data.getInt32(index, Endian.big);
  408. return short;
  409. }
  410. int toInt16(List list, {int index = 0}) {
  411. var byteArray = Uint8List.fromList(list);
  412. var buffer = byteArray.buffer;
  413. var data = ByteData.view(buffer);
  414. var short = data.getInt16(index, Endian.big);
  415. return short;
  416. }
  417. int toInt64(List list, {int index = 0}) {
  418. var byteArray = Uint8List.fromList(list);
  419. var buffer = byteArray.buffer;
  420. var data = ByteData.view(buffer);
  421. var short = data.getInt64(index, Endian.big);
  422. return short;
  423. }
  424. Future<void> showNotification(String title, String body) async {
  425. const AndroidNotificationDetails androidPlatformChannelSpecifics =
  426. AndroidNotificationDetails(
  427. 'your channel id', '梆梆鱼', 'your channel description',
  428. importance: Importance.max,
  429. playSound: true,
  430. priority: Priority.high,
  431. ticker: 'ticker');
  432. const NotificationDetails platformChannelSpecifics =
  433. NotificationDetails(android: androidPlatformChannelSpecifics);
  434. await flutterLocalNotificationsPlugin
  435. .show(0, title, body, platformChannelSpecifics, payload: body);
  436. }