Along with re-writing scripts, the motive was also to improve the scripts as the module was connecting to each remote server in iteration, it used to take very long time before it could complete the data processing.
I decided to optimize my scripts by taking advantage of multi threading in Python while connecting to remote hosts in parallel. I also would like to point out a function from psycopg2.extras module called execute_values and it can make use of LIST that can be passed directly to function in case there are multiple rows and statements and saving you from writing loop.
Before demonstration, I would like to show facts how much time I saved using this approach in numbers comparing traditional method with Thread model:
Without thread, the process used to take 45 seconds
With thread, same process took only 761 milliseconds, a 60 times improvement
I am demonstrating one of my set of scripts that is responsible to get the list of databases from remote SQL Servers. As I am using threading in the demo, need to import module called threading, then map the function with thread object and finally start the thread.
File Name: refreshDatabases.py
Purpose: This is the main class to sync databases:
First need to import the modules required for this script:
import json
import psycopg2
import psycopg2.extras as ex
import datetime
import sys
import threading
# Following modules are manually written separately just for the purpose of reusing objects:
import sqlRequestInJson
from pgdatabase import cursorfromconnectionpool as pgconnectionpool, Database as pgdatabase
from pgconnectionstring import connectionstring as pgconnectionstringNext, get list of all IP addresses and ports of remote hosts to connect to.
class getdata:
def __init__(self):
self.query="select sl.ipaddress , sl.name , sl.serverid, sl.port from serverslist sl"
def return_serverlist(self):
cs = pgconnectionstring() # following line of code is using psycopg2 through a class written in different file, I am doing it to hide user name and passwords
pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
# Get servers list
with pgconnectionpool() as cursor:
cursor.execute(self.query)
result = cursor.fetchall()
return result
This is the main working class that would collect and save data in tables
class update:
# empty initialize class
def __init__(self):
pass
# following function retuns cursor from postgres
def updateData(self, script):
cs = pgconnectionstring()
pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
with pgconnectionpool() as cursor:
cursor.execute(script)
# Following would call the fuction return_serverlist defined in the class above to get list of servers to be connected to
def returnRows(self): #self, serverid, serverip, name):
gd=getdata()
return gd.return_serverlist()
# This is the thread function, this would create thread based on number of servers to be connected to
def updateDatabasesThread(self, rows):
# Empty thread list
threadlist=[]
for request in rows:
lst=list(request['ipaddress'])
# removing "\" from ip address as psycopg2 does not like "\", rather will provide port number for connection.
if lst.count('\\')>0:
lst=lst[0:request['ipaddress'].find('\\')]
request['ipaddress']=''.join(lst)
# The threading function takes paraters like fuction and arguments to be referencedd by each thread
t=threading.Thread(target=self.updateDatabases, args=(request['serverid'], request['ipaddress'], request['port'],))
# Adding thread to the list
threadlist.append(t)
# start the thread
for thread in threadlist:
try:
thread.start()
except:
e="{}".format(sys.exc_info())
print(e)
# Following function will get database name from sql server and then would save into postgres
def updateDatabases (self, serverid, ipaddr, port):
currentDT = str(datetime.datetime.utcnow())
sqlreq=sqlRequestInJson.sqlrequests()
# get data from SQL Server and adding result to result variable
result=sqlreq.execsql(ipaddr, "select name from sys.databases", port)
cs = pgconnectionstring()
pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
with pgconnectionpool() as cursor:
dblist=[]
for row in result['result']:
try:
values=(row['name'], serverid)
# append database name and serverid in list to be used later with execute_values
dblist.append(values)
except:
e="{}".format(sys.exc_info())
print(e)
# Following line of code will insert data into postgres, without using loop as it takes list that will be passed to insert values by function itself
ex.execute_values(cursor,'insert into rtm.databases(databasename, serverid) values %s on conflict(serverid, databasename) DO NOTHING', dblist)
# finally put all together
# creating object from class
upd= update()
# return data from function
rows=upd.returnRows()
#Start thread
print("Started {}" .format(str(datetime.datetime.utcnow())))
upd.updateDatabasesThread(rows)
print("Ended {}" .format(str(datetime.datetime.utcnow())))
As mentioned above about writing classes separately, Python is an Object Oriented scripting language, thus I created multiple classes in separate files to re-use the code and these are the supporting python class files to be used in the script above:
File Name: pgconnectionstring.py:
Purpose: save credentials in separate class and use these as properties in connection strings wherever needed
class connectionstring:
def __init__(self):
self.user="user"
self.password="password"
self.database="dbname"
self.host="ip\hostname with port"
File Name: pgdatabase.py
Purpose: This class returns the data from Postgres in Data Dictionary format:
from psycopg2 import pool
from psycopg2 import extras
import psycopg2
class Database:
connection_pool=None;
@classmethod
def initialize(cls,**kwargs):
Database.connection_pool=psycopg2.connect(**kwargs);
@classmethod
def get_connection(cls):
#return cls.__connectionpool.getconn();
return cls.connection_pool;
class cursorfromconnectionpool:
def __init__(self):
self.connection = None
def __enter__(self):
self.connection=Database.connection_pool
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.connection.rollback();
else:
self.cursor.close()
self.connection.commit()
File Name: sqlRequestInJson.py
Purpose: This class returns the data from SQL in Data Dictionary format:
import pymssqlThis demonstration involves multiple database servers including Microsoft SQL Server and Postgres. In order to execute this, you will have to create one table in one of the Postgres server where data will be stored. The table in Postgres database should have at least table with this spec: select sl.ipaddress , sl.name , sl.serverid, sl.port from serverslist sl
import sys
class sqlrequests:
def execsql(self, server, script, port):
try:
connection=pymssql.connect(user='user', password='xxxx', database='mydb', host=server, as_dict=True,login_timeout=30, timeout=30, port=port)
connection.autocommit(True)
with connection.cursor() as cursor:
cursor.execute(script)
result=cursor.fetchall()
connection.close()
# instead of returning direct data, I am returning json data with result in result key and message in message key, this will be useful to read the exception from calling class
return {'result':result,'message':'nothing'}
except:
e="{}".format(sys.exc_info())
e=e.replace("'","*")
# as metioned in comment above, following line of code will return exception to calling class instead of simply crashing
return {'result':'exception','message':e}
No comments:
Post a Comment