DELPHI盒子
!实时搜索: 盒子论坛 | 注册用户 | 修改信息 | 退出
检举帖 | 全文检索 | 关闭广告 | 捐赠
技术论坛
 用户名
 密  码
自动登陆(30天有效)
忘了密码
≡技术区≡
DELPHI技术
lazarus/fpc/Free Pascal
移动应用开发
Web应用开发
数据库专区
报表专区
网络通讯
开源项目
论坛精华贴
≡发布区≡
发布代码
发布控件
文档资料
经典工具
≡事务区≡
网站意见
盒子之家
招聘应聘
信息交换
论坛信息
最新加入: monica9612
今日帖子: 0
在线用户: 8
导航: 论坛 -> DELPHI技术 斑竹:liumazi,sephil  
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/8 15:18:03
标题:
36分!求CSDN下载积分大富翁帮忙下载:基于Delphi的MQTT协议实现(使用INDY无三方控件) 浏览:3486
加入我的收藏
楼主: 基于Delphi的MQTT协议实现(使用INDY无三方控件)      
基于Delphi的MQTT协议实现(使用INDY无三方控件)使用方法参考:
 http://blog.tdiot.cc/?p=10&preview=true

https://download.csdn.net/download/oyaowang123/9621489

感谢!!
----------------------------------------------
作者:
男 wang_80919 (Flying Wang) ★☆☆☆☆ -
普通会员
2019/8/8 15:20:12
1楼: 一堆人说 INDY 不好,你干嘛下载?
----------------------------------------------
(C)(P)Flying Wang
作者:
男 earthsbest (全能中间件) ▲▲▲▲△ -
普通会员
2019/8/8 15:39:49
2楼: 好像就是这个 https://github.com/pjde/delphi-mqtt
----------------------------------------------
Delphi4Linux Delphi三层/FireDAC 技术群:734515869 http://www.cnblogs.com/rtcmw
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/8 17:37:28
3楼: 2楼 跟开源的那个不是一样的
----------------------------------------------
作者:
男 wang_80919 (Flying Wang) ★☆☆☆☆ -
普通会员
2019/8/8 17:49:05
4楼: 我的意思是,由于有很多人不喜欢 INDY,所以也就没几个人乐意下载。
----------------------------------------------
(C)(P)Flying Wang
作者:
男 54993699 (54993699) ★☆☆☆☆ -
普通会员
2019/8/9 8:38:34
5楼: 下下来了,不知道怎么给你, 就一个PAS,我直接粘了
unit utdMQTT;

interface

uses
  System.SysUtils, System.Classes,IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient,IdHashMessageDigest,IdGlobal,IdHash,
  System.StrUtils;

{
  使用请保留此处信息。
  基于indy的MQTT协议实现
  部分代码参考:http://jamie.op-i.net/blog/code/mqtt-client-library-for-delphi/
  改成可跨平台的版本,去掉三方控件,简化代码量。
  作者博客:http://blog.tdiot.cc/
  如有疑问可到博客留言
}
type
  TMQTTMessageType = (
          Reserved0,  //0  Reserved
          CONNECT, //  1  Client request to connect to Broker
          CONNACK, //  2  Connect Acknowledgment
          PUBLISH, //  3  Publish message
          PUBACK, //  4  Publish Acknowledgment
          PUBREC, //  5  Publish Received (assured delivery part 1)
          PUBREL, //  6  Publish Release (assured delivery part 2)
          PUBCOMP, //  7  Publish Complete (assured delivery part 3)
          SUBSCRIBE, //  8  Client Subscribe request
          SUBACK, //  9  Subscribe Acknowledgment
          UNSUBSCRIBE, // 10  Client Unsubscribe request
          UNSUBACK, // 11  Unsubscribe Acknowledgment
          PINGREQ, //  12  PING Request
          PINGRESP, //  13  PING Response
          DISCONNECT, // 14  Client is Disconnecting
          Reserved15 //  15
        );
  TRemainingLength = Array of Byte;   //剩余长度字段,可变,1到4字节
  TUTF8Text = Array of Byte;
type
  TMQTTMessage = Record
    FixedHeader: Byte;
    RL: TBytes;
    Data: TBytes;
  End;
{事件}
TConnAckEvent = procedure (Sender: TObject; sErrcode:string) of object;
TPublishEvent = procedure (Sender: TObject; topic, payload: string) of object;
TPingRespEvent = procedure (Sender: TObject) of object;
TSubAckEvent = procedure (Sender: TObject; MessageID: integer; GrantedQoS: integer) of object;
TUnSubAckEvent = procedure (Sender: TObject; MessageID: integer) of object;
type
  TClientHandleThread=class(TThread) //创建线程读取返回数据
  private
    { Private declarations }
    servercmd:integer;
    serverMsg:string;
    FIdTCPClient: TIdTCPClient;
    FCurrentData: TMQTTMessage;
    FConnAckEvent: TConnAckEvent;//连接返回事件
    FPublishEvent: TPublishEvent;//收到发布信息事件
    FPingRespEvent:TPingRespEvent;//收到心跳应答包
    FSubAckEvent:TSubAckEvent;//收到订阅应答包

    function BytesToStrLength(LengthBytes: TBytes): integer;
    function RemainingLengthToInt(RLBytes: TBytes): Integer;
    procedure AppendBytes(var DestArray: TBytes;const NewBytes: TBytes);
    procedure HandleInput;
  protected
    procedure Execute; override;
  public
    constructor Create(IdTCPClient: TIdTCPClient); overload;
    property OnConnAck : TConnAckEvent read FConnAckEvent write FConnAckEvent;
    property OnPublish : TPublishEvent read FPublishEvent write FPublishEvent;
    property OnPingRespEvent : TPingRespEvent read FPingRespEvent write FPingRespEvent;
    property OnSubAckEvent : TSubAckEvent read FSubAckEvent write FSubAckEvent;
  end;


type
  TTdMqtt = class(TComponent)
  private
    FActive: Boolean;
    FClientID: string;
    FUser,FPwd:string;
    FHost: string;
    FPort: Integer;
    FIdTCPClient: TIdTCPClient;
    FReadThread: TClientHandleThread;
    FConnAckEvent: TConnAckEvent;//连接返回事件
    FPublishEvent: TPublishEvent;//收到发布信息事件
    FPingRespEvent:TPingRespEvent;//收到心跳应答包
    FSubAckEvent:TSubAckEvent;//收到订阅应答包
    FMessageID: integer;
    function MD5(source:string):string;
    function GetMessageID: TBytes;
  protected
    function GetIsDesignTime: Boolean;
    function GetIsLoading: Boolean;
    property IsLoading: Boolean read GetIsLoading;
    property IsDesignTime: Boolean read GetIsDesignTime;
    procedure SetActive(AValue: Boolean);
    function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte; //固定头第一个字节
    procedure AppendArray(var Dest: TUTF8Text; Source: Array of Byte);
    function StrToBytes(str: String; perpendLength: boolean): TUTF8Text;
    function RemainingLength(x: Integer): TRemainingLength;     //固定头第二个字节,动态长度1-4
    function BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength;  VariableHead: TBytes; Payload: Array of Byte): TBytes; //构建最终的发送数据包
    procedure CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer);

  public
    { Public declarations }
    destructor Destroy; override;
    constructor Create(AOwner: TComponent); override;
  published
    property ClientID:string read FClientID write FClientID;
    property Host:string read FHost write FHost;
    property Port:Integer read FPort write FPort;
    property Active: Boolean read FActive write SetActive default False;
    property User:string read FUser write FUser;
    property Pwd:string read FPwd write FPwd;

    procedure MQTTConnect;
    procedure MQTTPing;
    procedure MQTTPublish(sTopic,sPayload:string);
    procedure MQTTSubscribe(sTopic:string);
    property OnConnAck : TConnAckEvent read FConnAckEvent write FConnAckEvent;
    property OnPublish : TPublishEvent read FPublishEvent write FPublishEvent;
    property OnPingRespEvent : TPingRespEvent read FPingRespEvent write FPingRespEvent;
    property OnSubAckEvent : TSubAckEvent read FSubAckEvent write FSubAckEvent;

  end;
procedure Register;

implementation

procedure Register;
begin
  RegisterComponents('Tndsoft', [TTdMqtt]);
end;

{TClientHandleThread}
function TClientHandleThread.BytesToStrLength(LengthBytes: TBytes): integer;
begin
  Assert(Length(LengthBytes) = 2, 'UTF-8 Length Bytes preceeding the text must be 2 Bytes in Legnth');

  Result := 0;
  Result := LengthBytes[0] shl 8;
  Result := Result + LengthBytes[1];
end;

function TClientHandleThread.RemainingLengthToInt(RLBytes: TBytes): Integer;
var
  multi: integer;
  i: integer;
  digit: Byte;
begin
  multi := 1;
  i := 0;
  Result := 0;

  digit := RLBytes[i];
  repeat
    digit := RLBytes[i];
    Result := Result + (digit and 127) * multi;
    multi := multi * 128;
    Inc(i);
  until ((digit and 128) = 0);
end;

procedure TClientHandleThread.AppendBytes(var DestArray: TBytes;const NewBytes: TBytes);
var
  DestLen: Integer;
begin
  DestLen := Length(DestArray);
  SetLength(DestArray, DestLen + Length(NewBytes));
  Move(NewBytes, DestArray[DestLen], Length(NewBytes));
end;

 procedure TClientHandleThread.HandleInput;
var
  MessageType: Byte;
  DataLen: integer;
  QoS: integer;
  sErrCode:string;
  Topic: string;
  Payload: String;
  iTopicLen,iPayloadLen:Integer;
  ResponseVH: TBytes;
  ConnectReturn: Integer;
  i:Integer;
  DestArray:TUTF8Text;
begin
  if (FCurrentData.FixedHeader <> 0) then
    begin
      MessageType := FCurrentData.FixedHeader shr 4;

     if (MessageType = Ord(CONNACK)) then
        begin
          // Check if we were given a Connect Return Code.
          ConnectReturn := 0;
          // Any return code except 0 is an Error
          if ((Length(FCurrentData.Data) > 0) and (Length(FCurrentData.Data) < 4)) then
          begin
          ConnectReturn := FCurrentData.Data[1];
          case ConnectReturn of
          0:sErrCode:='连接已接受';
          1:sErrCode:='连接已拒绝,不支持的协议版本';
          2:sErrCode:='连接已拒绝,不合格的客户端标识符';
          3:sErrCode:='连接已拒绝,服务端不可用';
          4:sErrCode:='连接已拒绝,无效的用户名或密码';
          5:sErrCode:='连接已拒绝,未授权';
          end;
          end;
          if Assigned(OnConnAck) then OnConnAck(Self, sErrCode);
        end
      else
      if (MessageType = Ord(PUBLISH)) then
        begin
          // Read the Length Bytes
          iTopicLen := BytesToStrLength(Copy(FCurrentData.Data, 0, 2));
          // Get the Topic
          SetLength(DestArray,0);

          SetLength(DestArray,iTopicLen);
          Move(FCurrentData.Data[2], DestArray[0], iTopicLen);
          Topic:=TEncoding.ANSI.GetString(Tbytes(DestArray));//StringOf(Tbytes(DestArray));

          // Get the Payload

          //iPayloadLen:=(Length(FCurrentData.Data) - 2 - iTopicLen-2);  //此值也用数据中的长度值计算
          //iPayloadLen:= BytesToStrLength(Copy(FCurrentData.Data, iTopicLen+2, 2));
          {此处使用新方法获取长度。可变长度-2-iTopicLen}
          iPayloadLen:= RemainingLengthToInt(FCurrentData.RL)-2-iTopicLen;



          SetLength(DestArray,iPayloadLen);
          Move(FCurrentData.Data[2+iTopicLen], DestArray[0], iPayloadLen);
          Payload:=TEncoding.ANSI.GetString(Tbytes(DestArray));

          if Assigned(OnPublish) then OnPublish(Self, Topic,Payload);


//          fMQTTForDelphi.Memo1.Lines.Add(Format('接收到消息.主体:%s 内容:%s',[Topic,Payload]));
        end
      else
      if (MessageType = Ord(SUBACK)) then
        begin
          // Reading the Message ID
          ResponseVH := Copy(FCurrentData.Data, 0, 2);
          DataLen := BytesToStrLength(ResponseVH);
          // Next Read the Granted QoS
          QoS := 0;
          if (Length(FCurrentData.Data) - 2) > 0 then
          begin
          ResponseVH := Copy(FCurrentData.Data, 2, 1);
          QoS := ResponseVH[0];
          end;
          if Assigned(OnSubAckEvent) then OnSubAckEvent(Self, DataLen,QoS);
        end
      else
      if (MessageType = Ord(UNSUBACK)) then
        begin
          // Read the Message ID for the event handler
          ResponseVH := Copy(FCurrentData.Data, 0, 2);
          DataLen := BytesToStrLength(ResponseVH);
//          fMQTTForDelphi.Memo1.Lines.Add(Format('收到取消订阅包。长度:%s',[DataLen]))
//          if Assigned(OnUnSubAck) then OnUnSubAck(Self, DataLen);
        end
      else
      if (MessageType = Ord(PINGRESP)) then
      begin
         if Assigned(OnPingRespEvent) then OnPingRespEvent(Self);
      end;
    end;
end;

procedure TClientHandleThread.Execute;
var
  CurrentMessage:TMQTTMessage;
  vBuffer:TIdBytes;
  Buffer: TBytes;
  RLInt: Integer;
  I:Integer;
begin
  while not Terminated do
  begin
    if not FIdTCPClient.Connected then
      Terminate
    else
    try
      CurrentMessage.FixedHeader := 0;
      CurrentMessage.RL := nil;
      CurrentMessage.Data := nil;
      CurrentMessage.FixedHeader:=FIdTCPClient.IOHandler.ReadByte;  //读取FixedHdader
      //读取剩余长度-编码过
      SetLength(CurrentMessage.RL,1);
      SetLength(Buffer,1);
      CurrentMessage.RL[0]:=FIdTCPClient.IOHandler.ReadByte;  //读取剩余长度第一位
      for i := 1 to 3 do    //读取剩余长度其他位数,剩余长度为可变长度1-4.
      begin
        if (( CurrentMessage.RL[i - 1] and 128) <> 0) then
        begin
          Buffer[0] := FIdTCPClient.IOHandler.ReadByte;
          AppendBytes(CurrentMessage.RL, Buffer);
        end
        else
        Break;
      end;
      //解码剩余长度}
      RLInt := RemainingLengthToInt(CurrentMessage.RL);
      //将剩余长度的数据全部读出}
      if (RLInt > 0)  then
      begin
        SetLength(CurrentMessage.Data, 0);
        SetLength(CurrentMessage.Data, RLInt);
        FIdTCPClient.IOHandler.ReadBytes(vBuffer,RLInt,False);
        CurrentMessage.Data:=TBytes(vBuffer);
//        FPSocket^.RecvBufferEx(Pointer(CurrentMessage.Data), RLInt, 1000);
      end;
      FCurrentData := CurrentMessage;
      Synchronize(HandleInput);
    except

    end;
  end;
end;

constructor TClientHandleThread.Create(IdTCPClient: TIdTCPClient);
begin
  inherited Create;
  FIdTCPClient:=IdTCPClient;
end;

{TTDMQTT}

function TTdMqtt.MD5(source:string):string;
var
  MyMD5:TIdHashMessageDigest5;
  Digest:T4x4LongWordRecord;
  ciphertext:string;
begin
  Result := '';
  {$IFDEF VER150}
    try
      MyMD5 := TIdHashMessageDigest5.Create;
      Digest := MyMD5.HashValue(source);
      ciphertext := MyMD5.AsHex(Digest);
      ciphertext := UpperCase(ciphertext);
      Result := ciphertext;
    finally
      MyMD5.Free;
    end;
  {$ELSE}
    try
      MyMD5 := TIdHashMessageDigest5.Create;
      ciphertext := MyMD5.HashStringAsHex(source);
      ciphertext := UpperCase(ciphertext);
      Result := ciphertext;
    finally
      MyMD5.Free;
    end;
  {$ENDIF}
end;

function TTdMqtt.StrToBytes(str: String; perpendLength: boolean): TUTF8Text;
var
  i: integer;
  ordStr:SmallInt;
  buf:TBytes;
begin
  buf:=TEncoding.ANSI.GetBytes(str);
  if perpendLength then
  begin
    SetLength(Result, Length(buf) + 2);
    Result[0] := Length(buf) div 256;
    Result[1] := Length(buf) mod 256;
    Move(buf[0],Result[2],Length(buf));
  end
  else
  begin
    SetLength(Result, Length(buf));
    Move(buf[0],Result[0],Length(buf));
  end;
end;

procedure TTdMqtt.CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer);
begin
  Assert(StartIndex >= 0);
  Move(SourceArray[0], DestArray[StartIndex], Length(SourceArray));
end;

function TTdMqtt.BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength;  VariableHead: TBytes; Payload: Array of Byte): TBytes; //构建最终的发送数据包
var
  iNextIndex: integer;
begin
  // Attach Fixed Header (1 byte)
  iNextIndex := 0;
  SetLength(Result, 1);
  Result[iNextIndex] := FixedHeader;

  // Attach RemainingLength (1-4 bytes)
  iNextIndex := Length(Result);
  SetLength(Result, Length(Result) + Length(RemainL));
  CopyIntoArray(Result, RemainL, iNextIndex);

  // Attach Variable Head
  iNextIndex := Length(Result);
  SetLength(Result, Length(Result) + Length(VariableHead));
  CopyIntoArray(Result, VariableHead, iNextIndex);

  // Attach Payload.
  iNextIndex := Length(Result);
  SetLength(Result, Length(Result) + Length(Payload));
  CopyIntoArray(Result, Payload, iNextIndex);
end;

function TTdMqtt.RemainingLength(x: Integer): TRemainingLength;     //固定头第二个字节,动态长度1-4
var
  byteindex: integer;
  digit: integer;
begin
  SetLength(Result, 1);
  byteindex := 0;
  while (x > 0) do
  begin
    digit := x mod 128;
    x := x div 128;
    if x > 0 then
    begin
      digit := digit or 128;
    end;
    Result[byteindex] := digit;
    if x > 0 then
    begin
      inc(byteindex);
      SetLength(Result, Length(Result) + 1);
    end;
  end;
end;

procedure TTdMqtt.AppendArray(var Dest: TUTF8Text; Source: Array of Byte);
var
  DestLen: Integer;
begin
  DestLen := Length(Dest);
  SetLength(Dest, DestLen + Length(Source));
  Move(Source, Dest[DestLen], Length(Source));
end;

function TTdMqtt.FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte; //固定头第一个字节
begin
  Result := (Ord(MessageType) * 16) + (Dup * 8) + (Qos * 2) + (Retain * 1);
end;

function TTdMqtt.GetIsDesignTime: Boolean;
begin
  Result := (csDesigning in ComponentState);
end;
function TTdMqtt.GetIsLoading: Boolean;
begin
  Result := (csLoading in ComponentState);
end;

procedure TTdMqtt.MQTTConnect; //
var
  MqttData_FixedHead:Byte;       //固定头
  MqttData_VariableHeader:TBytes;//可变头
  MqttData_RemainingLength: TRemainingLength;//剩余长度
  Payload: TUTF8Text;
  SendData: TBytes;          //构件完成的最终需要发送的二进制数据
  function VariableHeader_Connect(KeepAlive: Word): TBytes;
  const
    MQTT_PROTOCOL = 'MQTT';
    MQTT_VERSION = 4;//V3.1.1协议级别为4.非3了
  var
    Qos, Retain: word;
    iByteIndex: integer;
    ProtoBytes: TUTF8Text;
    ConnectFlag:Byte;//连接标志
  begin
    SetLength(Result, 10);    //长度为10
    iByteIndex := 0;
    ProtoBytes := StrToBytes(MQTT_PROTOCOL, true);
    CopyIntoArray(Result, ProtoBytes, iByteIndex);     //协议名

    Inc(iByteIndex, Length(ProtoBytes));
    Result[iByteIndex] := MQTT_VERSION;          //版本号

    Inc(iByteIndex);
  //  asm
  //    mov ConnectFlag,11000010B //UserName,Pwd,CleanSession为1,其余均为0
  //  end;
    ConnectFlag:=194;

    Result[iByteIndex] := ConnectFlag;//连接标志

    Inc(iByteIndex);
    Result[iByteIndex] := 0;          //保持连接时间第一位

    Inc(iByteIndex);
    Result[iByteIndex] := KeepAlive; //保持连接时间第二位
  end;
begin
  MqttData_FixedHead:=FixedHeader(CONNECT,0,0,0);
  MqttData_VariableHeader:= VariableHeader_Connect(40);//构建可变头。参数单位秒,在此时间之内,客户端需要发送 PINGREQ 否则服务端将断开网络连接
  SetLength(Payload, 0);  //开始构建有效荷载,由于需要认证帐号密码,此处荷载内容为ID,USER,PWD
  AppendArray(Payload, StrToBytes(FClientID, true)); //id
  AppendArray(Payload, StrToBytes(FUser, true));     //user
  AppendArray(Payload, StrToBytes(FPwd, true));     //pwd
  MqttData_RemainingLength:=RemainingLength(Length(MqttData_VariableHeader) + Length(Payload));//计算剩余长度
  SendData:=BuildCommand(MqttData_FixedHead, MqttData_RemainingLength, MqttData_VariableHeader, Payload);  //组包
  FIdTCPClient.Socket.Write(TIdBytes(SendData));  //发送
end;

procedure TTdMqtt.MQTTPing;
var
  FH: Byte;
  RL: Byte;
  Data: TBytes;
begin
  SetLength(Data, 2);
  FH := FixedHeader(PINGREQ, 0, 0, 0);
  RL := 0;
  Data[0] := FH;
  Data[1] := RL;
  FIdTCPClient.Socket.Write(TIdBytes(Data));
end;


procedure TTdMqtt.MQTTPublish(sTopic,sPayload:string);
var
  Data: TBytes;
  FH: Byte;
  RL: TRemainingLength;
  VH: TBytes;
  Payload: TUTF8Text;
  function VariableHeader_Publish(topic: string): TBytes;
  var
    BytesTopic: TUTF8Text;
  begin
    BytesTopic := StrToBytes(Topic, true);
    SetLength(Result, Length(BytesTopic));
    CopyIntoArray(Result, BytesTopic, 0);
  end;
begin
  FH := FixedHeader(PUBLISH, 0, 0, 1);  //保留消息设为1;对于发布者不定期发送状态消息这个场景,保留消息很有用。新的订阅者将会收到最近的状态。
  VH := VariableHeader_Publish(sTopic);
  SetLength(Payload, 0);
  AppendArray(Payload, StrToBytes(sPayload, false)); //需要注意,此处长度的计算  。协议中:有效荷载长度:用固定包头中的剩余长度字段减去可变包头长度
  RL := RemainingLength(Length(VH) + Length(Payload));
  Data := BuildCommand(FH, RL, VH, Payload);
  FIdTCPClient.Socket.Write(TIdBytes(Data));
end;


function TTdMqtt.GetMessageID: TBytes;
begin
  Assert((Self.FMessageID > Low(Word)), 'Message ID too low');
  Assert((Self.FMessageID < High(Word)), 'Message ID has gotten too big');

  {  Self.FMessageID is initialised to 1 upon TMQTTClient.Create
  The Message ID is a 16-bit unsigned integer, which typically increases by exactly
  one from one message to the next, but is not required to do so.
  The two bytes of the Message ID are ordered as MSB, followed by LSB (big-endian).}
  SetLength(Result, 2);
  Result[0] := Hi(Self.FMessageID);
  Result[1] := Lo(Self.FMessageID);
  Inc(Self.FMessageID);
end;

procedure TTdMqtt.MQTTSubscribe(sTopic:string);
var
  Data: TBytes;
  FH: Byte;
  RL: TRemainingLength;
  VH: TBytes;
  Payload: TUTF8Text;
  function VariableHeader_Subscribe: TBytes;
  begin
    Result := GetMessageID;
  end;
begin
  FH := FixedHeader(SUBSCRIBE, 0, 1, 0);
  VH := VariableHeader_Subscribe;
  SetLength(Payload, 0);
  AppendArray(Payload, StrToBytes(sTopic, true));
  // Append a new Byte to Add the Requested QoS Level for that Topic
  SetLength(Payload, Length(Payload) + 1);
  // Always Append Requested QoS Level 0
  Payload[Length(Payload) - 1] := $0;
  RL := RemainingLength(Length(VH) + Length(Payload));
  Data := BuildCommand(FH, RL, VH, Payload);
  FIdTCPClient.Socket.Write(TIdBytes(Data));
end;


procedure TTdMqtt.SetActive(AValue: Boolean);
begin
  // At design time we just set the value and save it for run time.
  // During loading we ignore it till all other properties are set.
  // Loaded will recall it to toggle it
  if IsDesignTime or IsLoading then
  begin
    FActive := AValue;
  end
  else
  if FActive <> AValue
  then
  begin
    if AValue then
    begin
      try
        FIdTCPClient.Host:=FHost;
        FIdTCPClient.Port:=FPort;
        FIdTCPClient.ReadTimeout:=5000;
        FIdTCPClient.ConnectTimeout:=5000;
        FIdTCPClient.Connect;
        {发送MQTT连接}
        MQTTConnect;
        {创建读取线程}
        FReadThread:=TClientHandleThread.Create(FIdTCPClient);
        FReadThread.OnConnAck := Self.OnConnAck;
        FReadThread.OnPublish := Self.OnPublish;
        FReadThread.OnPingRespEvent:=Self.OnPingRespEvent;
        FReadThread.OnSubAckEvent:=Self.OnSubAckEvent;
      except
        FActive := True;
        SetActive(False); // allow descendants to clean up
        raise Exception.Create('连接失败。');
      end;
      FActive := True;
    end
    else
    begin
      // Must set to False here. Shutdown() implementations call property setters that check this
      FActive := False;
      FIdTCPClient.Disconnect;
    end;
  end;
end;

destructor TTdMqtt.Destroy;
begin
  FIdTCPClient.Free;
  inherited Destroy;
end;

constructor TTdMqtt.Create(AOwner: TComponent);
var
  Guid: TGUID;
begin
  FHost:='127.0.0.1';
  FPort:=1883;
  FUser:='ade';
  FPwd:='ade';
  FMessageID := 1;
  FIdTCPClient:=TIdTCPClient.Create(nil);
  CreateGUID(Guid);
  FClientID:='tdito.cc-'+LeftStr(MD5(GUIDToString(Guid)),16);
  inherited Create(AOwner);
end;


end.



{
  导出:
  host,port
  心跳时间

  sub()
  pub
  ClientID
}
----------------------------------------------
-
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/9 9:14:40
6楼: 感谢楼上5楼的兄弟,花了这么多积分,其实可以上传到盒子ftp的,再次感谢!
----------------------------------------------
作者:
男 keymark (嬲) ▲▲▲△△ -
普通会员
2019/8/9 11:55:06
7楼: 可以传附件了的。
大小限制:4.90M,扩展名限制:gif|jpg|png|doc|zip|rar|chm|pdf|mid|mp3|asf|rm|swf|txt
这多年过去了依然不支持
。pas  当然也是好事。最少文件名不会乱。 压个包就是了。
----------------------------------------------
[alias]  co = clone --recurse-submodules  up = submodule update --init --recursiveupd = pullinfo = statusrest = reset --hard懒鬼提速https://www.cctry.com/>http://qalculate.github.io/downloads.htmlhttps://www.cctry.com/
作者:
男 dmzn (dmzn) ★☆☆☆☆ -
盒子活跃会员
2019/8/9 12:08:30
8楼: 测试了几个MQTT客户端,包括: sgcWebSocket-MQTT,TMS MQTT,QDAC MQTT
结论: 不稳定
后来用mosquitto库改了一个版本,相对稳定多了.
http://bbs.2ccc.com/topic.asp?topicid=566990
----------------------------------------------
生活愉快.
作者:
男 l_star (l.star) ★☆☆☆☆ -
普通会员
2019/8/9 13:11:53
9楼: https://github.com/pjde/delphi-mqtt

在生产环境中使用。
----------------------------------------------
-
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/9 13:45:18
10楼: 楼上9楼用的啥版本Delphi,我用Delphi 10.3 编译不了。PS感谢8楼测试:sgcWebSocket-MQTT,TMS MQTT,QDAC MQTT。8楼的编译缺少文件:UManagerGroup, UThreadPool, ULibFun。
----------------------------------------------
作者:
男 l_star (l.star) ★☆☆☆☆ -
普通会员
2019/8/9 17:40:56
11楼: 7有源码自己改
----------------------------------------------
-
作者:
男 lsz100 (lsz) ★☆☆☆☆ -
盒子活跃会员
2019/12/11 10:19:53
12楼: 有源码的 原作者的有个问题 就是推送过来的订阅数据多128字节 就无法接收 我改了一下源码就可以了
----------------------------------------------
我为人人为我
作者:
男 lsz100 (lsz) ★☆☆☆☆ -
盒子活跃会员
2019/12/11 10:20:51
13楼: utdMQTT.pas 单元改一下就行了
----------------------------------------------
我为人人为我
作者:
男 lsz100 (lsz) ★☆☆☆☆ -
盒子活跃会员
2019/12/11 10:22:12
14楼: utdMQTT.paS 有心跳处理 用户密码这些都有 原来的就有一个128节以上的 无法接收问题
----------------------------------------------
我为人人为我
作者:
男 xjia (xjia) ★☆☆☆☆ -
盒子活跃会员
2019/12/11 10:40:01
15楼: to 楼上,你应该把解决方法写出来
----------------------------------------------
-
信息
登陆以后才能回复
Copyright © 2CCC.Com 盒子论坛 v3.0.1 版权所有 页面执行97.65625毫秒 RSS