rabbitmq安裝部署
生產(chǎn)者(producter)隊(duì)列消息的產(chǎn)生者,復(fù)制生產(chǎn)消息,并將消息傳入隊(duì)列生產(chǎn)者代碼:
(相關(guān)資料圖)
import pikaimport jsoncredentials = pika.PlainCredentials("admin","admin")#mq用戶名和密碼,用于認(rèn)證#虛擬隊(duì)列需要指定參數(shù)virtual_host,如果是默認(rèn)的可以不填connection = pika.BlockingConnection(pika.ConnectionParameters(host="10.0.0.24",port=5672,virtual_host="/",credentials=credentials))channel = connection.channel()# 創(chuàng)建一個(gè)AMQP信道#聲明隊(duì)列,并設(shè)置durable為True,為了避免rabbitMq-server掛掉數(shù)據(jù)丟失,將durable設(shè)為Truechannel.queue_declare(queue="1",durable=True)for i in range(10): # 創(chuàng)建10個(gè)q message = json.dumps({"OrderId":"1000%s"%i}) # exchange表示交換器,可以精確的指定消息應(yīng)該發(fā)到哪個(gè)隊(duì)列中,route_key設(shè)置隊(duì)列的名稱,body表示發(fā)送的內(nèi)容 channel.basic_publish(exchange="",routing_key="1",body=message) print(message)connection.close()操作前
通過pika生命一個(gè)認(rèn)證用的憑證,然后用pika創(chuàng)建rabbitmq的塊連接,再用上面的連接創(chuàng)建一個(gè)AMQP信道 。創(chuàng)建消息隊(duì)列的連接時(shí),需要指定ip,斷開,虛擬主機(jī),憑證。
然后根據(jù)上面的信道,聲明一個(gè)隊(duì)列,
我們可以看到,下面信道點(diǎn)隊(duì)列聲明里的queue參數(shù)值就隊(duì)列的名字。這里是遍歷0到9,然后打印了下消息,這里的生成的消息,是json序列化后的數(shù)據(jù)。然后將數(shù)據(jù)作為i,信道點(diǎn)基礎(chǔ)發(fā)布的body參數(shù)的值。上面信道點(diǎn)隊(duì)列聲明是創(chuàng)建一個(gè)隊(duì)列,隊(duì)列名字是’1‘,下面我們用信道點(diǎn)基本發(fā)布,是將我們創(chuàng)建的消息體發(fā)送到隊(duì)列中,路由_key就是指定隊(duì)列名稱,指定發(fā)布消息到哪個(gè)隊(duì)列,消息是作為body的參數(shù),
最后,需要將這個(gè)消息隊(duì)列的連接關(guān)閉。
我們通過頁面可以看到,已經(jīng)創(chuàng)建好了這個(gè)隊(duì)列,隊(duì)列名字為1,并且已經(jīng)通過遍歷生成的10個(gè)消息,調(diào)用十次信道點(diǎn)基礎(chǔ)發(fā)布方法,將這十個(gè)產(chǎn)生的消息發(fā)布到消息隊(duì)列中
我們可以再看下,可以看到我們創(chuàng)建的消息的具體內(nèi)容。
消費(fèi)者(consumer):隊(duì)列消息的接收者,扶著接收并處理消息隊(duì)列中的消息
import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters( host="10.0.0.24", port=5672, virtual_host="/", credentials=credentials))channel = connection.channel()#聲明消息隊(duì)列,消息在這個(gè)隊(duì)列中傳遞,如果不存在,則創(chuàng)建隊(duì)列channel.queue_declare(queue="1",durable=True)# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中消息,這里是打印出來def callback(ch,method,properties,body): ch.basic_ack(delivery_tag=method.delivery_tag) print(body.decode())#告訴rabbitmq,用callback來接收消息channel.basic_consume("1",callback)#開始接收信息,并進(jìn)入阻塞狀態(tài),隊(duì)列里有信息才會調(diào)用callback進(jìn)行處理channel.start_consuming()
獲取消息,創(chuàng)建憑證,連接,信道,然后什么一下隊(duì)列。指定我們要獲取哪個(gè)隊(duì)列中的消息,如果沒有這個(gè)隊(duì)列,就會創(chuàng)建這個(gè)隊(duì)列,存在,那么后面使用這個(gè)信道,就會從這個(gè)隊(duì)列中獲取數(shù)據(jù)。信道是通過rabbitmq的連接對象來生成的,連接對象中放了連接用的憑證。所以,信道點(diǎn)基礎(chǔ)消費(fèi)方法,指定是哪個(gè)消息隊(duì)列,那么就會從這個(gè)隊(duì)列中獲取消息。然后傳參回調(diào)函數(shù)。而回調(diào)函數(shù)中,
我們可以看到,基礎(chǔ)消費(fèi)方法里面有消息回調(diào),就是上面我們自定義的回調(diào)函數(shù)
這個(gè)方法定義了回調(diào)函數(shù)的寫法。第一個(gè)參數(shù)是信道
第二個(gè)參數(shù)是方法,第三個(gè)參數(shù)是屬性,第四個(gè)是body,這些不用管,只需要按如下格式,就可以從body,做個(gè)解碼,就將信道點(diǎn)基礎(chǔ)消費(fèi)中指定的隊(duì)列中的消息,取出來了,我們是用回調(diào)函數(shù)來接收消息,當(dāng)需要獲取消息的時(shí)候,就需要執(zhí)行信道點(diǎn)開始消費(fèi)的方法。這里好像是遍歷隊(duì)列一個(gè)一個(gè)的將消息獲取出來。那么怎樣實(shí)現(xiàn),實(shí)時(shí)監(jiān)聽消息,實(shí)時(shí)消費(fèi)呢
RabbitMq持久化MQ默認(rèn)建立的臨時(shí)的queue和exchange,如果不聲明持久化,一旦rabbitmq掛掉,queue,exchange將會全部丟失,所以我們一般在創(chuàng)建queue或者exchange的時(shí)候會聲明持久化
# 聲明消息隊(duì)列,消息將在這個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表消息隊(duì)列持久化存儲,F(xiàn)alse 非持久化存儲result = channel.queue_declare(queue = "python-test",durable = True)
使用True
重啟消息隊(duì)列服務(wù)
消息隊(duì)列還在,但是消息被清空了
當(dāng)我改為false的時(shí)候,因?yàn)殛?duì)列1已經(jīng)存在,并且是Tue聲明的,所以這里就報(bào)錯(cuò)了
我們設(shè)置為false,然后聲明一個(gè)不存在的隊(duì)列2
創(chuàng)建好了隊(duì)列,并且10個(gè)消息
重啟一下消息隊(duì)列服務(wù)
剛剛上面創(chuàng)建的隊(duì)列2已經(jīng)不存在,這已經(jīng)不是消息被清空了,而是隊(duì)列直接被清除了
也就是這個(gè)Ture,是保留隊(duì)列用的,持久化隊(duì)列的。
channel.queue_declare(queue="2",durable=True)
# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建.durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test", durable = True)
注意:如果已存在一個(gè)非持久化的queue或exchange,執(zhí)行上述代碼會報(bào)錯(cuò),因?yàn)楫?dāng)前狀態(tài)不能更該queue 或 exchange存儲屬性,需要刪除重建,如果queue和exchange中一個(gè)聲明了持久化,另一個(gè)沒有聲明持久化,則不允許綁定
我們在1處改了,但是在2處沒有修改。結(jié)果有問題。
隊(duì)列2不存在,所以沒有將消息放進(jìn)去
而exchange這里,沒有寫將消息推送到聲明的python-test里面,所以里面也沒有消息
這次是聲明的exchange,并且將消息推送到python-test里面
還是沒有看到有東西呀
我們這里發(fā)布個(gè)消息,可以看到,是需要路由的
加上路由,再次執(zhí)行程序
由于隊(duì)列2 不存在,好像還是不行
我在這里給它bind一個(gè)路由
感覺還是沒有弄明白,先放棄了
原來是如下方式呀。
首先,在python-test2里面,
給exchange綁定隊(duì)列1和2
1和2目前的消息數(shù)量
我往路由1里面push一個(gè)消息
push成功
然后再看隊(duì)列1里面,可以看到多了一條剛剛push的消息
接下來用程序?qū)崿F(xiàn),聲明exchange,然后發(fā)布方法不變,發(fā)布到exchage中,因?yàn)橐呀?jīng)綁定了兩個(gè)路由了,這里指定路由key,根據(jù)路由key,可以將消息push到對應(yīng)的隊(duì)列中去
我們可以看到,之前是頁面點(diǎn)擊push了一條,上面程序push了十條到exchange,現(xiàn)在這個(gè)隊(duì)列就有11條數(shù)據(jù)??墒沁@個(gè)exchange和隊(duì)列的綁定,是我自己在頁面上綁定的,這個(gè)應(yīng)該不合理。以后有時(shí)間看下,怎么用程序綁定。
我們可以看到,應(yīng)該是程序中缺少使用這個(gè)綁定方法吧
雖然exchange和queue都聲明了持久化,但如果消息只存在內(nèi)存里,rabbitmq重啟后,內(nèi)存里的東西還是會丟失,所以必須聲明消息也是持久化,從內(nèi)存轉(zhuǎn)存到到硬盤
# 向隊(duì)列插入數(shù)值 routing_key是隊(duì)列名。delivery_mode = 2 聲明消息在隊(duì)列中持久化,delivery_mod = 1 消息非持久化channel.basic_publish(exchange = "",routing_key = "python-test",body = message, properties=pika.BasicProperties(delivery_mode = 2))
我們這里先重啟一下rabbitmq,把之前的寫入隊(duì)列的消息清空
不過我們看到,這里已經(jīng)有持久化存儲的消息了,之前好像是頁面點(diǎn)擊推送的消息
總共一條,持久化1條。持久化的,即使重啟服務(wù),消息也不會丟失
我們再去推送一條
可以看到剛剛推送的這條也是持久化存儲的
我們在發(fā)布的方法里面,添加屬性發(fā)布的模式是2,
剛才是2條持久化的,現(xiàn)在新增10條數(shù)據(jù),且是持久化的消息
如果改成1
可以看到,剛剛新增了10條消息,但是這10條消息沒有持久化。
消費(fèi)者(consume)調(diào)用callback函數(shù)時(shí),會存在處理消息失敗的風(fēng)險(xiǎn),如果處理失敗,則消息會丟失,但是也可以選擇消費(fèi)者處理失敗時(shí),將消息回退給rabbitmq,重新再被消費(fèi)者消費(fèi),這個(gè)時(shí)候需要設(shè)置確認(rèn)標(biāo)識。
channel.basic_consume(callback,queue = "python-test",# no_ack 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識,消息會重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 no_ack = False)
目前隊(duì)列2中有10條沒有持久化的,有12條持久化的消息
執(zhí)行消費(fèi)程序
再看隊(duì)列2中,可以看到之前12條持久化和10條沒有持久化的消息數(shù)據(jù)都已經(jīng)被消費(fèi)了。我們可以看到消費(fèi)者這里,多了一個(gè)消費(fèi)者。消費(fèi)者有個(gè)tag,還有ack的確認(rèn)。在詳情那里,也可以看到 消費(fèi)者數(shù)量是1
我們push了一條消息,但是沒有發(fā)現(xiàn)推送到隊(duì)列中,難道是因?yàn)殛?duì)列綁定exchange的原因?
push的時(shí)候,有個(gè)持久化的選擇,發(fā)現(xiàn)還是沒有push進(jìn)去
在exchange這里push了,
發(fā)現(xiàn)隊(duì)列1有數(shù)據(jù),2沒有消息
往路由key這里發(fā)送多次消息
還是沒有,難道上面都是失敗的發(fā)送嘛
我們再看消費(fèi)者程序,我們看到運(yùn)行程序之后,這個(gè)程序一直沒有退出,處于監(jiān)聽狀態(tài),正如我們在隊(duì)列中看到的那樣,有個(gè)消費(fèi)者是up狀態(tài),也就是這個(gè)消費(fèi)者一直在監(jiān)聽我們上面的那個(gè)隊(duì)列,程序并沒有退出。因此,我們上面在頁面push的sss之類的消息,都被這個(gè)消費(fèi)者消費(fèi)掉了,因此沒有看到新增的消息。
我們將上面的消費(fèi)者程序停掉之后,就可以看到隊(duì)列下面已經(jīng)顯示沒有消費(fèi)者了,然后再推送消息的時(shí)候,頁面選擇持久化,
我們可以看到,推送的消息,是持久化的。由上面的學(xué)習(xí),了解到,消息是否持久化,好像是取決于生產(chǎn)者的設(shè)置,而不是說消息沒有持久化,我給它用命令持久化一下,至于是否可以用命令持久化一下,本來不需要持久化的消息,暫且不考慮。
在上一章中,我們創(chuàng)建了一個(gè)工作隊(duì)列,工作隊(duì)列模式的設(shè)想是每一條消息只會被轉(zhuǎn)發(fā)給一個(gè)消費(fèi)者。本章將會講解完全不一樣的場景: 我們會把一個(gè)消息轉(zhuǎn)發(fā)給多個(gè)消費(fèi)者,這種模式稱之為發(fā)布-訂閱模式。RabbitMq消息模式的核心思想是:一個(gè)生產(chǎn)者并不會直接往一個(gè)隊(duì)列中發(fā)送消息,事實(shí)上,生產(chǎn)者根本不知道它發(fā)送的消息將被轉(zhuǎn)發(fā)到哪些隊(duì)列。實(shí)際上,生產(chǎn)者只能把消息發(fā)送給一個(gè)exchange,exchange只做一件簡單的事情:一方面它們接收從生產(chǎn)者發(fā)送過來的消息,另一方面,它們把接收到的消息推送給隊(duì)列。一個(gè)exchage必須清楚地知道如何處理一條消息. rabbitmq的發(fā)布與訂閱要借助交換機(jī)(Exchange)的原理實(shí)現(xiàn):
Exchange 一共有三種工作模式:fanout, direct, topicd
這種模式下,傳遞到exchange的消息將會==轉(zhuǎn)發(fā)到所有于其綁定的queue上
不需要指定routing_key,即使指定了也是無效的。需要提前將exchange和queue綁定,一個(gè)exchange可以綁定多個(gè)queue,一個(gè)queue可以綁定多個(gè)exchange。需要先啟動訂閱者,此模式下的隊(duì)列是consume隨機(jī)生成的,發(fā)布者僅僅發(fā)布消息到exchange,由exchange轉(zhuǎn)消息至queue。exchange交換器首先我們創(chuàng)建一個(gè)fanout類型的交換器,我們稱之為:python-test:
channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")
廣播模式交換器很簡單,從字面意思也能理解,它其實(shí)就是把接收到的消息推送給所有它知道的隊(duì)列。? 想查看當(dāng)前系統(tǒng)中有多少個(gè)exchange,可以從控制臺查看
可以看到有很多以amq.*開頭的交換器,以及(AMQP default)默認(rèn)交換器,這些是默認(rèn)創(chuàng)建的交換器。? 在前面,我們并不知道交換器的存在,但是依然可以將消息發(fā)送到隊(duì)列中,那其實(shí)并不是因?yàn)槲覀兛梢圆皇褂媒粨Q器,實(shí)際上是我們使用了默認(rèn)的交換器(我們通過指定交換器為字字符串:""),回顧一下我們之前是如何發(fā)送消息的:
channel.basic_publish(exchange="",routing_key="1",body=message)
第一個(gè)參數(shù)是交換器的名字,空字符串表示它是一個(gè)默認(rèn)或無命名的交換器,消息將會由指定的路由鍵(第二個(gè)參數(shù),routingKey,后面會講)轉(zhuǎn)發(fā)到隊(duì)列。? 你可能會有疑問:既然exchange可以指定為空字符串(""),那么可否指定為null?? ? 答案是:不能!
通過跟蹤發(fā)布消息的代碼,在AMQImpl類中的Publish()方面中,可以看到,不光是exchange不能為null,同時(shí)routingKey路由鍵也不能為null,否則會拋出異常:
在前面的例子中,我們使用的隊(duì)列都是有具體的隊(duì)列名,創(chuàng)建命名隊(duì)列是很必要的,因?yàn)槲覀冃枰獙⑾M(fèi)者指向同一名字的隊(duì)列。因此,要想在生產(chǎn)者和消費(fèi)者中間共享隊(duì)列就必須要使用命名隊(duì)列。
import pikaimport jsoncredentials = pika.PlainCredentials("admin", "admin") # mq用戶名和密碼# 虛擬隊(duì)列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel=connection.channel()# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")for i in range(10): message=json.dumps({"OrderId":"1000%s"%i})# 向隊(duì)列插入數(shù)值 routing_key是隊(duì)列名。delivery_mode = 2 聲明消息在隊(duì)列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置 channel.basic_publish(exchange = "python-test",routing_key = "",body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message)connection.close()
import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 創(chuàng)建臨時(shí)隊(duì)列,隊(duì)列名傳空字符,consumer關(guān)閉后,隊(duì)列自動刪除result = channel.queue_declare("4")# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")# 綁定exchange和隊(duì)列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個(gè)隊(duì)列去channel.queue_bind(exchange = "python-test",queue = "4")# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中的消息,這里是打印出來def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode())channel.basic_consume(result.method.queue,callback,# 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識,消息會重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 auto_ack = False)channel.start_consuming()
import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 創(chuàng)建臨時(shí)隊(duì)列,隊(duì)列名傳空字符,consumer關(guān)閉后,隊(duì)列自動刪除result = channel.queue_declare("2",durable=True)# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")# 綁定exchange和隊(duì)列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個(gè)隊(duì)列去channel.queue_bind(exchange = "python-test",queue = "2")# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中的消息,這里是打印出來def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode())channel.basic_consume(result.method.queue,callback,# 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識,消息會重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 auto_ack = False)channel.start_consuming()
當(dāng)前的隊(duì)列如下
發(fā)布消息,exchange類型不對
下面這就是直連類型
進(jìn)去之后把找個(gè)已經(jīng)存在的exchange刪除了,這個(gè)暫時(shí)沒用
發(fā)布,這里也沒有指的路由key
可以看到新建的exchange類型是fanout
因?yàn)闆]有綁定隊(duì)列,所以程序推送的消息,好像是丟失了
開啟訂閱者1,聲明隊(duì)列4并綁定到前面創(chuàng)建的python-test這個(gè)exchange。
查看,隊(duì)列4已經(jīng)創(chuàng)建
有個(gè)消費(fèi)者正連接著4
并且訂閱者1聲明的隊(duì)列,也跟指定的exchange已經(jīng)綁定了,路由key,默認(rèn)就是用的隊(duì)列名稱
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg "durable" for queue "2" in vhost "/": received "false" but current is "true"")
開啟訂閱者2,但是報(bào)錯(cuò)了,因?yàn)殛?duì)列2已經(jīng)存在了,并且是Ture,是持久化的,而這里信道點(diǎn)隊(duì)列聲明2,是沒有指定那個(gè)參數(shù),那就是默認(rèn)是Flase,非持久化的隊(duì)列,重啟下服務(wù)這個(gè)隊(duì)列就不存在了。因此保持了。我們先將這個(gè)已經(jīng)存在的隊(duì)列刪除,然后重新聲明一下吧,或者是直接給它加個(gè)持久化的參數(shù)也行
加上之后,就能正常開啟這個(gè)訂閱者2了
我們創(chuàng)建的4,是非持久化的隊(duì)列,這里這個(gè)d的標(biāo)記,可能就是durable參數(shù),是否持久化隊(duì)列的意思吧
我們重新執(zhí)行一次發(fā)布者程序,發(fā)布者并沒有指定路由key,只是指定了exchange,而訂閱者1和2程序里面,都是有綁定這個(gè)exchange的
我們可以看到,訂閱者1獲取到了發(fā)布到這個(gè)exchage的消息
訂閱者2也獲取到了發(fā)布到這個(gè)exchage的消息
再來看下這個(gè)exchange的情況
它對應(yīng)的兩個(gè)隊(duì)列
隊(duì)列2有個(gè)消費(fèi)者
隊(duì)列4也有個(gè)消費(fèi)者,這兩個(gè)消費(fèi)者各自對應(yīng)一個(gè)隊(duì)列,每個(gè)消費(fèi)者請求過來是的端口不同,消費(fèi)者tag不同。兩個(gè)隊(duì)列中的消息,都被訂閱者程序獲取并打印在pycharm上進(jìn)行消費(fèi)了,因此,隊(duì)列中也就沒有數(shù)據(jù)了。
難道,一個(gè)隊(duì)列,就是一個(gè)訂閱者嗎?當(dāng)發(fā)布者發(fā)布消息的時(shí)候,難道是基礎(chǔ)發(fā)布方法里面,指定exchange,不指定路由key,這樣就會將生產(chǎn)者生產(chǎn)的消息,發(fā)送給所有綁定這個(gè)exchange的隊(duì)列嗎,而訂閱者和隊(duì)列一一對應(yīng),然后每個(gè)訂閱者就從自己對應(yīng)的隊(duì)列中將這個(gè)消息消費(fèi)掉嗎?
把兩個(gè)訂閱者,都停止掉,查看目前這兩個(gè)隊(duì)列,都是沒有消息的。
我執(zhí)行發(fā)布者程序,發(fā)布消息,指定exchange,不指定路由key。
我們可以看到,這種情況下,的確是將消息發(fā)布給所有綁定這個(gè)exchange的隊(duì)列了,如下,2和4隊(duì)列都綁定了,所以都接收到了十條消息。
我們發(fā)布消息的參數(shù),指定消息是持久化的,因?yàn)殛?duì)列2是個(gè)持久化的隊(duì)列,因此,進(jìn)入隊(duì)列2的消息也是持久化的
由于聲明隊(duì)列4,不是持久化的隊(duì)列,因此,即使發(fā)布消息時(shí),指定消息是持久化的,但是實(shí)際上這個(gè)消息也是沒有在這個(gè)非持久化的隊(duì)列中進(jìn)行持久化,也只是臨時(shí)的罷了。
我開啟訂閱者1
訂閱者1對應(yīng)著隊(duì)列4,隊(duì)列4的消息已經(jīng)被消費(fèi)了,已經(jīng)在上圖中打印出來了。
開啟訂閱者2
訂閱者2對應(yīng)的隊(duì)列是2,也將消息消費(fèi)掉了,并在訂閱者2程序中打印了出來
如果,隊(duì)列或者消息是臨時(shí)的,消費(fèi)者還沒消費(fèi)的消息,因?yàn)橹貑⒎?wù),那么就會丟失消息,消費(fèi)者應(yīng)該就消費(fèi)不到那個(gè)丟失的消息了。
這種工作模式的原理是消息發(fā)送至exchange,exchange根據(jù)**路由鍵(routing_key)**轉(zhuǎn)發(fā)到相對應(yīng)的queue上。
import pikaimport jsoncredentials = pika.PlainCredentials("admin", "admin") # mq用戶名和密碼# 虛擬隊(duì)列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel=connection.channel()# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="direct")for i in range(10): message=json.dumps({"OrderId":"1000%s"%i})# 指定 routing_key。delivery_mode = 2 聲明消息在隊(duì)列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = "python-test",routing_key = "OrderId",body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message)connection.close()
import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 創(chuàng)建臨時(shí)隊(duì)列,隊(duì)列名傳空字符,consumer關(guān)閉后,隊(duì)列自動刪除result = channel.queue_declare("",exclusive=True)# 聲明exchange,由exchange指定消息在哪個(gè)隊(duì)列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="direct")# 綁定exchange和隊(duì)列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個(gè)隊(duì)列去channel.queue_bind(exchange = "python-test",queue = result.method.queue,routing_key="OrderId")# 定義一個(gè)回調(diào)函數(shù)來處理消息隊(duì)列中的消息,這里是打印出來def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode())#channel.basic_qos(prefetch_count=1)# 告訴rabbitmq,用callback來接受消息channel.basic_consume(result.method.queue,callback,# 設(shè)置成 False,在調(diào)用callback函數(shù)時(shí),未收到確認(rèn)標(biāo)識,消息會重回隊(duì)列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 auto_ack = False)channel.start_consuming()
將之前測試用的exchanges刪除,隊(duì)列也刪除
使用direct類型的exchange,發(fā)布消息
沒有隊(duì)列生成
開啟消費(fèi)者程序,exchange聲明的類型是direct,隊(duì)列綁定exchange,指定路由key,這個(gè)路由key,并沒有這個(gè)名字的隊(duì)列
開啟上面的消費(fèi)者程序之后,就生成了一個(gè)隊(duì)列。這個(gè)生成的隊(duì)列,進(jìn)入可以看到是有消費(fèi)者在監(jiān)聽這個(gè)隊(duì)列的。這個(gè)隊(duì)列,以上面命名的路由key,來綁定了前面定義的exchange。
我們進(jìn)入這個(gè)exchange查看下,路由key,定向到某個(gè)隊(duì)列
我們看下發(fā)布消息的程序,就是exchange聲明里面,定義了direct方式,而基礎(chǔ)發(fā)布方法里面,就指定發(fā)布到上面定義的exchange,然后指定路由key為之前執(zhí)行消費(fèi)者程序時(shí),隨機(jī)生成名字的隊(duì)列,綁定exchange時(shí)使用的路由key。這樣,我們發(fā)布消息的時(shí)候,發(fā)布給exchange,就會根據(jù)路由key,然后找到對應(yīng)的隊(duì)列,將消息推送到這個(gè)隊(duì)列中。
由于我們的訂閱者,一直在監(jiān)聽,當(dāng)上面發(fā)布消息到隊(duì)列中后,訂閱者就從exchange下根據(jù)路由key,找到對應(yīng)的隊(duì)列,然后將隊(duì)列中的消息消費(fèi),打印到pycharm上,
這種模式和第二種差不多,exchange也是通過路由鍵routing_key來轉(zhuǎn)發(fā)消息到指定的queue。不同之處在于:**routing_key使用正則表達(dá)式支持模糊匹配,**但匹配規(guī)則又與常規(guī)正則表達(dá)式不同,比如"#"是匹配全部,“*”是匹配一個(gè)詞。舉例:routing_key =“#orderid#”,意思是將消息轉(zhuǎn)發(fā)至所有 routing_key 包含 “orderid” 字符的隊(duì)列中。代碼和模式二 類似,
我們用上面的代碼改 一下,再復(fù)制處兩個(gè)訂閱者,只需要修改下路由key為帶2的 帶3的數(shù)字就可以
我們再改一下
我們看頁面,可以看到又多了兩個(gè)隊(duì)列了
可以看到這個(gè)exchange對應(yīng)三個(gè)隊(duì)列,路由key都是帶有OrderId,
我們將路由key,改為匹配的方式,然后發(fā)布消息
演示失敗
參考鏈接:https://blog.csdn.net/weixin_45144837/article/details/104335115
標(biāo)簽: