Balancing bullshits

After spreading bullshits, our Bullshit Company has been overflown by bullshit requests.
It's time to provide a bigger architecture than the one we used. Let's balance the Bullshit Generator.

For the impatients, this post will deal with several problems that a beginner could encounter or fear : exceptions handling and sequences

After dealing with millions of requests, our CBSG server was unable to answer to all of these.
A solution is to provide a load balancing CBSG.
To do this, we planned to dispatch several CBSG servants on different machines. These servants will have to register themselves to a CBSGBalancer.

Let's start with IDL.
We add the following at the end of our previous file:

// This exception is thrown when a client is asking for a CBSG
  //And no more are available
  exception NoMoreCBSGAvailable
  {
  };
 
  //This exception is thrown when a servant tries to unregister
  //itself and that a client still uses it
  exception CBSGInUse
  {
  };
 
  typedef sequence<CBSG> CBSGSeq;
 
  //Our load-balancing object
  interface CBSGBalancer{
    //This method allows to register a new servant to put in the balancer
    void register(in CBSG newServant);
 
    //This method should be called when a servant is shut down
    void unregister(in CBSG servantToRemove) raises (CBSGInUse);
 
    //This method is for client wanting to use a CBSG
    CBSG reserve() raises (NoMoreCBSGAvailable);
 
    //This method allows a client to reserve several CBSG
    //Less than the parameter could be returned depending on the number available
    //The exception is thrown when none can be returned
    CBSGSeq reserveSeveral(in long numberToReserve) raises (NoMoreCBSGAvailable);
 
    //This method must be called by client to 
    //release a previously reserved CBSG
    void release(in CBSG noMoreUsedCBSG);
  };

The IDL code is self-explanatory and reflects what we described before.
Take care of keeping your last *-impl.ad[sb] as the generation will overwrite them[1].
We generate Ada code from it like in the previous post:

fred@coruscant :~/cbsg/corba/Ada $ iac -i ../idl/cbsg.idl


Ok, now, we have to write the code of our balancer.

The Balancer

First of all, let's modify the spec :

--[snip...]
   function "=" (Left, Right : CorbaCBSG.CBSG.Ref) return Boolean;
 
private
   package Ref_Vectors is new Ada.Containers.Vectors(Index_Type => Positive,
						     Element_Type => CorbaCBSG.CBSG.Ref
						    );
 
   type Object is
     new PortableServer.Servant_Base with record
	Usable_Refs : Ref_Vectors.Vector;
	In_Use_Refs : Ref_Vectors.Vector;
      end record;
--[snip...]

We will use a standard Ada Vector to keep our CBSG servants. As a consequence, we need a way to test equality on CBSG references.
As you can see, we use two vectors for this. This is easy, one for available servants and the other for busy ones.
Now, here's the implementation. First, the register method :

procedure register
     (Self : not null access Object;
      newServant : CorbaCBSG.CBSG.Ref)
   is
   begin
      Ref_Vectors.Append(Self.Usable_Refs, NewServant);
      Put_Line("A servant was registered");
      Put_Line("It corbaloc is " & To_Standard_String(PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(NewServant)));
   end register;

In fact, we will not receive CBSG servants directly but references to them[2].
We simply add the reference to our vector.
As in every quick and dirty code, we use standard output for logging :D
Off course, using Alog, GNATCOLL or whatever logging framework would be smarter.
After registering, we need unregistering

procedure unregister
     (Self : not null access Object;
      servantToRemove : CorbaCBSG.CBSG.Ref)
   is
      Index : Ref_Vectors.Cursor;
   begin
      -- first, test if present in usable
      index := Ref_Vectors.Find(Self.Usable_Refs, ServantToRemove);
      if (Index /= Ref_Vectors.No_Element) then
	 Ref_Vectors.Delete(Self.Usable_Refs, Index);
      else
	 raise CBSGInUse;
      end if;
   end unregister;

We simply remove from the vector where it's referenced... Well, this could be better as we can imagine that the servant which is unregistering would probably quit anyway but reliability is not part of this post :)
As you could have seen exceptions in CORBA are just plain old Ada exceptions. That's all !!!

function reserve
     (Self : not null access Object)
     return CorbaCBSG.CBSG.Ref
   is
      First_Cursor : Ref_Vectors.Cursor := Ref_Vectors.First(Self.Usable_Refs);
      Returned_Ref : CorbaCBSG.CBSG.Ref;
   begin
      if (First_Cursor /= Ref_Vectors.No_Element) then
	 Returned_ref := Ref_Vectors.Element(First_Cursor);
	 Put_Line("Sending " & To_Standard_String(PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(Returned_Ref)));
 
	 -- we put it in use
	 Ref_Vectors.Append(Self.In_Use_Refs, Ref_Vectors.Element(First_Cursor));
	 -- we remove it from usable
	 Ref_Vectors.Delete(Self.Usable_Refs, First_Cursor);
      else
	 raise NoMoreCBSGAvailable;
      end if;
      return Returned_Ref;
   end reserve;

Now what about sequences we use in reserveSeveral?
Well, it's more complicated than in other languages but let's check.
In our case as sequence is a typedef in our package, the specification is in corbacbsg.ads

--snip
   package CBSG_Forward is
     new CORBA.Forward;
 
--snip
   package IDL_SEQUENCE_CorbaCBSG_CBSG_Forward is
     new CORBA.Sequences.Unbounded
        (CorbaCBSG.CBSG_Forward.Ref);
 
   type CBSGSeq is
     new CorbaCBSG.IDL_SEQUENCE_CorbaCBSG_CBSG_Forward.Sequence;
--snip

In fact, in PolyORB, sequences don't contain directly our object but an adapter object called Forward as explain on Khotar Labs blog. So we won't be able to return a reference array but an array of forward objects containing references.
As sequences are instanciations of CORBA.Sequences.Unbounded generic package which in fact are PolyORB.Sequences.Unbounded, all we need is in the polyorb-sequences-unbounded.ads file.

--snip
   function reserveSeveral
     (Self : not null access Object;
      NumberToReserve : CORBA.Long)
     return CorbaCBSG.CBSGSeq
   is
      use CorbaCBSG.CBSG.Convert_Forward;
 
      Returned_Seq : CorbaCBSG.CBSGSeq;
   begin
      for Index in 1..NumberToReserve loop
	 begin
	    Append(Returned_Seq, 
		   To_Forward(
			      Reserve(Self)
			     ));
	 exception
	    when NoMoreCBSGAvailable =>
	       if (Index = 1) then
		  raise;
	       else
		  exit;
	       end if;
	 end;
      end loop;
 
      return Returned_Seq;
   end reserveSeveral;
--snip

In order to convert our objects into forward objects, we need to use the To_Forward function. This function is defined as a package called Convert_Forward in the file representing the object to store. In our case, it will be in CorbaCBSG-cbsg.ads.
Once the object has been converted, we can put it in our sequence using Append... Easy :)
Well, this implementation is not really great as it can return fewer references than requested but it conforms to our spec in the IDL file ;)

Finally, the release function is quite simple.

procedure release
     (Self : not null access Object;
      noMoreUsedCBSG : CorbaCBSG.CBSG.Ref)
   is
      In_Use_Cursor : Ref_Vectors.Cursor := Ref_Vectors.Find(Self.In_Use_Refs, noMoreUsedCBSG);
   begin
      -- we remove it from in use vector if present
      if (In_Use_Cursor /= Ref_Vectors.No_Element) then
	 Ref_Vectors.Delete(Self.In_Use_Refs, In_Use_Cursor);
	 Ref_Vectors.Append(Self.Usable_Refs, NoMoreUsedCBSG);
      end if;
   end release;


The last function to implement is the equality for our objects. As Corbaloc are unique, the test only deals with this :

function "=" (Left, Right : CorbaCBSG.CBSG.Ref) return Boolean
   is
   begin
      if (PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(left) = PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(right)) then
	 return True;
      else
	 return False;
      end if;
   end "=";


Our servant is implemented, so it's time to expose it :

with Ada.Exceptions;
with Ada.Text_IO; use Ada.Text_IO;
 
with CORBA.Impl;
with CORBA.Object;
with CORBA.ORB;
 
with PortableServer.POA.Helper;
with PortableServer.POAManager;
 
with CorbaCBSG.CBSGBalancer.Impl;
 
with PolyORB.CORBA_P.CORBALOC;
 
-- allows to specify to PolyORB how to run
with PolyORB.Setup.No_Tasking_Server;
pragma Warnings (Off, PolyORB.Setup.No_Tasking_Server);
 
procedure Balancer is
begin
 
   declare
      -- We retrieve paramaters defined in CORBA standard such a InitialRef
      Argv : CORBA.ORB.Arg_List := CORBA.ORB.Command_Line_Arguments;
 
   begin
      -- We initialize our bus as ORB
      CORBA.ORB.Init (CORBA.ORB.To_CORBA_String ("ORB"), Argv);
 
      declare
	 -- The PortableObjectAdapter is where we store our objects
         Root_POA : PortableServer.POA.Local_Ref;
 
	 -- we declare a reference on it
         Ref : CORBA.Object.Ref;
 
	 -- and its implementation
         Obj : constant CORBA.Impl.Object_Ptr := new CorbaCBSG.CBSGBalancer.Impl.Object;
 
      begin
 
	 -- Get the root POA
         Root_POA := PortableServer.POA.Helper.To_Local_Ref
           (CORBA.ORB.Resolve_Initial_References
	      (CORBA.ORB.To_CORBA_String ("RootPOA")));
 
	 -- Start it
         PortableServer.POAManager.Activate
           (PortableServer.POA.Get_The_POAManager (Root_POA));
 
	 -- Create a reference in the POA
         Ref := PortableServer.POA.Servant_To_Reference
           (Root_POA, PortableServer.Servant (Obj));
 
	 -- Display the IOR
         Put_Line
           ("'"
	      & CORBA.To_Standard_String (CORBA.Object.Object_To_String (Ref))
	      & "'");
         New_Line;
 
         --  and the corbaloc
         Put_Line
           ("'"
	      & CORBA.To_Standard_String
	      (PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc (Ref))
	      & "'");
 
         --  Launch the server. CORBA.ORB.Run is supposed to never return,
         --  print a message if it does.
 
         CORBA.ORB.Run;
 
         Put_Line ("ORB main loop terminated!");
      end;
   end;
exception
   when E : others =>
      Put_Line
        ("CBSG Balancer server raised " & Ada.Exceptions.Exception_Information (E));
      raise;
end Balancer;

There's nothing special to say about it, so we go on with the CBSG that will register to the balancer

The registering CBSG

In fact, our previously implemented CBSG is still usable without any modification. The only thing we have to do is to change the way we launch it.
This time, we will also need a reference to the balancer and we will need to register it.
The code is really simple.

with Ada.Command_Line; use Ada.Command_Line;
 
with Ada.Exceptions;
with Ada.Text_IO; use Ada.Text_IO;
 
with CORBA.Impl;
with CORBA.Object;
with CORBA.ORB;
 
with PortableServer.POA.Helper;
with PortableServer.POAManager;
 
with CorbaCBSG.CBSG.Helper;
with CorbaCBSG.CBSG.Impl;
with CorbaCBSG.CBSGBalancer;
 
with PolyORB.CORBA_P.CORBALOC;
 
-- Specify to PolyORB how to dea with tasks
with PolyORB.Setup.No_Tasking_Server;
pragma Warnings (Off, PolyORB.Setup.No_Tasking_Server);
 
procedure Registering_Servant is
begin
 
   declare
      -- Allows to get parameters as defined in Corba standard like InitialRef
      Argv : CORBA.ORB.Arg_List := CORBA.ORB.Command_Line_Arguments;
 
   begin
      -- Init our bus under ORB name
      CORBA.ORB.Init (CORBA.ORB.To_CORBA_String ("ORB"), Argv);
 
      declare
	 -- PortableObjectAdapter is where we store our objects
         Root_POA : PortableServer.POA.Local_Ref;
 
	 -- We declare a reference for our object
         Ref : CorbaCBSG.CBSG.Ref;
 
	 -- and its implementation
         Obj : constant CORBA.Impl.Object_Ptr := new CorbaCBSG.CBSG.Impl.Object;
 
	 -- we declare a reference to the balancer
	 Balancer : CorbaCBSG.CBSGBalancer.Ref;
 
      begin
	 CORBA.ORB.Initialize ("ORB");
	 if Argument_Count not in 1 .. 2 then
	    Put_Line
	      ("usage: registered_servant <IOR_string_from_server>");
	    return;
	 end if;
 
	 --  Get a reference on the distributed object through its IOR or corbaloc
	 CORBA.ORB.String_To_Object
	   (CORBA.To_CORBA_String (Ada.Command_Line.Argument (1)), Balancer);
 
	 -- Check the reference is correct
	 if CorbaCBSG.CBSGBalancer.Is_Nil(Balancer) then
	    Put_Line ("main : cannot invoke on a nil reference");
	    return;
	 end if; 
 
	 -- Get the root
	 Root_POA := PortableServer.POA.Helper.To_Local_Ref
	   (CORBA.ORB.Resolve_Initial_References
	      (CORBA.ORB.To_CORBA_String ("RootPOA")));
 
	 -- Start our POA
	 PortableServer.POAManager.Activate
	   (PortableServer.POA.Get_The_POAManager (Root_POA));
 
	 -- We create a ref to expose it
	 Ref := CorbaCBSG.CBSG.Helper.To_Ref(PortableServer.POA.Servant_To_Reference
					       (Root_POA, PortableServer.Servant (Obj)));
 
	 -- we display the IOR
	 Put_Line
	   ("'"
	      & CORBA.To_Standard_String (CORBA.Object.Object_To_String (Ref))
	      & "'");
	 New_Line;
 
	 --  and the CorbaLoc
	 Put_Line
	   ("'"
	      & CORBA.To_Standard_String
	      (PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc (Ref))
	      & "'");
 
	 -- Ask to register to the balancer
	 CorbaCBSG.CBSGBalancer.Register(Balancer, Ref);
 
	 --  Launch the server. CORBA.ORB.Run is supposed to never return,
	 --  print a message if it does.
 
	 CORBA.ORB.Run;
 
	 Put_Line ("ORB main loop terminated!");
      end;
   end;
exception
   when E : others =>
      Put_Line
        ("CBSG server raised " & Ada.Exceptions.Exception_Information (E));
      raise;
end Registering_Servant;

In this code, we launch our server with a parameter which is the IOR of our balancer. This parameter allows to get a reference on it in order to call the register function.
That's all for it.

Our Java client

This time, we will use a Java client to request bullshits.

import org.omg.CORBA.*;
import CorbaCBSG.*;
 
public class CBSGClient
{
    public static void main(String[] argv)
    {
	//First, init the ORB
	ORB orb = ORB.init(argv, null);
 
	org.omg.CORBA.Object obj = orb.string_to_object(argv[0]);
	CBSGBalancer cbsgBalancer = CBSGBalancerHelper.narrow(obj);
 
	try
	{
	    CBSG[] cbsgArray = cbsgBalancer.reserveSeveral(2);
 
	    for (int i=0; i < 2; i++)
	    {
		String returned_l = cbsgArray[i].createSentence();
		System.out.println("The generator " + i + " said : " + returned_l);
 
		// We release it
		cbsgBalancer.release(cbsgArray[i]);
	    }
	}
	catch (NoMoreCBSGAvailable ex)
	{
	    System.out.println("Ouch !! We can't reserve one CBSG");
	}
    }
}

Simple !!
I won't describe here how Java deals with Corba through helpers, Operations' classes.
Ok, I forgot one thing and you could say :

What about using sequences in Ada ?

Ok, we will implement a client in Ada to show it :)

with Ada.Command_Line; use Ada.Command_Line;
with Ada.Text_IO; use Ada.Text_IO;
 
with CORBA.ORB;
 
with CorbaCBSG.CBSGBalancer;
with CorbaCBSG.CBSG;
 
with PolyORB.CORBA_P.CORBALOC;
 
with PolyORB.Setup.No_Tasking_Server;
pragma Warnings (Off, PolyORB.Setup.No_Tasking_Server);
 
 
Procedure Balanced_Client is
 
   Argv : CORBA.ORB.Arg_List := CORBA.ORB.Command_Line_Arguments;
 
   -- We declare a reference to the balancer
   Balancer : CorbaCBSG.CBSGBalancer.Ref;
 
   -- The sequence of CBSG to call
   Callees : CorbaCBSG.CBSGSeq;
 
begin
   -- Init the ORB
   CORBA.ORB.Init (CORBA.ORB.To_CORBA_String ("ORB"), Argv);
 
   if Argument_Count not in 1 .. 2 then
      Put_Line
	("usage: balanced_client <IOR_string_from_server>");
      return;
   end if;
 
   -- Get our balancer through its corbaloc or IOR
   CORBA.ORB.String_To_Object
     (CORBA.To_CORBA_String (Ada.Command_Line.Argument (1)), Balancer);
 
   -- Check it's correct
   if CorbaCBSG.CBSGBalancer.Is_Nil(Balancer) then
      Put_Line ("main : cannot invoke on a nil reference");
      return;
   end if; 
 
   -- Ask several CBSG to the balancer
   Callees := CorbaCBSG.CBSGBalancer.ReserveSeveral(Balancer, 2);
 
   -- Call each one
   for Index in 1..CorbaCBSG.Length(Callees) loop
      declare
	 The_Cbsg : CorbaCBSG.CBSG.Ref := 
	   CorbaCBSG.CBSG.Convert_Forward
	   .From_Forward(
			 CorbaCBSG.Get_Element(Callees, Index)
			);
      begin
	 -- Call the element
	 Put_Line("The generator " & Positive'Image(Index) & " said : " & Corba.To_Standard_String(The_Cbsg.createSentence));
	 CorbaCBSG.CBSGBalancer.Release(Balancer, The_Cbsg);
      end;
   end loop;
 
end Balanced_Client;

The most difficult part is the last one... But, it's not that diffcult.
You just have to remember that :

  1. Our sequence was declared inside the top-level part of the IDL
  2. Our sequence contains forward objects instead of reference directly

So, for the first point, all methods declared for sequences[3] are available in our top-level package.
For the second point, we will have to convert our forward objects to the ones we really want. This will be done with From_Forward function in the Convert_Forward package. As this conversion package deals with objects, it is declared in the object package and not at the top-level.

We now have a balancer, a CBSG able to register itself to the balancer and two clients.
It should work out-of-the-box if you run the programs on the same machine but if you want to run it across several machines[4], you will have to use InitialRef or equivalent parameters.
I can't be more precise as, even if I tested it, I've lost the configuration I used :D
Maybe, I'll update this post to provide you the correct parameters. Stay tuned ;)

Notes

[1] Something I just forgot few minutes ago :D

[2] Using ValueType would be a different thing but it is not currently implemented in PolyORB

[3] in PolyORB.Sequences.Unbounded through Corba.Sequences.Unbounded

[4] Which must be the goal of such architecture :)

Ajouter un commentaire

Le code HTML est affiché comme du texte et les adresses web sont automatiquement transformées.

La discussion continue ailleurs