# Copyright 2009-2010 ITA Software, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Oracle Queries""" import os import re import signal import cPickle from zope.interface import classProvides from twisted.internet import defer, error, process, protocol, reactor from twisted.python import failure from coil.struct import Struct try: from lxml import etree except ImportError: etree = None try: import cx_Oracle except ImportError: cx_Oracle = None from nagcat import errors, log, query class PickleReader(protocol.ProcessProtocol): def __init__(self, fd): self.fd = fd self.data = "" self.timedout = False self.deferred = defer.Deferred() def childDataReceived(self, fd, data): assert self.fd == fd self.data += data def timeout(self): self.timedout = True if self.transport.pid: try: os.kill(self.transport.pid, signal.SIGTERM) except OSError, ex: log.warn("Failed to send TERM to a subprocess: %s", ex) def processEnded(self, reason): if isinstance(reason.value, error.ProcessDone): try: self.deferred.callback(cPickle.loads(self.data)) except Exception: self.deferred.errback(failure.Failure()) elif (isinstance(reason.value, error.ProcessTerminated) and self.timedout): self.deferred.errback(errors.Failure(errors.TestCritical( "Timeout waiting for Oracle query to finish."))) else: self.deferred.errback(reason) class ForkIt(process.Process): def __init__(self, timeout, func, *args, **kwargs): readfd, writefd = os.pipe() self._write = os.fdopen(writefd, 'w') self._func = func self._args = args self._kwargs = kwargs proto = PickleReader(writefd) # Setup timeout call_id = reactor.callLater(timeout, proto.timeout) proto.deferred.addBoth(self._cancelTimeout, call_id) # Setup shutdown cleanup call_id = reactor.addSystemEventTrigger( 'after', 'shutdown', proto.timeout) proto.deferred.addBoth(self._cancelCleanup, call_id) process.Process.__init__(self, reactor, executable=None, args=None, environment=None, path=None, proto=proto, childFDs={0:0, 1:1, 2:2, writefd: writefd}) self.pipes[writefd] = self.processReaderFactory( reactor, self, writefd, readfd) self._write.close() def getResult(self): return self.proto.deferred def _cancelTimeout(self, result, call_id): if call_id.active(): call_id.cancel() return result def _cancelCleanup(self, result, call_id): reactor.removeSystemEventTrigger(call_id) return result def _execChild(self, *ignore, **kgnore): """Run our function instead of exec""" try: result = self._func(*self._args, **self._kwargs) except Exception: result = failure.Failure() cPickle.dump(result, self._write, -1) self._write.close() os._exit(0) def _resetSignalDisposition(self): """Reset non-standard signal handlers.""" for signalnum in xrange(1, signal.NSIG): if signal.getsignal(signalnum) not in (None, signal.SIG_DFL, signal.SIG_IGN): signal.signal(signalnum, signal.SIG_DFL) class _DBColumn: """describes the name and type of a column, to facilitate mapping the attributes of a DB column into XML attribs (taken by processing the contents of a cx_Oracle.Cursor.description)""" # taken from http://cx-oracle.sourceforge.net/html/cursor.html CX_VAR_COLUMNS = ('name', 'type', 'display_size', 'internal_size', 'precision', 'scale', 'null_ok') # used for ensuring the xml is sane CLEAN_NAME_RE = re.compile("[^\w]") def __init__(self, desc): """Take the decription of a cx_Oracle variable, and make it an actual object""" # these are the attributes that this item will contain for k,v in zip(self.CX_VAR_COLUMNS, desc): setattr(self, k, v) def element(self, value): """Factory for creating an Element for this column""" return self.single_element(value, self.name, self.type) @classmethod def single_element(cls, value, name, data_type): """Factory for creating a single Element""" name = cls.CLEAN_NAME_RE.sub("", name.lower()) # XML tags may not start with a digit if name[0].isdigit(): name = "_%s" % name elt = etree.Element(name, type=data_type.__name__) if value is not None: elt.text = str(value) return elt def __str__(self): return "<_DBColumn %s:%s>" % (self.name, self.type) def __repr__(self): return str(self) class OracleBase(query.Query): """Base query code for both SQL and PL/SQL queries. Subclasses must provide _start_oracle() """ def __init__(self, nagcat, conf): if not etree or not cx_Oracle: raise errors.InitError( "cx_Oracle and lxml are required for Oracle support.") super(OracleBase, self).__init__(nagcat, conf) for param in ('user', 'password', 'dsn'): if param not in conf: raise errors.ConfigError('%s is required but missing' % param) self.conf[param] = conf[param] def _start(self): proc = ForkIt(self.conf['timeout'], self._forked) return proc.getResult() def _forked(self): try: connection = cx_Oracle.connect( user=self.conf['user'], password=self.conf['password'], dsn=self.conf['dsn']) cursor = connection.cursor() try: return self._forked_query(cursor) finally: cursor.close() connection.close() except cx_Oracle.Error, ex: return errors.Failure(errors.TestCritical("Oracle query failed: %s" % ex)) def _forked_query(self, cursor): raise Exception("unimplemented") def _to_xml(self, cursor, root="queryresult"): """Convert a table to XML Elements example: select 1 as foo from dual 1 """ tree = etree.Element(root) # Return empty XML if this wasn't a SELECT if not isinstance(cursor.description, list): return tree columns = map(_DBColumn, cursor.description) for row in cursor: xmlrow = etree.Element('row') for col, val in zip(columns, row): xmlrow.append(col.element(val)) tree.append(xmlrow) return tree def _to_string(self, cursor): return etree.tostring(self._to_xml(cursor), pretty_print=True) class OracleSQL(OracleBase): """Execute a SQL query in Oracle, the result is formatted as XML""" classProvides(query.IQuery) name = "oracle_sql" def __init__(self, nagcat, conf): super(OracleSQL, self).__init__(nagcat, conf) self.conf['sql'] = conf.get('sql', "select 1 as data from dual") if 'parameters' in conf: parameters = conf['parameters'] elif 'binds' in conf: # binds is an alias parameters = conf['binds'] else: parameters = [] if isinstance(parameters, Struct): parameters.expand() parameters = parameters.dict() for key,value in parameters.iteritems(): if not isinstance(value, (str,int,long,float)): raise errors.ConfigError( "parameter %s is an invalid type" % key) elif isinstance(parameters, list): for key,value in enumerate(parameters): if not isinstance(value, (str,int,long,float)): raise errors.ConfigError( "parameter %s is an invalid type" % key) else: raise errors.ConfigError("parameters must be a list or struct") self.conf['parameters'] = parameters def _forked_query(self, cursor): cursor.execute(self.conf['sql'], self.conf['parameters']) return self._to_string(cursor) class OracleSQL2(OracleSQL): """Alias oraclesql to oracle_sql""" classProvides(query.IQuery) name = "oraclesql" def __init__(self, nagcat, conf): super(OracleSQL2, self).__init__(nagcat, conf) # So the scheduler's stats are correct self.name = "oracle_sql" class OraclePLSQL(OracleBase): """Execute a stored procedure in Oracle, the result is formatted as XML""" classProvides(query.IQuery) name = "oracle_plsql" def __init__(self, nagcat, conf): super(OraclePLSQL, self).__init__(nagcat, conf) self.conf['procedure'] = conf.get('procedure', None) if not self.conf['procedure']: raise errors.ConfigError(conf, "procedure is required") parameters = conf.get('parameters', None) self.conf['parameters'] = [] if not parameters or not isinstance(parameters, list): raise errors.ConfigError(conf, "parameters must be a list of lists") for param in parameters: self.conf['parameters'].append( self._check_params(conf, param)) def _check_params(self, conf, param): """Check that the parameter definition is valid""" if not isinstance(param, list): raise errors.ConfigError(conf, "parameters must be a list of lists") if len(param) != 3: raise errors.ConfigError(conf, ("%s must be a list of three elements: " "[ " % param)) param[0] = param[0].lower() if param[0] not in ('in', 'out'): raise errors.ConfigError(conf, "Invalid direction %s, must be 'in' or 'out'" % param[0]) if param[0] == "out": type_name = param[2].upper() type_class = getattr(cx_Oracle, type_name, None) if not type_class: raise errors.ConfigError(conf, "%s is not a valid Oracle type" % param[2]) param[2] = type_class return param def _build_params(self, cursor): params = [] for param in self.conf['parameters']: if param[0] == "in": params.append(param[2]) elif param[0] == "out": params.append(cursor.var(param[2])) else: assert 0 return params def _forked_query(self, cursor): result = cursor.callproc(self.conf['procedure'], self._build_params(cursor)) # Convert the 'out' parameters into XML. root = etree.Element('result') for i, param in enumerate(self.conf['parameters']): if param[0] != 'out': continue if isinstance(result[i], cx_Oracle.Cursor): tree = self._to_xml(result[i], param[1]) root.append(tree) else: item = _DBColumn.single_element( result[i], param[1], param[2]) root.append(item) return etree.tostring(root, pretty_print=True)