Skip to content

Message Broker System

Message Broker System

PROBLEM STATEMENT

Write a program to implement/simulate a message-broker system. Messages are be send, randomly, between any four stations (processes). Message will contain some substantial message, the source and the destination identity. Message formats can have following properties:-
    a.Text can be only in uppercase or only in lowercase
    b.End of word delimiter is space, comma, period etc.
    c.End of a statement is a special symbol, new line symbol,            
      etc.
An independent database will be present which will contain records all the stations and their corresponding message formats. An independent message broker program will convert the message from source station in message format understood by the destination station.

DESCRIPTION

In message-queuing systems, conversions are handled by special nodes in a queuing network, known as message brokers.
A message broker acts as an application-level gateway in a message-queuing system.

A message broker can be as simple as a reformatter for messages.
E.g.: An incoming message contains a table from a database, in which records are separated by a special end-of-record delimiter and fields within a record have a known, fixed length. If the destination application expects a different delimiter between records, and also expects that fields have variable lengths, a message broker can be used to convert messages to the format expected by the destination.

SOURE CODE

MBSServer.java

import java.io.*;
import java.net.*;
import java.sql.*;

public class MBSServer
{

  public static void main(String args[])throws Exception
    {
                InetAddress lclhost;
        lclhost = InetAddress.getLocalHost();
        System.out.println(lclhost);
        MBSSer mbs=new MBSSer(lclhost);
        mbs.recPort(8002);
        while(true)
        {
           
            mbs.recData();
            mbs.sendData();   
        }
    }
}

class MBSSer
{
int sendport,recport;
InetAddress ip;
String datastr;
Connection con=null;
    MBSSer(InetAddress ip)
    {
       this.ip=ip;
    }   

    void sendPort(int sendport)
    {
      this.sendport=sendport;   
    }

     void recPort(int recport)
    {
       this.recport=recport;
        System.out.println(“PORT set”);
    }

     void recData()
    {
        try
            {
                byte[] buf=new byte[256];
                char SrcArr[] = new char[10];
                    char DestArr[] = new char[10];
String src=””,dest=””,srcdelim=””,srcnewline=””,srcstrcase=””,desdelim=””,desnewline=””,desstrcase=””;
                char de,se,snl,dnl;
                String txt=””;
                DatagramPacket dp;
                DatagramSocket ds;
                ds=new DatagramSocket(recport);
                dp=new DatagramPacket(buf,buf.length);
                ds.receive(dp);
                ds.close();
            txt=new String(dp.getData(),0,dp.getLength());
            txt.getChars(0,5,SrcArr,0);
            txt.getChars(6,11,DestArr,0);
            src = new String(SrcArr);
            dest = new String(DestArr);
            src = src.trim();
            dest = dest.trim();

System.out.println(“Data coming from ” + src + “Send Data to “+ dest);
            if(dest.equalsIgnoreCase(“udcs1”)==true)
                sendPort(9001);
   
            else if(dest.equalsIgnoreCase(“udcs2”)==true)
                sendPort(9003);   
           
            else if(dest.equalsIgnoreCase(“udcs3”)==true)
                sendPort(9005);   
            try{
            Class.forName(“sun.jdbc.odbc.JdbcOdbcDriver”);
             con=DriverManager.getConnection(“jdbc:odbc:MBS”,”scott”,”tiger”);
}
catch(SQLException seq)
{
  seq.printStackTrace();
}
            Statement stsrc = con.createStatement();
            Statement stdes = con.createStatement();
ResultSet rsrc = stsrc.executeQuery(“select * from dataformat where system ='” +src+ “‘”);
ResultSet rdes = stdes.executeQuery(“select * from dataformat where system = ‘”+dest+ “‘”);
           
            if(rsrc.next())
            {
                srcdelim = rsrc.getString(2);
                srcnewline = rsrc.getString(4);
                srcstrcase = rsrc.getString(3);
            }
           
            if(rdes.next())
            {
                desdelim =  rdes.getString(2);
                desnewline = rdes.getString(4);
                desstrcase = rdes.getString(3);
            }

            if(desstrcase.equalsIgnoreCase(“lower”))
                {
                    txt = txt.toUpperCase();
                }
            else
                {
                    txt = txt.toLowerCase();
                }
       
            if(desdelim.equalsIgnoreCase(“comma”))
                {
                     de=’;’;
                }
            else if(desdelim.equalsIgnoreCase(“colon”))
                {
                    de = ‘:’;
                }
            else
                {
                    de = ‘.’;
                }
           

            if(srcdelim.equalsIgnoreCase(“comma”))
                {
                     se=’;’;
                }
            else if(srcdelim.equalsIgnoreCase(“colon”))
                {
                    se = ‘:’;
                }
            else
                {
                    se = ‘.’;
                }

            if(desnewline.equalsIgnoreCase(“dollar”))
                {
                     dnl=’$’;
                }
            else if(desnewline.equalsIgnoreCase(“asterix”))
                {
                    dnl = ‘*’;
                }
            else
                {
                    dnl = ‘?’;
                }
           
       
            if(srcnewline.equalsIgnoreCase(“dollar”))
                {
                    snl=’$’;
                }
            else if(srcnewline.equalsIgnoreCase(“asterix”))
                {
                    snl = ‘*’;
                }
            else
                {
                    snl = ‘?’;
                }
           
            txt = txt.replace(se,de);
            txt = txt.replace(snl,dnl);
            System.out.println(txt);
            datastr = txt;
           
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
}

    void sendData() throws Exception
    {
        BufferedReader objBr;
        DatagramSocket sendDS;
        DatagramPacket sendDP;

        sendDS = new DatagramSocket(sendport);
        sendDP = new DatagramPacket (datastr.getBytes(),datastr.length(),ip,sendport-1000);
        sendDS.send(sendDP);
        sendDS.close();
    }
}

MBSClientOne.java

import java.io.*;
import java.net.*;

class  MBSClientOne
{
    public static void main(String[] args) throws Exception
    {
        String dataStr;
        BufferedReader objBr;
        InetAddress IAHost;
        Client objClnt;
        IAHost = InetAddress.getLocalHost();
        objClnt = new Client(IAHost);
        objClnt.setSendPort(9002);
        objClnt.setRecPort(8001);

        while(true)
        {
            System.out.println(“Do you want to send some data”);
            System.out.println(“Enter Yes or No”);
            objBr = new BufferedReader(new InputStreamReader(System.in));
            dataStr=objBr.readLine();

            if(dataStr.equalsIgnoreCase(“Yes”))
                objClnt.sendData();
else
                objClnt.recData();
        }
    }
}

class Client
{
    InetAddress IAHost;
    int sendPort,recPort;

    Client(InetAddress IAHost)
    {
        this.IAHost=IAHost;
    }

    void setSendPort(int sendPort)
    {
        this.sendPort=sendPort;
    }

    void setRecPort(int recPort)
    {
        this.recPort=recPort;
    }

    void sendData() throws Exception
    {
String dataStr=””;
        BufferedReader objBr;
        DatagramSocket sendDS;
        DatagramPacket sendDP;

        System.out.println(“Enter the data”);
        objBr = new BufferedReader(new InputStreamReader(System.in));
        dataStr=objBr.readLine();

        sendDS = new DatagramSocket(sendPort);
        sendDP = new DatagramPacket (dataStr.getBytes(),dataStr.length(),IAHost,sendPort-1000);
        sendDS.send(sendDP);
        sendDS.close();
    }
       
    void recData() throws Exception
    {
        byte bBuffer[] = new byte[256];
        String msgStr;
        DatagramSocket recDS;
        DatagramPacket recDP;
       
        recDS = new DatagramSocket(recPort);
        recDP = new DatagramPacket(bBuffer,bBuffer.length);
        recDS.receive(recDP);
        recDS.close();

        msgStr=new String(recDP.getData(),0,recDP.getLength());
        System.out.print(“The data is “);
        System.out.println(msgStr);
    }
}

MBSClientTwo.java

import java.io.*;
import java.net.*;

class MBSClientTwo
{
    public static void main(String args[])throws Exception
    {
        String datastr;
        BufferedReader inpt;
        InetAddress lclhost = InetAddress.getLocalHost();
        MBSClient cltwo = new MBSClient(lclhost);
        cltwo.setSendPort(9002);
        cltwo.setRecPort(8003);
       
        while(true)
        {
            System.out.println(“Do You want to send some data”);
            System.out.println(“Enter yes or no”);
            inpt = new BufferedReader(new InputStreamReader(System.in));
            datastr= inpt.readLine();
           
            if(datastr.equalsIgnoreCase(“yes”))
            {
                cltwo.sendData();
            }
            else
            {
                cltwo.recData();
            }
        }

    }
}

class MBSClient
    {
        InetAddress lclhost;
        int sendport,recport;

         MBSClient(InetAddress lclhost)
        {
            this.lclhost = lclhost;   
        }

        void setSendPort(int sendport)
        {
            this.sendport = sendport;
        }
           
        void setRecPort(int recport)
        {
            this.recport = recport;
        }
   
        void sendData() throws Exception
        {
            String str=””;
            BufferedReader bfr;
            DatagramSocket sendds;
            DatagramPacket senddp;
       
            bfr = new BufferedReader(new InputStreamReader(System.in));
            str = bfr.readLine();
           
            sendds = new DatagramSocket(sendport);
            senddp = new DatagramPacket(str.getBytes(),str.length(),lclhost,sendport-1000);
            sendds.send(senddp);
            sendds.close();
        }

        void recData() throws Exception
        {
            byte[] buf = new byte[256];
            String txt;
            DatagramSocket recds;
            DatagramPacket recdp;
   
            recds = new DatagramSocket(recport);
            recdp = new DatagramPacket(buf,buf.length);
            recds.receive(recdp);
            recds.close();

            txt = new String(recdp.getData(),0,recdp.getLength());
           
            System.out.println(“The data is ” +txt);
        }
}   
   

MBSClientThree.java

import java.io.*;
import java.net.*;

public class MBSClientThree
{
    public static void main(String args[])throws Exception
    {
       
        String datastr;
        BufferedReader inpt;
        InetAddress lclhost = InetAddress.getLocalHost();
        MBSClient clthree = new MBSClient(lclhost);
        clthree.setSendPort(9002);
        clthree.setRecPort(8005);
        while(true)
        {
        System.out.println(“Do u want to enter data?(Yes/NO)”);
        BufferedReader br=new         BufferedReader(new InputStreamReader(System.in));
            String ans=br.readLine();
            if(ans.equalsIgnoreCase(“YES”))
                {
                    clthree.sendData();
                }
            else
                clthree.recData();
       
        }
       
    }
}

class MBSClient
{
    int recport,sendport;
    InetAddress lclhost;

    MBSClient(InetAddress lclhost)
    {
        this.lclhost = lclhost;   
    }
    void setRecPort(int recport)
    {
        this.recport=recport;
    }

    void setSendPort(int sendport)
    {
        this.sendport=sendport;
    }

   
    void sendData()throws Exception
    {
        BufferedReader br;
        DatagramSocket ds;
        DatagramPacket dp;
       
        System.out.println(“Enter the data you want to send”);
        br=new BufferedReader(new InputStreamReader(System.in));
        String datastr=br.readLine();
        ds=new DatagramSocket(sendport);
        dp=new DatagramPacket(datastr.getBytes(),datastr.length(),lclhost,sendport-1000);
        ds.send(dp);
        ds.close();
    }

    void recData()throws Exception
    {
        byte buf[]=new byte[256];
        DatagramSocket ds;
        DatagramPacket dp;
        String msgstr=””;
        ds=new DatagramSocket(recport);
        dp=new DatagramPacket(buf,buf.length);
        ds.receive(dp);
        msgstr=new String(dp.getData(),0,dp.getLength());
        System.out.println(“The recd data is ” +msgstr);
        ds.close();
    }
       

OUTPUT :

MBSServer O/P

E:pracsDCpracsMBS_prac2>java MBSServer
PORT set
Data coming from udcs1Send Data to udcs2
udcs1 udcs2 hello;how r u*
Data coming from udcs2Send Data to udcs3
UDCS2 UDCS3 HII,WELCOME$

MBSClientOne O/P

E:pracsDCpracsMBS_prac2>java MBSClientOne
Do you want to send some data               
Enter Yes or No                             
yes                                     
Enter the data                             
udcs1 udcs2 hello:how r u?                  
Do you want to send some data               
Enter Yes or No                            
no      

MBSClientTwo O/P

E:pracsDCpracsMBS_prac2>java MBSClientTwo
Do You want to send some data
Enter yes or no
no
The recd data is udcs1 udcs2 hello;how r u*
Do You want to send some data
Enter yes or no
yes
Enter the data you want to send
udcs2 udcs3 HII,WELCOME*
Do You want to send some data
Enter yes or no  

MBSClientThree O/P

E:pracsDCpracsMBS_prac2>java MBSClientThree
Do u want to enter data?(Yes/NO)
no
The recd data is UDCS2 UDCS3 HII,WELCOME$
Do u want to enter data?(Yes/NO)
no                       

Leave a Reply

Your email address will not be published. Required fields are marked *

error: Content is protected !!