nsq.dart 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. library nsq;
  2. import 'dart:async';
  3. import 'dart:convert';
  4. import 'dart:io';
  5. import 'dart:typed_data';
  6. import 'package:audioplayers/audio_cache.dart';
  7. import 'package:bbyyy/beans/message_bean_entity.dart';
  8. import 'package:bbyyy/beans/new_order_bean_entity.dart';
  9. import 'package:bbyyy/beans/nsq_type_bean_entity.dart';
  10. import 'package:bbyyy/msgDB/my_msg_db.dart';
  11. import 'package:bbyyy/my_tools/const.dart';
  12. import 'package:bbyyy/my_tools/event_bus.dart';
  13. import 'package:bbyyy/my_tools/my_cookie.dart';
  14. import 'package:flutter_local_notifications/flutter_local_notifications.dart';
  15. import '../main.dart';
  16. //const ServerIP = '127.0.0.1';
  17. //const ServerPort = 4150; // 服务器端口号
  18. String Topic;
  19. String Channel;
  20. String ClientId;
  21. String Hostname;
  22. var HandlerDataFunc;
  23. // 1, 消息处理的代码函数,自定义消息处理函数
  24. Future<void> handlerMessage(Message message) async {
  25. // 在这里写你的消息处理逻辑
  26. print('Got a message: ${utf8.decode(message.Body)}');
  27. print(
  28. '时间:${message.Timestamp}, Attempts: ${message.Attempts}, ID: ${utf8.decode(message.ID)}');
  29. try {
  30. NsqTypeBeanEntity entity =
  31. NsqTypeBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  32. print(entity.type);
  33. if (entity.type == notifyTypeNewMsg) {
  34. if (MyCookie().prefs.getBool('聊天新消息提示音')) {
  35. showNotification(appName, entity.type);
  36. }
  37. } else {
  38. showNotification(appName, entity.type);
  39. }
  40. if (entity.type == notifyTypeIncome) {
  41. if (MyCookie().prefs.getBool('收入进账提示音')) {
  42. AudioCache audioCache = AudioCache();
  43. audioCache.play('jifendaozhang.aac');
  44. }
  45. }
  46. if (entity.type == notifyTypeNewOrder) {
  47. NewOrderBeanEntity order =
  48. NewOrderBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  49. EventBus().emit('hasNewOrder', order);
  50. } else if (entity.type == notifyTypeNewMsg) {
  51. MessageBeanEntity entity =
  52. MessageBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
  53. MsgDB msgDB =
  54. MsgDB('table${MyCookie().getUID()}_${entity.content.senderUid}');
  55. if (!msgDB.isTableExits) {
  56. await msgDB.open();
  57. }
  58. await msgDB.addTableData([entity.content]);
  59. EventBus().emit('hasNewMsg', entity);
  60. EventBus().emit('hasNewMsgInMsgPage', entity);
  61. } else if (entity.type == notifyTypeOrderPaymentIsSuccessful) {
  62. EventBus().emit('alipayPaymentCallback');
  63. }
  64. } catch (e) {}
  65. return;
  66. }
  67. // 服务端返回数据类型: 下面的数据不能修噶
  68. const int FrameTypeResponse = 0; // 无需相应
  69. const int FrameTypeError = 1; // 错误返回
  70. const int FrameTypeMessage = 2; // 服务器发来的消息,重点
  71. var byteSpace = utf8.encode(' '); // 空白符
  72. var byteNewLine = utf8.encode('\n'); // 换行符
  73. Socket socket; // 套接字连接
  74. void ConnectServer(handlerFunc,
  75. {String serverIp = '127.0.0.1',
  76. topic,
  77. channel,
  78. clientId,
  79. hostname,
  80. int serverPort = 4150}) {
  81. HandlerDataFunc = handlerFunc;
  82. Topic = topic;
  83. Channel = channel;
  84. ClientId = clientId;
  85. Hostname = hostname;
  86. Socket.connect(serverIp, serverPort).then((Socket sock) {
  87. socket = sock;
  88. sock.add(utf8.encode(' V2'));
  89. if (sock == null) {
  90. print('socket is null');
  91. return;
  92. }
  93. var isOk = connectNsq(sock);
  94. if (isOk == false) {
  95. print('连接Nsq失败');
  96. } else {
  97. print('连接Nsq成功');
  98. }
  99. sock.listen(dataHandler,
  100. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  101. });
  102. }
  103. // 消息对象
  104. const MsgIDLength = 16;
  105. class Message {
  106. var ID = List(MsgIDLength);
  107. List Body; // 消息文本
  108. int Timestamp = 0;
  109. int Attempts = 0;
  110. String NSQDAddress = '';
  111. int autoResponseDisabled;
  112. int responded;
  113. }
  114. Message DecodeMessage(Uint8List b) {
  115. var msg = Message();
  116. if (b.length < 10 + MsgIDLength) {
  117. return null;
  118. }
  119. msg.Timestamp = toInt64(b.sublist(0, 8));
  120. msg.Attempts = toInt16(b.sublist(8, 10));
  121. msg.ID = b.sublist(10, 10 + MsgIDLength);
  122. msg.Body = b.sublist(10 + MsgIDLength, b.length);
  123. return msg;
  124. }
  125. void dataHandler(connData) {
  126. var conn = connData;
  127. var res = ReadUnpackedResponse(conn);
  128. if (res[0] == -1) {
  129. print('解析数据出错: ${utf8.decode(conn)}');
  130. }
  131. var frameType = res[0];
  132. var data = res[1];
  133. if (frameType == FrameTypeResponse && utf8.decode(data) == '_heartbeat_') {
  134. print('心跳了一下❤️');
  135. NOP().writeTo(socket);
  136. } else {
  137. switch (frameType) {
  138. case FrameTypeResponse:
  139. print('this is response: ${utf8.decode(data)}');
  140. break;
  141. case FrameTypeMessage:
  142. var msg = DecodeMessage(data);
  143. if (msg == null) {
  144. print('msg是空的!');
  145. break;
  146. }
  147. HandlerDataFunc(msg);
  148. Finish(msg.ID).writeTo(socket);
  149. break;
  150. case FrameTypeError:
  151. print('FrameTypeError: ${data}');
  152. break;
  153. default:
  154. print('unknown frame type ${frameType}');
  155. break;
  156. }
  157. }
  158. }
  159. void errorHandler(error, StackTrace trace) {
  160. print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=');
  161. print(error);
  162. reconnection();
  163. }
  164. int reconnectionTime = 5;
  165. bool reconnecting = false;
  166. void reconnection() {
  167. if (reconnecting) {
  168. return;
  169. } else {
  170. reconnecting = true;
  171. }
  172. print('-----------reconnection==============${DateTime.now().toString()}');
  173. socket.destroy();
  174. Socket.connect(MyCookie().server, 4150, timeout: Duration(seconds: 5))
  175. .then((Socket sock) {
  176. socket = sock;
  177. sock.add(utf8.encode(' V2'));
  178. if (sock == null) {
  179. print('socket is null');
  180. return;
  181. }
  182. var isOk = connectNsq(sock);
  183. if (isOk == false) {
  184. print('连接Nsq失败');
  185. reconnecting = false;
  186. reconnection();
  187. } else {
  188. print('连接Nsq成功');
  189. reconnecting = false;
  190. }
  191. sock.listen(dataHandler,
  192. onError: errorHandler, onDone: doneHandler, cancelOnError: false);
  193. }).catchError((onError) {
  194. print('---------------catchError\n$onError');
  195. reconnecting = false;
  196. reconnection();
  197. });
  198. }
  199. void doneHandler() {
  200. print('-------------doneHandler----------$reconnecting');
  201. reconnection();
  202. }
  203. bool sendMagic(socketConn) {
  204. socketConn.add(utf8.encode(' V2'));
  205. var isOk = connectNsq(socketConn);
  206. if (isOk == false) {
  207. print('连接Nsq失败');
  208. } else {
  209. print('连接Nsq成功');
  210. }
  211. return true;
  212. }
  213. bool connectNsq(Socket socketConn) {
  214. // 1, 连接
  215. var identity = getDefaultIdentity();
  216. var cmd = identityToCmd(identity);
  217. var n = cmd.writeTo(socketConn);
  218. print('写入字数:${n}');
  219. // 2, 订阅
  220. var subCmd = Subscribe(Topic, Channel);
  221. n = subCmd.writeTo(socketConn);
  222. print('订阅收到: n=${n}');
  223. // 3, 发送Nop
  224. n = NOP().writeTo(socketConn);
  225. // 4, 发送ready
  226. sendRead(socketConn, 1);
  227. return true;
  228. }
  229. // 获取默认的实体名
  230. class Identity {
  231. Map toJson() {
  232. var ci = {};
  233. ci['client_id'] = ClientId;
  234. ci['hostname'] = Hostname; //
  235. ci['user_agent'] = 'go-nsq/1.0.8';
  236. //ci['short_id'] = 'xusanhuodeMacBook-Pro' // deprecated
  237. //ci['long_id'] = 'xusanhuodeMacBook-Pro.local' // deprecated
  238. ci['tls_v1'] = false;
  239. ci['deflate'] = false;
  240. ci['deflate_level'] = 6;
  241. ci['snappy'] = false;
  242. ci['feature_negotiation'] = true;
  243. ci['heartbeat_interval'] = 10000; // 单位:毫秒,心跳间隔时间
  244. ci['sample_rate'] = 0;
  245. ci['output_buffer_size'] = 16384;
  246. ci['output_buffer_timeout'] = 250;
  247. ci['max_msg_timeout'] = 90000;
  248. ci['msg_timeout'] = 2000;
  249. return ci;
  250. }
  251. }
  252. Identity getDefaultIdentity() {
  253. var idt = Identity();
  254. return idt;
  255. }
  256. // s实体转commend
  257. Commend identityToCmd(Identity js) {
  258. var body = utf8.encode(jsonEncode(js));
  259. print('解析对象:$body');
  260. var c = Commend();
  261. c.Name = utf8.encode('IDENTIFY');
  262. c.Body = body;
  263. return c;
  264. }
  265. //如果客户端空闲,发送 NOP命令。
  266. //如果 2 个 _heartbeat_ 响应没有被应答, nsqd 将会超时,并且强制关闭客户端连接。
  267. //IDENTIFY 命令可以用来改变/禁用这个行为。
  268. class Commend {
  269. Uint8List Name;
  270. List Params; // 二维数组
  271. Uint8List Body;
  272. String getString() {
  273. if (Params.isNotEmpty) {
  274. return utf8.decode(Name) + ' ' + Params.join(utf8.decode(byteSpace));
  275. }
  276. return utf8.decode(Name);
  277. }
  278. int writeTo(Socket socketWrite) {
  279. var socketWrite = socket;
  280. var total = 0;
  281. var buf = Uint8List(4);
  282. // 写入名字
  283. socketWrite.add(Name);
  284. total += Name.length;
  285. if (Params != null) {
  286. Params.forEach((param) {
  287. socketWrite.add(byteSpace);
  288. total += byteSpace.length;
  289. socketWrite.add(param);
  290. total += param.length;
  291. });
  292. }
  293. // 写入行
  294. socketWrite.add(byteNewLine);
  295. total += byteNewLine.length;
  296. // 写入body
  297. if (Body != null) {
  298. var bufs = buf;
  299. var bLength = Body.length;
  300. bufs[0] = bLength >> 24;
  301. bufs[1] = bLength >> 16;
  302. bufs[2] = bLength >> 8;
  303. bufs[3] = bLength;
  304. // 写入内容长度
  305. socketWrite.add(bufs);
  306. total += bufs.length;
  307. // 写入内容
  308. socketWrite.add(Body);
  309. total += Body.length;
  310. }
  311. return total;
  312. }
  313. }
  314. Commend NOP() {
  315. var c = Commend();
  316. c.Name = utf8.encode('NOP');
  317. return c;
  318. }
  319. // 准备好
  320. Commend getReady(int count) {
  321. var c = Commend();
  322. var params = List(1);
  323. params[0] = utf8.encode(count.toString());
  324. c.Name = utf8.encode('RDY');
  325. c.Params = params;
  326. return c;
  327. }
  328. // 完成
  329. Commend Finish(Uint8List id) {
  330. var params = List(1);
  331. params[0] = id;
  332. var c = Commend();
  333. c.Name = utf8.encode('FIN');
  334. c.Params = params;
  335. return c;
  336. }
  337. // 订阅
  338. Commend Subscribe(String topic, String chanel) {
  339. var c = Commend();
  340. var params = List(2);
  341. params[0] = utf8.encode(topic);
  342. params[1] = utf8.encode(chanel);
  343. c.Name = utf8.encode('SUB');
  344. c.Params = params;
  345. return c;
  346. }
  347. void sendRead(socketWrite, count) {
  348. var cmd = getReady(count);
  349. var n = cmd.writeTo(socketWrite);
  350. print('Ready返回:${n}');
  351. }
  352. Uint8List ReadResponse(Uint8List socketData) {
  353. var sizeList = socketData.sublist(0, 4);
  354. var msgSize = toInt32(sizeList);
  355. if (msgSize < 0) {
  356. print('response msg size is negative: ${msgSize}');
  357. return List(0);
  358. } else {
  359. print('size=${msgSize}');
  360. }
  361. var buf = List(msgSize);
  362. buf = socketData.sublist(4, 4 + msgSize);
  363. return buf;
  364. }
  365. List UnpackResponse(Uint8List response) {
  366. if (response.length < 4) {
  367. return [-1, 'length of response is too small'];
  368. }
  369. return [
  370. toInt32(response.sublist(0, 4)),
  371. response.sublist(4, response.length)
  372. ];
  373. }
  374. List ReadUnpackedResponse(Uint8List socketData) {
  375. var resp = ReadResponse(socketData);
  376. if (resp.isEmpty) {
  377. return [-1, ''];
  378. }
  379. return UnpackResponse(resp);
  380. }
  381. int toInt32(List list, {int index = 0}) {
  382. var byteArray = Uint8List.fromList(list);
  383. var buffer = byteArray.buffer;
  384. var data = ByteData.view(buffer);
  385. var short = data.getInt32(index, Endian.big);
  386. return short;
  387. }
  388. int toInt16(List list, {int index = 0}) {
  389. var byteArray = Uint8List.fromList(list);
  390. var buffer = byteArray.buffer;
  391. var data = ByteData.view(buffer);
  392. var short = data.getInt16(index, Endian.big);
  393. return short;
  394. }
  395. int toInt64(List list, {int index = 0}) {
  396. var byteArray = Uint8List.fromList(list);
  397. var buffer = byteArray.buffer;
  398. var data = ByteData.view(buffer);
  399. var short = data.getInt64(index, Endian.big);
  400. return short;
  401. }
  402. Future<void> showNotification(String title, String body) async {
  403. const AndroidNotificationDetails androidPlatformChannelSpecifics =
  404. AndroidNotificationDetails(
  405. 'your channel id', '梆梆鱼', 'your channel description',
  406. importance: Importance.max,
  407. playSound: true,
  408. priority: Priority.high,
  409. ticker: 'ticker');
  410. const NotificationDetails platformChannelSpecifics =
  411. NotificationDetails(android: androidPlatformChannelSpecifics);
  412. await flutterLocalNotificationsPlugin
  413. .show(0, title, body, platformChannelSpecifics, payload: body);
  414. }