Skip to content

Commit

Permalink
Update indexing process (#2345)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fosol authored Nov 26, 2024
1 parent be975d1 commit a6f23c9
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 9 deletions.
19 changes: 19 additions & 0 deletions api/net/Areas/Kafka/Controllers/ProducerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,24 @@ public async Task<IActionResult> SendEventAsync([FromBody] EventScheduleRequestM
StatusCode = 201
};
}

/// <summary>
/// Add the index request to the folder queue.
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
[HttpPost("folder")]
[Produces(MediaTypeNames.Application.Json)]
[ProducesResponseType(typeof(DeliveryResultModel<IndexRequestModel>), (int)HttpStatusCode.Created)]
[ProducesResponseType(typeof(ErrorResponseModel), (int)HttpStatusCode.BadRequest)]
[SwaggerOperation(Tags = new[] { "Kafka" })]
public async Task<IActionResult> SendToFolderAsync([FromBody] IndexRequestModel model)
{
var result = (await _producer.SendMessageAsync(_kafkaOptions.FolderTopic, model)) ?? throw new InvalidOperationException("An unknown error occurred when publishing message to Kafka");
return new JsonResult(new DeliveryResultModel<IndexRequestModel>(result))
{
StatusCode = 201
};
}
#endregion
}
5 changes: 5 additions & 0 deletions api/net/Config/KafkaOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,10 @@ public class KafkaOptions
/// get/set - The Kafka topic name to request FFmpeg processes.
/// </summary>
public string FFmpegTopic { get; set; } = "";

/// <summary>
/// get/set - The Kafka topic name to add content to folders.
/// </summary>
public string FolderTopic { get; set; } = "";
#endregion
}
2 changes: 1 addition & 1 deletion api/net/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"EventTopic": "event-schedule",
"HubTopic": "hub",
"FFmpegTopic": "ffmpeg",
"FolderTopic": "folder",
"Producer": {
"ClientId": "API",
"BootstrapServers": "kafka-broker-0.kafka-headless:9092,kafka-broker-1.kafka-headless:9092,kafka-broker-2.kafka-headless:9092",
Expand Down Expand Up @@ -112,5 +113,4 @@
"EmailEnabled": true,
"EmailAuthorized": true
}

}
19 changes: 15 additions & 4 deletions libs/net/services/Helpers/ApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ public async Task<T> HandleConcurrencyAsync<T>(Func<Task<T>> callbackDelegate)
var url = this.Options.ApiUrl.Append($"kafka/producers/event");
return await RetryRequestAsync(async () => await this.OpenClient.PostAsync<API.Areas.Kafka.Models.DeliveryResultModel<TNO.Kafka.Models.EventScheduleRequestModel>>(url, JsonContent.Create(request)));
}

/// <summary>
/// Send indexing message to folder topic in Kafka.
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public async Task<API.Areas.Kafka.Models.DeliveryResultModel<TNO.Kafka.Models.IndexRequestModel>?> SendMessageAsync(TNO.Kafka.Models.IndexRequestModel request)
{
var url = this.Options.ApiUrl.Append($"kafka/producers/folder");
return await RetryRequestAsync(async () => await this.OpenClient.PostAsync<API.Areas.Kafka.Models.DeliveryResultModel<TNO.Kafka.Models.IndexRequestModel>>(url, JsonContent.Create(request)));
}
#endregion

#region Lookup Methods
Expand Down Expand Up @@ -353,7 +364,7 @@ public async Task<HttpResponseMessage> GetSourcesResponseAsync()
public async Task<HttpResponseMessage> GetIngestsResponseAsync()
{
var url = this.Options.ApiUrl.Append($"services/ingests");
return await RetryRequestAsync(async () => await this.OpenClient.GetAsync(url));
return await RetryRequestAsync(async () => await this.OpenClient.GetAsync(url));
}
public async Task<HttpResponseMessage> GetIngestsResponseWithEtagAsync(string etag)
{
Expand Down Expand Up @@ -1078,19 +1089,19 @@ public async Task<HttpResponseMessage> AddContentToFolderAsync(long contentId, i
var response = await GetSettingsResponseAsync();
return await GetResponseDataAsync<IEnumerable<API.Areas.Services.Models.Setting.SettingModel>?>(response);
}

public async Task<HttpResponseMessage> GetSettingsResponseAsync()
{
var url = this.Options.ApiUrl.Append($"services/settings");
return await RetryRequestAsync(async () => await this.OpenClient.GetAsync(url));
}

public async Task<HttpResponseMessage> GetSettingsResponseWithEtagAsync(string etag)
{
var url = this.Options.ApiUrl.Append($"services/settings");
return await RetryRequestAsync(async () => await this.OpenClient.GetAsync(url, etag));
}

#endregion

#endregion
Expand Down
7 changes: 7 additions & 0 deletions libs/net/services/Helpers/IApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public interface IApiService
/// <param name="request"></param>
/// <returns></returns>
Task<API.Areas.Kafka.Models.DeliveryResultModel<TNO.Kafka.Models.EventScheduleRequestModel>?> SendMessageAsync(TNO.Kafka.Models.EventScheduleRequestModel request);

/// <summary>
/// Send indexing message to folder topic in Kafka.
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
Task<API.Areas.Kafka.Models.DeliveryResultModel<TNO.Kafka.Models.IndexRequestModel>?> SendMessageAsync(TNO.Kafka.Models.IndexRequestModel request);
#endregion

#region Lookups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ metadata:
data:
KAFKA_CLIENT_ID: FolderCollection
MAX_FAIL_LIMIT: "5"
TOPICS: index
TOPICS: folder
CHES_EMAIL_ENABLED: "true"
CHES_EMAIL_AUTHORIZED: "true"
IGNORE_CONTENT_PUBLISHED_BEFORE_OFFSET: "7"
2 changes: 1 addition & 1 deletion services/net/folder-collection/FolderCollectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private async Task ProcessRequestAsync(IndexRequestModel request)

// TODO: Review how we can cache filters so that we do not need to request them every time we index content.
var folders = await this.Api.GetFoldersWithFiltersAsync() ?? Array.Empty<API.Areas.Services.Models.Folder.FolderModel>();
var activeFolders = folders.Where(f => f.Filter != null && f.Filter?.IsEnabled == true);
var activeFolders = folders.Where(f => f.Filter != null && f.Filter.IsEnabled == true);

if (activeFolders.Any())
this.Logger.LogDebug("Content being processed by folder filters. Content ID: {contentId}", content.Id);
Expand Down
2 changes: 1 addition & 1 deletion services/net/folder-collection/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"MaxFailLimit": 5,
"ApiUrl": "http://api:8080",
"TimeZone": "Pacific Standard Time",
"Topics": "index",
"Topics": "folder",
"SendEmailOnFailure": true
},
"Elastic": {
Expand Down
4 changes: 3 additions & 1 deletion services/net/indexing/IndexingManager.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using Confluent.Kafka;
using Elastic.Clients.Elasticsearch;
using Elastic.Transport;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using TNO.API.Areas.Services.Models.Content;
using TNO.Ches;
using TNO.Ches.Configuration;
Expand Down Expand Up @@ -313,6 +313,8 @@ private async Task ProcessIndexRequestAsync(ConsumeResult<string, IndexRequestMo
this.Logger.LogWarning("Content does not exists. Content ID: {ContentId}", result.Message.Value.ContentId);
}
}
// Indexing is completed, pass the baton to the folder process.
await this.Api.SendMessageAsync(model);
this.Logger.LogDebug($"ProcessIndexRequestAsync:END:{result.Message.Key}");
}

Expand Down

0 comments on commit a6f23c9

Please sign in to comment.