Sunday, December 15, 2019

Pull data from Azure Data lake Gen 2 and Write data to sql data warehouse using spark and scala

This article will give you the complete working information to pull data from Azure Data lake Gen 2 and write/push that data to azure SQL data warehouse through Azure Spark using Scala language.

Prerequisite:

  1. You should have Azure Subscription
  2. Azure SQL data warehouse
  3. Azure Blob Storage
  4. Blob Container
  5. Azure Spark 
  6. Azure Data Lake Gen 2 with some sample parquet file in blob container
  7. Master Key on Database
Step 0:

Create Master Key to Azure SQL Data warehouse

Open SQL server management studio and connect via your 
  • Azure SQL data warehouse Server //Fully Qualified name  suffix with ".database.windows.net"
  • You should use SQL authentication "username and password"
  • Azure SQL data warehouse username //should be suffix with @yourAzureWareHouseServerName
  • Azure SQL data warehouse password 
Once you successfully connected to database than select your data warehouse and open new query window and execute below SQL command that will create master key in Database

To execute this query you need control permission on database

CREATE MASTER KEY ENCRYPTION BY PASSWORD = '23987hxJ#KL95234nl0zBe';


Step 1:

Pull Data to Data Frame using Scala

Write Blow code to Spark Notebook.
Note:- First you have to mount your Azure Data Lake Gen 2 Container to Spark Context. In my case it is "/mnt/blobTestContainer/emp"


val pardf = spark.read.parquet("/mnt/blobTestContainer/emp") //load data frame from parquet file
pardf.createOrReplaceTempView("tempEmp") //create temporary view
val dfEmp = spark.sql("select * from tempEmp") //load data frame from sql view


After Executing your Notebook command your data frame "dfEmp" will be ready

Step 2:

Storage Setting to Write Data to Azure SQL warehouse

Write below code to Spark Notebook. Below settings are required to write data 

val blobStorage = "xxxxxx.blob.core.windows.net" //This should be your fully qualified blob storage name

val blobContainer = "test" //Blob container is required to temporary storage of file, after that Data Ware house pulls data from this temp storage

val blobAccessKey = "cH/xxxxxxxxxxxFBEd3878QPZM9EoYPrkSXXXXXXXXXXXX/eg+rh/IfjxxxxxxxxF6A3BA=="

val tempDir = "wasbs://" + blobContainer + "@" + blobStorage + "/tempDirs" //temp dir storage location

val acntInfo = "fs.azure.account.key."+ blobStorage

sc.hadoopConfiguration.set(acntInfo, blobAccessKey)

Step 3:

SQL Data warehouse related settings

Write below code to spark Notebook. make sure username should be fully qualified name meaning it should be suffix with @yourAzureWareHouseServerName

Data warehouse server name should suffix with ".database.windows.net"

val dwDatabase = "dwTestDB" //This is Data ware house database Name

val dwServer = "xxxxx.database.windows.net" //This is Data ware house server name

val dwUser = "dwuser@dwserver" //This is user name to connect to database. You can get this from azure data ware house property

val dwPass = "XXXXX" //This is password to connect to database. You can get username and password while creating azure data warehouse 

val dwJdbcPort = "1433"

val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser + ";password=" + dwPass + ";$dwJdbcExtraOptions"

val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser + ";password=" + dwPass

Step 4:

Code to Write Data to Azure Data warehouse

Write below code to push data to azure data warehouse. In the blow code we are writing data frame "dfEmp" to azure data warehouse. It will create data table "Emp" in database by reading tempDir and it will automatically create polybase connection underneath. 

spark.conf.set("spark.sql.parquet.writeLegacyFormat","true")

dfEmp.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable","Emp").option("forward_spark_azure_storage_credentials","True").option("tempdir",tempDir).mode("overwrite").save()
After All these 5 steps you can connect to you azure data warehouse using SSMS "Sql Server Management Studio" and query on Emp table.

Final Code

Follow above all settings and copy below code to spark notebook and execute together

val pardf = spark.read.parquet("/mnt/blobTestContainer/emp") //load data frame from parquet file

pardf.createOrReplaceTempView("tempEmp") //create temporary view

val dfEmp = spark.sql("select * from tempEmp") //load data frame from sql view

val blobStorage = "xxxxxx.blob.core.windows.net" //This should be your blob fully qualified storage name

val blobContainer = "test" //Blob container is required to temporary storage of file, after that Data Ware house pulls data from this temp storage

val blobAccessKey = "cH/xxxxxxxxxxxFBEd3878QPZM9EoYPrkSXXXXXXXXXXXX/eg+rh/IfjxxxxxxxxF6A3BA=="

val tempDir = "wasbs://" + blobContainer + "@" + blobStorage + "/tempDirs" //temp dir storage location

val acntInfo = "fs.azure.account.key."+ blobStorage

sc.hadoopConfiguration.set(acntInfo, blobAccessKey)


val dwDatabase = "dwTestDB" //This is Data ware house database Name

val dwServer = "xxxxx.database.windows.net" //This is Data ware house server name

val dwUser = "dwuser@dwserver" //This is user name to connect to database. You can get this from azure data ware house property

val dwPass = "XXXXX" //This is password to connect to database. You can get username and password while creating azure data warehouse 

val dwJdbcPort = "1433"

val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser + ";password=" + dwPass + ";$dwJdbcExtraOptions"

val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser + ";password=" + dwPass


spark.conf.set("spark.sql.parquet.writeLegacyFormat","true")

dfEmp.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable","Emp").option("forward_spark_azure_storage_credentials","True").option("tempdir",tempDir).mode("overwrite").save()

Hope this code will help you. Write comment If you need any help or for any suggestion.

Happy coding. 

Friday, October 4, 2019

Create your own custom ArrayList in c#

As per Microsoft c# team:
Initial capacity of an ArrayList in .NET 1.0 is 16. In 2.0 it was 4, and now - with .NET 3.5 - the initial capacty has been lowered to 0. I don't have an explanation of why, thou. When adding a first element to the list, the capacity will be set to 4. There after, every time when arraylist.Count eq arraylist.Capacity, the capacity will double.

Below code is just for reference  that how we can create custom ArrayList. In the below example I have just increased 1 size at a time. But the default size is 10 

class CArray
    {
        private int[] arr;
        private int upper;
        private int numElements;
        public CArray(int size)
        {
            arr = new int[size];
            upper = size - 1;
            numElements = 0;
        }
        public void Insert(int item)
        {
            if(numElements == arr.Length) //reallocate size
            {
                IncreaseArraySize();
                upper = arr.Length - 1;
            }

            arr[numElements] = item;
            numElements++;
        }
        public void DisplayElements()
        {
            for (int i = 0; i <= upper; i++)
            {
                Console.Write(arr[i] + " ");
                Console.WriteLine();
            }
        }
        public void Clear()
        {
            for (int i = 0; i <= upper; i++)
            {
                arr[i] = 0;
                numElements = 0;
            }
        }

        public void IncreaseArraySize()
        {
            Array.Resize(ref arr, arr.Length + 1);
        }
}

class Program
    {
        static void Main(string[] args)
        {
            CArray nums = new CArray(10);

            Random rnd = new Random(100);

            for (int i = 0; i < 10; i++)
            {
                nums.Insert((int)(rnd.NextDouble() * 100));
            }

            nums.Insert(203); //Dynamic size allocation here
            nums.Insert(2);

            nums.DisplayElements();
         
        }
    }

Note: This is not the exact implementation of ArrayList in C# , this is just a reference 

Tuesday, April 16, 2019

How to replace kendo grid odata parameter using parametermap

parameterMap: function (data, operations) {
                    var paramMap = kendo.data.transports.odata.parameterMap(data);
                    if (paramMap.$inlinecount) {
                        if (paramMap.$inlinecount == "allpages") {
                            paramMap.$count = true;
                        }
                        delete paramMap.$inlinecount;
                    }
                    if (paramMap.$take) {
                        paramMap.$top = paramMap.$take;
                        delete paramMap.$take;
                    }
                    if (paramMap.$filter) {
                        paramMap.$filter = paramMap.$filter.replace(/substringof\((.+),(.*?)\)/, "contains($2,$1)");
                    }
                    return paramMap;
                }

Thursday, January 10, 2019

OData using ODataQueryOptions $count $select $expand $filter without using [EnableQuery] attribute

If you are using OData controller without using [EnableQuery] attribute, Below example help you to get all your odata query

Odata Query

/odata/GetTestData?$format=json&$filter=(Name+eq+'santosh'+AND+Age+eq+30)&$select=Name,Address&$top=50&$count=true

Step 1:

declare temp variable and load data from your repository like below:

IQueryable tempQuery = repo.GetTestData();
IQueryable result = tempQuery;

Step 2:

Add below lines of code to apply your query.
for $count- get count() from temp result variable so that you will get all data count

if (opts.Filter != null){
tempQuery = opts.Filter.ApplyTo(tempQuery, new ODataQuerySettings()) as Queryable;
}
if (opts.Top != null){
tempQuery = opts.Top.ApplyTo(tempQuery, new ODataQuerySettings()) as IQueryable;
}
if (opts.Skip != null){
tempQuery = opts.Skip.ApplyTo(tempQuery, new ODataQuerySettings()) as IQueryable;
}
if (opts.OrderBy != null){
tempQuery = opts.OrderBy.ApplyTo(tempQuery, new ODataQuerySettings()) as Queryable;
}
if (opts.SelectExpand != null){
Request.ODataProperties().SelectExpandClause = opts.SelectExpand.SelectExpandClause;
}
 if (opts.Count != null)
{
Request.ODataProperties().TotalCount = result.Count();
}

Step 3:
Final full Odata Controller code

//Web API Odata Controller

 [HttpGet]
 [ODataRoute("GetTestData")]
public IQueryable GetTestData(ODataQueryOptions opts)
        {
            
            var repo = unitOfWork.TestRepository;

            IQueryable tempQuery = repo.GetTestData();
            IQueryable result = tempQuery;

            if (opts.Filter != null){
                tempQuery = opts.Filter.ApplyTo(tempQuery, new ODataQuerySettings()) as IQueryable;
}
            if (opts.Top != null){
                tempQuery = opts.Top.ApplyTo(tempQuery, new ODataQuerySettings()) as IQueryable;
}
            if (opts.Skip != null){
                tempQuery = opts.Skip.ApplyTo(tempQuery, new ODataQuerySettings()) as IQueryable;
}
            if (opts.OrderBy != null){
                tempQuery = opts.OrderBy.ApplyTo(tempQuery, new ODataQuerySettings()) as IQueryable;
}
            if (opts.SelectExpand != null){
                Request.ODataProperties().SelectExpandClause = opts.SelectExpand.SelectExpandClause;
}
            if (opts.Count != null)
            {
                Request.ODataProperties().TotalCount = result.Count();
            }

            result = tempQuery.ToList().AsQueryable();

            return result;
        }

Happy coding👍

Friday, January 4, 2019

OData $expand and $select

How to implement OData $expand and $select in Web API

You can download the full code from my GitHub repository 

And follow my YouTube channel for detailed description
Step 1:

Create a new Web API project in visual studio and add below nuget packages to enable odata
"Microsoft.AspNet.OData"

Step 2:

Create below model class Product, Supplier, Category and ProductList
-----------------------------------------------------------------------------
using System.ComponentModel.DataAnnotations.Schema;

namespace ODataExpandAndSelect.Models
{
    public class Product
    {
        public int ID { get; set; }
        public string Name { get; set; }
        public decimal Price { get; set; }

        [ForeignKey("Category")]
        public int CategoryId { get; set; }
        public virtual Category Category { get; set; }

        [ForeignKey("Supplier")]
        public string SupplierId { get; set; }
        public virtual Supplier Supplier { get; set; }
    }
}
----------------------------------------------------------------------------------------
using System.ComponentModel.DataAnnotations;

namespace ODataExpandAndSelect.Models
{
    public class Supplier
    {
        [Key]
        public string Key { get; set; }
        public string Name { get; set; }
    }

}
----------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using System.Linq;

namespace ODataExpandAndSelect.Models
{
    public class Category
    {
        public Category()
        {
            Products = new HashSet();
        }
        public int ID { get; set; }
        public string Name { get; set; }
        public virtual ICollection Products { get; set; }
    }
------------------------------------------------------------------------------------------
    public class ProductList
    {
        public IQueryable getProducts()
        {
            List products = new List();

            Category c1 = new Category() { ID = 1, Name = "Category1", Products = products };
            Category c2 = new Category() { ID = 2, Name = "Category2", Products = products };
            Supplier s1 = new Supplier() { Key = "s1", Name = "Supplier1" };
            Supplier s2 = new Supplier() { Key = "s2", Name = "Supplier2" };

            Product p1 = new Product() { ID = 1, Category = c1, CategoryId = 1, Name = "product1", Price = 100.50M , Supplier = s1, SupplierId = "SupplierS1" };
            Product p2 = new Product() { ID = 2, Category = c2, CategoryId = 2, Name = "product2", Price = 200.50M , Supplier = s2, SupplierId = "SupplierS2" };

            products.Add(p1);
            products.Add(p2);
           
            return products.AsQueryable();
        }
    }
}
------------------------------------------------------------------------------------

Since I am not using any database in this example, So I have created ProductList class and returning some dummy data, You may use any database or entity framework as per your requirement.

Step 3:  
In WebApi.Config Please add below changes

using System.Linq;
using System.Web.Http;
using Microsoft.AspNet.OData.Batch;
using Microsoft.AspNet.OData.Builder;
using Microsoft.AspNet.OData.Extensions;
using Microsoft.OData.Edm;
using ODataExpandAndSelect.Models;

namespace ODataExpandAndSelect
{
    public static class WebApiConfig
    {
        private static IEdmModel GetEdmModel()
        {
            ODataConventionModelBuilder builder = new ODataConventionModelBuilder();
            builder.Namespace = "WebAPITest";
            builder.ContainerName = "DefaultContainer";
            builder.EntitySet("Product");
            builder.EntitySet("Category");
            builder.EntitySet("Supplier");
            var edmModel = builder.GetEdmModel();
            return edmModel;
        }
        public static void Register(HttpConfiguration config)
        {
            config.Count().Filter().OrderBy().Expand().Select().MaxTop(null); 
            config.MapODataServiceRoute("odata", null, GetEdmModel(), new DefaultODataBatchHandler(GlobalConfiguration.DefaultServer));
            config.EnsureInitialized();
        }
    }

}

Step 4:
Create a ProductController in controller folder like below

using System.Linq;
using Microsoft.AspNet.OData;
using ODataExpandAndSelect.Models;

namespace ODataExpandAndSelect.Controllers
{
    public class ProductController : ODataController
    {
        [EnableQuery]
        public IQueryable Get()
        {
            ProductList list = new ProductList();
            var data = list.getProducts();
            return data;
        }
    }
}

Step 5:
In Global.asax.cs add below changes

using System.Web.Http;

namespace ODataExpandAndSelect
{
    public class WebApiApplication : System.Web.HttpApplication
    {
        protected void Application_Start()
        {
            GlobalConfiguration.Configure(WebApiConfig.Register);
        }
    }
}

Step 6:
Run your application on IIS express and use query like below

http://localhost:5197/Product?$expand=Category





You can expand to next level through comma separated like below query
http://localhost:5197/Product?$expand=Category,Supplier


For $select you can you below query
http://localhost:5197/Product?$select=Price





Thank you
Happy coding