Jan 28, 2019

Demo - Azure Logic App and Integration Service



What is Azure Integration Service


In modern technology, it has been becoming a common requirement to have integration between various business processes (services or applications). Often times Solution providers give solutions by embedding integration within the code like C#, Java etc. Developers have to ensure that besides integrations, there should be some kind of communication between services, monitoring and security. A good example could be that an ERP or Order System may be integrated with one of the vendor application.
This entire process looks very simple to develop, though it may not be that simple to implement as developing a solution from scratch need planning, coding, triage, testing and eventually time and money.
Many of the application vendors now days expose APIs for data and services, hence making it available to integrate their business processes for data and service utilization. The purpose of Integration Service is to take advantage of APIs and integrate processes.
Azure Integration service does exactly the same thing. With Azure Integration, APIs (Rest\SOAP), Service Buses (Messaging Queue) or Events can be utilized to setup Business Workflow using Logic Apps, hence integrating applications. The process and implementation can be broken down to following technologies:
  • Logic Apps
  • APIs Management
  • Service Bus
  • Event Grid
  • Data Gateway
  • On-Premises Application
Logic Apps

Logic Apps are required to implement the business process or you may call it process work flow with series of actions. Think of CRM system, with any update in CRM you may want to send an email or you may want to access on-premises SQL Server and update it. You may define system to system process, user to system process, etc. Please note that Logic App is server-less implementation (platform) i.e. you don’t need to have VM (infrastructure).
API Management
Most of the modern application vendors expose few of their APIs externally (REST and SOAP) so that external applications can utilize its data or services through these APIs. These APIs can be made available though API Management extension in Azure Portal.

Service Bus
Service Bus is a trigger and important part of application integration. One way to setup communication between applications is APIs, also called Synchronous method of communication and in cases where APIs cannot made available asynchronous approach may be required. This asynchronous way of communication is called Service Bus. Service Bus essentially an MSMQ type of service.
Event Grid
Event Grid is also a trigger. Instead of setting up polling for application to check whether any new message has been received, can be not too useful. Rather receiver can be notified via an event. An event can be setup to monitor the queue and if a message is arrived then event can invoke event handler.

Data Gateway

Data Gateway is a tool installed on individual host on-premises to connect on-premises service or application with Azure Integration Service, also called hybrid solution.

On-Premises Application

On-Premises Application is part of the business process that needs to be integrated, it could be BizTalk server connecting to other sources on Azure using Logic App.

Picture below shows high level architecture of Azure Integration Services


There can be different scenarios where APIs, Events Or Service Buses can be used to integrate. I would like to give a quick demo on how Azure and SQL Database can be integrated. The purpose of the Demo is to create Logic App that continuously monitors if a new file gets added to Drop Box, an entry should be made into Azure SQL Database.

This demo would require following prerequisites:

  • A Drop Box Account
  • Azure SQL Database
  • Connection String of Azure SQL Database
First create a Drop Box Account if you don't have already. 

This is common scenario hence I am skipping demo of creating Drop Box Account.

Create Azure SQL Database

  • If you already have Azure SQL Database then you can skip this step
  • Login to portal.azure.com
  • Click on create resource, search for SQL Database resource



  • After selecting SQL Database resource, click on Create




  • Enter SQL Database Details and click on create to submit deployment request

Modify Firewall Settings from Azure SQL Database
  • Once Database gets created, you need to modify database firewall and add client IP of your network so that you could connect to database using SSMS on your machine.
Get Connection Details of SQL Database and create table to save new file record
  • Now get the connection details of newly created database. For this go to resources and select Database resource, it should open properties and from there you can get Connection details.


  • Once you connect to this database from SSMS, create table in this database with following specs


Design Logic App
  • At this time you will have to start creating the Logic App. Select Logic App resource from resources and click on create.
  • Enter basic details of Logic App like Name, Subscription, Group and Location and submit the deployment request

  • This should open Logic App Designer. From Designer under choose action, search for DropBox and under Triggers, select When a file is created



  • It should ask you to authenticate your dropbox account and also select the root folder.
  • After completing dropbox configuration, add new step for SQL Server


  • Add SQL Server connection details
  • Next select the table name that was created earlier

  • Next, select parameters FileName and Date.
  • Enter parameter details



  • It should be all set, finally review all configurations
  • Add a file to root folder



Verify that Logic App is working as designed
  • Execute the Logic App manually, it should execute anyways as per schedule as well
  • Now if you go back to SQL and look at the table content it should have new file names and date new file was observed as per UTC time.


This is all, you now have Azure Logic App monitoring your drop box and adding an entry in SQL if there is a new file found.

Jan 26, 2019

Python Multithreading and automation demonstration for DBAs

A few weeks ago, I started moving my SQL Server automation scripts from Power Shell to Python to add some data analysis capabilities. Just to give some background, I am responsible for 100s of servers and collect data from remote hosts worldwide and copy at centralized location in order to process it to get useful insights like monitoring, performance analysis, capacity planning etc.

Along with re-writing scripts, the motive was also to improve the scripts as the module was connecting to each remote server in iteration, it used to take very long time before it could complete the data processing.

I decided to optimize my scripts by taking advantage of multi threading in Python while connecting to remote hosts in parallel. I also would like to point out a function from psycopg2.extras module called execute_values and it can make use of LIST that can be passed directly to function in case there are multiple rows and statements and saving you from writing loop.

Before demonstration, I would like to show facts how much time I saved using this approach in numbers comparing traditional method with Thread model:

Without thread, the process used to take 45 seconds




With thread, same process took only 761 milliseconds, a 60 times improvement




I am demonstrating one of my set of scripts that is responsible to get the list of databases from remote SQL Servers. As I am using threading in the demo, need to import module called threading, then map the function with thread object and finally start the thread.

File Name: refreshDatabases.py
Purpose: This is the main class to sync databases:

First need to import the modules required for this script:


import json
import psycopg2
import psycopg2.extras as ex
import datetime
import sys
import threading
# Following modules are manually written separately just for the purpose of reusing objects: 
import sqlRequestInJson
from pgdatabase import cursorfromconnectionpool as pgconnectionpool, Database as pgdatabase
from pgconnectionstring import connectionstring as pgconnectionstring
Next, get list of all IP addresses and ports of remote hosts to connect to.
class getdata:
def __init__(self):
self.query="select sl.ipaddress , sl.name , sl.serverid, sl.port from serverslist sl"
def return_serverlist(self):
cs = pgconnectionstring() # following line of code is using psycopg2 through a class written in different file, I am doing it to hide user name and passwords
pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
        # Get servers list
with pgconnectionpool() as cursor:
cursor.execute(self.query)
result = cursor.fetchall()
return result

This is the main working class that would collect and save data in tables
class update:
    # empty initialize class
    def __init__(self):
        pass
    # following function retuns cursor from postgres
    def updateData(self, script):
        cs = pgconnectionstring()
        pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
        with pgconnectionpool() as cursor:
            cursor.execute(script)
    # Following would call the fuction return_serverlist defined in the class above to get list of servers to be connected to
    def returnRows(self): #self, serverid, serverip, name):     
        gd=getdata()
        return gd.return_serverlist()
    # This is the thread function, this would create thread based on number of servers to be connected to
    def updateDatabasesThread(self, rows):
        # Empty thread list
        threadlist=[]
        for  request in rows: 
            lst=list(request['ipaddress'])
            # removing "\" from ip address as psycopg2 does not like "\", rather will provide port number for connection.
            if lst.count('\\')>0:
                lst=lst[0:request['ipaddress'].find('\\')]
                request['ipaddress']=''.join(lst)
            # The threading function takes paraters like fuction and arguments to be referencedd by each thread
            t=threading.Thread(target=self.updateDatabases, args=(request['serverid'], request['ipaddress'], request['port'],))
            # Adding thread to the list
            threadlist.append(t)
         # start the thread
        for thread in threadlist:
            try:
                thread.start()
            except:
                e="{}".format(sys.exc_info())
                print(e)
    # Following function will get database name from sql server and then would save into postgres
    def updateDatabases (self,  serverid, ipaddr, port):
        currentDT = str(datetime.datetime.utcnow())
        sqlreq=sqlRequestInJson.sqlrequests()
        # get data from SQL Server and adding result to result variable
        result=sqlreq.execsql(ipaddr, "select name from sys.databases", port)
        cs = pgconnectionstring()
        pgdatabase.initialize(user=cs.user, password=cs.password, database=cs.database, host=cs.host)
        with pgconnectionpool() as cursor:
            dblist=[]
            for row in result['result']:
                try:
                    values=(row['name'], serverid)
                    # append database name and serverid in list to be used later with execute_values
                    dblist.append(values)
                except:
                    e="{}".format(sys.exc_info())
                    print(e)
            # Following line of code will insert data into postgres, without using loop as it takes list that will be passed to insert values by function itself
            ex.execute_values(cursor,'insert into rtm.databases(databasename, serverid) values %s  on conflict(serverid, databasename) DO NOTHING', dblist)
# finally put all together
# creating object from class
upd= update()
# return data from function
rows=upd.returnRows()
#Start thread
print("Started {}" .format(str(datetime.datetime.utcnow())))
upd.updateDatabasesThread(rows)
print("Ended {}" .format(str(datetime.datetime.utcnow())))
         
As mentioned above about writing classes separately, Python is an Object Oriented scripting language, thus I created multiple classes in separate files to re-use the code and these are the supporting python class files to be used in the script above:

File Name: pgconnectionstring.py:
Purpose: save credentials in separate class and use these as properties in connection strings wherever needed


class connectionstring:
    def __init__(self):
        self.user="user"
        self.password="password"
        self.database="dbname"
        self.host="ip\hostname with port"

File Name: pgdatabase.py
Purpose: This class returns the data from Postgres in Data Dictionary format:


from psycopg2 import pool
from psycopg2 import extras
import psycopg2
class Database:
    connection_pool=None;
    @classmethod
    def initialize(cls,**kwargs):
        Database.connection_pool=psycopg2.connect(**kwargs);
    @classmethod
    def get_connection(cls):
        #return cls.__connectionpool.getconn();
        return cls.connection_pool;

class cursorfromconnectionpool:
 
    def __init__(self):
        self.connection = None
 
    def __enter__(self):
        self.connection=Database.connection_pool
     
        self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        return self.cursor
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_val is not None:
            self.connection.rollback();
        else:
            self.cursor.close()
            self.connection.commit()
 
         
File Name: sqlRequestInJson.py
Purpose: This class returns the data from SQL in Data Dictionary format:


import pymssql
import sys
class sqlrequests:
    def execsql(self, server, script, port):
     
        try:
            connection=pymssql.connect(user='user', password='xxxx', database='mydb', host=server, as_dict=True,login_timeout=30, timeout=30, port=port)
            connection.autocommit(True)
            with connection.cursor() as cursor:
                cursor.execute(script)
                result=cursor.fetchall()
                connection.close()
                # instead of returning direct data, I am returning json data with result in result key and message in message key, this will be useful to read the exception from calling class
                return {'result':result,'message':'nothing'}
        except:
            e="{}".format(sys.exc_info())
            e=e.replace("'","*")
            # as metioned in comment above, following line of code will return exception to calling class instead of simply crashing
            return {'result':'exception','message':e}
This demonstration involves multiple database servers including Microsoft SQL Server and Postgres. In order to execute this, you will have to create one table in one of the Postgres server where data will be stored. The table in Postgres database should have at least table with this spec: select sl.ipaddress , sl.name , sl.serverid, sl.port from serverslist sl

Optimizing Indexes with Execution Plans

Contact Me

Name

Email *

Message *