library nsq; import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; import 'package:audioplayers/audio_cache.dart'; import 'package:bbyyy/beans/message_bean_entity.dart'; import 'package:bbyyy/beans/new_order_bean_entity.dart'; import 'package:bbyyy/beans/nsq_type_bean_entity.dart'; import 'package:bbyyy/msgDB/my_msg_db.dart'; import 'package:bbyyy/my_tools/const.dart'; import 'package:bbyyy/my_tools/event_bus.dart'; import 'package:bbyyy/my_tools/my_cookie.dart'; import 'package:flutter_local_notifications/flutter_local_notifications.dart'; import 'package:bbyyy/beans/system_information_bean_entity.dart'; import '../main.dart'; //const ServerIP = '127.0.0.1'; //const ServerPort = 4150; // 服务器端口号 String Topic; String Channel; String ClientId; String Hostname; var HandlerDataFunc; // 1, 消息处理的代码函数,自定义消息处理函数 Future handlerMessage(Message message) async { // 在这里写你的消息处理逻辑 print('Got a message: ${utf8.decode(message.Body)}'); print( '时间:${message.Timestamp}, Attempts: ${message.Attempts}, ID: ${utf8.decode(message.ID)}'); try { NsqTypeBeanEntity entity = NsqTypeBeanEntity().fromJson(json.decode(utf8.decode(message.Body))); print(entity.type); if (entity.type == notifyTypeNewMsg) { if (MyCookie().prefs.getBool('聊天新消息提示音')) { showNotification(appName, entity.type); } } else { showNotification(appName, entity.type); } if (entity.type == notifyTypeIncome) { if (MyCookie().prefs.getBool('收入进账提示音')) { AudioCache audioCache = AudioCache(); audioCache.play('jifendaozhang.aac'); } } if (entity.type == notifyTypeNewOrder) { NewOrderBeanEntity order = NewOrderBeanEntity().fromJson(json.decode(utf8.decode(message.Body))); EventBus().emit('hasNewOrder', order); } else if (entity.type == notifyTypeNewMsg) { MessageBeanEntity entity = MessageBeanEntity().fromJson(json.decode(utf8.decode(message.Body))); MsgDB msgDB = MsgDB('table${MyCookie().getUID()}_${entity.content.senderUid}'); if (!msgDB.isTableExits) { await msgDB.open(); } await msgDB.addTableData([entity.content]); EventBus().emit('hasNewMsg', entity); EventBus().emit('hasNewMsgInMsgPage', entity); } else if (entity.type == notifyTypeOrderPaymentIsSuccessful) { EventBus().emit('alipayPaymentCallback'); } else if(entity.type == notifyTypeSysMsg){ print('$notifyTypeSysMsg---------------------------\n${utf8.decode(message.Body)}\n---------------------'); SystemInformationBeanEntity entity = SystemInformationBeanEntity().fromJson(json.decode(utf8.decode(message.Body))); String systemInformation = MyCookie().prefs.getString('${MyCookie().getUID()}系统消息'); if(systemInformation==null){ List sI = []; sI.add(entity); MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', json.encode(sI)); }else{ List sI = []; List sL = json.decode(systemInformation); sL.forEach((element) { sI.add(SystemInformationBeanEntity().fromJson(json.decode(json.encode(element)))); }); sI.insert(0, entity); MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', json.encode(sI)); } EventBus().emit('systemInformation',entity); } } catch (e) {} return; } // 服务端返回数据类型: 下面的数据不能修噶 const int FrameTypeResponse = 0; // 无需相应 const int FrameTypeError = 1; // 错误返回 const int FrameTypeMessage = 2; // 服务器发来的消息,重点 var byteSpace = utf8.encode(' '); // 空白符 var byteNewLine = utf8.encode('\n'); // 换行符 Socket socket; // 套接字连接 void ConnectServer(handlerFunc, {String serverIp = '127.0.0.1', topic, channel, clientId, hostname, int serverPort = 4150}) { HandlerDataFunc = handlerFunc; Topic = topic; Channel = channel; ClientId = clientId; Hostname = hostname; Socket.connect(serverIp, serverPort).then((Socket sock) { socket = sock; sock.add(utf8.encode(' V2')); if (sock == null) { print('socket is null'); return; } var isOk = connectNsq(sock); if (isOk == false) { print('连接Nsq失败'); } else { print('连接Nsq成功'); } sock.listen(dataHandler, onError: errorHandler, onDone: doneHandler, cancelOnError: false); }); } // 消息对象 const MsgIDLength = 16; class Message { var ID = List(MsgIDLength); List Body; // 消息文本 int Timestamp = 0; int Attempts = 0; String NSQDAddress = ''; int autoResponseDisabled; int responded; } Message DecodeMessage(Uint8List b) { var msg = Message(); if (b.length < 10 + MsgIDLength) { return null; } msg.Timestamp = toInt64(b.sublist(0, 8)); msg.Attempts = toInt16(b.sublist(8, 10)); msg.ID = b.sublist(10, 10 + MsgIDLength); msg.Body = b.sublist(10 + MsgIDLength, b.length); return msg; } void dataHandler(connData) { var conn = connData; var res = ReadUnpackedResponse(conn); if (res[0] == -1) { print('解析数据出错: ${utf8.decode(conn)}'); } var frameType = res[0]; var data = res[1]; if (frameType == FrameTypeResponse && utf8.decode(data) == '_heartbeat_') { print('心跳了一下❤️'); NOP().writeTo(socket); } else { switch (frameType) { case FrameTypeResponse: print('this is response: ${utf8.decode(data)}'); break; case FrameTypeMessage: var msg = DecodeMessage(data); if (msg == null) { print('msg是空的!'); break; } HandlerDataFunc(msg); Finish(msg.ID).writeTo(socket); break; case FrameTypeError: print('FrameTypeError: ${data}'); break; default: print('unknown frame type ${frameType}'); break; } } } void errorHandler(error, StackTrace trace) { print('-=-=-=-=-=-=-=-=-=-=-=-=-=-='); print(error); reconnection(); } int reconnectionTime = 5; bool reconnecting = false; void reconnection() { if (reconnecting) { return; } else { reconnecting = true; } print('-----------reconnection==============${DateTime.now().toString()}'); socket.destroy(); Socket.connect(MyCookie().server, 4150, timeout: Duration(seconds: 5)) .then((Socket sock) { socket = sock; sock.add(utf8.encode(' V2')); if (sock == null) { print('socket is null'); return; } var isOk = connectNsq(sock); if (isOk == false) { print('连接Nsq失败'); reconnecting = false; reconnection(); } else { print('连接Nsq成功'); reconnecting = false; } sock.listen(dataHandler, onError: errorHandler, onDone: doneHandler, cancelOnError: false); }).catchError((onError) { print('---------------catchError\n$onError'); reconnecting = false; reconnection(); }); } void doneHandler() { print('-------------doneHandler----------$reconnecting'); reconnection(); } bool sendMagic(socketConn) { socketConn.add(utf8.encode(' V2')); var isOk = connectNsq(socketConn); if (isOk == false) { print('连接Nsq失败'); } else { print('连接Nsq成功'); } return true; } bool connectNsq(Socket socketConn) { // 1, 连接 var identity = getDefaultIdentity(); var cmd = identityToCmd(identity); var n = cmd.writeTo(socketConn); print('写入字数:${n}'); // 2, 订阅 var subCmd = Subscribe(Topic, Channel); n = subCmd.writeTo(socketConn); print('订阅收到: n=${n}'); // 3, 发送Nop n = NOP().writeTo(socketConn); // 4, 发送ready sendRead(socketConn, 1); return true; } // 获取默认的实体名 class Identity { Map toJson() { var ci = {}; ci['client_id'] = ClientId; ci['hostname'] = Hostname; // ci['user_agent'] = 'go-nsq/1.0.8'; //ci['short_id'] = 'xusanhuodeMacBook-Pro' // deprecated //ci['long_id'] = 'xusanhuodeMacBook-Pro.local' // deprecated ci['tls_v1'] = false; ci['deflate'] = false; ci['deflate_level'] = 6; ci['snappy'] = false; ci['feature_negotiation'] = true; ci['heartbeat_interval'] = 10000; // 单位:毫秒,心跳间隔时间 ci['sample_rate'] = 0; ci['output_buffer_size'] = 16384; ci['output_buffer_timeout'] = 250; ci['max_msg_timeout'] = 90000; ci['msg_timeout'] = 2000; return ci; } } Identity getDefaultIdentity() { var idt = Identity(); return idt; } // s实体转commend Commend identityToCmd(Identity js) { var body = utf8.encode(jsonEncode(js)); print('解析对象:$body'); var c = Commend(); c.Name = utf8.encode('IDENTIFY'); c.Body = body; return c; } //如果客户端空闲,发送 NOP命令。 //如果 2 个 _heartbeat_ 响应没有被应答, nsqd 将会超时,并且强制关闭客户端连接。 //IDENTIFY 命令可以用来改变/禁用这个行为。 class Commend { Uint8List Name; List Params; // 二维数组 Uint8List Body; String getString() { if (Params.isNotEmpty) { return utf8.decode(Name) + ' ' + Params.join(utf8.decode(byteSpace)); } return utf8.decode(Name); } int writeTo(Socket socketWrite) { var socketWrite = socket; var total = 0; var buf = Uint8List(4); // 写入名字 socketWrite.add(Name); total += Name.length; if (Params != null) { Params.forEach((param) { socketWrite.add(byteSpace); total += byteSpace.length; socketWrite.add(param); total += param.length; }); } // 写入行 socketWrite.add(byteNewLine); total += byteNewLine.length; // 写入body if (Body != null) { var bufs = buf; var bLength = Body.length; bufs[0] = bLength >> 24; bufs[1] = bLength >> 16; bufs[2] = bLength >> 8; bufs[3] = bLength; // 写入内容长度 socketWrite.add(bufs); total += bufs.length; // 写入内容 socketWrite.add(Body); total += Body.length; } return total; } } Commend NOP() { var c = Commend(); c.Name = utf8.encode('NOP'); return c; } // 准备好 Commend getReady(int count) { var c = Commend(); var params = List(1); params[0] = utf8.encode(count.toString()); c.Name = utf8.encode('RDY'); c.Params = params; return c; } // 完成 Commend Finish(Uint8List id) { var params = List(1); params[0] = id; var c = Commend(); c.Name = utf8.encode('FIN'); c.Params = params; return c; } // 订阅 Commend Subscribe(String topic, String chanel) { var c = Commend(); var params = List(2); params[0] = utf8.encode(topic); params[1] = utf8.encode(chanel); c.Name = utf8.encode('SUB'); c.Params = params; return c; } void sendRead(socketWrite, count) { var cmd = getReady(count); var n = cmd.writeTo(socketWrite); print('Ready返回:${n}'); } Uint8List ReadResponse(Uint8List socketData) { var sizeList = socketData.sublist(0, 4); var msgSize = toInt32(sizeList); if (msgSize < 0) { print('response msg size is negative: ${msgSize}'); return List(0); } else { print('size=${msgSize}'); } var buf = List(msgSize); buf = socketData.sublist(4, 4 + msgSize); return buf; } List UnpackResponse(Uint8List response) { if (response.length < 4) { return [-1, 'length of response is too small']; } return [ toInt32(response.sublist(0, 4)), response.sublist(4, response.length) ]; } List ReadUnpackedResponse(Uint8List socketData) { var resp = ReadResponse(socketData); if (resp.isEmpty) { return [-1, '']; } return UnpackResponse(resp); } int toInt32(List list, {int index = 0}) { var byteArray = Uint8List.fromList(list); var buffer = byteArray.buffer; var data = ByteData.view(buffer); var short = data.getInt32(index, Endian.big); return short; } int toInt16(List list, {int index = 0}) { var byteArray = Uint8List.fromList(list); var buffer = byteArray.buffer; var data = ByteData.view(buffer); var short = data.getInt16(index, Endian.big); return short; } int toInt64(List list, {int index = 0}) { var byteArray = Uint8List.fromList(list); var buffer = byteArray.buffer; var data = ByteData.view(buffer); var short = data.getInt64(index, Endian.big); return short; } Future showNotification(String title, String body) async { const AndroidNotificationDetails androidPlatformChannelSpecifics = AndroidNotificationDetails( 'your channel id', '梆梆鱼', 'your channel description', importance: Importance.max, playSound: true, priority: Priority.high, ticker: 'ticker'); const NotificationDetails platformChannelSpecifics = NotificationDetails(android: androidPlatformChannelSpecifics); await flutterLocalNotificationsPlugin .show(0, title, body, platformChannelSpecifics, payload: body); }