Saturday 2 April 2011

Error Handling in SOA 11g: Part 2

Please have a look at my previous post http://shrikworld.blogspot.com/2011/03/errorhandling-in-soa-11g.html before get started.There I have mentioned error queue in fault policy. The moto is that we can bind action ref="ora-errorQ" with below property set,

<Action id="ora-errorQ">
        <javaAction className="com.shrik.ErrorHospitalQueue"
                    defaultAction="ora-terminate"
                    propertySet="enqueue-properties">
          <returnValue value="REPLAY" ref="ora-terminate"/>
          <returnValue value="RETRHOW" ref="ora-rethrow-fault"/>
          <returnValue value="ABORT" ref="ora-terminate"/>
          <returnValue value="RETRY" ref="ora-retry"/>
          <returnValue value="MANUAL" ref="ora-human-intervention"/>
        </javaAction>
      </Action>

<propertySet name="enqueue-properties">
        <property name="aq.queueconnectionfactory">jms/shrikCF</property>
          <property name="aq.queue">jms/shrikQueue</property>
      </propertySet>

So first of all you need to create a jms queue and connection factory in weblogic server , in my case it is shrikQueue and shrikCF and change the enqueue-properties value accordingly.

Here is the code excerpts that help you to enqueue data into shrikQueue,

package com.shrik;


import com.collaxa.cube.engine.fp.BPELFaultRecoveryContextImpl;

import java.io.IOException;

import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import oracle.integration.platform.faulthandling.recovery.RejectedMsgRecoveryContext;
import oracle.integration.platform.faultpolicy.IFaultRecoveryContext;
import oracle.integration.platform.faultpolicy.IFaultRecoveryJavaClass;

public class ErrorHospitalQueue implements IFaultRecoveryJavaClass {

    private String queueCF;
    private String queueName;

    public ErrorHospitalQueue() {
        super();
    }


    public String handleFault(IFaultRecoveryContext iFaultRecoveryContext) {
        BPELFaultRecoveryContextImpl ctx =
            (BPELFaultRecoveryContextImpl)iFaultRecoveryContext;
        Map properties = iFaultRecoveryContext.getProperties();
        UUID uuid = UUID.randomUUID();

        ctx.addAuditTrailEntry("Enqueueing Data into shrikQueue...");

        ctx.addAuditTrailEntry(createEventPayload(ctx));
        ctx.addAuditTrailEntry((String)getParameterValue((ArrayList)properties.get("aq.queueconnectionfactory")));
        ctx.addAuditTrailEntry((String)getParameterValue((ArrayList)properties.get("aq.queue")));
        try {
            enqueueAqEvent(createEventPayload(ctx), uuid, properties, ctx);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "ora-terminate";
    }

    private String createEventPayload(IFaultRecoveryContext context) {

        String eventPayload =
            "<AdminFault xmlns=\"http://www.shrik.com/\">\n" +
            " <ecid>UNKNOWN_ECID</ecid>\n" +
            "</AdminFault>";

        if (context instanceof RejectedMsgRecoveryContext) {

            RejectedMsgRecoveryContext rejectedMessageContext =
                (RejectedMsgRecoveryContext)context;
            String ecid = null;
            if (rejectedMessageContext.getRejectedMessage() != null &&
                rejectedMessageContext.getRejectedMessage().getEcid() !=
                null) {
                ecid = rejectedMessageContext.getRejectedMessage().getEcid();
            } else if (rejectedMessageContext.getFault() != null &&
                       rejectedMessageContext.getFault().getECID() != null) {
                ecid = rejectedMessageContext.getFault().getECID();
            }
            eventPayload = eventPayload.replace("UNKNOWN_ECID", ecid);
        } else if (context instanceof BPELFaultRecoveryContextImpl) {
            BPELFaultRecoveryContextImpl bpelFaultRecoveryContextImpl =
                (BPELFaultRecoveryContextImpl)context;
            eventPayload =
                    eventPayload.replace("UNKNOWN_ECID", bpelFaultRecoveryContextImpl.getECID());

        }

        return eventPayload;
    }


    public void enqueueAqEvent(String input, UUID uuid, Map props,
                               BPELFaultRecoveryContextImpl ctx) throws JMSException,
                                                                        NamingException,
                                                                        IOException {

        Session session = null;
        MessageProducer publisher = null;
        TextMessage message = null;
        Context context = new InitialContext();
        QueueConnectionFactory connectionFactory =
            (QueueConnectionFactory)context.lookup((String)getParameterValue((ArrayList)props.get("aq.queueconnectionfactory")));
        Connection connection =
            (Connection)connectionFactory.createConnection();
        Queue errQueue =
            (Queue)context.lookup((String)getParameterValue((ArrayList)props.get("aq.queue")));
        session =
                (Session)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        publisher = session.createProducer(errQueue);

        message = session.createTextMessage(input);

        message.setJMSCorrelationID(uuid.toString());
        connection.start();
        publisher.send(message);
        connection.stop();
        connection.close();


    }

    private String getParameterValue(ArrayList parameterList) {
        String value = null;
        if (parameterList != null && parameterList.size() > 0)
            value = (String)parameterList.get(0);
        return value;
    }

    public void handleRetrySuccess(IFaultRecoveryContext iFaultRecoveryContext) {
        System.out.println("This is for retry success");
        handleFault(iFaultRecoveryContext);
    }

    public void setQueueCF(String queueCF) {
        this.queueCF = queueCF;
    }

    public String getQueueCF() {
        return queueCF;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public String getQueueName() {
        return queueName;
    }
}

After updating the jar you will be able to see message with ecid got enqueued on error scenario, here is the sample message structure,

<AdminFault xmlns="http://www.shrik.com/">
<ecid>11d1def534ea1be0:-918321b:12f07773909:-8000-0000000000000c0d</ecid>
</AdminFault>

You can customize the message as per your need.

4 comments:

Safeeq said...

I am getting method not found error for

rejectedMessageContext.getFault().getECID()

getECID()function does not seem to exist. Could you please help me with this?

Thanks!

shrikworld said...

I tried with Jdev 11.1.1.4 , but in Jdev 11.1.1.5 u'll get the getEcid() like below,
rejectedMessageContext.getRejectedMessage().getEcid()

Safeeq said...

I am working with Jdev 11.1.1.4 only not the 5 version.

Also for enqueuing the runtime faults, will the remotefault also be captured as a runtime fault and written to the queue ? I assume thats what it should do. Please confirm.

One more question is regarding the Part 3 of this blog where we write console messages to track the progress of the queue. If we start soa server via the weblogic console, how do we see the System.out.Println console messages?

Thanks!

go on clcking said...

"addAuditTrailEntry" can you just give a brief on this please. I have tried to find it in Goole. I found that it is used to log the entries in AuditTrail but I didnt understand what an AuditTrail and why all the properties need be logged in it. Kindly give a brief when you find time.