Solving Data Ingestion at scale using argo workflows

Solving data ingestion at scale using argo workflows!

Challenges with Data Ingestion 

At Unbxd we process a huge volume of e-commerce catalog data for multiple sites to serve search results where product count varies from 5k to 50M. This is a multi-tenant architecture that involves periodic refreshes of complete catalog and incremental updates on fields like price, inventory, etc. The frequency of such uploads varies from client to client and spreads across multiple regions. The objective here is to treat a site with a catalog size of 5k products with hourly updates and a site with a catalog size of 10M with once in a day updating the same, honoring the predefined SLAs in both cases while keeping the usage cost to minimal.

A generic data ingestion of any search engine involves following operations

  1. A periodic full refresh of catalog
  2. Frequent incremental updates on a set of fields, where field size may vary significantly on every update

For an e-commerce search engine, where a product once added is either removed or has very minor updates, operations can be:

  1. A periodic full refresh of the catalog where the change on products is usually less than 10% of the whole catalog size
  2. Frequent incremental updates on a fixed set of fields like inventory size, price of products, etc.
  3. Deletion of an out of stock product

Given the very specific requirement set and unpredictable frequency of updates, the frequently chosen pipeline of Kafka‚ÄĒ>storm‚ÄĒ>solr-cloud doesn’t work very well. Distinguishing different clients considering¬† their varying size becomes tough and can increase overall infrastructure cost with a lot of unused bandwidth during off-peak hours.

Transforming Ingestion request to the workflow 

We decided to treat every catalog ingestion request as a workflow. This gives us two major advantages. Every request is independent of each other. Resources are used only when there is an upload event

Out of various workflow management platforms out there, Argo checked all the boxes for us. We decided to ditch other popular options like airflow because Argo is a container-native workflow engine on Kubernetes and since we already have most of our services on kubernetes, maintenance was not an issue.

On top of this, we take advantage of AWS spot instances, which ensures we have enough capacity to launch any number of workflows at the same time

Here’s a very basic view of our request flow :

  1. Request when received on app server is identified for its region, a request for region-2 is bounced to the appropriate app server while the region-1 request is sent downstream. We make use of NATS to sync data across regions,  find  more details here 
  2. App server uses Argo server APIs to launch appropriate workflow with configurations that in turn decide the scale of workflow job and provides all sort of metadata for the step execution
  3. Every step of the workflow emits events that are processed by the app server to provide status updates on completion/failure of the workflow. Alerts are triggered on failures.
              +---------------------------------------------------------------+
              |                                                               |
              |                                                             +-+-+
 workflow     |                                                             |   |
 event        |                                                             |   | kafka queue
 consumed     |       app server (region 2)                                 |   |
              |                                                             +-+-+
              |      +-------------+                                          ^
              |      |             |                                          |
              |      +-------------+                                          |
              |                                     argo workflow engine      | workflow events
              |                                                               |
public lb     |                                   +----------------------------------+
              |       app server (region 1)       |                           |      |
 +------+     |                                   |  +-------------+          |      |
 |      |     |      +-------------+              |  | workflow 1  |          |      |
 |      +-----+----+ |             +--------------+  |             +----------+      |
 |      |            +-------------+              |  +-------------+                 |
 +------+                                         |                                  |
                   - translates requests to       |                                  |
                     workflow                     |                                  |
                   - consumes events from         +----------------------------------+
                     workflow to update status

 
Basic view of Request Flow
A basic view of the request workflow

Challenges

  1. Storage

Even though Kubernetes has revolutionized the way applications are deployed and maintained, it falls short on out of the box storage solutions. If you’re on AWS (Amazon Web Services) like us, you get EBS and EFS as your two options. Our use-case requires us to mount a storage on multiple pods of a workflow at the same time, but the number of parallel mounts jumps in some cases where catalog file size is huge and has to be split into multiple files for faster processing. EBS as per amazon has an upper limit on allowed parallel mounts. In this case a pod requesting mount would get stuck indefinitely till other pods release the resource.

Thankfully, CSI (Container Storage Interface) is a standard set of specifications adapted into kubernetes which has a lot of driver plugins. We make use of Amazon FSx for Lustre which doesn’t have any limits on parallel mounts and provides fast storage. Details of the driver can be found here https://github.com/kubernetes-sigs/aws-fsx-csi-driver – Connect to preview

  1. Scaling Solr Indexing

While solr provides a lot of search features out of the box, indexing still requires extensive configuration tweaks to ensure faster indexing while keeping search traffic unaffected.

The default setup works well for any incremental or in-place updates and deletes. But for cases where a huge catalog has to be indexed into solr cloud cluster it can significantly hit the search traffic. Adding to it performance requirements for indexing can exceed searching and is usually a one time or periodic job.

To solve this we chose not to index into the production solr cluster at all. Instead our workflow step responsible for indexing spawns an embedded solr server to index it locally and imports the index into the prod cluster. This offers us following benefits

  1. Only interaction with production cluster is done when the output index is to be imported which is just a directory copy call
  2. Given the requirement a pod of any size appropriate for indexing can be launched temporarily
  3. Parallel indexing into separate pods can maximize performance, output of which can be later merged into one index
  4. Allows us to tweak lucene which works underneath solr to index documents

Following are the lucene properties we tweak but YMMV

  1. Index Merge Policy : Every batch of documents in lucene is written as a segment, which is periodically merged by a scheduler that tries to merge similar sized segments. Objective here is to keep segment count as low as possible because it slows down search as it has to iterate over every segment to find matching documents.
    This is quite unnecessary while indexing which is why we use NoMergePolicyFactory while merging of segments is an operation we do later in the workflow.
  2. RAM_BUFFER_SIZE : As the name suggests, you can opt for a higher value of memory buffer to ensure docs are written to the file system in desirable chunks. We figured keeping a high value doesn’t always help and you’ll have to arrive at a sweet spot by trial. Turning on infoStream is recommended while tweaking as it gives detailed logs of what is happening behind the scenes while lucene indexes documents.
[DWPT][main]:new segment has 0 deleted docs
[DWPT][main]: new segment has no vectors; norms; docValues; prox; freqs
[DWPT][main]: flushedFiles=[_3.fdx, _3.nvd, _3_Lucene50_0.pos, _3_Lucene54_0.dvd, _3_Lucene50_0.doc, _3_Lucene50_0.tim, _3.nvm, _3.fnm, _3.fdt, _3_Lucene50_0.tip, _3_Lucene54_0.dvm]
[DWPT][main]: flushed codec=Lucene62
[DWPT][main]: flushed: segment=_3 ramUsed=53.245 MB newFlushedSize=19.837 MB docs/MB=504.115
[DWPT][main]: flush time 1508.68918 msec
[DW][main]: publishFlushedSegment seg-private updates=null
[IW][main]: publishFlushedSegment 
All these configurations can be put in solrConfig.xml under indexConfig
indexConfig
1. ramBufferSizeMB
2. maxBufferedDocs
4. mergePolicyFactory
5. infoStream
 
  1. Scaling solr product retrieval

While solr offers both indexing and storing  docs, retrieval of documents post searching can cause a toll on performance, which is why we chose to only index docs and retrieve unique Id on search. We have our own implementation of a product store which works with aerospike and dgraph underneath. A retrieved set of unique Ids from solr is then stitched with docs from the product store.

Another big problem that it solves is identifying the no of products that actually require changes. In an ideal scenario when a complete catalog is being sent for ingestion, the change in products data is never more than 10% of the total catalog size which when once identified is the only change we push to solr while a full catalog indexing is only done when a configuration changes.

Stitching it together !

Now that we’ve ironed out all complications, comes the most important part, stitching it all together.

Argo workflow works on two resource kinds, argo workflow template and argo workflow

 

A full functional workflow specification looks like this

#### Workflow Definition
apiVersion: argoproj.io/v1alpha1
kind: Workflow
spec:
  entrypoint: steps ### --1--
  onExit: cleanup
  parallelism: 15 ### --2--
  volumes:
    - name: fsx-claim
      persistentVolumeClaim:
        claimName: fsx-claim
  templates:
    - name: cleanup
      steps:
        - - name: cleanup
            templateRef:
              name: cleanup
              template: cleanup-template
            arguments:
             parameters:
              - name: name
                value: cleanup
              - name: id
                value: 6131a465-318e-4ed4-825f-add951db6bc9
                                                                             
    - name: steps
      steps:
        - - name: step1
            templateRef:
              name: step-1
              template: step-1-template
            arguments:
              parameters:
                - name: name
                  value: step-1
                - name: id
                  value: 6131a465-318e-4ed4-825f-add951db6bc9
            withItems: ### --6--
              - file1.json
              - file2.json
        - - name: step-2
            templateRef:
              name: step-2
              template: step-2-template
            arguments:
              parameters:
                - name: name
                  value: step-2
                - name: id
                  value: 6131a465-318e-4ed4-825f-add951db6bc9  

#### Workflow Template Definition 
### --3--
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: cleanup
spec:
  templates:
    - name: cleanup-template
      retryStrategy: ### --5--
        limit: 3
      inputs:
        parameters:
          - name: name
          - name: id
      activeDeadlineSeconds: 1800 ### --8--
      container:
        image: /:
        imagePullPolicy: IfNotPresent
        requests: ### --7--
          memory: 400Mi
          cpu: 500m
        env:
          - name: env-1
            value: value-1
        command: ["./cleanup"]
        args
          - "--name={{inputs.parameters.name}}"
          - "--id={{inputs.parameters.id}}"
        volumeMounts:
          - name: fsx-claim
            mountPath: /mnt/vol/
 

For a smart workflow, it is very important to split your process into components that can be independent of each other and can be retried internally in case of failure from the last known save point.

We chose to split ours into following steps

  1. Analyzer: Runs through product catalog, checks for any format errors, emits appropriate metadata for next step to scale accordingly like total product count, per product size etc.
  2. Enricher: Enriches the fields of catalog with various microservices around the system specifically aimed at it. More details to existing fields are added here to enhance search experience
  3. Indexer: Indexer indexes data to target stores. In our case we have solr and product store which function as a search index and product retrieval service respectively
  4. Resource Manager: Resource Manager on the basis of traffic stats and off peak hours decides how much resources to allocate to a tenant

Argo Syntax

  1. onExit : https://github.com/argoproj/argo/blob/master/examples/template-on-exit.yaml
    lets us configure a mandatory set of steps that will be executed on workflow completion, both in cases of success and failure
  2. parallelism : https://github.com/argoproj/argo/blob/master/examples/parallelism-limit.yaml
    lets us configure how many parallel pods can be spawned within a workflow, this is to keep an upper limit to ensure other workflow jobs do not get affected
  3. Workflowtemplate : https://github.com/argoproj/argo/tree/master/examples/workflow-template – Connect to preview
    Above example explains in detail how to configure them. For frequently used templates that take part in multiple workflows, you create a    library which can be referenced in original workflow
  4. retryStrategy : https://github.com/argoproj/argo/blob/master/examples/retry-on-error.yaml
    You can configure a retry strategy for each step, defining retry action on a failure/error
  5. withItems: https://github.com/argoproj/argo/blob/master/examples/loops-dag.yaml
  6. requests: you can configure pod level restrictions on cpu and memory to be allocated

activeDeadlineSeconds: https://github.com/argoproj/argo/blob/master/examples/timeouts-workflow.yaml When specified a pod will timeout and onExit steps will be executed

Conclusion

We are actively moving towards an architecture where we try to ensure that any write operation on a live site has minimal effect on production traffic ensuring seclusion from other active sites as well. As a result of this we see 3x improvement in indexing time

E-commerce is an ecosystem where immediate reflecting of any change in data is equally important to ensure  100% site availability and fast response times and a workflow driven architecture helps in achieving this while making scaling up inherently feasible

More To Explore

Employees and Customers of Unbxd
Unbxd News

To Employees and Customers of Unbxd

For any organization, its employees and customers are the pillars of strength and growth. We at Unbxd take pride in being able to attend to