Fluent Commerce Logo
Docs
Sign In

commercetools Connector Extend (Customisation) Guidelines

Feature

Changed on:

31 Jan 2024

Overview

A Fluent Connector project typically has the structure as defined below :
com.fluentcommerce.connect.custom

Any customizations/extensions can either be done under the custom folder or as a separate module (jar) altogether.

Regardless of which way one chooses to extend it, the key message here is that any custom code should be under the 

`com.fluentcommerce.connect`
 package. This is due to how the FluentConnectorApplication is wired to work with Spring, and in its current form, Spring will only scan for components under 
`com.fluentcommerce.connect`
.

During the server start-up, the commercetools-connector will load all handlers found in the spring context and bind them to the correct service they belong to. As part of this binding process, the commercetools-connector also determines which handler has precedence over others of the same name. In other words, it is possible to override existing handlers by assigning a higher priority to the custom handler. By default, all handlers will have a priority of zero, and the commercetools-connector understands the highest priority as the higher number. For example, priority = 100 takes precedence over priority = 1.

`@HandlerInfo(name = "CustomCategoryUpsert", priority = 100)`

Detailed Technical Description

Message Handlers

Message handlers are dedicated to processing messages pushed to the message queue of the commercetools-connector.

Error Handling

  • For errors that a retry won't make a difference, exit the method with a 
    `UnprocessableMessageException,`
     and the message will be forwarded to the DLQ (dead letter queue).
  • For errors that should have the message retried, exit the method with a 
    `HandlerException`
    . Note that each listener (SQS vs Kafka, for example) has a different implementation of retry.
  • If no exception is thrown, the message is removed from the queue, and it is assumed that the processing was done and no further action is required.
  • For messages that have more than one item to be processed (e.g., a message has a list of products to be processed), either approach is viable.
  • Any failure will stop the processing, and when the message is retried, all items are reprocessed.
  • Failure to process one item will not abort the processing of the rest of the items. Failed items can be individually sent to the commercetools-connector as new messages for retry.
  • Messages that need to perform more than one action may require different approaches based on what kind of data is being processed.
  • In this scenario, each operation should be idempotent, so if the message is retried or replayed, the outcome will not change.

Sample code and configuration

A message handler has to extend 

`MessageHandler`
 and must be annotated with 
`@HandlerInfo`
 and 
`@Component`
. The Component annotation will make this class a singleton spring bean, and the fact that it extends MessageHandler, the commercetools-connector can locate it and
bind it to the Message Routing Service.

1@Slf4j
2@Component
3@HandlerInfo(name = "ProductUpsert")
4public class ProductUpsertHandler extends MessageHandler {
5
6    @Override
7    public void processMessage(final MessageHandlerContext context) throws UnprocessableMessageException, HandlerException {
8        final Optional<MyPayload> payload = context.getMessagePayload(MyPayload.class);
9        if (payload.isPresent()) {
10            //logic to process a message
11            final Event productEvent = processProduct(payload);
12            //send product to Fluent
13            context.ofFluentContext().sendEvent(productEvent);
14        }
15        throw new UnprocessableMessageException("Unable to get message payload");      
16    }
17}

Language: java

Name: Example

Description:

[Warning: empty required content area]

Every message handler must have a 

`route`
 configuration. Some handlers may work in a pair of 
`route-mapping`
 and 
`route`
.

route: Contains the build time configuration of a handler - always required.

  • name: Name of the route.
  • handler: ID of the handler, for example, 
    `ProductUpsert`
    .
  • props: Additional and optional configuration the handler can use when processing a message.

route-mapping: Used to determine a route to a handler when receiving external messages. This is usually used when receiving messages from external systems.

  • route: Name of the route - it must match the name defined above on the route name.
  • props: Any configuration necessary to drive the logic to determine a route based on the information received in a message.

When a message is routed to a message handler, the commercetools-connector will look at the message name and match it against a route name. Whenever a match is found, it takes the handler from that route and finds the class/bean implementing it to have the message processed by it.

In the route-mapping example below, 

`props`
 has a property called 
`inclusion-filter,`
 which is a list of message types supported by the route 
`commercetools.connect.product.upsert`
. If the message received has the type of 
`ResourceUpdated`
, then the message can be routed to 
`commercetools.connect.product.upsert`
. Receiving an external message and applying the logic to determine a route is always done by the listener - There is an example further down at SQS Queue Listener topic.

1 # Route mappings are used to determine a route to a handler when receiving external messages
2  route-mapping:
3   - route: "commercetools.connect.product.upsert"
4     props:
5     # any number of properties that can be used to determine if this route is adequate for the message received
6       name: "product"
7       inclusion-filter:
8          - "ResourceUpdated"
9          - "ResourceCreated"
10
11  # Handler routes          
12  routes:
13    - name: "commercetools.connect.product.upsert"
14      handler: "ProductUpsert"
15      props:
16        # any number of properties can drive or assit the handler to process a message
17        query: "ct-product.graphql"

Language: java

Name: Example

Description:

[Warning: empty required content area]

Job Handlers

When it is not possible to receive input/requests from systems (whether it’s Fluent OMS or other systems), jobs can be used to poll these systems, retrieve data, and have it processed. Note that full extracts should be avoided in favour of delta extracts. See commercetools Connector Configuration for more details on how to configure and run a job.

All jobs in the commercetools-connector share the same pre-and post-execution steps:

  • A job cannot have multiple instances running simultaneously.
  • Retrieves the job configuration from Fluent settings.
  • Calculates a date range (from-to datetime) the job may use during its execution.
  • Executes the actual job.
  • Persists the calculated date range as part of the job configuration so that it can be utilized on the next execution.
  • commercetools-connector runs at UTC time.
1@Slf4j
2@Component
3@HandlerInfo(name = "BatchInventorySyncJob")
4public class InventoryJobHandler extends JobHandler {
5
6    @Override
7    public void run(final JobHandlerContext jobHandlerContext) throws JobExecutionException {
8        final JobProperties settings = jobHandlerContext.getJobSettings();
9        final LocalDateTime start = settings.getStart();
10        final LocalDateTime end = settings.getEnd();
11
12        //get a runtime configuration propperty 
13        final String myProp = settings.getProp("my-property", StringUtils.EMPTY);
14
15        //get a build time (route) configuration property
16        final int pageSize = jobHandlerContext.getRouteSettings().getProp("page-size", 100);
17
18        //your job logic next...
19
20        //poll external system
21        //either add each item into the queue for async processing 
22            //jobHandlerContext.getPublishService().publishMessage(commercetools-connectorMessage);
23        //or process each received item here
24    }
25}

Language: java

Name: Example

Description:

[Warning: empty required content area]

Every job needs to have a route, and this is used to find the class that executes the job from a job execution request. The execution request will contain the Fluent account and retailer it should run on and the name of the job

`fc.connect.batch-inventory-sync`
. Like the message handler, the route can have optional properties for any additional property the job requires. Note that properties set at the route level are build-time configurations.

1routes:
2    - name: "fc.connect.batch-inventory-sync"
3      handler: "BatchInventorySyncJob"
4      props:
5        page-size: 500

Language: java

Name: Example

Description:

[Warning: empty required content area]

Configuration Handlers

Configuration handlers can be used when there is a need to react to configuration changes. The example below modifies the application log level based on the configured log level. Note that there is a validation step where one can determine if this handler should be executed or not, and in this example, it is just simply comparing the current log level with the new log level. When they are different, it returns 

`true,`
 and the commercetools-connector will execute this configuration handler.

1@Slf4j
2@Component
3@HandlerInfo(name = "log-level-update")
4public class LogLevelConfigurationHandler extends ConfigurationHandler {
5    private static final String DEFAULT_LEVEL = "INFO";
6    private final FluentConnectConfiguration connectConfiguration;
7
8    private Level currentLevel;
9
10    @Autowired
11    public LogLevelConfigurationHandler(final FluentConnectConfiguration connectConfiguration) {
12        this.connectConfiguration = connectConfiguration;
13    }
14
15    @Override
16    public boolean validate(@NotNull final Configuration configuration) {
17        final var newLevel = getLogLevel(configuration);
18
19        final var shouldExecute = currentLevel != null && newLevel.levelInt != currentLevel.levelInt;
20        currentLevel = newLevel;
21        return shouldExecute;
22    }
23
24    @Override
25    public void run(final @NotNull Configuration configuration) {
26        final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
27        final Logger logger = loggerContext.getLogger("com.fluentcommerce.connect");
28        if (logger != null) {
29            logger.setLevel(getLogLevel(configuration));
30            log.info("Log level for com.fluentcommerce.connect set to [{}].", logger.getLevel());
31
32        } else {
33            log.error("Unable to detect logger context: com.fluentcommerce.connect.");
34        }
35    }
36
37    private Level getLogLevel(@NotNull final Configuration configuration) {
38        final String key = connectConfiguration.getConfiguration().getLogLevelKey();
39        final ConfigurationTuple newValue = configuration.getProperties().getOrDefault(key, ConfigurationTuple.ofReadOnly(key, DEFAULT_LEVEL));
40
41        return StringUtils.isNotBlank(newValue.getValue()) ? Level.toLevel(newValue.getValue()) : Level.INFO;
42    }
43}

Language: java

Name: Example

Description:

[Warning: empty required content area]

Receiving messages from external systems

There are a couple of different ways to receive messages or requests from external systems.

HTTP Requests

The commercetools-connector comes with spring web-enabled, and opening up a new endpoint is as easy as creating a new rest controller. Please note that security is not enabled by default on the commercetools-connector, but it is possible to add the spring security library. If one does decide to add web security, check if your setup impacts the existing APIs. There is an open API page listing all existing endpoints - /api/docs.

1@Slf4j
2@RestController
3@RequestMapping("/sample")
4public class SampleController {
5
6  private final MyService myService;
7
8  @Autowired
9  public SampleController(final MyService myService) {
10      this.myService = myService;
11  }
12
13  @RequestMapping(method = RequestMethod.POST, consumes = "application/json")
14  public ResponseEntity<String> receive(@RequestBody final SampleRequest sampleRequest) {
15      try {
16          myService.process(sampleRequest);
17      } catch (final Exception e) {
18          log.error("Error receiving request", e);
19          return ResponseEntity.unprocessableEntity().build();
20      }
21      return ResponseEntity.ok().build();
22  }
23
24  @Value
25  public static class SampleRequest{
26      String id;
27      String name;
28      //...
29  }
30}

Language: java

Name: Example

Description:

[Warning: empty required content area]

External Listeners (SQS, Kafka, etc)

The commercetools-connector has its internal listeners, and although it is possible to extend it, this topic will only cover external listeners. An external listener receives messages from external systems to the commercetools-connector, for example, listing orders published by an e-commerce system. To create an external listener that subscribes to either a topic or queue, there is a specific class to extend, as the example below illustrates. When defining such a class, it is important to specify what kind of payload this listener will be working with at both the Generic type and the 

`@ListenerInfo`
 annotation. The ID of the listener is not the topic/queue name but the configuration ID of the listener.

1@Slf4j
2@Component
3@ListenerInfo(id = "my_queue_id", messageClass = SampleQueueListener.MyMessagePayload.class)
4public class SampleQueueListener extends ExternalListener<SampleQueueListener.MyMessagePayload> {
5
6  @Override
7  public Optional<Connectcommercetools-connectorMessage> receiveMessage(final ExternalMessage message) throws UnprocessableMessageException {
8      final MyMessagePayload messagePayload = getMessageContent(message);
9
10      final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap("website-id", messagePayload.websiteId));
11      if (accountReference.isPresent()) {
12          return Optional.ofNullable(getConnectcommercetools-connectorMessage(messagePayload, accountReference.get()));
13      }
14
15      log.error("Unable to retrieve a Fluent account for the message received [{}].", messagePayload.getId());
16      throw new UnprocessableMessageException("Unable to retrieve a Fluent account for the message received.");
17  }
18
19  /**
20   * Extract a subset of the payload, pre-process and enrich it, convert into a different format or simply use it as is.
21   */
22  private Connectcommercetools-connectorMessage getConnectcommercetools-connectorMessage(final MyMessagePayload messagePayload,
23                                                 final AccountReference accountReference) throws UnprocessableMessageException {
24      //The example below is a pass-through
25      try {
26          return Connectcommercetools-connectorMessage.builder()
27                  .id(UUID.randomUUID())
28                  .name(getMessageRoute(messagePayload))
29                  .accountId(accountReference.getAccountId())
30                  .retailerId(accountReference.getRetailerId())
31                  .payload(Connectcommercetools-connectorMessage.toJson(messagePayload))
32                  .build();
33      } catch (JsonProcessingException e) {
34          throw new UnprocessableMessageException(e);
35      }
36  }
37
38  /**
39   * Apply your logic to determine the route name
40   */
41  private String getMessageRoute(final MyMessagePayload messagePayload) {
42      return "my-route";
43  }
44
45  @Value
46  public static class MyMessagePayload {
47      String id;
48      String websiteId;
49      //... 
50  }
51}

Language: java

Name: Example

Description:

[Warning: empty required content area]

Each listener must have its own configuration section and the snippet below only covers the essentials. See Commercetools Connector Configuration for all configuration options and detailed information about each property.

1my_queue_id:
2      name: "SQS_CONNECTOR"
3      fluent-account: "DEV_ACCOUNT"
4      type: sqs

Language: java

Name: Example

Description:

[Warning: empty required content area]

Copyright © 2025 Fluent Retail Pty Ltd (trading as Fluent Commerce). All rights reserved. No materials on this docs.fluentcommerce.com site may be used in any way and/or for any purpose without prior written authorisation from Fluent Commerce. Current customers and partners shall use these materials strictly in accordance with the terms and conditions of their written agreements with Fluent Commerce or its affiliates.

Fluent Logo