导航:
论坛 -> DELPHI技术
斑竹:liumazi,sephil
作者:
2019/4/20 10:43:04
标题:
请教一个实时数据接收处理,多线程的问题
浏览:2291
加入我的收藏
楼主:
目标:实时通过idhttp收传感器数据,并处理 thread1: 通过idhttp.get('http:/.....',mem); 从http服务器收数据到mem里,由于实时数据不会停下来,也不存在结束,因此会一直运行 thread2 每个300毫秒,看看mem收的数据长度如大于1000个字节,则将收到的数据拷贝,同时把mem清空,为了实现线程之间的同步,避免mem混乱,在thread2中用Self.Synchronize(self.Copy_Http_Rec);来处理上面的功能,只拷贝mem和清空mem。 目前出现的问题,运行几个小时后不稳定,假死?反正是不收数了,请教在thread2运行Self.Synchronize(self.Copy_Http_Rec)的过程中,thread1的收数据线程会获取时间片运行吗?可能的原因是什么。 另也用过挂起和恢复thread1,也不行,甚至效果更差。
----------------------------------------------
-
作者:
joman (joman)
▲▲▲▲▲
-
普通会员
2019/4/20 11:15:19
1楼:
变量存取的同时应该加锁控制,或者缓存另外一个变量,处理完了再回复,防止并发情况
----------------------------------------------
DelphiWeb开发方案(开源):https://gitee.com/pearroom/DelphiWebMVC
作者:
2019/4/20 11:29:42
2楼:
把Self.Synchronize方式改为临界区(criticalsection)方式试试,凡是mem的读写,都必须通过临界区来限制。不管是主线程、两个子线程,只要是访问mem,必须通过临界区。
----------------------------------------------
-
作者:
2019/4/20 11:30:11
3楼:
没看到协商,只看到处理。
----------------------------------------------
[alias] co = clone --recurse-submodules up = submodule update --init --recursiveupd = pullinfo = statusrest = reset --hard懒鬼提速https://www.cctry.com/ >http://qalculate.github.io/downloads.htmlhttps://www.cctry.com/
作者:
2019/4/20 11:39:11
4楼:
一个reader一个writer,要加什么锁?直接做无锁队列就可以。 另外,thread里不要涉及到ui操作。 再有,indy改成Delphi自带的httprequest看看
----------------------------------------------
--
作者:
2019/4/20 11:40:31
4楼:
Self.Synchronize是一种比较慢的方式,如果你还想用这种方式,那么在线程1中,写mem的动作,也要用Self.Synchronize方式。
----------------------------------------------
-
作者:
2019/4/20 12:40:57
5楼:
感谢这么多网友热心回复。 有个问题没讲透彻。 thread1: 通过idhttp.get('http:/.....',mem); 从http服务器收数据到mem里,由于实时数据不会停下来,也不存在结束,因此会一直运行。 这样理解,传感器的数据假设每10、20毫秒打包数据,通过http服务器吐出来,idhttp实施接收,其get函数不能结束,否则要再次认证,握手,会丢数的。 idhttp.get收数是实时的,但不是任何时刻都在收,收到一个包后,可能间隔10或20毫秒才会来新的数据。 目标是希望能在这个没有数据的间隔内,及时把收的数据拷贝出来,清空mem。 由于idttp.get根本不会结束,无法用临界区或者互斥来处理,也不能用Synchronize 来idhttp.get,这会导致thread2根本没机会获取时间片来运行。
----------------------------------------------
-
作者:
2019/4/20 12:52:47
6楼:
这个 mem 谁建立的? thread1? 为啥不用tcp 。。。。 !!? idhttp.get 里面是个死循环???
----------------------------------------------
[alias] co = clone --recurse-submodules up = submodule update --init --recursiveupd = pullinfo = statusrest = reset --hard懒鬼提速https://www.cctry.com/ >http://qalculate.github.io/downloads.htmlhttps://www.cctry.com/
作者:
2019/4/20 12:58:10
7楼:
mem,全局的, 以前用socket,握手后每次收一定长度数据,处理。 仪器升级,改http吐数据了 idhttp.get不是死循环,是一个网址,但一直吐数,根本不会到结尾。
----------------------------------------------
-
作者:
2019/4/20 13:01:14
8楼:
可以理解为idhttp.get,这个函数永远不会返回,不会结束,除非关闭程序。
----------------------------------------------
-
作者:
2019/4/20 13:04:42
9楼:
idhttp.get,好像不能指定收多长的数据 同时再次get的时候,还有3次握手,中间可能有数据包
----------------------------------------------
-
作者:
gmxyb (gmxyb)
★☆☆☆☆
-
普通会员
2019/4/20 13:30:13
10楼:
如果 idhttp.get 永远不会结束,那我也建议不要用 idhttp 用 socket 直接连接更好,熟悉 http 协议的话并不难 这样你会有机会处理每次接收的数据
----------------------------------------------
-
作者:
2019/4/20 14:52:35
11楼:
10楼的方法,估计能行。
----------------------------------------------
-
作者:
2019/4/20 14:57:22
12楼:
上一版socket,连续运行3个月以上都没问题。 得研究一下http协议了,没深入学习过,能否简要说一下思路。
----------------------------------------------
-
作者:
janker (janker)
★☆☆☆☆
-
盒子活跃会员
2019/4/20 16:46:36
13楼:
考虑下双缓冲?
----------------------------------------------
-
作者:
2019/4/20 19:18:59
14楼:
idHttp有一个OnWork事件,你看一下能不能利用它 另外,idHTTP.get函数最终是调用TIdCustomHTTP.DoRequest函数,这个函数是可以override的
----------------------------------------------
-
作者:
2019/4/21 9:53:05
15楼:
这个是TIdCustomHTTP.DoRequest函数 procedure TIdCustomHTTP.DoRequest(const AMethod: TIdHTTPMethod; AURL: string; ASource, AResponseContent: TStream; AIgnoreReplies: array of Int16); 其中有段是这样: // Workaround for servers wich respond with 100 Continue on GET and HEAD // This workaround is just for temporary use until we have final HTTP 1.1 // realisation. HTTP 1.1 is ongoing because of all the buggy and conflicting servers. repeat Response.ResponseText := InternalReadLn; FHTTPProto.RetrieveHeaders(MaxHeaderLines); ProcessCookies(Request, Response); until Response.ResponseCode <> 100; 感觉应该每次收到的数据在这个循环里。 另这个函数是 protected的,里面调用的函数还有一些也是 protected,单独再定义一个类,从idhttp继承,重载DoRequest,会遇到很多protected的麻烦,再求解决方案。
----------------------------------------------
-
作者:
2019/4/21 10:47:40
16楼:
下面的程序,使用了idHttp的OnWorkBegin、OnWork、OnWorkEnd这三个事件,使用MemoryStream接收数据,在接收数据过程中把接收到的数据拷贝到FileStream中,并清空MemoryStream,再接收下一组数据: unit Unit1; interface uses Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics, Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient, IdHTTP, IdIOHandler; type TForm1 = class(TForm) IdHTTP1: TIdHTTP; Button1: TButton; procedure Button1Click(Sender: TObject); procedure IdHTTP1WorkBegin(ASender: TObject; AWorkMode: TWorkMode; AWorkCountMax: Int64); procedure IdHTTP1Work(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); procedure IdHTTP1WorkEnd(ASender: TObject; AWorkMode: TWorkMode); private LastTime: TDateTime; MemoryStream: TMemoryStream; FileStream: TFileStream; Started: Boolean; public { Public declarations } end; var Form1: TForm1; implementation {$R *.dfm} procedure TForm1.Button1Click(Sender: TObject); begin FileStream:= TFileStream.Create('D:\Chrome_Installer.exe', fmCreate); MemoryStream := TMemoryStream.Create; Started := False; LastTime := Now; IdHTTP1.Get('http://tech.down.sina.com.cn/20190304/372031d7/72.0.3626.121_chrome_installer.exe?fn=&ssig=ocvsq7uVE4&Expires=1555893993&KID=sae,230kw3wk15&ip=1555814793,223.73.84.14', MemoryStream); FileStream.Free; MemoryStream.Free; ShowMessage('Download Completed.'); end; procedure TForm1.IdHTTP1Work(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); var Count: Int64; CurTime: TDateTime; begin if Started then begin CurTime := Now; if (CurTime - LastTime) >= (3.472222222222E-6) then //300毫秒 begin LastTime := CurTime; Count := MemoryStream.Position; if Count >= 1000 then //如果大于1000字节则拷贝出去 begin MemoryStream.Position := 0; FileStream.CopyFrom(MemoryStream, Count); MemoryStream.Size := 0; end; end; end; end; procedure TForm1.IdHTTP1WorkBegin(ASender: TObject; AWorkMode: TWorkMode; AWorkCountMax: Int64); begin Started := True; end; procedure TForm1.IdHTTP1WorkEnd(ASender: TObject; AWorkMode: TWorkMode); var Count: Int64; begin Count := MemoryStream.Position; if Count > 0 then begin MemoryStream.Position := 0; FileStream.CopyFrom(MemoryStream, Count); end; end; end.
----------------------------------------------
-
作者:
2019/4/21 10:53:09
17楼:
用上面的方法,你就可以把对filestream(或Buffer)的操作,放在临界区了
----------------------------------------------
-
作者:
2019/4/21 19:30:54
18楼:
非常感谢楼上,问题最根本是get是接收实时数据 根本不会结束。 我仔细想了一下,idhttp怕是解决不了。 上面有个朋友建议我用socket解决,当时觉得不是好的方案,已到高级的应用层了,为啥还要回到传输层。 现在回想,确实自己是半桶水的水平,要解决这个问题,必须降到传输层,用socket模拟http,用线程实时处理socket收到的数据。网上看了一下,也有很多类似需求是这么干的,而且难度确实不大,难者不会 会者不难吧。 非常感谢,交流学习,又又很多收获。
----------------------------------------------
-
作者:
2019/4/21 19:34:59
19楼:
暮然回首,自己已是四星级用户了,在盒子泡了十多年了,每次有问题,盒子值得信赖
----------------------------------------------
-
作者:
2019/4/21 20:02:01
20楼:
上面里程序,完全能满足不断接收实时数据的要求。如果你认为不行,理由是什么?
----------------------------------------------
-
作者:
2019/4/21 20:37:19
21楼:
没严格测试是否可行。 到在我的需求里,由于get一直运行,不能在主线程。只能放在线程里,让其一直运行。线程里idhttp好像不能触发onwork,上午实验一下,最少不是实时触发,就算触发,在线程运行,收到的数据还是得不到保护,线程执行先后顺序可能是未知的。 可能是需求不同,我需求的实时性非常高。一百多个传感器,每秒大约三百多个数据包,连续不断。目前还在开发阶段,应用阶段传感器可能几千个,甚至更多。 当然可测试一下。稳定运行三个月才能算可行。
----------------------------------------------
-
作者:
2019/4/21 20:47:49
22楼:
每个包数据为256字节,数据不大,实时性高。
----------------------------------------------
-
作者:
2019/4/21 20:56:39
23楼:
其实你的思路跟我提问时的思路基本一样的,只不过你放在主线程,我全部放在线程里,反正放在线程里,绝对是不行的。
----------------------------------------------
-
作者:
2019/4/21 21:20:28
22楼:
(1)idHttpy因为在线程中运行就不能触发OnWork事件的问题是不存在的 (2)OnWork事件肯定不是每收到1Byte就触发一次,但事件间隔都在10ms的左右 (3)在线程中收到数据得不到保护?可以用临界区来保护 (4)线程执行先后顺序未知,是指同一线程中先后顺序未知,还是不同线程之间的先后顺序未知? (5)100多个传感器,每个传感器开一个线程就好,每秒大约三百多个数据包,小case
----------------------------------------------
-
作者:
2019/4/21 21:31:57
24楼:
“反正放在线程里,绝对是不行的。”---原因是什么?作为一个开发人员,总不能说反正就是这样吧?
----------------------------------------------
-
作者:
2019/4/21 21:35:54
24楼:
线程1,运行get,由于不返回,怎么用临界区来保护收到的数据流? 由于不返回,是否需要另开线程来处理onwork ? 这么多的onwork,效率估计很难保证。 你说的第一点,我还得来测试。
----------------------------------------------
-
作者:
2019/4/21 21:44:14
25楼:
歪个楼 if (CurTime - LastTime) >= (3.472222222222E-6) then //300毫秒 begin LastTime := CurTime; 个人认为 if (CurTime >= LastTime) then //300毫秒 begin LastTime := CurTime +300毫秒; //个人觉得这样写比较好??有啥建议么?
----------------------------------------------
[alias] co = clone --recurse-submodules up = submodule update --init --recursiveupd = pullinfo = statusrest = reset --hard懒鬼提速https://www.cctry.com/ >http://qalculate.github.io/downloads.htmlhttps://www.cctry.com/
作者:
2019/4/21 21:52:58
25楼:
OnWork事件,是在下载过程中不断产出的,每次从网卡里面读取一次数据就会产生一次,不是下载完毕才产生,你的程序不停地下载,程序就一直会产生OnWork事件,下载完毕产生的是OnWorkEnd事件。 OnWork影不影响效率,不只取决于OnWork数量的多少,还要看Onwork事件里面要完成的工作量,如果你在OnWork事件里面做了一些很耗时的事情,当然影响效率。
----------------------------------------------
-
作者:
2019/4/21 22:02:39
26楼:
我的程序,主要是要演示怎么在连续不断的下载中把数据拷贝出来。至于怎么在线程里面实现,对于你这样一个在这个论坛里面混了十几年的人来说,还是问题吗?
----------------------------------------------
-
作者:
2019/4/21 22:29:52
26楼:
交流的很充分,也很有收货。回过头来看我遇到的问题,以及我的解决方案, 说白了就是线程1GET,线程2类似干onwork的事情,为了保护数据流,我用了同步函数,让主线程去拷贝,清空已收到的数据流。 在同步函数执行中,线程1仍有机会获取时间片,数据流无法严格得到保护。同时get函数不返回,无法用临界区来保护。 所以我才武断认为当前的方案恐怕很难解决,想到退到传输层,用SOCKET来解决。
----------------------------------------------
-
作者:
2019/4/21 22:32:06
27楼:
三人行,必有我师。
----------------------------------------------
-
作者:
2019/4/22 2:30:17
28楼:
楼主,你没把意思表达清楚。可能你不熟悉 HTTP 协议和用法。 首先,传统的 HTTP 的用法:无状态模式。传统的 HTTP,一次访问就是客户端向服务器端发起一次 TCP 连接,请求数据。数据请求完成,连接断开,不会保持连接的。 现在呢,HTTP 多了一个【保持连接】的设置。这时候,一个 HTTP 请求,就一直保持连接,不会停止。这个其实是不应该 HTTP 有的功能非要在 HTTP 上解决才产生出来的应用场景。IT 行业经常干这种蠢事。 再说传统。传统的编程,是 UNIX 的编程。LINUX 也是学的 UNIX 的编程。传统的 UNIX 的编程,大多数时候,都用【阻塞】模式,也就是【同步】模式。其实就是代码一行一行单步走。这种编程模式,代码简单易读。网络通信很慢,一次数据要收完或者发完,可能要好久。程序就停在这里,等执行完这行发数据代码或者收数据代码,才继续往下执行。在 UNIX 的世界,不单单是网络通信,很多地方都是采用阻塞模式编程。这是 UNIX 的哲学:简单。 到了 WINDOWS 底下,古代的 WINDOWS -- 真正大众用起来的版本是 Windows3.1,那时候没有多线程,是所谓抢先式多任务,谁抢到 CPU 谁就可以占住不放。一个网络访问阻塞了代码不继续往下走,界面就会冻结。WINDOWS 于是搞了一个【异步】编程模式,你执行一行【读TCP】的代码,没读到数据程序就继续往下走了,也就是【非阻塞】模式。然后你要写一个回调函数,注册给 WINDOWS,WINDOWS 底层收到数据后,调你的回调函数,把收到的数据丢给你。这种异步编程模式,代码比较麻烦,不易读,不友好。 所以,Indy 的作者,把 WINDOWS 程序员习惯的非阻塞异步编程的 SOCKET 代码,封装为阻塞式同步编程。这样,使用 INDY 的时候,对网络的操作就很简单了。 明白了这些革命道理,你就会知道: S := IdHttp1.Get(URL); 这句话的意义:GET 一个网页,GET 完成就返回,没 GET 完成,比如网络慢,它就阻塞在那里。因此,它的意义就是:做一件事,做完才返回。 这样一来,代码好简单: S := IdHTTP1.Get(URL); Memo1.Lines.Add(S); 两句话搞定。 如果是异步编程,你得另外写一个函数,在那个函数里面写 Memo1.Lines.Add();还要把那个函数注册到有关地方去。 OK,在一个 HTTP 保持连接,服务器端不停地往客户端发送数据的场景下,上面的阻塞编程,就永远阻塞了,只要这个 HTTP 的 TCP 连接不断开,估计 IndHTTP.Get 就不会返回。因为它这个 GET 一直做不完! 这个时候,你想要自己不停地从里面读数据,咋办?上面有人说了,你用它的 OnWork 事件。然后我看到有人不理解如何用 OnWork 事件。这就涉及到我老人家常说的,你写的代码,你一定要知道是哪个线程在执行它。 比如我写了一行代码: IdHttp1.Get(URL); 这行代码是哪个线程在执行它?这得看这行代码所在地函数,是被哪个线程去调用啦。假设是按钮的事件去调用它,那就是主线程在执行它。假设是其它某个线程在调用它....这个 OnWork 事件,就是调用 Get 方法的线程引发出来的。因此你在 OnWork 事件里面写代码,就是被那个线程去调用的。 因此,假设你在 OnWork 里面写了一行代码:Label1.Caption := AWorkCount.ToString; 显示读到多少数据量,然后呢,你的 Get 方法是主线程调用的,没问题。如果你的 GET 方法是一个其它线程调用的,那么,你的【其它】线程就去操作了 Label1.Caption 这个界面元素,可能会成功,但可能就会卡死。 根据 Delphi 的编程要求,非主线程里面要去操作界面元素,必须要做线程同步。这里的所谓线程同步,其实是同步到主线程,让主线程去做。具体如何做,也就一句话:TThreading.Syncxxxxxxx。 扯了这么多,话说回来,楼主的这种应用场景,直接用 TIdTCPClient 更直接。 我老人家试过用 TIdTCPClient 去连接 WEB SERVER,自己构造 HTTP 命令,其实也就是一些字符串,发送给 WEB SERVER,然后再接收来自 WEB SERVER 的内容,一点问题都没有。 用 TIdTCPClient 去访问一个 HTTPS 的网站,也完全木有问题。 既然是 TCP 了,如何读数据就是你自己随意操控的了。比用 HTTP 控件,随意得多。因为,Indy 的 HTTP 控件,它的设计目标就是一次收完才返回的,不是设计来用于楼主你要的场景的。 多说几句关于阻塞的话题: 既然 Indy 的控件都是阻塞模式,那么,如果主线程去调用,然后网络又慢,就卡死在那里,把主界面都冻结,显然不好嘛。 那就把你的读写网络的业务逻辑代码写好后,用一个线程去调用它嘛。 D7 的时候,就要自己去写一个 TThread 的子类。 现在的新时代的 Delphi 嘛,就简单了,用它的【异步编程】,也就是一句话: TTask.Run(在这里放入你的阻塞时间很长的函数); 搞定。
----------------------------------------------
-
作者:
2019/4/22 6:40:03
29楼:
顶楼上,我辈楷模!
----------------------------------------------
Delphi威武!千秋万代,一统江湖!Delphi威武!千秋万代,一统江湖!Delphi威武!千秋万代,一统江湖!Delphi威武!千秋万代,一统江湖!Delphi威武!千秋万代,一统江湖!Delphi威武!千秋万代,一统江湖!Delphi威武!千秋万代,一统江湖!我去WC吐一会儿去!
作者:
2019/4/22 16:19:03
30楼:
解决楼主的需求,从技术角度看可以有很多种实现方案,只能说楼主思路还不够开阔没有多个方案中优选最合适的解决方案。
----------------------------------------------
-
作者:
2019/4/22 16:21:05
31楼:
如果开始方向选择错了,那结果会为了这个错误各种买单,反而不如另辟蹊径从根本上解决问题。
----------------------------------------------
-
作者:
2019/4/22 21:19:32
32楼:
在大家的帮助下,经过一天的努力,最后的解决方案是 通过socket,发送http报文,模拟http 用户认证通过Cookie报文, 后面数据流就全部通过socket来传输,死循环收数据,每次收到数据处理,用临界区进行保护, 目前运行五个小时了很稳定,无一数据包错误,应已达到我的需求。 感谢各位网友的热心回复,尤其是28楼,应是花了很长时间回复,也传播了很多知识,您们的方案应该也可行,时间关系没去一一测试。本人主要做数据的处理、检测、后期成像、产品产出等,岁月也不饶人,对新控件也不想再去深入研究了,抓住核心业务的智能化、信息化亦能立足。 运行本个月后再来聊聊是否稳定。
----------------------------------------------
-
作者:
gmxyb (gmxyb)
★☆☆☆☆
-
普通会员
2019/4/23 13:43:54
33楼:
楼主的这个场景,其实是一个“畸形的”http请求 无限长的响应流,中间需要自己去切割和处理成数据包,可以看做是一种 简化的websocket 协议 不是说用idhttp不能处理,而是不太适合。
----------------------------------------------
-
作者:
2019/4/24 15:10:20
34楼:
感觉楼主这个应用像websocket呀
----------------------------------------------
-
作者:
2019/5/11 13:38:10
35楼:
学习了。 我写的: delphi10.3fmx架构实现的一维码,二维码2合一扫描程序源码,年终大促销。源码可以在Windows,Mac,iOS和安卓系统编译运行。所有代码,包括识别算法都是delphi代码实现的。可以直接用摄像头扫描识别各种常见一二维码,也可以选择一张图片来识别。与微信不一样,微信没有网络是识别不了的,我们这个识别时不需要服务端程序支持,直接本程序识别即可。非常实用,几乎所有APP都需要集成该功能。扫描界面可以任意定制二次开发,随时可以变成您自己风格的专业扫描模块。 与其他实现方式不同,这个不需要借助于安卓和iOS自己的任何库和代码,直接delphi代码实现。使用时不需要针对各种平台部署一堆库文件,非常方便,直接编译调用即可。 识别算法Google官方提供,稳定有保证。 https://item.taobao.com/item.htm?id=544459109539 直接拍下留下QQ号即可。
----------------------------------------------
Delphi 的移动程序开发,是您不可再错失的机遇:http://blog.163.com/you888@188/blog/static/6723961920169319529515/
作者:
2019/5/12 2:44:09
36楼:
28楼正解。 你的应用就好像HTTP协议传送 MJPEG 视频流一样,永远不会停歇。 建议你使用RTC的HttpClient或TcpClient,再配合一个环形缓冲区,那个效率真的太爽啦。 1920x1080 1280x720等JPG 视频流数据,一点问题都没有。视频不卡顿起码每秒30帧哦。
----------------------------------------------
-
作者:
2019/5/12 11:13:35
37楼:
顶 28 楼。
----------------------------------------------
(C)(P)Flying Wang
作者:
2019/5/13 21:26:53
38楼:
给28楼回贴点赞!
----------------------------------------------
学无止境
作者:
2019/5/15 9:13:51
39楼:
好贴,俺顶。。。
----------------------------------------------
kittyapp