python

导航

RabbitMQ如何在python分布式爬虫中构建?

来源 :中华考试网 2020-11-26

  基于这种确认机制,可以在高可靠性和高数据要求情景中,避免数据抓取的遗漏和丢失。

  其设计思路应该是基于mq设计两个接口,一个用于URL的存放,一个用户URL的获取,同时基于Redis的URL去重,通过类似scrapy-redis 的调度使爬虫运行。

  主程序示例:

  import pika

  class RabbitMQBASE:

  def __new__(cls, *args, **kw):

  if not hasattr(cls, '_instance'):

  org = super(RabbitMQBASE, cls)

  cls._instance = org.__new__(cls)

  return cls._instance

  def __init__(self, use='root', pwd='111'):

  user_pwd = pika.PlainCredentials(use, pwd)

  self.s_conn = pika.BlockingConnection(

  pika.ConnectionParameters(host='1.1.1.1', heartbeat_interval=3600, credentials=user_pwd))

  def channel(self):

  return self.s_conn.channel()

  def close(self):

  """关闭连接"""

  self.s_conn.close()

  @staticmethod

  def callback(ch, method, properties, body):

  print(" [消费者] %r" % body)

  class RabbitMQ(RabbitMQBASE):

  """

  type_:交换机类型fanout、direct、topic

  exchange:交换机名字

  queue_name:队列名字,为空则随机命名

  exclusive:队列是否持久化False持久,True不持久

  key_list:消费者的交换机、队列绑定的关键词列表

  key:生产者路由的关键词

  no_ack:是否确认消息True不确定,False确定

  """

  def __init__(self, use='root', pwd='Kw7pGR4xDD1CsP*U', type_='direct', exchange='test',

  queue_name=None, exclusive=True, key_list=['test'], key='test', no_ack=True):

  RabbitMQBASE.__init__(self, use=use, pwd=pwd)

  self.type_ = type_

  self.exchange = exchange

  self.queue_name = queue_name

  self.exclusive = exclusive

  self.key = key

  self.key_list = key_list

  self.no_ack = no_ack

  def rabbit_get(self):

  """消费者"""

  channel = self.channel()

  channel.exchange_declare(exchange=self.exchange, exchange_type=self.type_)

  if self.queue_name == None:

  result = channel.queue_declare(exclusive=self.exclusive)

  self.queue_name = result.method.queue

  if self.type_ != 'fanout':

  for key in self.key_list:

  channel.queue_bind(exchange=self.exchange, # 将交换机、队列、关键字绑定

  queue=self.queue_name, routing_key=key)

  channel.basic_consume(RabbitMQBASE.callback, queue=self.queue_name, no_ack=self.no_ack)

  channel.start_consuming()

  def rabbit_put(self, message='hello word'):

  """生产者"""

  channel = self.channel()

  channel.exchange_declare(exchange=self.exchange, exchange_type=self.type_)

  if self.type_ == 'fanout':

  self.key = ""

  channel.basic_publish(exchange=self.exchange, routing_key=self.key, body=message)

  channel.close()

  小伙伴们可以尝试着运行上面的代码,虽然我们不一定能全部弄明白其中的原理,但是我们需要进行RabbitMQ基础的搭建操作。

分享到

您可能感兴趣的文章