-- -- Copyright (C) 2016 Meng Zhang @ Yottaa,Inc -- Copyright (C) 2018 4mig4 -- Copyright (C) 2019 gsdenys -- local c = require ('amqp.consts') local frame = require ('amqp.frame') local logger = require ('amqp.logger') local bit = require('bit') local band = bit.band local bor = bit.bor local lshift = bit.lshift local rshift = bit.rshift local format = string.format local gmatch = string.gmatch local min = math.min local socket local tcp local use_cqueues, lfs = pcall(require,"cqueues") local amqp = {} use_cqueues = false -- let ngx.socket take precedence to lua socket --[[ if _G.ngx and _G.ngx.socket then logger.dbg("[socket] Unsing ngx socket.") socket = _G.ngx.socket tcp = socket.tcp else --]] if use_cqueues == true then logger.dbg("[socket] Unsing cqueues socket.") socket = require('cqueues.socket') tcp = socket else logger.dbg("[socket] Unsing lua socket.") socket = require("socket") tcp = socket.tcp end if use_cqueues == true then function amqp:send(str) if self.sock == nil then return false, 'sock is nil' else local ok, err = pcall(self.sock.xwrite, self.sock, str, 'bnf') if not ok then if err then return nil, err else return nil, 'self.sock.xwrite failed' end else return err end end end function amqp:receive(int) if self.sock == nil then return false, 'sock is nil' else local ok, err = pcall(self.sock.xread, self.sock, int) if not ok then if err then return nil, err else return nil, 'self.sock.xread failed' end else return err end end end else function amqp:send(str) return self.sock:send(str) end function amqp:receive(int) return self.sock:receive(int) end end -- getopt(key, table, table, ..., value) -- return the key's value from the first table that has it, or VALUE if none do local function _getopt(k,t,...) if select('#',...)==0 then return t elseif t[k]~=nil then return t[k] else return _getopt(k,...) end end -- to check whether we have valid parameters to setup local function mandatory_options(opts) if not opts then error("no opts provided") end if type(opts) ~= "table" then error("opts is not valid") end if (opts.role == nil or opts.role == "consumer") and not opts.queue then error("as a consumer, queue is required") end end -- -- initialize the context -- function amqp:new(opts) local sock, err local mt = { __index = self } mandatory_options(opts) if use_cqueues == true then sock = tcp else sock, err = tcp() end if not sock then return nil, err end local ctx = { sock = sock, ssl_ctx = opts.ssl_ctx or nil, opts = opts, connection_state = c.state.CLOSED, channel_state = c.state.CLOSED, major = c.PROTOCOL_VERSION_MAJOR, minor = c.PROTOCOL_VERSION_MINOR, revision = c.PROTOCOL_VERSION_REVISION, frame_max = c.DEFAULT_FRAME_SIZE, channel_max = c.DEFAULT_MAX_CHANNELS, mechanism = c.MECHANISM_PLAIN } setmetatable(ctx,mt) return ctx end local function sslhandshake(ctx) local sock = ctx.sock local ssl_ctx = ctx.ssl_ctx local session local err local errno if _G.ngx then session, err = sock:sslhandshake() if not session then logger.error("[amqp:sslhandshake] SSL handshake failed: ", err) end return session, err -- return elseif use_cqueues == true then if ssl_ctx then session, err, errno = sock:starttls(ssl_ctx) else session, err, errno = sock:starttls() end if not session then logger.error("[amqp:sslhandshake] SSL handshake failed: ", err) end return session, err -- return end -- if none of the above then continue down local ssl = require("ssl") -- require library only for socket local default_params = { mode = "client", protocol = "any", verify = "none", options = {"all", "no_sslv2","no_sslv3"} } local wsock, err = ssl.wrap(sock, ssl_ctx or default_params) if not wsock then logger.error("[amqp:sslhandshake] Socket wrapping failed: ", err) return wsock, err -- return else logger.dbg("[amqp:sslhandshake] Wrapped socket") ctx.sock = wsock end local ok, msg = ctx.sock:dohandshake() if not ok then logger.error("[amqp:sslhandshake] SSL handshake failed: ", msg) else logger.dbg("[amqp:sslhandshake] SSL handshake") end return ok, msg -- return end -- connect to the AMQP server (broker) -- function amqp:connect(...) local sock = self.sock if not sock then return nil, "not initialized" end self._subscribed = false if use_cqueues == true then local s, err = sock.connect(...) if not s then logger.error("[amqp:connect] failed: ", err) return nil, err else self.sock = s self.sock:settimeout(self.opts.connect_timeout or 5000) -- configurable but 5 seconds timeout end else sock:settimeout(self.opts.connect_timeout or 5000) -- configurable but 5 seconds timeout local ok, err = sock:connect(...) if not ok then logger.error("[amqp:connect] failed: ", err) return nil, err end end if self.opts.ssl then return sslhandshake(self) end return true end -- to close the socket -- function amqp:close() local sock = self.sock if not sock then return nil, "not initialized" end return sock:close() end local function platform() local has_jit, jit = pcall(require, "jit") if has_jit then return jit.os .. "_" .. jit.arch end return "posix" end -- connection and channel -- function amqp:connection_start_ok() local user = self.opts.user or "guest" local password = self.opts.password or "guest" local f = frame.new_method_frame(c.DEFAULT_CHANNEL, c.class.CONNECTION, c.method.connection.START_OK) f.method = { properties = { product = c.PRODUCT, version = c.VERSION, platform = platform(), copyright = c.COPYRIGHT, capabilities = { authentication_failure_close = true } }, mechanism = self.mechanism, response = format("\0%s\0%s", user, password), locale = c.LOCALE } return frame.wire_method_frame(self, f) end function amqp:connection_tune_ok() local f = frame.new_method_frame(c.DEFAULT_CHANNEL, c.class.CONNECTION, c.method.connection.TUNE_OK) f.method = { no_wait = true, channel_max = self.channel_max or c.DEFAULT_MAX_CHANNELS, frame_max = self.frame_max or c.DEFAULT_FRAME_SIZE, heartbeat = self.opts.heartbeat or c.DEFAULT_HEARTBEAT } return frame.wire_method_frame(self,f) end function amqp:connection_open() local f = frame.new_method_frame(c.DEFAULT_CHANNEL, c.class.CONNECTION, c.method.connection.OPEN) f.method = { virtual_host = self.opts.virtual_host or "/" } return frame.wire_method_frame(self,f) end local function sanitize_close_reason(ctx, reason) reason = reason or {} local ongoing = ctx.ongoing or {} return { reply_code = reason.reply_code or c.err.CONNECTION_FORCED, reply_text = reason.reply_text or "", class_id = ongoing.class_id or 0, method_id = ongoing.method_id or 0 } end function amqp:connection_close(reason) local f = frame.new_method_frame(c.DEFAULT_CHANNEL, c.class.CONNECTION, c.method.connection.CLOSE) f.method = sanitize_close_reason(self, reason) return frame.wire_method_frame(self,f) end function amqp:connection_close_ok() local f = frame.new_method_frame(self.channel or 1, c.class.CONNECTION, c.method.connection.CLOSE_OK) return frame.wire_method_frame(self,f) end --- method to open an AMQP channel -- @param none implied self (context) -- @return the channel number function amqp:channel_open() local f = frame.new_method_frame(self.opts.channel or 1, c.class.CHANNEL, c.method.channel.OPEN) f.method = { no_wait = false } logger.dbg("[channel_open] wired a frame", "[class_id]: ", f.class_id, "[method_id]: ", f.method_id) local res, err = frame.wire_method_frame(self,f) if res then logger.dbg("[channel_open] channel: ", res.channel) self.channel = res.channel end return res, err end function amqp:channel_close(reason) local f = frame.new_method_frame(self.channel or c.DEFAULT_CHANNEL, c.class.CHANNEL, c.method.channel.CLOSE) f.method = sanitize_close_reason(self, reason) return frame.wire_method_frame(self,f) end function amqp:channel_close_ok() local f = frame.new_method_frame(self.channel or 1, c.class.CHANNEL, c.method.channel.CLOSE_OK) return frame.wire_method_frame(self,f) end --- check if protocol version matches -- @param self (context) -- @param major major version of the protocol -- @param minor minor version of the protocol -- @return true or false local function is_version_acceptable(self,major,minor) return self.major == major and self.minor == minor end local function is_mechanism_acceptable(self,method) local mechanism = method.mechanism if not mechanism then return nil, "broker does not support any mechanism" end for me in gmatch(mechanism, "%S+") do if me == self.mechanism then return true end end return nil, "mechanism does not match" end function amqp:verify_capabilities(method) if not is_version_acceptable(self, method.major, method.minor) then return nil, "protocol version does not match" end if not is_mechanism_acceptable(self,method) then return nil, "mechanism does not match" end return true end local function negotiate_connection_tune_params(ctx,method) if not method then return end if method.channel_max ~= nil and method.channel_max ~= 0 then -- 0 means no limit ctx.channel_max = min(ctx.channel_max, method.channel_max) end if method.frame_max ~= nil and method.frame_max ~= 0 then ctx.frame_max = min(ctx.frame_max, method.frame_max) end end local function set_state(ctx, channel_state, connection_state) ctx.channel_state = channel_state ctx.connection_state = connection_state end --- setup connection to AMQP server -- @param implied self (context) -- @return true or nil -- @error TBD function amqp:setup() local res local err local sock = self.sock if not sock then return nil, "not initialized" end -- configurable but 30 seconds read timeout sock:settimeout(self.opts.read_timeout or 30000) res, err = frame.wire_protocol_header(self) if not res then logger.error("[amqp:setup] wire_protocol_header failed: ", err) return nil, err end if res.method then logger.dbg("[amqp:setup] connection_start: ",res.method) res, err = amqp.verify_capabilities(self, res.method) if not res then -- in order to close the socket without sending futher data set_state(self,c.state.CLOSED, c.state.CLOSED) return nil, err end end res, err = amqp.connection_start_ok(self) if not res then logger.error("[amqp:setup] connection_start_ok failed: ", err) return nil, err end negotiate_connection_tune_params(self,res.method) res, err = amqp.connection_tune_ok(self) if not res then logger.error("[amqp:setup] connection_tune_ok failed: ", err) return nil, err end res, err = amqp.connection_open(self) if not res then logger.error("[amqp:setup] connection_open failed: ", err) return nil, err end self.connection_state = c.state.ESTABLISHED res, err = amqp.channel_open(self) if not res then logger.error("[amqp:setup] channel_open failed: ", err) return nil, err end self.channel_state = c.state.ESTABLISHED return true end -- -- close channel and connection if needed. -- function amqp:teardown(reason) local ok local err if self.channel_state == c.state.ESTABLISHED then ok, err = amqp.channel_close(self, reason) if not ok then logger.error("[channel_close] err: ", err) end elseif self.channel_state == c.state.CLOSE_WAIT then ok, err = amqp.channel_close_ok(self) if not ok then logger.error("[channel_close_ok] err: ", err) end end if self.connection_state == c.state.ESTABLISHED then ok, err = amqp.connection_close(self, reason) if not ok then logger.error("[connection_close] err: ", err) end elseif self.connection_state == c.state.CLOSE_WAIT then ok, err = amqp:connection_close_ok() if not ok then logger.error("[connection_close_ok] err: ", err) end end end -- -- initialize the consumer -- function amqp:prepare_to_consume() local res local err if self.channel_state ~= c.state.ESTABLISHED then return nil, "[prepare_to_consume] channel is not open" end res, err = amqp.queue_declare(self) if not res then logger.error("[prepare_to_consume] queue_declare failed: ", err) return nil, err end if self.opts.exchange ~= '' then res, err = amqp.queue_bind(self) if not res then logger.error("[prepare_to_consume] queue_bind failed: ", err) return nil, err end end res, err = amqp.basic_consume(self) if not res then logger.error("[prepare_to_consume] basic_consume failed: ", err) return nil, err end return true end -- -- conclude a heartbeat timeout -- if and only if we see ctx.threshold or more failure heartbeats in the recent heartbeats ctx.window -- local function timedout(ctx, timeouts) local window = ctx.window or 5 local threshold = ctx.threshold or 4 local counter = 0 for i = 1, window do if band(rshift(timeouts,i-1),1) ~= 0 then counter = counter + 1 end end return counter >= threshold end function amqp:timedout(timeouts) return timedout(self, timeouts) end local function exiting() return _G.ngx and _G.ngx.worker and _G.ngx.worker.exiting() end -- -- consumer -- function amqp:basic_ack(ok, delivery_tag) local method = c.method.basic.REJECT if self.opts.requeue then method = c.method.basic.NOACK end local f = frame.new_method_frame(self.channel or 1, c.class.BASIC, ok and c.method.basic.ACK or method) f.method = { delivery_tag = delivery_tag, requeue = self.opts.requeue, multiple = self.opts.multiple, no_wait = true } return frame.wire_method_frame(self,f) end function amqp:consume_loop(callback) local ok local err local err0 local f local hb = { last = os.time(), timeouts = 0 } local f_deliver local f_header local status local flag = true local breaker = false while true do f, err0 = frame.consume_frame(self) while not f do if exiting() then err = "exiting" breaker = true break end -- in order to send the heartbeat, -- the very read op need be awaken up periodically, so the timeout is expected. if err0 ~= "timeout" then logger.error("[amqp:consume_loop]", err0 or "?") end if err0 == "closed" then err = err0 set_state(self, c.state.CLOSED, c.state.CLOSED) logger.error("[amqp:consume_loop] socket closed") breaker = true break end if err0 == "wantread" then err = err0 set_state(self, c.state.CLOSED, c.state.CLOSED) logger.error("[amqp:consume_loop] SSL socket needs to dohandshake again") breaker = true break end -- intented timeout? local now = os.time() if now - hb.last > c.DEFAULT_HEARTBEAT then logger.dbg("[amqp:consume_loop] timeouts inc. [ts]: ",now) hb.timeouts = bor(lshift(hb.timeouts,1),1) hb.last = now ok, err0 = frame.wire_heartbeat(self) if not ok then logger.error("[heartbeat]","pong error: ", err0 or "?", "[ts]: ", hb.last) else logger.dbg("[heartbeat]","pong sent. [ts]: ",hb.last) end end if timedout(self,hb.timeouts) then err = "heartbeat timeout" logger.error("[amqp:consume_loop] timedout. [ts]: ", now) breaker = true break end --if end logger.dbg("[amqp:consume_loop] continue consuming ", err0) f, err0 = frame.consume_frame(self) end if breaker then break end if f.type == c.frame.METHOD_FRAME then if f.class_id == c.class.CHANNEL then if f.method_id == c.method.channel.CLOSE then set_state(self, c.state.CLOSE_WAIT, self.connection_state) logger.dbg("[channel close method]", f.method.reply_code, f.method.reply_text) break end elseif f.class_id == c.class.CONNECTION then if f.method_id == c.method.connection.CLOSE then set_state(self, c.state.CLOSED, c.state.CLOSE_WAIT) logger.dbg("[connection close method]", f.method.reply_code, f.method.reply_text) break end elseif f.class_id == c.class.BASIC then if f.method_id == c.method.basic.DELIVER then f_deliver = f.method if f.method ~= nil then logger.dbg("[basic_deliver] ", f.method) end end end elseif f.type == c.frame.HEADER_FRAME then f_header = f logger.dbg(format("[header] class_id: %d weight: %d, body_size: %d", f.class_id, f.weight, f.body_size)) logger.dbg("[frame.properties]",f.properties) elseif f.type == c.frame.BODY_FRAME then status = true if callback then status, err0 = pcall(callback, { body = f.body, frame = f_deliver, properties = f_header.properties }) if not status then logger.error("calling callback failed: ", err0) end end if not self.opts.no_ack then -- ack self:basic_ack(status, f_deliver.delivery_tag) end f_deliver, f_header = nil, nil logger.dbg("[body]", f.body) elseif f.type == c.frame.HEARTBEAT_FRAME then hb.last = os.time() logger.dbg("[heartbeat]","ping received. [ts]: ", hb.last) hb.timeouts = band(lshift(hb.timeouts,1),0) ok, err0 = frame.wire_heartbeat(self) if not ok then logger.error("[heartbeat]","pong error: ", err0 or "?", "[ts]: ", hb.last) else logger.dbg("[heartbeat]","pong sent. [ts]: ", hb.last) end end end self:teardown() -- return not err or err ~= "exiting", err return nil, err or err0 end function amqp:consume() local ok local err ok, err = self:setup() if not ok then self:teardown() return nil, err end ok, err = self:prepare_to_consume() if not ok then self:teardown() return nil, err end return self:consume_loop(self.opts.callback) end -- -- get -- function amqp:get() local ok, res, err ok, err = self:setup() if not ok then self:teardown() return nil, err end if self.channel_state ~= c.state.ESTABLISHED then self:teardown() return nil, err end res, err = amqp.basic_get(self) if not res then logger.error("[amqp:get] basic_get failed: " .. err) return nil, err end local message if res.method_id == c.method.basic.GET_OK and res.method.message_count > 0 then local f = frame.consume_frame(self) if f.type == c.frame.HEADER_FRAME then logger.dbg(format("[header] class_id: %d weight: %d, body_size: %d", f.class_id, f.weight, f.body_size)) logger.dbg("[frame.properties]",f.properties) end f = frame.consume_frame(self) if f.type == c.frame.BODY_FRAME then message = f.body end logger.dbg("[message]", message) end self:teardown() return message, err end -- -- publisher -- function amqp:publish(payload,opts,properties) local ok local err local size = #payload ok, err = amqp.basic_publish(self, opts) if not ok then logger.error("[amqp.publish] failed: ", err) return nil, err end ok, err = frame.wire_header_frame(self,size,properties) if not ok then logger.error("[amqp.publish] failed: ", err) return nil, err end ok, err = frame.wire_body_frame(self,payload) if not ok then logger.error("[amqp.publish] failed: ", err) return nil, err end return true end -- -- queue -- function amqp:queue_declare(opts) opts = opts or {} if not opts.queue and not self.opts.queue then return nil, "[queue_declare] queue is not specified" end local f = frame.new_method_frame(self.channel or 1, c.class.QUEUE, c.method.queue.DECLARE) f.method = { queue = opts.queue or self.opts.queue, passive = _getopt('passive', opts, self.opts, false), durable = _getopt('durable', opts, self.opts, false), exclusive = _getopt('exclusive', opts, self.opts, false), auto_delete = _getopt('auto_delete', opts, self.opts, true), no_wait = _getopt('no_wait', opts, self.opts, false), arguments = _getopt('arguments', opts, self.opts, {}) } return frame.wire_method_frame(self,f) end function amqp:queue_bind(opts) opts = opts or {} if not opts.queue and not self.opts.queue then return nil, "[queue_bind] queue is not specified" end local f = frame.new_method_frame(self.channel or 1, c.class.QUEUE, c.method.queue.BIND) f.method = { queue = opts.queue or self.opts.queue, exchange = opts.exchange or self.opts.exchange, routing_key = _getopt('routing_key', opts, self.opts, ""), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end function amqp:queue_unbind(opts) opts = opts or {} if not opts.queue and not self.opts.queue then return nil, "[queue_unbind] queue is not specified" end local f = frame.new_method_frame(self.channel or 1, c.class.QUEUE, c.method.queue.UNBIND) f.method = { queue = opts.queue or self.opts.queue, exchange = opts.exchange or self.opts.exchange, routing_key = _getopt('routing_key', opts, self.opts, ""), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end function amqp:queue_delete(opts) opts = opts or {} if not opts.queue and not self.opts.queue then return nil, "[queue_delete] queue is not specified" end local f = frame.new_method_frame(self.channel or 1, c.class.QUEUE, c.method.queue.DELETE) f.method = { queue = opts.queue or self.opts.queue, if_unused = _getopt('if_unused', opts, self.opts, false), if_empty = _getopt('if_empty', opts, self.opts, false), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end -- -- exchange -- function amqp:exchange_declare(opts) opts = opts or {} local f = frame.new_method_frame(self.channel or 1, c.class.EXCHANGE, c.method.exchange.DECLARE) f.method = { exchange = opts.exchange or self.opts.exchange, typ = opts.typ or "topic", passive = _getopt('passive', opts, self.opts, false), durable = _getopt('durable', opts, self.opts, false), auto_delete = _getopt('auto_delete', opts, self.opts, false), internal = _getopt('internal', opts, self.opts, false), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end function amqp:exchange_bind(opts) if not opts then return nil, "[exchange_bind] opts is required" end if not opts.source then return nil, "[exchange_bind] source is required" end if not opts.destination then return nil, "[exchange_bind] destination is required" end local f = frame.new_method_frame(self.channel or 1, c.class.EXCHANGE, c.method.exchange.BIND) f.method = { destination = opts.destination, source = opts.source, routing_key = _getopt('routing_key', opts, self.opts, ''), no_wait = _getopt('no_wait', opts, self,opts, false) } return frame.wire_method_frame(self,f) end function amqp:exchange_unbind(opts) if not opts then return nil, "[exchange_unbind] opts is required" end if not opts.source then return nil, "[exchange_unbind] source is required" end if not opts.destination then return nil, "[exchange_unbind] destination is required" end local f = frame.new_method_frame(self.channel or 1, c.class.EXCHANGE, c.method.exchange.UNBIND) f.method = { destination = opts.destination, source = opts.source, routing_key = _getopt('routing_key', opts, self.opts, ""), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end function amqp:exchange_delete(opts) opts = opts or {} local f = frame.new_method_frame(self.channel or 1, c.class.EXCHANGE, c.method.exchange.DELETE) f.method = { exchange = opts.exchange or self.opts.exchange, if_unused = _getopt('if_unused', opts, self.opts, true), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end -- -- basic -- function amqp:basic_consume(opts) opts = opts or {} if not opts.queue and not self.opts.queue then return nil, "[basic_consume] queue is not specified" end local f = frame.new_method_frame(self.channel or 1, c.class.BASIC, c.method.basic.CONSUME) f.method = { queue = opts.queue or self.opts.queue, no_local = _getopt('no_local',opts, self.opts, false), no_ack = _getopt('no_ack', opts, self.opts, true), exclusive = _getopt('exclusive', opts, self.opts, false), no_wait = _getopt('no_wait', opts, self.opts, false) } return frame.wire_method_frame(self,f) end function amqp:basic_publish(opts) opts = opts or {} local f = frame.new_method_frame(self.channel or 1, c.class.BASIC, c.method.basic.PUBLISH) f.method = { no_wait = true, exchange = opts.exchange or self.opts.exchange, routing_key = _getopt('routing_key',opts, self.opts, ""), mandatory = _getopt('mandatory', opts, self.opts, false), immediate = _getopt('immediate', opts, self.opts, false) } return frame.wire_method_frame(self,f) end function amqp:basic_get(opts) opts = opts or {} if not opts.queue and not self.opts.queue then return nil, "[basic_get] queue is not specified." end local f = frame.new_method_frame(self.channel or 1, c.class.BASIC, c.method.basic.GET) f.method = { queue = opts.queue or self.opts.queue, no_ack = _getopt('no_ack', opts, self.opts, true) } return frame.wire_method_frame(self,f) end return amqp