diff --git a/.changelog/3833.txt b/.changelog/3833.txt new file mode 100644 index 0000000000..e9f5a096a5 --- /dev/null +++ b/.changelog/3833.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/cloudflare_queue: added support for create queue consumers +``` \ No newline at end of file diff --git a/internal/sdkv2provider/resource_cloudflare_queue.go b/internal/sdkv2provider/resource_cloudflare_queue.go index 9d3927b8c6..5bf5540fd7 100644 --- a/internal/sdkv2provider/resource_cloudflare_queue.go +++ b/internal/sdkv2provider/resource_cloudflare_queue.go @@ -46,6 +46,33 @@ func resourceCloudflareQueueCreate(ctx context.Context, d *schema.ResourceData, return diag.FromErr(fmt.Errorf("failed to find id in Create response; resource was empty")) } + if d.Get("consumers") != nil { + consumers := d.Get("consumers").([]interface{}) + for _, consumer := range consumers { + consumerMap := consumer.(map[string]interface{}) + settings := consumerMap["settings"].([]interface{})[0].(map[string]interface{}) + req := cloudflare.CreateQueueConsumerParams{ + QueueName: queueName, + Consumer: cloudflare.QueueConsumer{ + Service: "workers", + Name: consumerMap["script_name"].(string), + QueueName: queueName, + Environment: consumerMap["environment"].(string), + ScriptName: consumerMap["script_name"].(string), + Settings: cloudflare.QueueConsumerSettings{ + BatchSize: settings["batch_size"].(int), + MaxRetires: settings["max_retries"].(int), + MaxWaitTime: settings["max_wait_time_ms"].(int), + }, + }, + } + _, err := client.CreateQueueConsumer(ctx, cloudflare.AccountIdentifier(accountID), req) + if err != nil { + return diag.FromErr(errors.Wrap(err, "error creating workers queue consumer")) + } + } + } + d.SetId(r.ID) tflog.Info(ctx, fmt.Sprintf("Cloudflare Workers Queue ID: %s. Name: %s", d.Id(), queueName)) @@ -68,6 +95,26 @@ func resourceCloudflareQueueRead(ctx context.Context, d *schema.ResourceData, me if r.ID == queueID { queue = r d.Set("name", r.Name) + consumers := make([]map[string]interface{}, 0) + if len(r.Consumers) > 0 { + for _, consumer := range r.Consumers { + consumerMap := map[string]interface{}{ + "environment": consumer.Environment, + "queue_name": consumer.QueueName, + "script_name": consumer.ScriptName, + "settings": []map[string]interface{}{ + { + "batch_size": consumer.Settings.BatchSize, + "max_retries": consumer.Settings.MaxRetires, + "max_wait_time_ms": consumer.Settings.MaxWaitTime, + }, + }, + } + consumers = append(consumers, consumerMap) + } + println(fmt.Sprintf("consumers: %+v", consumers)) + d.Set("consumers", consumers) + } break } } diff --git a/internal/sdkv2provider/resource_cloudflare_queue_test.go b/internal/sdkv2provider/resource_cloudflare_queue_test.go index 7e4425784f..7574918460 100644 --- a/internal/sdkv2provider/resource_cloudflare_queue_test.go +++ b/internal/sdkv2provider/resource_cloudflare_queue_test.go @@ -80,6 +80,43 @@ func TestAccCloudflareQueue_Basic(t *testing.T) { }) } +func TestAccCloudflareQueue_Consumer(t *testing.T) { + t.Parallel() + var queue cloudflare.Queue + accountID := os.Getenv("CLOUDFLARE_ACCOUNT_ID") + rnd := generateRandomResourceName() + resourceName := "cloudflare_queue." + rnd + workerScriptName := "cloudflare_workers_script." + rnd + + resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + }, + ProviderFactories: providerFactories, + CheckDestroy: testAccCloudflareQueueDestroy, + Steps: []resource.TestStep{ + + { + + Config: testAccCheckCloudflareQueueWConsumer(rnd, accountID), + Check: resource.ComposeTestCheckFunc( + testAccCheckCloudflareQueueExists(rnd, &queue), + resource.TestCheckResourceAttr(resourceName, "name", rnd), + resource.TestCheckResourceAttr(resourceName, "account_id", accountID), + resource.TestCheckResourceAttr(workerScriptName, "name", rnd), + ), + }, + + { + ImportState: true, + ImportStateVerify: true, + ResourceName: resourceName, + ImportStateIdPrefix: fmt.Sprintf("%s/", accountID), + }, + }, + }) +} + func testAccCloudflareQueueDestroy(s *terraform.State) error { client := testAccProvider.Meta().(*cloudflare.API) @@ -112,10 +149,35 @@ resource "cloudflare_queue" "%[1]s" { }`, rnd, accountID, name) } +func testAccCheckCloudflareQueueWConsumer(rnd, accountID string) string { + queueModuleContent := `export default { queue(batch, env) { return new Response('Hello world'); }, };` + return fmt.Sprintf(` + resource "cloudflare_workers_script" "%[1]s" { + account_id = "%[2]s" + name = "%[1]s" + content = "%[3]s" + module = true + } + + resource "cloudflare_queue" "%[1]s" { + account_id = "%[2]s" + name = "%[1]s" + consumers { + script_name = "%[1]s" + settings { + batch_size = 5 + } + } + depends_on = [cloudflare_workers_script.%[1]s] + }`, rnd, accountID, queueModuleContent) + +} + + func testAccCheckCloudflareQueueExists(name string, queue *cloudflare.Queue) resource.TestCheckFunc { return func(s *terraform.State) error { client := testAccProvider.Meta().(*cloudflare.API) - + rs, ok := s.RootModule().Resources["cloudflare_queue."+name] if !ok { return fmt.Errorf("not found: %s", name) @@ -137,3 +199,4 @@ func testAccCheckCloudflareQueueExists(name string, queue *cloudflare.Queue) res return fmt.Errorf("queue not found") } } + diff --git a/internal/sdkv2provider/schema_cloudflare_queue.go b/internal/sdkv2provider/schema_cloudflare_queue.go index 2ad61a3d7e..26f53e2a7d 100644 --- a/internal/sdkv2provider/schema_cloudflare_queue.go +++ b/internal/sdkv2provider/schema_cloudflare_queue.go @@ -17,5 +17,56 @@ func resourceCloudflareQueueSchema() map[string]*schema.Schema { Required: true, Description: "The name of the queue.", }, + "consumers": { + Type: schema.TypeList, + Optional: true, + Description: "Array of consumers.", + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "environment": { + Description: "Environment", + Type: schema.TypeString, + Optional: true, + }, + "queue_name": { + Description: "Queue name", + Type: schema.TypeString, + Optional: true, + }, + "script_name": { + Description: "script_name", + Type: schema.TypeString, + Required: true, + }, + "settings": { + Description: "Settings", + Type: schema.TypeList, + Required: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "batch_size": { + Description: "Batch size", + Type: schema.TypeInt, + Optional: true, + Default: 1, + }, + "max_retries": { + Description: "Max retries", + Type: schema.TypeInt, + Optional: true, + Default: 3, + }, + "max_wait_time_ms": { + Description: "Max wait time in milliseconds", + Type: schema.TypeInt, + Optional: true, + Default: 1000, + }, + }, + }, + }, + }, + }, + }, } }