Jan 26, 2019

Python Multithreading and automation demonstration for DBAs

A few weeks ago, I started moving my SQL Server automation scripts from Power Shell to Python to add some data analysis capabilities. Just to give some background, I am responsible for 100s of servers and collect data from remote hosts worldwide and copy at centralized location in order to process it to get useful insights like monitoring, performance analysis, capacity planning etc.

Along with re-writing scripts, the motive was also to improve the scripts as the module was connecting to each remote server in iteration, it used to take very long time before it could complete the data processing.

I decided to optimize my scripts by taking advantage of multi threading in Python while connecting to remote hosts in parallel. I also would like to point out a function from psycopg2.extras module called execute_values and it can make use of LIST that can be passed directly to function in case there are multiple rows and statements and saving you from writing loop.

Before demonstration, I would like to show facts how much time I saved using this approach in numbers comparing traditional method with Thread model:

Without thread, the process used to take 45 seconds




With thread, same process took only 761 milliseconds, a 60 times improvement




I am demonstrating one of my set of scripts that is responsible to get the list of databases from remote SQL Servers. As I am using threading in the demo, need to import module called threading, then map the function with thread object and finally start the thread.

File Name: refreshDatabases.py
Purpose: This is the main class to sync databases:

First need to import the modules required for this script:


import json
import psycopg2
import psycopg2.extras as ex
import datetime
import sys
import threading
# Following modules are manually written separately just for the purpose of reusing objects: 
import sqlRequestInJson
from pgdatabase import cursorfromconnectionpool as pgconnectionpool, Database as pgdatabase
from pgconnectionstring import connectionstring as pgconnectionstring
Next, get list of all IP addresses and ports of remote hosts to connect to.
class getdata:
def __init__(self):
self.query="select sl.ipaddress , sl.name , sl.serverid, sl.port from serverslist sl"
def return_serverlist(self):
cs = pgconnectionstring() # following line of code is using psycopg2 through a class written in different file, I am doing it to hide user name and passwords
pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
        # Get servers list
with pgconnectionpool() as cursor:
cursor.execute(self.query)
result = cursor.fetchall()
return result

This is the main working class that would collect and save data in tables
class update:
    # empty initialize class
    def __init__(self):
        pass
    # following function retuns cursor from postgres
    def updateData(self, script):
        cs = pgconnectionstring()
        pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
        with pgconnectionpool() as cursor:
            cursor.execute(script)
    # Following would call the fuction return_serverlist defined in the class above to get list of servers to be connected to
    def returnRows(self): #self, serverid, serverip, name):     
        gd=getdata()
        return gd.return_serverlist()
    # This is the thread function, this would create thread based on number of servers to be connected to
    def updateDatabasesThread(self, rows):
        # Empty thread list
        threadlist=[]
        for  request in rows: 
            lst=list(request['ipaddress'])
            # removing "\" from ip address as psycopg2 does not like "\", rather will provide port number for connection.
            if lst.count('\\')>0:
                lst=lst[0:request['ipaddress'].find('\\')]
                request['ipaddress']=''.join(lst)
            # The threading function takes paraters like fuction and arguments to be referencedd by each thread
            t=threading.Thread(target=self.updateDatabases, args=(request['serverid'], request['ipaddress'], request['port'],))
            # Adding thread to the list
            threadlist.append(t)
         # start the thread
        for thread in threadlist:
            try:
                thread.start()
            except:
                e="{}".format(sys.exc_info())
                print(e)
    # Following function will get database name from sql server and then would save into postgres
    def updateDatabases (self,  serverid, ipaddr, port):
        currentDT = str(datetime.datetime.utcnow())
        sqlreq=sqlRequestInJson.sqlrequests()
        # get data from SQL Server and adding result to result variable
        result=sqlreq.execsql(ipaddr, "select name from sys.databases", port)
        cs = pgconnectionstring()
        pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
        with pgconnectionpool() as cursor:
            dblist=[]
            for row in result['result']:
                try:
                    values=(row['name'], serverid)
                    # append database name and serverid in list to be used later with execute_values
                    dblist.append(values)
                except:
                    e="{}".format(sys.exc_info())
                    print(e)
            # Following line of code will insert data into postgres, without using loop as it takes list that will be passed to insert values by function itself
            ex.execute_values(cursor,'insert into rtm.databases(databasename, serverid) values %s  on conflict(serverid, databasename) DO NOTHING', dblist)
# finally put all together
# creating object from class
upd= update()
# return data from function
rows=upd.returnRows()
#Start thread
print("Started {}" .format(str(datetime.datetime.utcnow())))
upd.updateDatabasesThread(rows)
print("Ended {}" .format(str(datetime.datetime.utcnow())))
         
As mentioned above about writing classes separately, Python is an Object Oriented scripting language, thus I created multiple classes in separate files to re-use the code and these are the supporting python class files to be used in the script above:

File Name: pgconnectionstring.py:
Purpose: save credentials in separate class and use these as properties in connection strings wherever needed


class connectionstring:
    def __init__(self):
        self.user="user"
        self.password="password"
        self.database="dbname"
        self.host="ip\hostname with port"

File Name: pgdatabase.py
Purpose: This class returns the data from Postgres in Data Dictionary format:


from psycopg2 import pool
from psycopg2 import extras
import psycopg2
class Database:
    connection_pool=None;
    @classmethod
    def initialize(cls,**kwargs):
        Database.connection_pool=psycopg2.connect(**kwargs);
    @classmethod
    def get_connection(cls):
        #return cls.__connectionpool.getconn();
        return cls.connection_pool;

class cursorfromconnectionpool:
 
    def __init__(self):
        self.connection = None
 
    def __enter__(self):
        self.connection=Database.connection_pool
     
        self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        return self.cursor
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_val is not None:
            self.connection.rollback();
        else:
            self.cursor.close()
            self.connection.commit()
 
         
File Name: sqlRequestInJson.py
Purpose: This class returns the data from SQL in Data Dictionary format:


import pymssql
import sys
class sqlrequests:
    def execsql(self, server, script, port):
     
        try:
            connection=pymssql.connect(user='user', password='xxxx', database='mydb', host=server, as_dict=True,login_timeout=30, timeout=30, port=port)
            connection.autocommit(True)
            with connection.cursor() as cursor:
                cursor.execute(script)
                result=cursor.fetchall()
                connection.close()
                # instead of returning direct data, I am returning json data with result in result key and message in message key, this will be useful to read the exception from calling class
                return {'result':result,'message':'nothing'}
        except:
            e="{}".format(sys.exc_info())
            e=e.replace("'","*")
            # as metioned in comment above, following line of code will return exception to calling class instead of simply crashing
            return {'result':'exception','message':e}
This demonstration involves multiple database servers including Microsoft SQL Server and Postgres. In order to execute this, you will have to create one table in one of the Postgres server where data will be stored. The table in Postgres database should have at least table with this spec: select sl.ipaddress , sl.name , sl.serverid, sl.port from serverslist sl

No comments:

Post a Comment

Optimizing Indexes with Execution Plans

Contact Me

Name

Email *

Message *