amqp/init.lua
2025-12-06 13:20:29 +04:00

1061 lines
26 KiB
Lua
Executable File

--
-- Copyright (C) 2016 Meng Zhang @ Yottaa,Inc
-- Copyright (C) 2018 4mig4
-- Copyright (C) 2019 gsdenys
--
local c = require ('amqp.consts')
local frame = require ('amqp.frame')
local logger = require ('amqp.logger')
local bit = require('bit')
local band = bit.band
local bor = bit.bor
local lshift = bit.lshift
local rshift = bit.rshift
local format = string.format
local gmatch = string.gmatch
local min = math.min
local socket
local tcp
local use_cqueues, lfs = pcall(require,"cqueues")
local amqp = {}
use_cqueues = false
-- let ngx.socket take precedence to lua socket
--[[
if _G.ngx and _G.ngx.socket then
logger.dbg("[socket] Unsing ngx socket.")
socket = _G.ngx.socket
tcp = socket.tcp
else
--]]
if use_cqueues == true then
logger.dbg("[socket] Unsing cqueues socket.")
socket = require('cqueues.socket')
tcp = socket
else
logger.dbg("[socket] Unsing lua socket.")
socket = require("socket")
tcp = socket.tcp
end
if use_cqueues == true then
function amqp:send(str)
if self.sock == nil then
return false, 'sock is nil'
else
local ok, err = pcall(self.sock.xwrite, self.sock, str, 'bnf')
if not ok then
if err then
return nil, err
else
return nil, 'self.sock.xwrite failed'
end
else
return err
end
end
end
function amqp:receive(int)
if self.sock == nil then
return false, 'sock is nil'
else
local ok, err = pcall(self.sock.xread, self.sock, int)
if not ok then
if err then
return nil, err
else
return nil, 'self.sock.xread failed'
end
else
return err
end
end
end
else
function amqp:send(str) return self.sock:send(str) end
function amqp:receive(int) return self.sock:receive(int) end
end
-- getopt(key, table, table, ..., value)
-- return the key's value from the first table that has it, or VALUE if none do
local function _getopt(k,t,...)
if select('#',...)==0 then
return t
elseif t[k]~=nil then
return t[k]
else
return _getopt(k,...)
end
end
-- to check whether we have valid parameters to setup
local function mandatory_options(opts)
if not opts then
error("no opts provided")
end
if type(opts) ~= "table" then
error("opts is not valid")
end
if (opts.role == nil or opts.role == "consumer") and not opts.queue then
error("as a consumer, queue is required")
end
end
--
-- initialize the context
--
function amqp:new(opts)
local sock, err
local mt = { __index = self }
mandatory_options(opts)
if use_cqueues == true then
sock = tcp
else
sock, err = tcp()
end
if not sock then
return nil, err
end
local ctx = {
sock = sock,
ssl_ctx = opts.ssl_ctx or nil,
opts = opts,
connection_state = c.state.CLOSED,
channel_state = c.state.CLOSED,
major = c.PROTOCOL_VERSION_MAJOR,
minor = c.PROTOCOL_VERSION_MINOR,
revision = c.PROTOCOL_VERSION_REVISION,
frame_max = c.DEFAULT_FRAME_SIZE,
channel_max = c.DEFAULT_MAX_CHANNELS,
mechanism = c.MECHANISM_PLAIN
}
setmetatable(ctx,mt)
return ctx
end
local function sslhandshake(ctx)
local sock = ctx.sock
local ssl_ctx = ctx.ssl_ctx
local session
local err
local errno
if _G.ngx then
session, err = sock:sslhandshake()
if not session then
logger.error("[amqp:sslhandshake] SSL handshake failed: ", err)
end
return session, err -- return
elseif use_cqueues == true then
if ssl_ctx then
session, err, errno = sock:starttls(ssl_ctx)
else
session, err, errno = sock:starttls()
end
if not session then
logger.error("[amqp:sslhandshake] SSL handshake failed: ", err)
end
return session, err -- return
end
-- if none of the above then continue down
local ssl = require("ssl") -- require library only for socket
local default_params = {
mode = "client",
protocol = "any",
verify = "none",
options = {"all", "no_sslv2","no_sslv3"}
}
local wsock, err = ssl.wrap(sock, ssl_ctx or default_params)
if not wsock then
logger.error("[amqp:sslhandshake] Socket wrapping failed: ", err)
return wsock, err -- return
else
logger.dbg("[amqp:sslhandshake] Wrapped socket")
ctx.sock = wsock
end
local ok, msg = ctx.sock:dohandshake()
if not ok then
logger.error("[amqp:sslhandshake] SSL handshake failed: ", msg)
else
logger.dbg("[amqp:sslhandshake] SSL handshake")
end
return ok, msg -- return
end
-- connect to the AMQP server (broker)
--
function amqp:connect(...)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
self._subscribed = false
if use_cqueues == true then
local s, err = sock.connect(...)
if not s then
logger.error("[amqp:connect] failed: ", err)
return nil, err
else
self.sock = s
self.sock:settimeout(self.opts.connect_timeout or 5000) -- configurable but 5 seconds timeout
end
else
sock:settimeout(self.opts.connect_timeout or 5000) -- configurable but 5 seconds timeout
local ok, err = sock:connect(...)
if not ok then
logger.error("[amqp:connect] failed: ", err)
return nil, err
end
end
if self.opts.ssl then
return sslhandshake(self)
end
return true
end
-- to close the socket
--
function amqp:close()
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:close()
end
local function platform()
local has_jit, jit = pcall(require, "jit")
if has_jit then
return jit.os .. "_" .. jit.arch
end
return "posix"
end
-- connection and channel
--
function amqp:connection_start_ok()
local user = self.opts.user or "guest"
local password = self.opts.password or "guest"
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.START_OK)
f.method = {
properties = {
product = c.PRODUCT,
version = c.VERSION,
platform = platform(),
copyright = c.COPYRIGHT,
capabilities = {
authentication_failure_close = true
}
},
mechanism = self.mechanism,
response = format("\0%s\0%s", user, password),
locale = c.LOCALE
}
return frame.wire_method_frame(self, f)
end
function amqp:connection_tune_ok()
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.TUNE_OK)
f.method = {
no_wait = true,
channel_max = self.channel_max or c.DEFAULT_MAX_CHANNELS,
frame_max = self.frame_max or c.DEFAULT_FRAME_SIZE,
heartbeat = self.opts.heartbeat or c.DEFAULT_HEARTBEAT
}
return frame.wire_method_frame(self,f)
end
function amqp:connection_open()
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.OPEN)
f.method = {
virtual_host = self.opts.virtual_host or "/"
}
return frame.wire_method_frame(self,f)
end
local function sanitize_close_reason(ctx, reason)
reason = reason or {}
local ongoing = ctx.ongoing or {}
return {
reply_code = reason.reply_code or c.err.CONNECTION_FORCED,
reply_text = reason.reply_text or "",
class_id = ongoing.class_id or 0,
method_id = ongoing.method_id or 0
}
end
function amqp:connection_close(reason)
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.CLOSE)
f.method = sanitize_close_reason(self, reason)
return frame.wire_method_frame(self,f)
end
function amqp:connection_close_ok()
local f = frame.new_method_frame(self.channel or 1,
c.class.CONNECTION,
c.method.connection.CLOSE_OK)
return frame.wire_method_frame(self,f)
end
--- method to open an AMQP channel
-- @param none implied self (context)
-- @return the channel number
function amqp:channel_open()
local f = frame.new_method_frame(self.opts.channel or 1,
c.class.CHANNEL,
c.method.channel.OPEN)
f.method = { no_wait = false }
logger.dbg("[channel_open] wired a frame", "[class_id]: ", f.class_id, "[method_id]: ", f.method_id)
local res, err = frame.wire_method_frame(self,f)
if res then
logger.dbg("[channel_open] channel: ", res.channel)
self.channel = res.channel
end
return res, err
end
function amqp:channel_close(reason)
local f = frame.new_method_frame(self.channel or c.DEFAULT_CHANNEL, c.class.CHANNEL, c.method.channel.CLOSE)
f.method = sanitize_close_reason(self, reason)
return frame.wire_method_frame(self,f)
end
function amqp:channel_close_ok()
local f = frame.new_method_frame(self.channel or 1, c.class.CHANNEL, c.method.channel.CLOSE_OK)
return frame.wire_method_frame(self,f)
end
--- check if protocol version matches
-- @param self (context)
-- @param major major version of the protocol
-- @param minor minor version of the protocol
-- @return true or false
local function is_version_acceptable(self,major,minor)
return self.major == major and self.minor == minor
end
local function is_mechanism_acceptable(self,method)
local mechanism = method.mechanism
if not mechanism then
return nil, "broker does not support any mechanism"
end
for me in gmatch(mechanism, "%S+") do
if me == self.mechanism then
return true
end
end
return nil, "mechanism does not match"
end
function amqp:verify_capabilities(method)
if not is_version_acceptable(self, method.major, method.minor) then
return nil, "protocol version does not match"
end
if not is_mechanism_acceptable(self,method) then
return nil, "mechanism does not match"
end
return true
end
local function negotiate_connection_tune_params(ctx,method)
if not method then
return
end
if method.channel_max ~= nil and method.channel_max ~= 0 then
-- 0 means no limit
ctx.channel_max = min(ctx.channel_max, method.channel_max)
end
if method.frame_max ~= nil and method.frame_max ~= 0 then
ctx.frame_max = min(ctx.frame_max, method.frame_max)
end
end
local function set_state(ctx, channel_state, connection_state)
ctx.channel_state = channel_state
ctx.connection_state = connection_state
end
--- setup connection to AMQP server
-- @param implied self (context)
-- @return true or nil
-- @error TBD
function amqp:setup()
local res
local err
local sock = self.sock
if not sock then
return nil, "not initialized"
end
-- configurable but 30 seconds read timeout
sock:settimeout(self.opts.read_timeout or 30000)
res, err = frame.wire_protocol_header(self)
if not res then
logger.error("[amqp:setup] wire_protocol_header failed: ", err)
return nil, err
end
if res.method then
logger.dbg("[amqp:setup] connection_start: ",res.method)
res, err = amqp.verify_capabilities(self, res.method)
if not res then
-- in order to close the socket without sending futher data
set_state(self,c.state.CLOSED, c.state.CLOSED)
return nil, err
end
end
res, err = amqp.connection_start_ok(self)
if not res then
logger.error("[amqp:setup] connection_start_ok failed: ", err)
return nil, err
end
negotiate_connection_tune_params(self,res.method)
res, err = amqp.connection_tune_ok(self)
if not res then
logger.error("[amqp:setup] connection_tune_ok failed: ", err)
return nil, err
end
res, err = amqp.connection_open(self)
if not res then
logger.error("[amqp:setup] connection_open failed: ", err)
return nil, err
end
self.connection_state = c.state.ESTABLISHED
res, err = amqp.channel_open(self)
if not res then
logger.error("[amqp:setup] channel_open failed: ", err)
return nil, err
end
self.channel_state = c.state.ESTABLISHED
return true
end
--
-- close channel and connection if needed.
--
function amqp:teardown(reason)
local ok
local err
if self.channel_state == c.state.ESTABLISHED then
ok, err = amqp.channel_close(self, reason)
if not ok then
logger.error("[channel_close] err: ", err)
end
elseif self.channel_state == c.state.CLOSE_WAIT then
ok, err = amqp.channel_close_ok(self)
if not ok then
logger.error("[channel_close_ok] err: ", err)
end
end
if self.connection_state == c.state.ESTABLISHED then
ok, err = amqp.connection_close(self, reason)
if not ok then
logger.error("[connection_close] err: ", err)
end
elseif self.connection_state == c.state.CLOSE_WAIT then
ok, err = amqp:connection_close_ok()
if not ok then
logger.error("[connection_close_ok] err: ", err)
end
end
end
--
-- initialize the consumer
--
function amqp:prepare_to_consume()
local res
local err
if self.channel_state ~= c.state.ESTABLISHED then
return nil, "[prepare_to_consume] channel is not open"
end
res, err = amqp.queue_declare(self)
if not res then
logger.error("[prepare_to_consume] queue_declare failed: ", err)
return nil, err
end
if self.opts.exchange ~= '' then
res, err = amqp.queue_bind(self)
if not res then
logger.error("[prepare_to_consume] queue_bind failed: ", err)
return nil, err
end
end
res, err = amqp.basic_consume(self)
if not res then
logger.error("[prepare_to_consume] basic_consume failed: ", err)
return nil, err
end
return true
end
--
-- conclude a heartbeat timeout
-- if and only if we see ctx.threshold or more failure heartbeats in the recent heartbeats ctx.window
--
local function timedout(ctx, timeouts)
local window = ctx.window or 5
local threshold = ctx.threshold or 4
local counter = 0
for i = 1, window do
if band(rshift(timeouts,i-1),1) ~= 0 then
counter = counter + 1
end
end
return counter >= threshold
end
function amqp:timedout(timeouts)
return timedout(self, timeouts)
end
local function exiting()
return _G.ngx and _G.ngx.worker and _G.ngx.worker.exiting()
end
--
-- consumer
--
function amqp:basic_ack(ok, delivery_tag)
local method = c.method.basic.REJECT
if self.opts.requeue then method = c.method.basic.NOACK end
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
ok and c.method.basic.ACK or method)
f.method = {
delivery_tag = delivery_tag,
requeue = self.opts.requeue,
multiple = self.opts.multiple,
no_wait = true
}
return frame.wire_method_frame(self,f)
end
function amqp:consume_loop(callback)
local ok
local err
local err0
local f
local hb = { last = os.time(), timeouts = 0 }
local f_deliver
local f_header
local status
local flag = true
local breaker = false
while true do
f, err0 = frame.consume_frame(self)
while not f do
if exiting() then
err = "exiting"
breaker = true
break
end
-- in order to send the heartbeat,
-- the very read op need be awaken up periodically, so the timeout is expected.
if err0 ~= "timeout" then
logger.error("[amqp:consume_loop]", err0 or "?")
end
if err0 == "closed" then
err = err0
set_state(self, c.state.CLOSED, c.state.CLOSED)
logger.error("[amqp:consume_loop] socket closed")
breaker = true
break
end
if err0 == "wantread" then
err = err0
set_state(self, c.state.CLOSED, c.state.CLOSED)
logger.error("[amqp:consume_loop] SSL socket needs to dohandshake again")
breaker = true
break
end
-- intented timeout?
local now = os.time()
if now - hb.last > c.DEFAULT_HEARTBEAT then
logger.dbg("[amqp:consume_loop] timeouts inc. [ts]: ",now)
hb.timeouts = bor(lshift(hb.timeouts,1),1)
hb.last = now
ok, err0 = frame.wire_heartbeat(self)
if not ok then
logger.error("[heartbeat]","pong error: ", err0 or "?", "[ts]: ", hb.last)
else
logger.dbg("[heartbeat]","pong sent. [ts]: ",hb.last)
end
end
if timedout(self,hb.timeouts) then
err = "heartbeat timeout"
logger.error("[amqp:consume_loop] timedout. [ts]: ", now)
breaker = true
break
end --if end
logger.dbg("[amqp:consume_loop] continue consuming ", err0)
f, err0 = frame.consume_frame(self)
end
if breaker then break end
if f.type == c.frame.METHOD_FRAME then
if f.class_id == c.class.CHANNEL then
if f.method_id == c.method.channel.CLOSE then
set_state(self, c.state.CLOSE_WAIT, self.connection_state)
logger.dbg("[channel close method]", f.method.reply_code, f.method.reply_text)
break
end
elseif f.class_id == c.class.CONNECTION then
if f.method_id == c.method.connection.CLOSE then
set_state(self, c.state.CLOSED, c.state.CLOSE_WAIT)
logger.dbg("[connection close method]", f.method.reply_code, f.method.reply_text)
break
end
elseif f.class_id == c.class.BASIC then
if f.method_id == c.method.basic.DELIVER then
f_deliver = f.method
if f.method ~= nil then
logger.dbg("[basic_deliver] ", f.method)
end
end
end
elseif f.type == c.frame.HEADER_FRAME then
f_header = f
logger.dbg(format("[header] class_id: %d weight: %d, body_size: %d",
f.class_id, f.weight, f.body_size))
logger.dbg("[frame.properties]",f.properties)
elseif f.type == c.frame.BODY_FRAME then
status = true
if callback then
status, err0 = pcall(callback, {
body = f.body,
frame = f_deliver,
properties = f_header.properties
})
if not status then
logger.error("calling callback failed: ", err0)
end
end
if not self.opts.no_ack then
-- ack
self:basic_ack(status, f_deliver.delivery_tag)
end
f_deliver, f_header = nil, nil
logger.dbg("[body]", f.body)
elseif f.type == c.frame.HEARTBEAT_FRAME then
hb.last = os.time()
logger.dbg("[heartbeat]","ping received. [ts]: ", hb.last)
hb.timeouts = band(lshift(hb.timeouts,1),0)
ok, err0 = frame.wire_heartbeat(self)
if not ok then
logger.error("[heartbeat]","pong error: ", err0 or "?", "[ts]: ", hb.last)
else
logger.dbg("[heartbeat]","pong sent. [ts]: ", hb.last)
end
end
end
self:teardown()
-- return not err or err ~= "exiting", err
return nil, err or err0
end
function amqp:consume()
local ok
local err
ok, err = self:setup()
if not ok then
self:teardown()
return nil, err
end
ok, err = self:prepare_to_consume()
if not ok then
self:teardown()
return nil, err
end
return self:consume_loop(self.opts.callback)
end
--
-- get
--
function amqp:get()
local ok, res, err
ok, err = self:setup()
if not ok then
self:teardown()
return nil, err
end
if self.channel_state ~= c.state.ESTABLISHED then
self:teardown()
return nil, err
end
res, err = amqp.basic_get(self)
if not res then
logger.error("[amqp:get] basic_get failed: " .. err)
return nil, err
end
local message
if res.method_id == c.method.basic.GET_OK and res.method.message_count > 0 then
local f = frame.consume_frame(self)
if f.type == c.frame.HEADER_FRAME then
logger.dbg(format("[header] class_id: %d weight: %d, body_size: %d",
f.class_id, f.weight, f.body_size))
logger.dbg("[frame.properties]",f.properties)
end
f = frame.consume_frame(self)
if f.type == c.frame.BODY_FRAME then
message = f.body
end
logger.dbg("[message]", message)
end
self:teardown()
return message, err
end
--
-- publisher
--
function amqp:publish(payload,opts,properties)
local ok
local err
local size = #payload
ok, err = amqp.basic_publish(self, opts)
if not ok then
logger.error("[amqp.publish] failed: ", err)
return nil, err
end
ok, err = frame.wire_header_frame(self,size,properties)
if not ok then
logger.error("[amqp.publish] failed: ", err)
return nil, err
end
ok, err = frame.wire_body_frame(self,payload)
if not ok then
logger.error("[amqp.publish] failed: ", err)
return nil, err
end
return true
end
--
-- queue
--
function amqp:queue_declare(opts)
opts = opts or {}
if not opts.queue and not self.opts.queue then
return nil, "[queue_declare] queue is not specified"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.DECLARE)
f.method = {
queue = opts.queue or self.opts.queue,
passive = _getopt('passive', opts, self.opts, false),
durable = _getopt('durable', opts, self.opts, false),
exclusive = _getopt('exclusive', opts, self.opts, false),
auto_delete = _getopt('auto_delete', opts, self.opts, true),
no_wait = _getopt('no_wait', opts, self.opts, false),
arguments = _getopt('arguments', opts, self.opts, {})
}
return frame.wire_method_frame(self,f)
end
function amqp:queue_bind(opts)
opts = opts or {}
if not opts.queue and not self.opts.queue then
return nil, "[queue_bind] queue is not specified"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.BIND)
f.method = {
queue = opts.queue or self.opts.queue,
exchange = opts.exchange or self.opts.exchange,
routing_key = _getopt('routing_key', opts, self.opts, ""),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:queue_unbind(opts)
opts = opts or {}
if not opts.queue and not self.opts.queue then
return nil, "[queue_unbind] queue is not specified"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.UNBIND)
f.method = {
queue = opts.queue or self.opts.queue,
exchange = opts.exchange or self.opts.exchange,
routing_key = _getopt('routing_key', opts, self.opts, ""),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:queue_delete(opts)
opts = opts or {}
if not opts.queue and not self.opts.queue then
return nil, "[queue_delete] queue is not specified"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.DELETE)
f.method = {
queue = opts.queue or self.opts.queue,
if_unused = _getopt('if_unused', opts, self.opts, false),
if_empty = _getopt('if_empty', opts, self.opts, false),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
--
-- exchange
--
function amqp:exchange_declare(opts)
opts = opts or {}
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.DECLARE)
f.method = {
exchange = opts.exchange or self.opts.exchange,
typ = opts.typ or "topic",
passive = _getopt('passive', opts, self.opts, false),
durable = _getopt('durable', opts, self.opts, false),
auto_delete = _getopt('auto_delete', opts, self.opts, false),
internal = _getopt('internal', opts, self.opts, false),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:exchange_bind(opts)
if not opts then
return nil, "[exchange_bind] opts is required"
end
if not opts.source then
return nil, "[exchange_bind] source is required"
end
if not opts.destination then
return nil, "[exchange_bind] destination is required"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.BIND)
f.method = {
destination = opts.destination,
source = opts.source,
routing_key = _getopt('routing_key', opts, self.opts, ''),
no_wait = _getopt('no_wait', opts, self,opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:exchange_unbind(opts)
if not opts then
return nil, "[exchange_unbind] opts is required"
end
if not opts.source then
return nil, "[exchange_unbind] source is required"
end
if not opts.destination then
return nil, "[exchange_unbind] destination is required"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.UNBIND)
f.method = {
destination = opts.destination,
source = opts.source,
routing_key = _getopt('routing_key', opts, self.opts, ""),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:exchange_delete(opts)
opts = opts or {}
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.DELETE)
f.method = {
exchange = opts.exchange or self.opts.exchange,
if_unused = _getopt('if_unused', opts, self.opts, true),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
--
-- basic
--
function amqp:basic_consume(opts)
opts = opts or {}
if not opts.queue and not self.opts.queue then
return nil, "[basic_consume] queue is not specified"
end
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
c.method.basic.CONSUME)
f.method = {
queue = opts.queue or self.opts.queue,
no_local = _getopt('no_local',opts, self.opts, false),
no_ack = _getopt('no_ack', opts, self.opts, true),
exclusive = _getopt('exclusive', opts, self.opts, false),
no_wait = _getopt('no_wait', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:basic_publish(opts)
opts = opts or {}
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
c.method.basic.PUBLISH)
f.method = {
no_wait = true,
exchange = opts.exchange or self.opts.exchange,
routing_key = _getopt('routing_key',opts, self.opts, ""),
mandatory = _getopt('mandatory', opts, self.opts, false),
immediate = _getopt('immediate', opts, self.opts, false)
}
return frame.wire_method_frame(self,f)
end
function amqp:basic_get(opts)
opts = opts or {}
if not opts.queue and not self.opts.queue then
return nil, "[basic_get] queue is not specified."
end
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
c.method.basic.GET)
f.method = {
queue = opts.queue or self.opts.queue,
no_ack = _getopt('no_ack', opts, self.opts, true)
}
return frame.wire_method_frame(self,f)
end
return amqp