em->getRepository(Import::class)->find($message->importId); if (!$import) { $this->logger->error('Import not found', ['importId' => $message->importId]); return; } $csvContent = $this->defaultStorage->read($import->getFilePath()); $tmpFile = tempnam(sys_get_temp_dir(), 'import_'); file_put_contents($tmpFile, $csvContent); try { $ltbxdMovies = $this->ltbxdGateway->parseFileFromPath($tmpFile); } finally { unlink($tmpFile); } $batch = array_slice($ltbxdMovies, $message->offset, $message->limit); $userId = $import->getUser()->getId(); $importId = $import->getId(); foreach ($batch as $ltbxdMovie) { try { $movie = $this->filmImporter->importFromLtbxdMovie($ltbxdMovie); if (!$movie) { $this->importRepository->incrementFailedFilms($import); continue; } $this->actorSyncer->syncActorsForMovie($movie); // Import awards for actors of this movie foreach ($movie->getActors() as $role) { $this->awardImporter->importForActor($role->getActor()); } $user = $this->em->getReference(\App\Entity\User::class, $userId); $existingLink = $this->em->getRepository(UserMovie::class)->findOneBy([ 'user' => $user, 'movie' => $movie, ]); if (!$existingLink) { $userMovie = new UserMovie(); $userMovie->setUser($user); $userMovie->setMovie($movie); $this->em->persist($userMovie); } $this->em->flush(); } catch (\Throwable $e) { $this->logger->warning('Failed to import film', [ 'film' => $ltbxdMovie->getName(), 'importId' => $importId, 'error' => $e->getMessage(), ]); $this->importRepository->incrementFailedFilms($import); } $this->em->clear(); $import = $this->em->getRepository(Import::class)->find($importId); } $processedBatches = $this->importRepository->incrementProcessedBatches($import); if ($processedBatches >= $import->getTotalBatches()) { // Refresh the entity to get updated failedFilms from DB $this->em->refresh($import); $import->setStatus(Import::STATUS_COMPLETED); $import->setCompletedAt(new \DateTimeImmutable()); $this->em->flush(); } } }