Skip to content

Commit

Permalink
logger calls moved to IO chain, other refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sparkhi committed Feb 11, 2025
1 parent 651a345 commit c43b92b
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ object DynamoFormatters {
val queuedAt = "queuedAt"
val sourceSystem = "sourceSystem"
val taskToken = "taskToken"
val executionName = "executionName"

private def validateProperty(av: DynamoValue, name: String) =
av.toAttributeValue.m().asScala.get(name).map(_.s()).map(Validated.Valid.apply).getOrElse(Validated.Invalid(name -> MissingProperty)).toValidatedNel

given filesTablePkFormat: Typeclass[FilesTablePrimaryKey] = new DynamoFormat[FilesTablePrimaryKey]:
override def read(av: DynamoValue): Either[DynamoReadError, FilesTablePrimaryKey] = {
val valueMap = av.toAttributeValue.m().asScala

def validateProperty(name: String) =
valueMap.get(name).map(_.s()).map(Validated.Valid.apply).getOrElse(Validated.Invalid(name -> MissingProperty)).toValidatedNel

(validateProperty(id), validateProperty(batchId))
(validateProperty(av, id), validateProperty(av, batchId))
.mapN { (id, batchId) =>
FilesTablePrimaryKey(FilesTablePartitionKey(UUID.fromString(id)), FilesTableSortKey(batchId))
}
Expand All @@ -123,12 +123,8 @@ object DynamoFormatters {

given queueTablePkFormat: Typeclass[IngestQueuePrimaryKey] = new DynamoFormat[IngestQueuePrimaryKey]:
override def read(av: DynamoValue): Either[DynamoReadError, IngestQueuePrimaryKey] = {
val valueMap = av.toAttributeValue.m().asScala

def validateProperty(name: String) =
valueMap.get(name).map(_.s()).map(Validated.Valid.apply).getOrElse(Validated.Invalid(name -> MissingProperty)).toValidatedNel

(validateProperty(sourceSystem), validateProperty(queuedAt))
(validateProperty(av, sourceSystem), validateProperty(av, queuedAt))
.mapN { (sourceSystem, queuedAt) =>
IngestQueuePrimaryKey(IngestQueuePartitionKey(sourceSystem), IngestQueueSortKey(Instant.parse(queuedAt)))
}
Expand Down
2 changes: 1 addition & 1 deletion scala/lambdas/ingest-flow-control/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The configuration which governs how the flow of various tasks is controlled. A t
```

In the configuration shown above,
- `"maxConcurrency: 7"` indicates that there can be upto 7 ingest processes running at a time
- `"maxConcurrency: 8"` indicates that there can be upto 8 ingest processes running at a time
- Each source system is configured with its `systemName` (e.g. "TDR", "FCL" etc.)
- Each source system has a configuration of `reservedChannels` and `probability`
- `reservedChannels` means there is a reserved channel out of the `maxConcurrency` for that specific system.
Expand Down
Loading

0 comments on commit c43b92b

Please sign in to comment.