procedure Register; begin RegisterComponents('Tndsoft', [TTdMqtt]); end;
{TClientHandleThread} function TClientHandleThread.BytesToStrLength(LengthBytes: TBytes): integer; begin Assert(Length(LengthBytes) = 2, 'UTF-8 Length Bytes preceeding the text must be 2 Bytes in Legnth');
Result := 0; Result := LengthBytes[0] shl 8; Result := Result + LengthBytes[1]; end;
function TClientHandleThread.RemainingLengthToInt(RLBytes: TBytes): Integer; var multi: integer; i: integer; digit: Byte; begin multi := 1; i := 0; Result := 0;
digit := RLBytes[i]; repeat digit := RLBytes[i]; Result := Result + (digit and 127) * multi; multi := multi * 128; Inc(i); until ((digit and 128) = 0); end;
procedure TClientHandleThread.HandleInput; var MessageType: Byte; DataLen: integer; QoS: integer; sErrCode:string; Topic: string; Payload: String; iTopicLen,iPayloadLen:Integer; ResponseVH: TBytes; ConnectReturn: Integer; i:Integer; DestArray:TUTF8Text; begin if (FCurrentData.FixedHeader <> 0) then begin MessageType := FCurrentData.FixedHeader shr 4;
if (MessageType = Ord(CONNACK)) then begin // Check if we were given a Connect Return Code. ConnectReturn := 0; // Any return code except 0 is an Error if ((Length(FCurrentData.Data) > 0) and (Length(FCurrentData.Data) < 4)) then begin ConnectReturn := FCurrentData.Data[1]; case ConnectReturn of 0:sErrCode:='连接已接受'; 1:sErrCode:='连接已拒绝,不支持的协议版本'; 2:sErrCode:='连接已拒绝,不合格的客户端标识符'; 3:sErrCode:='连接已拒绝,服务端不可用'; 4:sErrCode:='连接已拒绝,无效的用户名或密码'; 5:sErrCode:='连接已拒绝,未授权'; end; end; if Assigned(OnConnAck) then OnConnAck(Self, sErrCode); end else if (MessageType = Ord(PUBLISH)) then begin // Read the Length Bytes iTopicLen := BytesToStrLength(Copy(FCurrentData.Data, 0, 2)); // Get the Topic SetLength(DestArray,0);
if Assigned(OnPublish) then OnPublish(Self, Topic,Payload);
// fMQTTForDelphi.Memo1.Lines.Add(Format('接收到消息.主体:%s 内容:%s',[Topic,Payload])); end else if (MessageType = Ord(SUBACK)) then begin // Reading the Message ID ResponseVH := Copy(FCurrentData.Data, 0, 2); DataLen := BytesToStrLength(ResponseVH); // Next Read the Granted QoS QoS := 0; if (Length(FCurrentData.Data) - 2) > 0 then begin ResponseVH := Copy(FCurrentData.Data, 2, 1); QoS := ResponseVH[0]; end; if Assigned(OnSubAckEvent) then OnSubAckEvent(Self, DataLen,QoS); end else if (MessageType = Ord(UNSUBACK)) then begin // Read the Message ID for the event handler ResponseVH := Copy(FCurrentData.Data, 0, 2); DataLen := BytesToStrLength(ResponseVH); // fMQTTForDelphi.Memo1.Lines.Add(Format('收到取消订阅包。长度:%s',[DataLen])) // if Assigned(OnUnSubAck) then OnUnSubAck(Self, DataLen); end else if (MessageType = Ord(PINGRESP)) then begin if Assigned(OnPingRespEvent) then OnPingRespEvent(Self); end; end; end;
procedure TClientHandleThread.Execute; var CurrentMessage:TMQTTMessage; vBuffer:TIdBytes; Buffer: TBytes; RLInt: Integer; I:Integer; begin while not Terminated do begin if not FIdTCPClient.Connected then Terminate else try CurrentMessage.FixedHeader := 0; CurrentMessage.RL := nil; CurrentMessage.Data := nil; CurrentMessage.FixedHeader:=FIdTCPClient.IOHandler.ReadByte; //读取FixedHdader //读取剩余长度-编码过 SetLength(CurrentMessage.RL,1); SetLength(Buffer,1); CurrentMessage.RL[0]:=FIdTCPClient.IOHandler.ReadByte; //读取剩余长度第一位 for i := 1 to 3 do //读取剩余长度其他位数,剩余长度为可变长度1-4. begin if (( CurrentMessage.RL[i - 1] and 128) <> 0) then begin Buffer[0] := FIdTCPClient.IOHandler.ReadByte; AppendBytes(CurrentMessage.RL, Buffer); end else Break; end; //解码剩余长度} RLInt := RemainingLengthToInt(CurrentMessage.RL); //将剩余长度的数据全部读出} if (RLInt > 0) then begin SetLength(CurrentMessage.Data, 0); SetLength(CurrentMessage.Data, RLInt); FIdTCPClient.IOHandler.ReadBytes(vBuffer,RLInt,False); CurrentMessage.Data:=TBytes(vBuffer); // FPSocket^.RecvBufferEx(Pointer(CurrentMessage.Data), RLInt, 1000); end; FCurrentData := CurrentMessage; Synchronize(HandleInput); except
end; end; end;
constructor TClientHandleThread.Create(IdTCPClient: TIdTCPClient); begin inherited Create; FIdTCPClient:=IdTCPClient; end;
{TTDMQTT}
function TTdMqtt.MD5(source:string):string; var MyMD5:TIdHashMessageDigest5; Digest:T4x4LongWordRecord; ciphertext:string; begin Result := ''; {$IFDEF VER150} try MyMD5 := TIdHashMessageDigest5.Create; Digest := MyMD5.HashValue(source); ciphertext := MyMD5.AsHex(Digest); ciphertext := UpperCase(ciphertext); Result := ciphertext; finally MyMD5.Free; end; {$ELSE} try MyMD5 := TIdHashMessageDigest5.Create; ciphertext := MyMD5.HashStringAsHex(source); ciphertext := UpperCase(ciphertext); Result := ciphertext; finally MyMD5.Free; end; {$ENDIF} end;
function TTdMqtt.StrToBytes(str: String; perpendLength: boolean): TUTF8Text; var i: integer; ordStr:SmallInt; buf:TBytes; begin buf:=TEncoding.ANSI.GetBytes(str); if perpendLength then begin SetLength(Result, Length(buf) + 2); Result[0] := Length(buf) div 256; Result[1] := Length(buf) mod 256; Move(buf[0],Result[2],Length(buf)); end else begin SetLength(Result, Length(buf)); Move(buf[0],Result[0],Length(buf)); end; end;
procedure TTdMqtt.CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer); begin Assert(StartIndex >= 0); Move(SourceArray[0], DestArray[StartIndex], Length(SourceArray)); end;
function TTdMqtt.BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength; VariableHead: TBytes; Payload: Array of Byte): TBytes; //构建最终的发送数据包 var iNextIndex: integer; begin // Attach Fixed Header (1 byte) iNextIndex := 0; SetLength(Result, 1); Result[iNextIndex] := FixedHeader;
function TTdMqtt.RemainingLength(x: Integer): TRemainingLength; //固定头第二个字节,动态长度1-4 var byteindex: integer; digit: integer; begin SetLength(Result, 1); byteindex := 0; while (x > 0) do begin digit := x mod 128; x := x div 128; if x > 0 then begin digit := digit or 128; end; Result[byteindex] := digit; if x > 0 then begin inc(byteindex); SetLength(Result, Length(Result) + 1); end; end; end;
procedure TTdMqtt.AppendArray(var Dest: TUTF8Text; Source: Array of Byte); var DestLen: Integer; begin DestLen := Length(Dest); SetLength(Dest, DestLen + Length(Source)); Move(Source, Dest[DestLen], Length(Source)); end;
function TTdMqtt.FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte; //固定头第一个字节 begin Result := (Ord(MessageType) * 16) + (Dup * 8) + (Qos * 2) + (Retain * 1); end;
function TTdMqtt.GetIsDesignTime: Boolean; begin Result := (csDesigning in ComponentState); end; function TTdMqtt.GetIsLoading: Boolean; begin Result := (csLoading in ComponentState); end;
function TTdMqtt.GetMessageID: TBytes; begin Assert((Self.FMessageID > Low(Word)), 'Message ID too low'); Assert((Self.FMessageID < High(Word)), 'Message ID has gotten too big');
{ Self.FMessageID is initialised to 1 upon TMQTTClient.Create The Message ID is a 16-bit unsigned integer, which typically increases by exactly one from one message to the next, but is not required to do so. The two bytes of the Message ID are ordered as MSB, followed by LSB (big-endian).} SetLength(Result, 2); Result[0] := Hi(Self.FMessageID); Result[1] := Lo(Self.FMessageID); Inc(Self.FMessageID); end;
procedure TTdMqtt.MQTTSubscribe(sTopic:string); var Data: TBytes; FH: Byte; RL: TRemainingLength; VH: TBytes; Payload: TUTF8Text; function VariableHeader_Subscribe: TBytes; begin Result := GetMessageID; end; begin FH := FixedHeader(SUBSCRIBE, 0, 1, 0); VH := VariableHeader_Subscribe; SetLength(Payload, 0); AppendArray(Payload, StrToBytes(sTopic, true)); // Append a new Byte to Add the Requested QoS Level for that Topic SetLength(Payload, Length(Payload) + 1); // Always Append Requested QoS Level 0 Payload[Length(Payload) - 1] := $0; RL := RemainingLength(Length(VH) + Length(Payload)); Data := BuildCommand(FH, RL, VH, Payload); FIdTCPClient.Socket.Write(TIdBytes(Data)); end;
procedure TTdMqtt.SetActive(AValue: Boolean); begin // At design time we just set the value and save it for run time. // During loading we ignore it till all other properties are set. // Loaded will recall it to toggle it if IsDesignTime or IsLoading then begin FActive := AValue; end else if FActive <> AValue then begin if AValue then begin try FIdTCPClient.Host:=FHost; FIdTCPClient.Port:=FPort; FIdTCPClient.ReadTimeout:=5000; FIdTCPClient.ConnectTimeout:=5000; FIdTCPClient.Connect; {发送MQTT连接} MQTTConnect; {创建读取线程} FReadThread:=TClientHandleThread.Create(FIdTCPClient); FReadThread.OnConnAck := Self.OnConnAck; FReadThread.OnPublish := Self.OnPublish; FReadThread.OnPingRespEvent:=Self.OnPingRespEvent; FReadThread.OnSubAckEvent:=Self.OnSubAckEvent; except FActive := True; SetActive(False); // allow descendants to clean up raise Exception.Create('连接失败。'); end; FActive := True; end else begin // Must set to False here. Shutdown() implementations call property setters that check this FActive := False; FIdTCPClient.Disconnect; end; end; end;
destructor TTdMqtt.Destroy; begin FIdTCPClient.Free; inherited Destroy; end;
constructor TTdMqtt.Create(AOwner: TComponent); var Guid: TGUID; begin FHost:='127.0.0.1'; FPort:=1883; FUser:='ade'; FPwd:='ade'; FMessageID := 1; FIdTCPClient:=TIdTCPClient.Create(nil); CreateGUID(Guid); FClientID:='tdito.cc-'+LeftStr(MD5(GUIDToString(Guid)),16); inherited Create(AOwner); end;