Balancing bullshits
Par Fred le vendredi 19 avril 2013, 14:22 - Développement - Lien permanent
After spreading bullshits, our Bullshit Company has been overflown by bullshit requests.
It's time to provide a bigger architecture than the one we used. Let's balance the Bullshit Generator.
For the impatients, this post will deal with several problems that a beginner could encounter or fear : exceptions handling and sequences
After dealing with millions of requests, our CBSG server was unable to answer to all of these.
A solution is to provide a load balancing CBSG.
To do this, we planned to dispatch several CBSG servants on different machines. These servants will have to register themselves to a CBSGBalancer.
Let's start with IDL.
We add the following at the end of our previous file:
// This exception is thrown when a client is asking for a CBSG //And no more are available exception NoMoreCBSGAvailable { }; //This exception is thrown when a servant tries to unregister //itself and that a client still uses it exception CBSGInUse { }; typedef sequence<CBSG> CBSGSeq; //Our load-balancing object interface CBSGBalancer{ //This method allows to register a new servant to put in the balancer void register(in CBSG newServant); //This method should be called when a servant is shut down void unregister(in CBSG servantToRemove) raises (CBSGInUse); //This method is for client wanting to use a CBSG CBSG reserve() raises (NoMoreCBSGAvailable); //This method allows a client to reserve several CBSG //Less than the parameter could be returned depending on the number available //The exception is thrown when none can be returned CBSGSeq reserveSeveral(in long numberToReserve) raises (NoMoreCBSGAvailable); //This method must be called by client to //release a previously reserved CBSG void release(in CBSG noMoreUsedCBSG); };
The IDL code is self-explanatory and reflects what we described before.
Take care of keeping your last *-impl.ad[sb] as the generation will overwrite them[1].
We generate Ada code from it like in the previous post:
fred@coruscant :~/cbsg/corba/Ada $ iac -i ../idl/cbsg.idl
Ok, now, we have to write the code of our balancer.
The Balancer
First of all, let's modify the spec :
--[snip...] function "=" (Left, Right : CorbaCBSG.CBSG.Ref) return Boolean; private package Ref_Vectors is new Ada.Containers.Vectors(Index_Type => Positive, Element_Type => CorbaCBSG.CBSG.Ref ); type Object is new PortableServer.Servant_Base with record Usable_Refs : Ref_Vectors.Vector; In_Use_Refs : Ref_Vectors.Vector; end record; --[snip...]
We will use a standard Ada Vector to keep our CBSG servants. As a consequence, we need a way to test equality on CBSG references.
As you can see, we use two vectors for this. This is easy, one for available servants and the other for busy ones.
Now, here's the implementation. First, the register method :
procedure register (Self : not null access Object; newServant : CorbaCBSG.CBSG.Ref) is begin Ref_Vectors.Append(Self.Usable_Refs, NewServant); Put_Line("A servant was registered"); Put_Line("It corbaloc is " & To_Standard_String(PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(NewServant))); end register;
In fact, we will not receive CBSG servants directly but references to them[2].
We simply add the reference to our vector.
As in every quick and dirty code, we use standard output for logging :D
Off course, using Alog, GNATCOLL or whatever logging framework would be smarter.
After registering, we need unregistering
procedure unregister (Self : not null access Object; servantToRemove : CorbaCBSG.CBSG.Ref) is Index : Ref_Vectors.Cursor; begin -- first, test if present in usable index := Ref_Vectors.Find(Self.Usable_Refs, ServantToRemove); if (Index /= Ref_Vectors.No_Element) then Ref_Vectors.Delete(Self.Usable_Refs, Index); else raise CBSGInUse; end if; end unregister;
We simply remove from the vector where it's referenced... Well, this could be better as we can imagine that the servant which is unregistering would probably quit anyway but reliability is not part of this post
As you could have seen exceptions in CORBA are just plain old Ada exceptions. That's all !!!
function reserve (Self : not null access Object) return CorbaCBSG.CBSG.Ref is First_Cursor : Ref_Vectors.Cursor := Ref_Vectors.First(Self.Usable_Refs); Returned_Ref : CorbaCBSG.CBSG.Ref; begin if (First_Cursor /= Ref_Vectors.No_Element) then Returned_ref := Ref_Vectors.Element(First_Cursor); Put_Line("Sending " & To_Standard_String(PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(Returned_Ref))); -- we put it in use Ref_Vectors.Append(Self.In_Use_Refs, Ref_Vectors.Element(First_Cursor)); -- we remove it from usable Ref_Vectors.Delete(Self.Usable_Refs, First_Cursor); else raise NoMoreCBSGAvailable; end if; return Returned_Ref; end reserve;
Now what about sequences we use in reserveSeveral?
Well, it's more complicated than in other languages but let's check.
In our case as sequence is a typedef in our package, the specification is in corbacbsg.ads
--snip package CBSG_Forward is new CORBA.Forward; --snip package IDL_SEQUENCE_CorbaCBSG_CBSG_Forward is new CORBA.Sequences.Unbounded (CorbaCBSG.CBSG_Forward.Ref); type CBSGSeq is new CorbaCBSG.IDL_SEQUENCE_CorbaCBSG_CBSG_Forward.Sequence; --snip
In fact, in PolyORB, sequences don't contain directly our object but an adapter object called Forward as explain on Khotar Labs blog. So we won't be able to return a reference array but an array of forward objects containing references.
As sequences are instanciations of CORBA.Sequences.Unbounded generic package which in fact are PolyORB.Sequences.Unbounded, all we need is in the polyorb-sequences-unbounded.ads file.
--snip function reserveSeveral (Self : not null access Object; NumberToReserve : CORBA.Long) return CorbaCBSG.CBSGSeq is use CorbaCBSG.CBSG.Convert_Forward; Returned_Seq : CorbaCBSG.CBSGSeq; begin for Index in 1..NumberToReserve loop begin Append(Returned_Seq, To_Forward( Reserve(Self) )); exception when NoMoreCBSGAvailable => if (Index = 1) then raise; else exit; end if; end; end loop; return Returned_Seq; end reserveSeveral; --snip
In order to convert our objects into forward objects, we need to use the To_Forward function. This function is defined as a package called Convert_Forward in the file representing the object to store. In our case, it will be in CorbaCBSG-cbsg.ads.
Once the object has been converted, we can put it in our sequence using Append... Easy
Well, this implementation is not really great as it can return fewer references than requested but it conforms to our spec in the IDL file
Finally, the release function is quite simple.
procedure release (Self : not null access Object; noMoreUsedCBSG : CorbaCBSG.CBSG.Ref) is In_Use_Cursor : Ref_Vectors.Cursor := Ref_Vectors.Find(Self.In_Use_Refs, noMoreUsedCBSG); begin -- we remove it from in use vector if present if (In_Use_Cursor /= Ref_Vectors.No_Element) then Ref_Vectors.Delete(Self.In_Use_Refs, In_Use_Cursor); Ref_Vectors.Append(Self.Usable_Refs, NoMoreUsedCBSG); end if; end release;
The last function to implement is the equality for our objects. As Corbaloc are unique, the test only deals with this :
function "=" (Left, Right : CorbaCBSG.CBSG.Ref) return Boolean is begin if (PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(left) = PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc(right)) then return True; else return False; end if; end "=";
Our servant is implemented, so it's time to expose it :
with Ada.Exceptions; with Ada.Text_IO; use Ada.Text_IO; with CORBA.Impl; with CORBA.Object; with CORBA.ORB; with PortableServer.POA.Helper; with PortableServer.POAManager; with CorbaCBSG.CBSGBalancer.Impl; with PolyORB.CORBA_P.CORBALOC; -- allows to specify to PolyORB how to run with PolyORB.Setup.No_Tasking_Server; pragma Warnings (Off, PolyORB.Setup.No_Tasking_Server); procedure Balancer is begin declare -- We retrieve paramaters defined in CORBA standard such a InitialRef Argv : CORBA.ORB.Arg_List := CORBA.ORB.Command_Line_Arguments; begin -- We initialize our bus as ORB CORBA.ORB.Init (CORBA.ORB.To_CORBA_String ("ORB"), Argv); declare -- The PortableObjectAdapter is where we store our objects Root_POA : PortableServer.POA.Local_Ref; -- we declare a reference on it Ref : CORBA.Object.Ref; -- and its implementation Obj : constant CORBA.Impl.Object_Ptr := new CorbaCBSG.CBSGBalancer.Impl.Object; begin -- Get the root POA Root_POA := PortableServer.POA.Helper.To_Local_Ref (CORBA.ORB.Resolve_Initial_References (CORBA.ORB.To_CORBA_String ("RootPOA"))); -- Start it PortableServer.POAManager.Activate (PortableServer.POA.Get_The_POAManager (Root_POA)); -- Create a reference in the POA Ref := PortableServer.POA.Servant_To_Reference (Root_POA, PortableServer.Servant (Obj)); -- Display the IOR Put_Line ("'" & CORBA.To_Standard_String (CORBA.Object.Object_To_String (Ref)) & "'"); New_Line; -- and the corbaloc Put_Line ("'" & CORBA.To_Standard_String (PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc (Ref)) & "'"); -- Launch the server. CORBA.ORB.Run is supposed to never return, -- print a message if it does. CORBA.ORB.Run; Put_Line ("ORB main loop terminated!"); end; end; exception when E : others => Put_Line ("CBSG Balancer server raised " & Ada.Exceptions.Exception_Information (E)); raise; end Balancer;
There's nothing special to say about it, so we go on with the CBSG that will register to the balancer
The registering CBSG
In fact, our previously implemented CBSG is still usable without any modification. The only thing we have to do is to change the way we launch it.
This time, we will also need a reference to the balancer and we will need to register it.
The code is really simple.
with Ada.Command_Line; use Ada.Command_Line; with Ada.Exceptions; with Ada.Text_IO; use Ada.Text_IO; with CORBA.Impl; with CORBA.Object; with CORBA.ORB; with PortableServer.POA.Helper; with PortableServer.POAManager; with CorbaCBSG.CBSG.Helper; with CorbaCBSG.CBSG.Impl; with CorbaCBSG.CBSGBalancer; with PolyORB.CORBA_P.CORBALOC; -- Specify to PolyORB how to dea with tasks with PolyORB.Setup.No_Tasking_Server; pragma Warnings (Off, PolyORB.Setup.No_Tasking_Server); procedure Registering_Servant is begin declare -- Allows to get parameters as defined in Corba standard like InitialRef Argv : CORBA.ORB.Arg_List := CORBA.ORB.Command_Line_Arguments; begin -- Init our bus under ORB name CORBA.ORB.Init (CORBA.ORB.To_CORBA_String ("ORB"), Argv); declare -- PortableObjectAdapter is where we store our objects Root_POA : PortableServer.POA.Local_Ref; -- We declare a reference for our object Ref : CorbaCBSG.CBSG.Ref; -- and its implementation Obj : constant CORBA.Impl.Object_Ptr := new CorbaCBSG.CBSG.Impl.Object; -- we declare a reference to the balancer Balancer : CorbaCBSG.CBSGBalancer.Ref; begin CORBA.ORB.Initialize ("ORB"); if Argument_Count not in 1 .. 2 then Put_Line ("usage: registered_servant <IOR_string_from_server>"); return; end if; -- Get a reference on the distributed object through its IOR or corbaloc CORBA.ORB.String_To_Object (CORBA.To_CORBA_String (Ada.Command_Line.Argument (1)), Balancer); -- Check the reference is correct if CorbaCBSG.CBSGBalancer.Is_Nil(Balancer) then Put_Line ("main : cannot invoke on a nil reference"); return; end if; -- Get the root Root_POA := PortableServer.POA.Helper.To_Local_Ref (CORBA.ORB.Resolve_Initial_References (CORBA.ORB.To_CORBA_String ("RootPOA"))); -- Start our POA PortableServer.POAManager.Activate (PortableServer.POA.Get_The_POAManager (Root_POA)); -- We create a ref to expose it Ref := CorbaCBSG.CBSG.Helper.To_Ref(PortableServer.POA.Servant_To_Reference (Root_POA, PortableServer.Servant (Obj))); -- we display the IOR Put_Line ("'" & CORBA.To_Standard_String (CORBA.Object.Object_To_String (Ref)) & "'"); New_Line; -- and the CorbaLoc Put_Line ("'" & CORBA.To_Standard_String (PolyORB.CORBA_P.CORBALOC.Object_To_Corbaloc (Ref)) & "'"); -- Ask to register to the balancer CorbaCBSG.CBSGBalancer.Register(Balancer, Ref); -- Launch the server. CORBA.ORB.Run is supposed to never return, -- print a message if it does. CORBA.ORB.Run; Put_Line ("ORB main loop terminated!"); end; end; exception when E : others => Put_Line ("CBSG server raised " & Ada.Exceptions.Exception_Information (E)); raise; end Registering_Servant;
In this code, we launch our server with a parameter which is the IOR of our balancer. This parameter allows to get a reference on it in order to call the register function.
That's all for it.
Our Java client
This time, we will use a Java client to request bullshits.
import org.omg.CORBA.*; import CorbaCBSG.*; public class CBSGClient { public static void main(String[] argv) { //First, init the ORB ORB orb = ORB.init(argv, null); org.omg.CORBA.Object obj = orb.string_to_object(argv[0]); CBSGBalancer cbsgBalancer = CBSGBalancerHelper.narrow(obj); try { CBSG[] cbsgArray = cbsgBalancer.reserveSeveral(2); for (int i=0; i < 2; i++) { String returned_l = cbsgArray[i].createSentence(); System.out.println("The generator " + i + " said : " + returned_l); // We release it cbsgBalancer.release(cbsgArray[i]); } } catch (NoMoreCBSGAvailable ex) { System.out.println("Ouch !! We can't reserve one CBSG"); } } }
Simple !!
I won't describe here how Java deals with Corba through helpers, Operations' classes.
Ok, I forgot one thing and you could say :
What about using sequences in Ada ?
Ok, we will implement a client in Ada to show it
with Ada.Command_Line; use Ada.Command_Line; with Ada.Text_IO; use Ada.Text_IO; with CORBA.ORB; with CorbaCBSG.CBSGBalancer; with CorbaCBSG.CBSG; with PolyORB.CORBA_P.CORBALOC; with PolyORB.Setup.No_Tasking_Server; pragma Warnings (Off, PolyORB.Setup.No_Tasking_Server); Procedure Balanced_Client is Argv : CORBA.ORB.Arg_List := CORBA.ORB.Command_Line_Arguments; -- We declare a reference to the balancer Balancer : CorbaCBSG.CBSGBalancer.Ref; -- The sequence of CBSG to call Callees : CorbaCBSG.CBSGSeq; begin -- Init the ORB CORBA.ORB.Init (CORBA.ORB.To_CORBA_String ("ORB"), Argv); if Argument_Count not in 1 .. 2 then Put_Line ("usage: balanced_client <IOR_string_from_server>"); return; end if; -- Get our balancer through its corbaloc or IOR CORBA.ORB.String_To_Object (CORBA.To_CORBA_String (Ada.Command_Line.Argument (1)), Balancer); -- Check it's correct if CorbaCBSG.CBSGBalancer.Is_Nil(Balancer) then Put_Line ("main : cannot invoke on a nil reference"); return; end if; -- Ask several CBSG to the balancer Callees := CorbaCBSG.CBSGBalancer.ReserveSeveral(Balancer, 2); -- Call each one for Index in 1..CorbaCBSG.Length(Callees) loop declare The_Cbsg : CorbaCBSG.CBSG.Ref := CorbaCBSG.CBSG.Convert_Forward .From_Forward( CorbaCBSG.Get_Element(Callees, Index) ); begin -- Call the element Put_Line("The generator " & Positive'Image(Index) & " said : " & Corba.To_Standard_String(The_Cbsg.createSentence)); CorbaCBSG.CBSGBalancer.Release(Balancer, The_Cbsg); end; end loop; end Balanced_Client;
The most difficult part is the last one... But, it's not that diffcult.
You just have to remember that :
- Our sequence was declared inside the top-level part of the IDL
- Our sequence contains forward objects instead of reference directly
So, for the first point, all methods declared for sequences[3] are available in our top-level package.
For the second point, we will have to convert our forward objects to the ones we really want. This will be done with From_Forward function in the Convert_Forward package. As this conversion package deals with objects, it is declared in the object package and not at the top-level.
We now have a balancer, a CBSG able to register itself to the balancer and two clients.
It should work out-of-the-box if you run the programs on the same machine but if you want to run it across several machines[4], you will have to use InitialRef or equivalent parameters.
I can't be more precise as, even if I tested it, I've lost the configuration I used :D
Maybe, I'll update this post to provide you the correct parameters. Stay tuned