nsq.dart 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. library nsq;
  2. import 'dart:async';
  3. import 'dart:convert';
  4. import 'dart:io';
  5. import 'dart:typed_data';
  6. import 'package:audioplayers/audioplayers.dart';
  7. import 'package:bbyyy/beans/message_bean_entity.dart';
  8. import 'package:bbyyy/beans/new_order_bean_entity.dart';
  9. import 'package:bbyyy/beans/nsq_type_bean_entity.dart';
  10. import 'package:bbyyy/msgDB/my_msg_db.dart';
  11. import 'package:bbyyy/my_tools/const.dart';
  12. import 'package:bbyyy/my_tools/event_bus.dart';
  13. import 'package:bbyyy/my_tools/my_cookie.dart';
  14. import 'package:flutter_local_notifications/flutter_local_notifications.dart';
  15. import 'package:bbyyy/beans/system_information_bean_entity.dart';
  16. import '../main.dart';
  17. import 'package:bbyyy/my_tools/pop_up_queue.dart';
  18. //const ServerIP = '127.0.0.1';
  19. //const ServerPort = 4150; // 服务器端口号
  20. String Topic;
  21. String Channel;
  22. String ClientId;
  23. String Hostname;
  24. var HandlerDataFunc;
  25. // 1, 消息处理的代码函数,自定义消息处理函数
  26. Future<void> handlerMessage(Message message) async {
  27. // 在这里写你的消息处理逻辑
  28. print('Got a message: ${utf8.decode(message.Body)}');
  29. print(
  30. '时间:${message.Timestamp}, Attempts: ${message.Attempts}, ID: ${utf8.decode(message.ID)}');
  31. try {
  32. NsqTypeBeanEntity entity =
  33. NsqTypeBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  34. print(entity.type);
  35. if (entity.type == notifyTypeNewMsg) {
  36. if (MyCookie().prefs.getBool('聊天新消息提示音')) {
  37. showNotification(appName, entity.type);
  38. }
  39. } else {
  40. showNotification(appName, entity.type);
  41. }
  42. if (entity.type == notifyTypeIncome) {
  43. if (MyCookie().prefs.getBool('收入进账提示音')) {
  44. AudioCache audioCache = AudioCache();
  45. audioCache.play('jifendaozhang.aac');
  46. }
  47. }
  48. if (entity.type == notifyTypeNewOrder) {
  49. NewOrderBeanEntity order =
  50. NewOrderBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  51. PopUpQueue().nsqData.insert(0, order);
  52. PopUpQueue().showNext();
  53. }
  54. else if (entity.type == notifyTypeNewMsg) {
  55. MessageBeanEntity entity =
  56. MessageBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  57. MsgDB msgDB =
  58. MsgDB('table${MyCookie().getUID()}_${entity.content.senderUid}');
  59. if (!msgDB.isTableExits) {
  60. await msgDB.open();
  61. }
  62. await msgDB.addTableData([entity.content]);
  63. EventBus().emit('hasNewMsg', entity);
  64. EventBus().emit('hasNewMsgInMsgPage', entity);
  65. }
  66. else if (entity.type == notifyTypeOrderPaymentIsSuccessful) {
  67. EventBus().emit('alipayPaymentCallback');
  68. }
  69. else if(entity.type == notifyTypeSysMsg){
  70. SystemInformationBeanEntity entity = SystemInformationBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  71. String systemInformation = MyCookie().prefs.getString('${MyCookie().getUID()}系统消息');
  72. if(systemInformation==null){
  73. List <SystemInformationBeanEntity> sI = [];
  74. sI.add(entity);
  75. MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', json.encode(sI));
  76. }else{
  77. List <SystemInformationBeanEntity> sI = [];
  78. List sL = json.decode(systemInformation);
  79. sL.forEach((element) {
  80. sI.add(SystemInformationBeanEntity().fromJson(json.decode(json.encode(element))));
  81. });
  82. sI.insert(0, entity);
  83. MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', json.encode(sI));
  84. }
  85. PopUpQueue().nsqData.insert(0, entity);
  86. PopUpQueue().showNext();
  87. }
  88. } catch (e) {}
  89. return;
  90. }
  91. // 服务端返回数据类型: 下面的数据不能修噶
  92. const int FrameTypeResponse = 0; // 无需相应
  93. const int FrameTypeError = 1; // 错误返回
  94. const int FrameTypeMessage = 2; // 服务器发来的消息,重点
  95. var byteSpace = utf8.encode(' '); // 空白符
  96. var byteNewLine = utf8.encode('\n'); // 换行符
  97. Socket socket; // 套接字连接
  98. void ConnectServer(handlerFunc,
  99. {String serverIp = '127.0.0.1',
  100. topic,
  101. channel,
  102. clientId,
  103. hostname,
  104. int serverPort = 4150}) {
  105. HandlerDataFunc = handlerFunc;
  106. Topic = topic;
  107. Channel = channel;
  108. ClientId = clientId;
  109. Hostname = hostname;
  110. Socket.connect(serverIp, serverPort).then((Socket sock) {
  111. socket = sock;
  112. sock.add(utf8.encode(' V2'));
  113. if (sock == null) {
  114. print('socket is null');
  115. return;
  116. }
  117. var isOk = connectNsq(sock);
  118. if (isOk == false) {
  119. print('连接Nsq失败');
  120. } else {
  121. print('连接Nsq成功');
  122. }
  123. sock.listen(dataHandler,
  124. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  125. });
  126. }
  127. // 消息对象
  128. const MsgIDLength = 16;
  129. class Message {
  130. var ID = List(MsgIDLength);
  131. List Body; // 消息文本
  132. int Timestamp = 0;
  133. int Attempts = 0;
  134. String NSQDAddress = '';
  135. int autoResponseDisabled;
  136. int responded;
  137. }
  138. Message DecodeMessage(Uint8List b) {
  139. var msg = Message();
  140. if (b.length < 10 + MsgIDLength) {
  141. return null;
  142. }
  143. msg.Timestamp = toInt64(b.sublist(0, 8));
  144. msg.Attempts = toInt16(b.sublist(8, 10));
  145. msg.ID = b.sublist(10, 10 + MsgIDLength);
  146. msg.Body = b.sublist(10 + MsgIDLength, b.length);
  147. return msg;
  148. }
  149. void dataHandler(connData) {
  150. var conn = connData;
  151. var res = ReadUnpackedResponse(conn);
  152. if (res[0] == -1) {
  153. print('解析数据出错: ${utf8.decode(conn)}');
  154. }
  155. var frameType = res[0];
  156. var data = res[1];
  157. if (frameType == FrameTypeResponse && utf8.decode(data) == '_heartbeat_') {
  158. print('心跳了一下❤️');
  159. NOP().writeTo(socket);
  160. } else {
  161. switch (frameType) {
  162. case FrameTypeResponse:
  163. print('this is response: ${utf8.decode(data)}');
  164. break;
  165. case FrameTypeMessage:
  166. var msg = DecodeMessage(data);
  167. if (msg == null) {
  168. print('msg是空的!');
  169. break;
  170. }
  171. HandlerDataFunc(msg);
  172. Finish(msg.ID).writeTo(socket);
  173. break;
  174. case FrameTypeError:
  175. print('FrameTypeError: ${data}');
  176. break;
  177. default:
  178. print('unknown frame type ${frameType}');
  179. break;
  180. }
  181. }
  182. }
  183. void errorHandler(error, StackTrace trace) {
  184. print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=');
  185. print(error);
  186. reconnection();
  187. }
  188. int reconnectionTime = 5;
  189. bool reconnecting = false;
  190. void reconnection() {
  191. if (reconnecting) {
  192. return;
  193. } else {
  194. reconnecting = true;
  195. }
  196. print('-----------reconnection==============${DateTime.now().toString()}');
  197. socket.destroy();
  198. Socket.connect(MyCookie().server, 4150, timeout: Duration(seconds: 5))
  199. .then((Socket sock) {
  200. socket = sock;
  201. sock.add(utf8.encode(' V2'));
  202. if (sock == null) {
  203. print('socket is null');
  204. return;
  205. }
  206. var isOk = connectNsq(sock);
  207. if (isOk == false) {
  208. print('连接Nsq失败');
  209. reconnecting = false;
  210. reconnection();
  211. } else {
  212. print('连接Nsq成功');
  213. reconnecting = false;
  214. }
  215. sock.listen(dataHandler,
  216. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  217. }).catchError((onError) {
  218. print('---------------catchError\n$onError');
  219. reconnecting = false;
  220. reconnection();
  221. });
  222. }
  223. void doneHandler() {
  224. print('-------------doneHandler----------$reconnecting');
  225. reconnection();
  226. }
  227. bool sendMagic(socketConn) {
  228. socketConn.add(utf8.encode(' V2'));
  229. var isOk = connectNsq(socketConn);
  230. if (isOk == false) {
  231. print('连接Nsq失败');
  232. } else {
  233. print('连接Nsq成功');
  234. }
  235. return true;
  236. }
  237. bool connectNsq(Socket socketConn) {
  238. // 1, 连接
  239. var identity = getDefaultIdentity();
  240. var cmd = identityToCmd(identity);
  241. var n = cmd.writeTo(socketConn);
  242. print('写入字数:${n}');
  243. // 2, 订阅
  244. var subCmd = Subscribe(Topic, Channel);
  245. n = subCmd.writeTo(socketConn);
  246. print('订阅收到: n=${n}');
  247. // 3, 发送Nop
  248. n = NOP().writeTo(socketConn);
  249. // 4, 发送ready
  250. sendRead(socketConn, 1);
  251. return true;
  252. }
  253. // 获取默认的实体名
  254. class Identity {
  255. Map toJson() {
  256. var ci = {};
  257. ci['client_id'] = ClientId;
  258. ci['hostname'] = Hostname; //
  259. ci['user_agent'] = 'go-nsq/1.0.8';
  260. //ci['short_id'] = 'xusanhuodeMacBook-Pro' // deprecated
  261. //ci['long_id'] = 'xusanhuodeMacBook-Pro.local' // deprecated
  262. ci['tls_v1'] = false;
  263. ci['deflate'] = false;
  264. ci['deflate_level'] = 6;
  265. ci['snappy'] = false;
  266. ci['feature_negotiation'] = true;
  267. ci['heartbeat_interval'] = 10000; // 单位:毫秒,心跳间隔时间
  268. ci['sample_rate'] = 0;
  269. ci['output_buffer_size'] = 16384;
  270. ci['output_buffer_timeout'] = 250;
  271. ci['max_msg_timeout'] = 90000;
  272. ci['msg_timeout'] = 2000;
  273. return ci;
  274. }
  275. }
  276. Identity getDefaultIdentity() {
  277. var idt = Identity();
  278. return idt;
  279. }
  280. // s实体转commend
  281. Commend identityToCmd(Identity js) {
  282. var body = utf8.encode(jsonEncode(js));
  283. print('解析对象:$body');
  284. var c = Commend();
  285. c.Name = utf8.encode('IDENTIFY');
  286. c.Body = body;
  287. return c;
  288. }
  289. //如果客户端空闲,发送 NOP命令。
  290. //如果 2 个 _heartbeat_ 响应没有被应答, nsqd 将会超时,并且强制关闭客户端连接。
  291. //IDENTIFY 命令可以用来改变/禁用这个行为。
  292. class Commend {
  293. Uint8List Name;
  294. List Params; // 二维数组
  295. Uint8List Body;
  296. String getString() {
  297. if (Params.isNotEmpty) {
  298. return utf8.decode(Name) + ' ' + Params.join(utf8.decode(byteSpace));
  299. }
  300. return utf8.decode(Name);
  301. }
  302. int writeTo(Socket socketWrite) {
  303. var socketWrite = socket;
  304. var total = 0;
  305. var buf = Uint8List(4);
  306. // 写入名字
  307. socketWrite.add(Name);
  308. total += Name.length;
  309. if (Params != null) {
  310. Params.forEach((param) {
  311. socketWrite.add(byteSpace);
  312. total += byteSpace.length;
  313. socketWrite.add(param);
  314. total += param.length;
  315. });
  316. }
  317. // 写入行
  318. socketWrite.add(byteNewLine);
  319. total += byteNewLine.length;
  320. // 写入body
  321. if (Body != null) {
  322. var bufs = buf;
  323. var bLength = Body.length;
  324. bufs[0] = bLength >> 24;
  325. bufs[1] = bLength >> 16;
  326. bufs[2] = bLength >> 8;
  327. bufs[3] = bLength;
  328. // 写入内容长度
  329. socketWrite.add(bufs);
  330. total += bufs.length;
  331. // 写入内容
  332. socketWrite.add(Body);
  333. total += Body.length;
  334. }
  335. return total;
  336. }
  337. }
  338. Commend NOP() {
  339. var c = Commend();
  340. c.Name = utf8.encode('NOP');
  341. return c;
  342. }
  343. // 准备好
  344. Commend getReady(int count) {
  345. var c = Commend();
  346. var params = List(1);
  347. params[0] = utf8.encode(count.toString());
  348. c.Name = utf8.encode('RDY');
  349. c.Params = params;
  350. return c;
  351. }
  352. // 完成
  353. Commend Finish(Uint8List id) {
  354. var params = List(1);
  355. params[0] = id;
  356. var c = Commend();
  357. c.Name = utf8.encode('FIN');
  358. c.Params = params;
  359. return c;
  360. }
  361. // 订阅
  362. Commend Subscribe(String topic, String chanel) {
  363. var c = Commend();
  364. var params = List(2);
  365. params[0] = utf8.encode(topic);
  366. params[1] = utf8.encode(chanel);
  367. c.Name = utf8.encode('SUB');
  368. c.Params = params;
  369. return c;
  370. }
  371. void sendRead(socketWrite, count) {
  372. var cmd = getReady(count);
  373. var n = cmd.writeTo(socketWrite);
  374. print('Ready返回:${n}');
  375. }
  376. Uint8List ReadResponse(Uint8List socketData) {
  377. var sizeList = socketData.sublist(0, 4);
  378. var msgSize = toInt32(sizeList);
  379. if (msgSize < 0) {
  380. print('response msg size is negative: ${msgSize}');
  381. return List(0);
  382. } else {
  383. print('size=${msgSize}');
  384. }
  385. var buf = List(msgSize);
  386. buf = socketData.sublist(4, 4 + msgSize);
  387. return buf;
  388. }
  389. List UnpackResponse(Uint8List response) {
  390. if (response.length < 4) {
  391. return [-1, 'length of response is too small'];
  392. }
  393. return [
  394. toInt32(response.sublist(0, 4)),
  395. response.sublist(4, response.length)
  396. ];
  397. }
  398. List ReadUnpackedResponse(Uint8List socketData) {
  399. var resp = ReadResponse(socketData);
  400. if (resp.isEmpty) {
  401. return [-1, ''];
  402. }
  403. return UnpackResponse(resp);
  404. }
  405. int toInt32(List list, {int index = 0}) {
  406. var byteArray = Uint8List.fromList(list);
  407. var buffer = byteArray.buffer;
  408. var data = ByteData.view(buffer);
  409. var short = data.getInt32(index, Endian.big);
  410. return short;
  411. }
  412. int toInt16(List list, {int index = 0}) {
  413. var byteArray = Uint8List.fromList(list);
  414. var buffer = byteArray.buffer;
  415. var data = ByteData.view(buffer);
  416. var short = data.getInt16(index, Endian.big);
  417. return short;
  418. }
  419. int toInt64(List list, {int index = 0}) {
  420. var byteArray = Uint8List.fromList(list);
  421. var buffer = byteArray.buffer;
  422. var data = ByteData.view(buffer);
  423. var short = data.getInt64(index, Endian.big);
  424. return short;
  425. }
  426. Future<void> showNotification(String title, String body) async {
  427. const AndroidNotificationDetails androidPlatformChannelSpecifics =
  428. AndroidNotificationDetails(
  429. 'your channel id', '梆梆鱼', 'your channel description',
  430. importance: Importance.max,
  431. playSound: true,
  432. priority: Priority.high,
  433. ticker: 'ticker');
  434. const NotificationDetails platformChannelSpecifics =
  435. NotificationDetails(android: androidPlatformChannelSpecifics);
  436. await flutterLocalNotificationsPlugin
  437. .show(0, title, body, platformChannelSpecifics, payload: body);
  438. }