Selectively Retrieving Task Results

Selective task output retrieval enables a Symphony client to request task results by specifying task IDs, in order to control the amount of memory consumed on the client host.

Scope


Operating system (client)

  • All platforms supported by Symphony

Limitations

  • This feature only applies to clients that receive results synchronously.

  • This feature is only available if the session is created with the FetchResultsDirectly session flag.

  • COM API is not supported.

  • Multi-threaded use of this feature is not recommended unless the client implements a synchronization scheme to prevent overlapping criteria from being specified in concurrently executing threads; otherwise the client may hang or falsely validate the task output retrieval.


About selective task result retrieval

Before learning how this feature can impact memory management on the client host, it might be helpful to know how results are processed in Symphony’s default model.

Default task output retrieval

In the default model, the Symphony Session Manager sends results to the client as soon as they are available. If the client makes a request for results before they arrive, the client code is blocked until the results arrive or the wait period expires. If the results arrive before the client makes a request, the results are queued (cached) in the local session’s result set. Since the delivery of results from the Session Manager is not in sync with the retrieval of results, it is possible that too many results could be queued in the API layer; this could cause the client to run out of memory and eventually terminate abnormally. One possible solution is to have the client request a specific number of results directly from the Session Manager; this feature is explained in On-demand results retrieval. This can help to manage memory usage on the client host but the requested results may not be the ones the client is really interested in.

Symphony offers another option, which enables a client to request specific tasks results via selection criteria. Using this feature, the client gets only the results it wants.

Selective task output retrieval

The purpose of selective task output retrieval is twofold: it controls the retrieval of results so that the client is not overloaded with results and runs out of memory, and it allows the client to retrieve only the results it is interested in. This is achieved by having the client retrieve data directly from the Session Manager instead of from the local result cache. To better understand the interaction between the client and the Session Manager, let’s look at the sequence of events.

  1. Session Manager gets the result from the service and defers dispatching it to the client.

  2. The client makes an API call to selectively retrieve task results from the Session Manager by supplying the task IDs for the results it wants to process.

  3. In the non-blocking model (zero timeout), the Session Manager returns all the results (that match the task IDs) available at the time of the API call. If no matching task IDs are found, no results are returned.

    In the blocking model (infinite timeout), the API call does not return until all the results with matching task IDs are available.

Note:

Task results returned to the client are not sorted. For example, if the client retrieves the output of tasks with IDs 1, 2, 3, the results may be returned in a different order.

Client API

Selective task results retrieval can only be achieved through the client API. To use this feature, the client application must perform the following sequence:

  1. Create a session using the FetchResultsDirectly flag to inform the API of the client’s intent to retrieve results directly from the Symphony Session Manager.

  2. Send the tasks.

  3. Retrieve results by specifying the associated task IDs in a selection filter.

Code samples for the blocking model

The following code samples demonstrate selective task result retrieval for the blocking model in each supported language. Refer to the API reference documentation in the Knowledge Center for more information.

//C++ sample
try
{
    ……
    // Set up session creation attributes and create the session
    attributes.setSessionFlags(Session::ReceiveSync|Session::FetchResultsDirectly);
    SessionPtr sesPtr = conPtr->createSession(attributes);
    //Create a task ID filter object
    TaskIdFilter filter;
        
    // Now we will send some messages to our service
    for (int taskCount = 0; taskCount < tasksToSend; taskCount++)
    {
        ……
        // send tasks
        TaskInputHandlePtr input = sesPtr->sendTaskInput(attrTask);
        // add specific task ID into task ID filter
        filter.addId(input->getId());
    }
        
    //Block until all filtered task outputs are ready.
    EnumItemsPtr enumOutput = sesPtr->fetchTaskOutput(filter);
    //handle the task outputs
    …….
}
//Java sample
try
{
    ……
    // Set up session creation attributes and create the session
    attributes.setSessionFlags(Session.RECEIVE_SYNC | Session.FETCH_RESULTS_DIRECTLY);
    session = connection.createSession(attributes);
    //Create a task ID filter object
    TaskIdFilter filter = new TaskIdFilter();
        
    // Now we will send some messages to our service
    for (int taskCount = 0; taskCount < tasksToSend; taskCount++)
    {
        ……
        // send tasks
        TaskInputHandle input = session.sendTaskInput(attrTask);
        // add specific task ID into task ID filter
        filter.addId(input.getId());
    }
        
    //Block until all filtered task outputs are ready.
    EnumItems enumOutput = session.fetchTaskOutput(filter);
    //handle the task outputs
    …….
}
//C# sample
try
{
    ……
    // Set up session creation attributes and create the session
    attributes.SessionFlags = SessionFlags.ReceiveSync | 
    SessionFlags.FetchResultsDirectly;
    session = connection.CreateSession(attributes);
    //Create a task ID filter object
    TaskIdFilter filter = new TaskIdFilter();
        
    // Now we will send some messages to our service
    for (int taskCount = 0; taskCount < tasksToSend; taskCount++)
    {
        ……
        // send tasks
        TaskInputHandle input = session.SendTaskInput(attrTask);
        // add specific task ID into task ID filter
        filter.AddId(input.Id);
    }
        
    //Block until all filtered task outputs are ready.
    EnumItems enumOutput = session.FetchTaskOutput(filter);
    //handle the task outputs
    …….
}

Code samples for the non-blocking model

The following code samples demonstrate selective task result retrieval for the non-blocking model in each supported language. Refer to the API reference documentation in the Knowledge Center for more information.

//C++ sample
try
{
    ……
    // Set up session creation attributes and create the session
    attributes.setSessionFlags(Session::ReceiveSync|Session::FetchResultsDirectly);
    SessionPtr sesPtr = conPtr->createSession(attributes);
    //Create a task ID filter object
    TaskIdFilter filter;
        
    // Now we will send some messages to our service
    for (int taskCount = 0; taskCount < tasksToSend; taskCount++)
    {
        ……
        // send tasks
        TaskInputHandlePtr input = sesPtr->sendTaskInput(attrTask);
        // add specific task ID into task ID filter
        filter.addId(input->getId());
    }
    while (!filter.isSatisfied()) 
    {
        EnumItemsPtr enumOutput = sesPtr->fetchTaskOutput(filter, 
        0 /* non-blocking */); 
        //handle the task output
        …….
        // Since the current thread is not blocked waiting for all results, we can do  
        // something else in between fetch attempts to make use of the current thread.
    }
} 
//Java sample
try
{
    ……
    // Set up session creation attributes and create the session
    attributes.setSessionFlags(Session.RECEIVE_SYNC | Session.FETCH_RESULTS_DIRECTLY);
    session = connection.createSession(attributes);
    //Create a task ID filter object
    TaskIdFilter filter = new TaskIdFilter();
        
    // Now we will send some messages to our service
    for (int taskCount = 0; taskCount < tasksToSend; taskCount++)
    {
        ……
        // send tasks
        TaskInputHandle input = session.sendTaskInput(attrTask);
        // add specific task ID into task ID filter
        filter.addId(input.getId());
    }
    while (!filter.isSatisfied())
    {
        EnumItems enumOutput = session.fetchTaskOutput(filter, 0 /* non-blocking */);
        //handle the task output
        …….
        // Since the current thread is not blocked waiting for all results, we can do  
        // something else in between fetch attempts to make use of the current thread.
    }
}
//C# sample
try
{
    ……
    // Set up session creation attributes and create the session
    attributes.SessionFlags = SessionFlags.ReceiveSync | 
    SessionFlags.FetchResultsDirectly;
    session = connection.CreateSession(attributes);
    //Create a task ID filter object
    TaskIdFilter filter = new TaskIdFilter();
        
    // Now we will send some messages to our service
    for (int taskCount = 0; taskCount < tasksToSend; taskCount++)
    {
        ……
        // send tasks
        TaskInputHandle input = session.SendTaskInput(attrTask);
        // add specific task ID into task ID filter
        filter.AddId(input.Id);
    }
        
    while (!filter.IsSatisfied)
    {
        EnumItems enumOutput = session.FetchTaskOutput(filter, 0 /* non-blocking */);
        //handle the task output
        …….
        // Since the current thread is not blocked waiting for all results, we can do  
        // something else in between fetch attempts to make use of the current thread.
    }
}