nsq.dart 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. library nsq;
  2. import 'dart:async';
  3. import 'dart:convert';
  4. import 'dart:io';
  5. import 'dart:typed_data';
  6. import 'package:audioplayers/audio_cache.dart';
  7. import 'package:bbyyy/beans/message_bean_entity.dart';
  8. import 'package:bbyyy/beans/new_order_bean_entity.dart';
  9. import 'package:bbyyy/beans/nsq_type_bean_entity.dart';
  10. import 'package:bbyyy/msgDB/my_msg_db.dart';
  11. import 'package:bbyyy/my_tools/const.dart';
  12. import 'package:bbyyy/my_tools/event_bus.dart';
  13. import 'package:bbyyy/my_tools/my_cookie.dart';
  14. import 'package:flutter_local_notifications/flutter_local_notifications.dart';
  15. import 'package:bbyyy/beans/system_information_bean_entity.dart';
  16. import '../main.dart';
  17. //const ServerIP = '127.0.0.1';
  18. //const ServerPort = 4150; // 服务器端口号
  19. String Topic;
  20. String Channel;
  21. String ClientId;
  22. String Hostname;
  23. var HandlerDataFunc;
  24. // 1, 消息处理的代码函数,自定义消息处理函数
  25. Future<void> handlerMessage(Message message) async {
  26. // 在这里写你的消息处理逻辑
  27. print('Got a message: ${utf8.decode(message.Body)}');
  28. print(
  29. '时间:${message.Timestamp}, Attempts: ${message.Attempts}, ID: ${utf8.decode(message.ID)}');
  30. try {
  31. NsqTypeBeanEntity entity =
  32. NsqTypeBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  33. print(entity.type);
  34. if (entity.type == notifyTypeNewMsg) {
  35. if (MyCookie().prefs.getBool('聊天新消息提示音')) {
  36. showNotification(appName, entity.type);
  37. }
  38. } else {
  39. showNotification(appName, entity.type);
  40. }
  41. if (entity.type == notifyTypeIncome) {
  42. if (MyCookie().prefs.getBool('收入进账提示音')) {
  43. AudioCache audioCache = AudioCache();
  44. audioCache.play('jifendaozhang.aac');
  45. }
  46. }
  47. if (entity.type == notifyTypeNewOrder) {
  48. NewOrderBeanEntity order =
  49. NewOrderBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  50. EventBus().emit('hasNewOrder', order);
  51. }
  52. else if (entity.type == notifyTypeNewMsg) {
  53. MessageBeanEntity entity =
  54. MessageBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  55. MsgDB msgDB =
  56. MsgDB('table${MyCookie().getUID()}_${entity.content.senderUid}');
  57. if (!msgDB.isTableExits) {
  58. await msgDB.open();
  59. }
  60. await msgDB.addTableData([entity.content]);
  61. EventBus().emit('hasNewMsg', entity);
  62. EventBus().emit('hasNewMsgInMsgPage', entity);
  63. }
  64. else if (entity.type == notifyTypeOrderPaymentIsSuccessful) {
  65. EventBus().emit('alipayPaymentCallback');
  66. }
  67. else if(entity.type == notifyTypeSysMsg){
  68. print('$notifyTypeSysMsg---------------------------\n${utf8.decode(message.Body)}\n---------------------');
  69. SystemInformationBeanEntity entity = SystemInformationBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  70. String systemInformation = MyCookie().prefs.getString('${MyCookie().getUID()}系统消息');
  71. if(systemInformation==null){
  72. MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', entity.content);
  73. }else{
  74. MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', '${entity.content}!@##@!$systemInformation');
  75. }
  76. EventBus().emit('systemInformation',entity);
  77. }
  78. } catch (e) {}
  79. return;
  80. }
  81. // 服务端返回数据类型: 下面的数据不能修噶
  82. const int FrameTypeResponse = 0; // 无需相应
  83. const int FrameTypeError = 1; // 错误返回
  84. const int FrameTypeMessage = 2; // 服务器发来的消息,重点
  85. var byteSpace = utf8.encode(' '); // 空白符
  86. var byteNewLine = utf8.encode('\n'); // 换行符
  87. Socket socket; // 套接字连接
  88. void ConnectServer(handlerFunc,
  89. {String serverIp = '127.0.0.1',
  90. topic,
  91. channel,
  92. clientId,
  93. hostname,
  94. int serverPort = 4150}) {
  95. HandlerDataFunc = handlerFunc;
  96. Topic = topic;
  97. Channel = channel;
  98. ClientId = clientId;
  99. Hostname = hostname;
  100. Socket.connect(serverIp, serverPort).then((Socket sock) {
  101. socket = sock;
  102. sock.add(utf8.encode(' V2'));
  103. if (sock == null) {
  104. print('socket is null');
  105. return;
  106. }
  107. var isOk = connectNsq(sock);
  108. if (isOk == false) {
  109. print('连接Nsq失败');
  110. } else {
  111. print('连接Nsq成功');
  112. }
  113. sock.listen(dataHandler,
  114. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  115. });
  116. }
  117. // 消息对象
  118. const MsgIDLength = 16;
  119. class Message {
  120. var ID = List(MsgIDLength);
  121. List Body; // 消息文本
  122. int Timestamp = 0;
  123. int Attempts = 0;
  124. String NSQDAddress = '';
  125. int autoResponseDisabled;
  126. int responded;
  127. }
  128. Message DecodeMessage(Uint8List b) {
  129. var msg = Message();
  130. if (b.length < 10 + MsgIDLength) {
  131. return null;
  132. }
  133. msg.Timestamp = toInt64(b.sublist(0, 8));
  134. msg.Attempts = toInt16(b.sublist(8, 10));
  135. msg.ID = b.sublist(10, 10 + MsgIDLength);
  136. msg.Body = b.sublist(10 + MsgIDLength, b.length);
  137. return msg;
  138. }
  139. void dataHandler(connData) {
  140. var conn = connData;
  141. var res = ReadUnpackedResponse(conn);
  142. if (res[0] == -1) {
  143. print('解析数据出错: ${utf8.decode(conn)}');
  144. }
  145. var frameType = res[0];
  146. var data = res[1];
  147. if (frameType == FrameTypeResponse && utf8.decode(data) == '_heartbeat_') {
  148. print('心跳了一下❤️');
  149. NOP().writeTo(socket);
  150. } else {
  151. switch (frameType) {
  152. case FrameTypeResponse:
  153. print('this is response: ${utf8.decode(data)}');
  154. break;
  155. case FrameTypeMessage:
  156. var msg = DecodeMessage(data);
  157. if (msg == null) {
  158. print('msg是空的!');
  159. break;
  160. }
  161. HandlerDataFunc(msg);
  162. Finish(msg.ID).writeTo(socket);
  163. break;
  164. case FrameTypeError:
  165. print('FrameTypeError: ${data}');
  166. break;
  167. default:
  168. print('unknown frame type ${frameType}');
  169. break;
  170. }
  171. }
  172. }
  173. void errorHandler(error, StackTrace trace) {
  174. print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=');
  175. print(error);
  176. reconnection();
  177. }
  178. int reconnectionTime = 5;
  179. bool reconnecting = false;
  180. void reconnection() {
  181. if (reconnecting) {
  182. return;
  183. } else {
  184. reconnecting = true;
  185. }
  186. print('-----------reconnection==============${DateTime.now().toString()}');
  187. socket.destroy();
  188. Socket.connect(MyCookie().server, 4150, timeout: Duration(seconds: 5))
  189. .then((Socket sock) {
  190. socket = sock;
  191. sock.add(utf8.encode(' V2'));
  192. if (sock == null) {
  193. print('socket is null');
  194. return;
  195. }
  196. var isOk = connectNsq(sock);
  197. if (isOk == false) {
  198. print('连接Nsq失败');
  199. reconnecting = false;
  200. reconnection();
  201. } else {
  202. print('连接Nsq成功');
  203. reconnecting = false;
  204. }
  205. sock.listen(dataHandler,
  206. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  207. }).catchError((onError) {
  208. print('---------------catchError\n$onError');
  209. reconnecting = false;
  210. reconnection();
  211. });
  212. }
  213. void doneHandler() {
  214. print('-------------doneHandler----------$reconnecting');
  215. reconnection();
  216. }
  217. bool sendMagic(socketConn) {
  218. socketConn.add(utf8.encode(' V2'));
  219. var isOk = connectNsq(socketConn);
  220. if (isOk == false) {
  221. print('连接Nsq失败');
  222. } else {
  223. print('连接Nsq成功');
  224. }
  225. return true;
  226. }
  227. bool connectNsq(Socket socketConn) {
  228. // 1, 连接
  229. var identity = getDefaultIdentity();
  230. var cmd = identityToCmd(identity);
  231. var n = cmd.writeTo(socketConn);
  232. print('写入字数:${n}');
  233. // 2, 订阅
  234. var subCmd = Subscribe(Topic, Channel);
  235. n = subCmd.writeTo(socketConn);
  236. print('订阅收到: n=${n}');
  237. // 3, 发送Nop
  238. n = NOP().writeTo(socketConn);
  239. // 4, 发送ready
  240. sendRead(socketConn, 1);
  241. return true;
  242. }
  243. // 获取默认的实体名
  244. class Identity {
  245. Map toJson() {
  246. var ci = {};
  247. ci['client_id'] = ClientId;
  248. ci['hostname'] = Hostname; //
  249. ci['user_agent'] = 'go-nsq/1.0.8';
  250. //ci['short_id'] = 'xusanhuodeMacBook-Pro' // deprecated
  251. //ci['long_id'] = 'xusanhuodeMacBook-Pro.local' // deprecated
  252. ci['tls_v1'] = false;
  253. ci['deflate'] = false;
  254. ci['deflate_level'] = 6;
  255. ci['snappy'] = false;
  256. ci['feature_negotiation'] = true;
  257. ci['heartbeat_interval'] = 10000; // 单位:毫秒,心跳间隔时间
  258. ci['sample_rate'] = 0;
  259. ci['output_buffer_size'] = 16384;
  260. ci['output_buffer_timeout'] = 250;
  261. ci['max_msg_timeout'] = 90000;
  262. ci['msg_timeout'] = 2000;
  263. return ci;
  264. }
  265. }
  266. Identity getDefaultIdentity() {
  267. var idt = Identity();
  268. return idt;
  269. }
  270. // s实体转commend
  271. Commend identityToCmd(Identity js) {
  272. var body = utf8.encode(jsonEncode(js));
  273. print('解析对象:$body');
  274. var c = Commend();
  275. c.Name = utf8.encode('IDENTIFY');
  276. c.Body = body;
  277. return c;
  278. }
  279. //如果客户端空闲,发送 NOP命令。
  280. //如果 2 个 _heartbeat_ 响应没有被应答, nsqd 将会超时,并且强制关闭客户端连接。
  281. //IDENTIFY 命令可以用来改变/禁用这个行为。
  282. class Commend {
  283. Uint8List Name;
  284. List Params; // 二维数组
  285. Uint8List Body;
  286. String getString() {
  287. if (Params.isNotEmpty) {
  288. return utf8.decode(Name) + ' ' + Params.join(utf8.decode(byteSpace));
  289. }
  290. return utf8.decode(Name);
  291. }
  292. int writeTo(Socket socketWrite) {
  293. var socketWrite = socket;
  294. var total = 0;
  295. var buf = Uint8List(4);
  296. // 写入名字
  297. socketWrite.add(Name);
  298. total += Name.length;
  299. if (Params != null) {
  300. Params.forEach((param) {
  301. socketWrite.add(byteSpace);
  302. total += byteSpace.length;
  303. socketWrite.add(param);
  304. total += param.length;
  305. });
  306. }
  307. // 写入行
  308. socketWrite.add(byteNewLine);
  309. total += byteNewLine.length;
  310. // 写入body
  311. if (Body != null) {
  312. var bufs = buf;
  313. var bLength = Body.length;
  314. bufs[0] = bLength >> 24;
  315. bufs[1] = bLength >> 16;
  316. bufs[2] = bLength >> 8;
  317. bufs[3] = bLength;
  318. // 写入内容长度
  319. socketWrite.add(bufs);
  320. total += bufs.length;
  321. // 写入内容
  322. socketWrite.add(Body);
  323. total += Body.length;
  324. }
  325. return total;
  326. }
  327. }
  328. Commend NOP() {
  329. var c = Commend();
  330. c.Name = utf8.encode('NOP');
  331. return c;
  332. }
  333. // 准备好
  334. Commend getReady(int count) {
  335. var c = Commend();
  336. var params = List(1);
  337. params[0] = utf8.encode(count.toString());
  338. c.Name = utf8.encode('RDY');
  339. c.Params = params;
  340. return c;
  341. }
  342. // 完成
  343. Commend Finish(Uint8List id) {
  344. var params = List(1);
  345. params[0] = id;
  346. var c = Commend();
  347. c.Name = utf8.encode('FIN');
  348. c.Params = params;
  349. return c;
  350. }
  351. // 订阅
  352. Commend Subscribe(String topic, String chanel) {
  353. var c = Commend();
  354. var params = List(2);
  355. params[0] = utf8.encode(topic);
  356. params[1] = utf8.encode(chanel);
  357. c.Name = utf8.encode('SUB');
  358. c.Params = params;
  359. return c;
  360. }
  361. void sendRead(socketWrite, count) {
  362. var cmd = getReady(count);
  363. var n = cmd.writeTo(socketWrite);
  364. print('Ready返回:${n}');
  365. }
  366. Uint8List ReadResponse(Uint8List socketData) {
  367. var sizeList = socketData.sublist(0, 4);
  368. var msgSize = toInt32(sizeList);
  369. if (msgSize < 0) {
  370. print('response msg size is negative: ${msgSize}');
  371. return List(0);
  372. } else {
  373. print('size=${msgSize}');
  374. }
  375. var buf = List(msgSize);
  376. buf = socketData.sublist(4, 4 + msgSize);
  377. return buf;
  378. }
  379. List UnpackResponse(Uint8List response) {
  380. if (response.length < 4) {
  381. return [-1, 'length of response is too small'];
  382. }
  383. return [
  384. toInt32(response.sublist(0, 4)),
  385. response.sublist(4, response.length)
  386. ];
  387. }
  388. List ReadUnpackedResponse(Uint8List socketData) {
  389. var resp = ReadResponse(socketData);
  390. if (resp.isEmpty) {
  391. return [-1, ''];
  392. }
  393. return UnpackResponse(resp);
  394. }
  395. int toInt32(List list, {int index = 0}) {
  396. var byteArray = Uint8List.fromList(list);
  397. var buffer = byteArray.buffer;
  398. var data = ByteData.view(buffer);
  399. var short = data.getInt32(index, Endian.big);
  400. return short;
  401. }
  402. int toInt16(List list, {int index = 0}) {
  403. var byteArray = Uint8List.fromList(list);
  404. var buffer = byteArray.buffer;
  405. var data = ByteData.view(buffer);
  406. var short = data.getInt16(index, Endian.big);
  407. return short;
  408. }
  409. int toInt64(List list, {int index = 0}) {
  410. var byteArray = Uint8List.fromList(list);
  411. var buffer = byteArray.buffer;
  412. var data = ByteData.view(buffer);
  413. var short = data.getInt64(index, Endian.big);
  414. return short;
  415. }
  416. Future<void> showNotification(String title, String body) async {
  417. const AndroidNotificationDetails androidPlatformChannelSpecifics =
  418. AndroidNotificationDetails(
  419. 'your channel id', '梆梆鱼', 'your channel description',
  420. importance: Importance.max,
  421. playSound: true,
  422. priority: Priority.high,
  423. ticker: 'ticker');
  424. const NotificationDetails platformChannelSpecifics =
  425. NotificationDetails(android: androidPlatformChannelSpecifics);
  426. await flutterLocalNotificationsPlugin
  427. .show(0, title, body, platformChannelSpecifics, payload: body);
  428. }