||
- library nsq;
- import 'dart:async';
- import 'dart:convert';
- import 'dart:io';
- import 'dart:typed_data';
- import 'package:audioplayers/audio_cache.dart';
- import 'package:bbyyy/beans/message_bean_entity.dart';
- import 'package:bbyyy/beans/new_order_bean_entity.dart';
- import 'package:bbyyy/beans/nsq_type_bean_entity.dart';
- import 'package:bbyyy/msgDB/my_msg_db.dart';
- import 'package:bbyyy/my_tools/const.dart';
- import 'package:bbyyy/my_tools/event_bus.dart';
- import 'package:bbyyy/my_tools/my_cookie.dart';
- import 'package:flutter_local_notifications/flutter_local_notifications.dart';
- import 'package:bbyyy/beans/system_information_bean_entity.dart';
- import '../main.dart';
- //const ServerIP = '127.0.0.1';
- //const ServerPort = 4150; // 服务器端口号
- String Topic;
- String Channel;
- String ClientId;
- String Hostname;
- var HandlerDataFunc;
- // 1, 消息处理的代码函数,自定义消息处理函数
- Future<void> handlerMessage(Message message) async {
- // 在这里写你的消息处理逻辑
- print('Got a message: ${utf8.decode(message.Body)}');
- print(
- '时间:${message.Timestamp}, Attempts: ${message.Attempts}, ID: ${utf8.decode(message.ID)}');
- try {
- NsqTypeBeanEntity entity =
- NsqTypeBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
- print(entity.type);
- if (entity.type == notifyTypeNewMsg) {
- if (MyCookie().prefs.getBool('聊天新消息提示音')) {
- showNotification(appName, entity.type);
- }
- } else {
- showNotification(appName, entity.type);
- }
- if (entity.type == notifyTypeIncome) {
- if (MyCookie().prefs.getBool('收入进账提示音')) {
- AudioCache audioCache = AudioCache();
- audioCache.play('jifendaozhang.aac');
- }
- }
- if (entity.type == notifyTypeNewOrder) {
- NewOrderBeanEntity order =
- NewOrderBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
- EventBus().emit('hasNewOrder', order);
- }
- else if (entity.type == notifyTypeNewMsg) {
- MessageBeanEntity entity =
- MessageBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
- MsgDB msgDB =
- MsgDB('table${MyCookie().getUID()}_${entity.content.senderUid}');
- if (!msgDB.isTableExits) {
- await msgDB.open();
- }
- await msgDB.addTableData([entity.content]);
- EventBus().emit('hasNewMsg', entity);
- EventBus().emit('hasNewMsgInMsgPage', entity);
- }
- else if (entity.type == notifyTypeOrderPaymentIsSuccessful) {
- EventBus().emit('alipayPaymentCallback');
- }
- else if(entity.type == notifyTypeSysMsg){
- print('$notifyTypeSysMsg---------------------------\n${utf8.decode(message.Body)}\n---------------------');
- SystemInformationBeanEntity entity = SystemInformationBeanEntity().fromJson(json.decode(utf8.decode(message.Body)));
- String systemInformation = MyCookie().prefs.getString('${MyCookie().getUID()}系统消息');
- if(systemInformation==null){
- MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', entity.content);
- }else{
- MyCookie().prefs.setString('${MyCookie().getUID()}系统消息', '${entity.content}!@##@!$systemInformation');
- }
- EventBus().emit('systemInformation',entity);
- }
- } catch (e) {}
- return;
- }
- // 服务端返回数据类型: 下面的数据不能修噶
- const int FrameTypeResponse = 0; // 无需相应
- const int FrameTypeError = 1; // 错误返回
- const int FrameTypeMessage = 2; // 服务器发来的消息,重点
- var byteSpace = utf8.encode(' '); // 空白符
- var byteNewLine = utf8.encode('\n'); // 换行符
- Socket socket; // 套接字连接
- void ConnectServer(handlerFunc,
- {String serverIp = '127.0.0.1',
- topic,
- channel,
- clientId,
- hostname,
- int serverPort = 4150}) {
- HandlerDataFunc = handlerFunc;
- Topic = topic;
- Channel = channel;
- ClientId = clientId;
- Hostname = hostname;
- Socket.connect(serverIp, serverPort).then((Socket sock) {
- socket = sock;
- sock.add(utf8.encode(' V2'));
- if (sock == null) {
- print('socket is null');
- return;
- }
- var isOk = connectNsq(sock);
- if (isOk == false) {
- print('连接Nsq失败');
- } else {
- print('连接Nsq成功');
- }
- sock.listen(dataHandler,
- onError: errorHandler, onDone: doneHandler, cancelOnError: false);
- });
- }
- // 消息对象
- const MsgIDLength = 16;
- class Message {
- var ID = List(MsgIDLength);
- List Body; // 消息文本
- int Timestamp = 0;
- int Attempts = 0;
- String NSQDAddress = '';
- int autoResponseDisabled;
- int responded;
- }
- Message DecodeMessage(Uint8List b) {
- var msg = Message();
- if (b.length < 10 + MsgIDLength) {
- return null;
- }
- msg.Timestamp = toInt64(b.sublist(0, 8));
- msg.Attempts = toInt16(b.sublist(8, 10));
- msg.ID = b.sublist(10, 10 + MsgIDLength);
- msg.Body = b.sublist(10 + MsgIDLength, b.length);
- return msg;
- }
- void dataHandler(connData) {
- var conn = connData;
- var res = ReadUnpackedResponse(conn);
- if (res[0] == -1) {
- print('解析数据出错: ${utf8.decode(conn)}');
- }
- var frameType = res[0];
- var data = res[1];
- if (frameType == FrameTypeResponse && utf8.decode(data) == '_heartbeat_') {
- print('心跳了一下❤️');
- NOP().writeTo(socket);
- } else {
- switch (frameType) {
- case FrameTypeResponse:
- print('this is response: ${utf8.decode(data)}');
- break;
- case FrameTypeMessage:
- var msg = DecodeMessage(data);
- if (msg == null) {
- print('msg是空的!');
- break;
- }
- HandlerDataFunc(msg);
- Finish(msg.ID).writeTo(socket);
- break;
- case FrameTypeError:
- print('FrameTypeError: ${data}');
- break;
- default:
- print('unknown frame type ${frameType}');
- break;
- }
- }
- }
- void errorHandler(error, StackTrace trace) {
- print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=');
- print(error);
- reconnection();
- }
- int reconnectionTime = 5;
- bool reconnecting = false;
- void reconnection() {
- if (reconnecting) {
- return;
- } else {
- reconnecting = true;
- }
- print('-----------reconnection==============${DateTime.now().toString()}');
- socket.destroy();
- Socket.connect(MyCookie().server, 4150, timeout: Duration(seconds: 5))
- .then((Socket sock) {
- socket = sock;
- sock.add(utf8.encode(' V2'));
- if (sock == null) {
- print('socket is null');
- return;
- }
- var isOk = connectNsq(sock);
- if (isOk == false) {
- print('连接Nsq失败');
- reconnecting = false;
- reconnection();
- } else {
- print('连接Nsq成功');
- reconnecting = false;
- }
- sock.listen(dataHandler,
- onError: errorHandler, onDone: doneHandler, cancelOnError: false);
- }).catchError((onError) {
- print('---------------catchError\n$onError');
- reconnecting = false;
- reconnection();
- });
- }
- void doneHandler() {
- print('-------------doneHandler----------$reconnecting');
- reconnection();
- }
- bool sendMagic(socketConn) {
- socketConn.add(utf8.encode(' V2'));
- var isOk = connectNsq(socketConn);
- if (isOk == false) {
- print('连接Nsq失败');
- } else {
- print('连接Nsq成功');
- }
- return true;
- }
- bool connectNsq(Socket socketConn) {
- // 1, 连接
- var identity = getDefaultIdentity();
- var cmd = identityToCmd(identity);
- var n = cmd.writeTo(socketConn);
- print('写入字数:${n}');
- // 2, 订阅
- var subCmd = Subscribe(Topic, Channel);
- n = subCmd.writeTo(socketConn);
- print('订阅收到: n=${n}');
- // 3, 发送Nop
- n = NOP().writeTo(socketConn);
- // 4, 发送ready
- sendRead(socketConn, 1);
- return true;
- }
- // 获取默认的实体名
- class Identity {
- Map toJson() {
- var ci = {};
- ci['client_id'] = ClientId;
- ci['hostname'] = Hostname; //
- ci['user_agent'] = 'go-nsq/1.0.8';
- //ci['short_id'] = 'xusanhuodeMacBook-Pro' // deprecated
- //ci['long_id'] = 'xusanhuodeMacBook-Pro.local' // deprecated
- ci['tls_v1'] = false;
- ci['deflate'] = false;
- ci['deflate_level'] = 6;
- ci['snappy'] = false;
- ci['feature_negotiation'] = true;
- ci['heartbeat_interval'] = 10000; // 单位:毫秒,心跳间隔时间
- ci['sample_rate'] = 0;
- ci['output_buffer_size'] = 16384;
- ci['output_buffer_timeout'] = 250;
- ci['max_msg_timeout'] = 90000;
- ci['msg_timeout'] = 2000;
- return ci;
- }
- }
- Identity getDefaultIdentity() {
- var idt = Identity();
- return idt;
- }
- // s实体转commend
- Commend identityToCmd(Identity js) {
- var body = utf8.encode(jsonEncode(js));
- print('解析对象:$body');
- var c = Commend();
- c.Name = utf8.encode('IDENTIFY');
- c.Body = body;
- return c;
- }
- //如果客户端空闲,发送 NOP命令。
- //如果 2 个 _heartbeat_ 响应没有被应答, nsqd 将会超时,并且强制关闭客户端连接。
- //IDENTIFY 命令可以用来改变/禁用这个行为。
- class Commend {
- Uint8List Name;
- List Params; // 二维数组
- Uint8List Body;
- String getString() {
- if (Params.isNotEmpty) {
- return utf8.decode(Name) + ' ' + Params.join(utf8.decode(byteSpace));
- }
- return utf8.decode(Name);
- }
- int writeTo(Socket socketWrite) {
- var socketWrite = socket;
- var total = 0;
- var buf = Uint8List(4);
- // 写入名字
- socketWrite.add(Name);
- total += Name.length;
- if (Params != null) {
- Params.forEach((param) {
- socketWrite.add(byteSpace);
- total += byteSpace.length;
- socketWrite.add(param);
- total += param.length;
- });
- }
- // 写入行
- socketWrite.add(byteNewLine);
- total += byteNewLine.length;
- // 写入body
- if (Body != null) {
- var bufs = buf;
- var bLength = Body.length;
- bufs[0] = bLength >> 24;
- bufs[1] = bLength >> 16;
- bufs[2] = bLength >> 8;
- bufs[3] = bLength;
- // 写入内容长度
- socketWrite.add(bufs);
- total += bufs.length;
- // 写入内容
- socketWrite.add(Body);
- total += Body.length;
- }
- return total;
- }
- }
- Commend NOP() {
- var c = Commend();
- c.Name = utf8.encode('NOP');
- return c;
- }
- // 准备好
- Commend getReady(int count) {
- var c = Commend();
- var params = List(1);
- params[0] = utf8.encode(count.toString());
- c.Name = utf8.encode('RDY');
- c.Params = params;
- return c;
- }
- // 完成
- Commend Finish(Uint8List id) {
- var params = List(1);
- params[0] = id;
- var c = Commend();
- c.Name = utf8.encode('FIN');
- c.Params = params;
- return c;
- }
- // 订阅
- Commend Subscribe(String topic, String chanel) {
- var c = Commend();
- var params = List(2);
- params[0] = utf8.encode(topic);
- params[1] = utf8.encode(chanel);
- c.Name = utf8.encode('SUB');
- c.Params = params;
- return c;
- }
- void sendRead(socketWrite, count) {
- var cmd = getReady(count);
- var n = cmd.writeTo(socketWrite);
- print('Ready返回:${n}');
- }
- Uint8List ReadResponse(Uint8List socketData) {
- var sizeList = socketData.sublist(0, 4);
- var msgSize = toInt32(sizeList);
- if (msgSize < 0) {
- print('response msg size is negative: ${msgSize}');
- return List(0);
- } else {
- print('size=${msgSize}');
- }
- var buf = List(msgSize);
- buf = socketData.sublist(4, 4 + msgSize);
- return buf;
- }
- List UnpackResponse(Uint8List response) {
- if (response.length < 4) {
- return [-1, 'length of response is too small'];
- }
- return [
- toInt32(response.sublist(0, 4)),
- response.sublist(4, response.length)
- ];
- }
- List ReadUnpackedResponse(Uint8List socketData) {
- var resp = ReadResponse(socketData);
- if (resp.isEmpty) {
- return [-1, ''];
- }
- return UnpackResponse(resp);
- }
- int toInt32(List list, {int index = 0}) {
- var byteArray = Uint8List.fromList(list);
- var buffer = byteArray.buffer;
- var data = ByteData.view(buffer);
- var short = data.getInt32(index, Endian.big);
- return short;
- }
- int toInt16(List list, {int index = 0}) {
- var byteArray = Uint8List.fromList(list);
- var buffer = byteArray.buffer;
- var data = ByteData.view(buffer);
- var short = data.getInt16(index, Endian.big);
- return short;
- }
- int toInt64(List list, {int index = 0}) {
- var byteArray = Uint8List.fromList(list);
- var buffer = byteArray.buffer;
- var data = ByteData.view(buffer);
- var short = data.getInt64(index, Endian.big);
- return short;
- }
- Future<void> showNotification(String title, String body) async {
- const AndroidNotificationDetails androidPlatformChannelSpecifics =
- AndroidNotificationDetails(
- 'your channel id', '梆梆鱼', 'your channel description',
- importance: Importance.max,
- playSound: true,
- priority: Priority.high,
- ticker: 'ticker');
- const NotificationDetails platformChannelSpecifics =
- NotificationDetails(android: androidPlatformChannelSpecifics);
- await flutterLocalNotificationsPlugin
- .show(0, title, body, platformChannelSpecifics, payload: body);
- }
|